1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.common.keymaster.client.zookeeper;
20
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.stream.Collectors;
25 import org.apache.curator.framework.CuratorFramework;
26 import org.apache.curator.x.discovery.ServiceDiscovery;
27 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
28 import org.apache.curator.x.discovery.ServiceInstance;
29 import org.apache.curator.x.discovery.ServiceProvider;
30 import org.apache.syncope.common.keymaster.client.api.KeymasterException;
31 import org.apache.syncope.common.keymaster.client.api.ServiceOps;
32 import org.apache.syncope.common.keymaster.client.api.model.NetworkService;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.springframework.beans.factory.InitializingBean;
36 import org.springframework.beans.factory.annotation.Autowired;
37
38
39
40
41 public class ZookeeperServiceDiscoveryOps implements ServiceOps, InitializingBean {
42
43 private static final Logger LOG = LoggerFactory.getLogger(ServiceOps.class);
44
45 private static final String SERVICE_PATH = "/services";
46
47 private final Map<NetworkService.Type, ServiceProvider<Void>> providers = new ConcurrentHashMap<>();
48
49 @Autowired
50 private CuratorFramework client;
51
52 private ServiceDiscovery<Void> discovery;
53
54 @Override
55 public void afterPropertiesSet() throws Exception {
56 discovery = ServiceDiscoveryBuilder.builder(Void.class).
57 client(client).
58 basePath(SERVICE_PATH).
59 build();
60 discovery.start();
61 }
62
63 private ServiceProvider<Void> getProvider(final NetworkService.Type type) {
64 return providers.computeIfAbsent(type, t -> {
65 try {
66 ServiceProvider<Void> provider = discovery.
67 serviceProviderBuilder().
68 serviceName(t.name()).build();
69 provider.start();
70 return provider;
71 } catch (KeymasterException e) {
72 throw e;
73 } catch (Exception e) {
74 throw new KeymasterException("While preparing ServiceProvider for " + type, e);
75 }
76 });
77 }
78
79 @Override
80 public void register(final NetworkService service) {
81 try {
82 unregister(service);
83
84 ServiceInstance<Void> instance = ServiceInstance.<Void>builder().
85 name(service.getType().name()).
86 address(service.getAddress()).
87 build();
88 discovery.registerService(instance);
89 } catch (KeymasterException e) {
90 throw e;
91 } catch (Exception e) {
92 LOG.error("While registering {}", service, e);
93 throw new KeymasterException(e);
94 }
95 }
96
97 @Override
98 public void unregister(final NetworkService service) {
99 try {
100 discovery.queryForInstances(service.getType().name()).stream().
101 filter(instance -> instance.getName().equals(service.getType().name())
102 && instance.getAddress().equals(service.getAddress())).findFirst().
103 ifPresent(instance -> {
104 try {
105 discovery.unregisterService(instance);
106 } catch (Exception e) {
107 LOG.error("While deregistering {}", service, e);
108 throw new KeymasterException(e);
109 }
110 });
111 } catch (KeymasterException e) {
112 throw e;
113 } catch (Exception e) {
114 LOG.error("While registering {}", service, e);
115 throw new KeymasterException(e);
116 }
117 }
118
119 private static NetworkService toNetworkService(
120 final NetworkService.Type serviceType,
121 final ServiceInstance<Void> serviceInstance) {
122
123 NetworkService ns = new NetworkService();
124 ns.setType(serviceType);
125 ns.setAddress(serviceInstance.getAddress());
126 return ns;
127 }
128
129 @Override
130 public List<NetworkService> list(final NetworkService.Type serviceType) {
131 try {
132 return discovery.queryForInstances(serviceType.name()).stream().
133 map(serviceInstance -> toNetworkService(serviceType, serviceInstance)).
134 collect(Collectors.toList());
135 } catch (KeymasterException e) {
136 throw e;
137 } catch (Exception e) {
138 throw new KeymasterException(e);
139 }
140 }
141
142 @Override
143 public NetworkService get(final NetworkService.Type serviceType) {
144 ServiceInstance<Void> serviceInstance = null;
145 try {
146 if (!discovery.queryForInstances(serviceType.name()).isEmpty()) {
147 serviceInstance = getProvider(serviceType).getInstance();
148 }
149 } catch (KeymasterException e) {
150 throw e;
151 } catch (Exception e) {
152 throw new KeymasterException(e);
153 }
154
155 if (serviceInstance == null) {
156 throw new KeymasterException("No services found for " + serviceType);
157 }
158 return toNetworkService(serviceType, serviceInstance);
159 }
160 }