1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.client.console.topology;
20
21 import com.fasterxml.jackson.databind.JsonNode;
22 import com.fasterxml.jackson.databind.json.JsonMapper;
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.apache.syncope.client.console.SyncopeConsoleSession;
34 import org.apache.syncope.client.console.rest.ConnectorRestClient;
35 import org.apache.syncope.client.console.rest.ResourceRestClient;
36 import org.apache.syncope.common.keymaster.client.api.ConfParamOps;
37 import org.apache.syncope.common.keymaster.client.api.ServiceOps;
38 import org.apache.syncope.common.keymaster.client.api.model.NetworkService;
39 import org.apache.wicket.protocol.ws.api.WebSocketBehavior;
40 import org.apache.wicket.protocol.ws.api.WebSocketRequestHandler;
41 import org.apache.wicket.protocol.ws.api.message.TextMessage;
42 import org.apache.wicket.spring.injection.annot.SpringBean;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class TopologyWebSocketBehavior extends WebSocketBehavior {
47
48 private static final long serialVersionUID = -1653665542635275551L;
49
50 protected static final Logger LOG = LoggerFactory.getLogger(TopologyWebSocketBehavior.class);
51
52 protected static final JsonMapper MAPPER = JsonMapper.builder().findAndAddModules().build();
53
54 protected static final String CONNECTOR_TEST_TIMEOUT_PARAMETER = "connector.test.timeout";
55
56 protected static final String RESOURCE_TEST_TIMEOUT_PARAMETER = "resource.test.timeout";
57
58 protected static void timeoutHandlingConnectionChecker(
59 final Checker checker,
60 final Integer timeout,
61 final Map<String, String> responses,
62 final Set<String> running) {
63
64 String response = null;
65 try {
66 if (timeout == null || timeout < 0) {
67 LOG.debug("No timeouts for resource connection checking ... ");
68 response = checker.call();
69 } else if (timeout > 0) {
70 LOG.debug("Timeouts provided for resource connection checking ... ");
71 response = SyncopeConsoleSession.get().execute(checker).get(timeout, TimeUnit.SECONDS);
72 }
73 } catch (InterruptedException | TimeoutException e) {
74 LOG.warn("Connection with {} timed out", checker.key);
75 response = String.format("{ \"status\": \"%s\", \"target\": \"%s\"}",
76 TopologyNode.Status.UNREACHABLE, checker.key);
77 } catch (Exception e) {
78 LOG.error("Unexpected exception conneting to {}", checker.key, e);
79 response = String.format("{ \"status\": \"%s\", \"target\": \"%s\"}",
80 TopologyNode.Status.FAILURE, checker.key);
81 }
82
83 if (response != null) {
84 responses.put(checker.key, response);
85 }
86
87 running.remove(checker.key);
88 }
89
90 @SpringBean
91 protected ServiceOps serviceOps;
92
93 @SpringBean
94 protected ConfParamOps confParamOps;
95
96 @SpringBean
97 protected ConnectorRestClient connectorRestClient;
98
99 @SpringBean
100 protected ResourceRestClient resourceRestClient;
101
102 protected final Map<String, String> connectors = Collections.synchronizedMap(new HashMap<>());
103
104 protected final Set<String> runningConnCheck = Collections.synchronizedSet(new HashSet<>());
105
106 protected final Map<String, String> resources = Collections.synchronizedMap(new HashMap<>());
107
108 protected final Set<String> runningResCheck = Collections.synchronizedSet(new HashSet<>());
109
110 protected String coreAddress;
111
112 protected String domain;
113
114 protected String jwt;
115
116 protected Integer connectorTestTimeout = null;
117
118 protected Integer resourceTestTimeout = null;
119
120 public TopologyWebSocketBehavior() {
121 coreAddress = serviceOps.get(NetworkService.Type.CORE).getAddress();
122 domain = SyncopeConsoleSession.get().getDomain();
123 jwt = SyncopeConsoleSession.get().getJWT();
124
125
126 try {
127 connectorTestTimeout = confParamOps.get(domain, CONNECTOR_TEST_TIMEOUT_PARAMETER, null, Integer.class);
128 resourceTestTimeout = confParamOps.get(domain, RESOURCE_TEST_TIMEOUT_PARAMETER, null, Integer.class);
129 } catch (Exception e) {
130 LOG.debug("No {} or {} conf parameters found",
131 CONNECTOR_TEST_TIMEOUT_PARAMETER, RESOURCE_TEST_TIMEOUT_PARAMETER, e);
132 }
133 }
134
135 @Override
136 protected void onMessage(final WebSocketRequestHandler handler, final TextMessage message) {
137 try {
138 JsonNode obj = MAPPER.readTree(message.getText());
139 switch (Topology.SupportedOperation.valueOf(obj.get("kind").asText())) {
140 case CHECK_CONNECTOR:
141 String ckey = obj.get("target").asText();
142
143 if (connectors.containsKey(ckey)) {
144 handler.push(connectors.get(ckey));
145 } else {
146 handler.push(String.format(
147 "{ \"status\": \"%s\", \"target\": \"%s\"}", TopologyNode.Status.UNKNOWN, ckey));
148 }
149
150 if (runningConnCheck.contains(ckey)) {
151 LOG.debug("Running connection check for connector {}", ckey);
152 } else {
153 try {
154 SyncopeConsoleSession.get().execute(() -> timeoutHandlingConnectionChecker(
155 new ConnectorChecker(ckey), connectorTestTimeout, connectors, runningConnCheck));
156
157 runningConnCheck.add(ckey);
158 } catch (Exception e) {
159 LOG.error("Unexpected error", e);
160 }
161 }
162 break;
163
164 case CHECK_RESOURCE:
165 String rkey = obj.get("target").asText();
166
167 if (resources.containsKey(rkey)) {
168 handler.push(resources.get(rkey));
169 } else {
170 handler.push(String.format(
171 "{ \"status\": \"%s\", \"target\": \"%s\"}", TopologyNode.Status.UNKNOWN, rkey));
172 }
173
174 if (runningResCheck.contains(rkey)) {
175 LOG.debug("Running connection check for resource {}", rkey);
176 } else {
177 try {
178 SyncopeConsoleSession.get().execute(() -> timeoutHandlingConnectionChecker(
179 new ResourceChecker(rkey), resourceTestTimeout, resources, runningResCheck));
180
181 runningResCheck.add(rkey);
182 } catch (Exception e) {
183 LOG.error("Unexpected error", e);
184 }
185 }
186 break;
187
188 case ADD_ENDPOINT:
189 handler.appendJavaScript(String.format("addEndpoint('%s', '%s', '%s');",
190 obj.get("source").asText(),
191 obj.get("target").asText(),
192 obj.get("scope").asText()));
193 break;
194
195 default:
196 }
197 } catch (IOException e) {
198 LOG.error("Eror managing websocket message", e);
199 }
200 }
201
202 public boolean connCheckDone(final Collection<String> connectors) {
203 return this.connectors.keySet().containsAll(connectors);
204 }
205
206 public boolean resCheckDone(final Collection<String> resources) {
207 return this.resources.keySet().containsAll(resources);
208 }
209
210 private abstract class Checker implements Callable<String> {
211
212 protected final String key;
213
214 Checker(final String key) {
215 this.key = key;
216 }
217 }
218
219 private class ConnectorChecker extends Checker {
220
221 ConnectorChecker(final String key) {
222 super(key);
223 }
224
225 @Override
226 public String call() throws Exception {
227 try {
228 return String.format("{ \"status\": \"%s\", \"target\": \"%s\"}",
229 connectorRestClient.check(coreAddress, domain, jwt, key)
230 ? TopologyNode.Status.REACHABLE : TopologyNode.Status.UNREACHABLE, key);
231 } catch (Exception e) {
232 LOG.warn("Error checking connection for {}", key, e);
233 return String.format("{ \"status\": \"%s\", \"target\": \"%s\"}",
234 TopologyNode.Status.FAILURE, key);
235 }
236 }
237 }
238
239 private class ResourceChecker extends Checker {
240
241 ResourceChecker(final String key) {
242 super(key);
243 }
244
245 @Override
246 public String call() throws Exception {
247 try {
248 return String.format("{ \"status\": \"%s\", \"target\": \"%s\"}",
249 resourceRestClient.check(coreAddress, domain, jwt, key)
250 ? TopologyNode.Status.REACHABLE : TopologyNode.Status.UNREACHABLE, key);
251 } catch (Exception e) {
252 LOG.warn("Error checking connection for {}", key, e);
253 return String.format("{ \"status\": \"%s\", \"target\": \"%s\"}",
254 TopologyNode.Status.FAILURE, key);
255 }
256 }
257 }
258 }