View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.common.keymaster.client.zookeeper;
20  
21  import java.util.List;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.stream.Collectors;
25  import org.apache.curator.framework.CuratorFramework;
26  import org.apache.curator.x.discovery.ServiceDiscovery;
27  import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
28  import org.apache.curator.x.discovery.ServiceInstance;
29  import org.apache.curator.x.discovery.ServiceProvider;
30  import org.apache.syncope.common.keymaster.client.api.KeymasterException;
31  import org.apache.syncope.common.keymaster.client.api.ServiceOps;
32  import org.apache.syncope.common.keymaster.client.api.model.NetworkService;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  import org.springframework.beans.factory.InitializingBean;
36  import org.springframework.beans.factory.annotation.Autowired;
37  
38  /**
39   * Implements {@link ServiceOps} via Apache Curator / Zookeeper via Curator's {@link ServiceDiscovery}.
40   */
41  public class ZookeeperServiceDiscoveryOps implements ServiceOps, InitializingBean {
42  
43      private static final Logger LOG = LoggerFactory.getLogger(ServiceOps.class);
44  
45      private static final String SERVICE_PATH = "/services";
46  
47      private final Map<NetworkService.Type, ServiceProvider<Void>> providers = new ConcurrentHashMap<>();
48  
49      @Autowired
50      private CuratorFramework client;
51  
52      private ServiceDiscovery<Void> discovery;
53  
54      @Override
55      public void afterPropertiesSet() throws Exception {
56          discovery = ServiceDiscoveryBuilder.builder(Void.class).
57                  client(client).
58                  basePath(SERVICE_PATH).
59                  build();
60          discovery.start();
61      }
62  
63      private ServiceProvider<Void> getProvider(final NetworkService.Type type) {
64          return providers.computeIfAbsent(type, t -> {
65              try {
66                  ServiceProvider<Void> provider = discovery.
67                          serviceProviderBuilder().
68                          serviceName(t.name()).build();
69                  provider.start();
70                  return provider;
71              } catch (KeymasterException e) {
72                  throw e;
73              } catch (Exception e) {
74                  throw new KeymasterException("While preparing ServiceProvider for " + type, e);
75              }
76          });
77      }
78  
79      @Override
80      public void register(final NetworkService service) {
81          try {
82              unregister(service);
83  
84              ServiceInstance<Void> instance = ServiceInstance.<Void>builder().
85                      name(service.getType().name()).
86                      address(service.getAddress()).
87                      build();
88              discovery.registerService(instance);
89          } catch (KeymasterException e) {
90              throw e;
91          } catch (Exception e) {
92              LOG.error("While registering {}", service, e);
93              throw new KeymasterException(e);
94          }
95      }
96  
97      @Override
98      public void unregister(final NetworkService service) {
99          try {
100             discovery.queryForInstances(service.getType().name()).stream().
101                     filter(instance -> instance.getName().equals(service.getType().name())
102                     && instance.getAddress().equals(service.getAddress())).findFirst().
103                     ifPresent(instance -> {
104                         try {
105                             discovery.unregisterService(instance);
106                         } catch (Exception e) {
107                             LOG.error("While deregistering {}", service, e);
108                             throw new KeymasterException(e);
109                         }
110                     });
111         } catch (KeymasterException e) {
112             throw e;
113         } catch (Exception e) {
114             LOG.error("While registering {}", service, e);
115             throw new KeymasterException(e);
116         }
117     }
118 
119     private static NetworkService toNetworkService(
120         final NetworkService.Type serviceType,
121         final ServiceInstance<Void> serviceInstance) {
122 
123         NetworkService ns = new NetworkService();
124         ns.setType(serviceType);
125         ns.setAddress(serviceInstance.getAddress());
126         return ns;
127     }
128 
129     @Override
130     public List<NetworkService> list(final NetworkService.Type serviceType) {
131         try {
132             return discovery.queryForInstances(serviceType.name()).stream().
133                     map(serviceInstance -> toNetworkService(serviceType, serviceInstance)).
134                     collect(Collectors.toList());
135         } catch (KeymasterException e) {
136             throw e;
137         } catch (Exception e) {
138             throw new KeymasterException(e);
139         }
140     }
141 
142     @Override
143     public NetworkService get(final NetworkService.Type serviceType) {
144         ServiceInstance<Void> serviceInstance = null;
145         try {
146             if (!discovery.queryForInstances(serviceType.name()).isEmpty()) {
147                 serviceInstance = getProvider(serviceType).getInstance();
148             }
149         } catch (KeymasterException e) {
150             throw e;
151         } catch (Exception e) {
152             throw new KeymasterException(e);
153         }
154 
155         if (serviceInstance == null) {
156             throw new KeymasterException("No services found for " + serviceType);
157         }
158         return toNetworkService(serviceType, serviceInstance);
159     }
160 }