1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.pushpull;
20
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Optional;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.stream.Collectors;
31 import java.util.stream.Stream;
32 import org.apache.commons.lang3.StringUtils;
33 import org.apache.commons.lang3.tuple.MutablePair;
34 import org.apache.syncope.common.lib.to.Item;
35 import org.apache.syncope.common.lib.to.OrgUnit;
36 import org.apache.syncope.common.lib.to.Provision;
37 import org.apache.syncope.common.lib.types.ConflictResolutionAction;
38 import org.apache.syncope.common.lib.types.ResourceOperation;
39 import org.apache.syncope.core.persistence.api.attrvalue.validation.PlainAttrValidationManager;
40 import org.apache.syncope.core.persistence.api.dao.GroupDAO;
41 import org.apache.syncope.core.persistence.api.dao.NotFoundException;
42 import org.apache.syncope.core.persistence.api.dao.PlainSchemaDAO;
43 import org.apache.syncope.core.persistence.api.dao.VirSchemaDAO;
44 import org.apache.syncope.core.persistence.api.entity.AnyType;
45 import org.apache.syncope.core.persistence.api.entity.AnyUtils;
46 import org.apache.syncope.core.persistence.api.entity.AnyUtilsFactory;
47 import org.apache.syncope.core.persistence.api.entity.Implementation;
48 import org.apache.syncope.core.persistence.api.entity.VirSchema;
49 import org.apache.syncope.core.persistence.api.entity.group.Group;
50 import org.apache.syncope.core.persistence.api.entity.policy.PullPolicy;
51 import org.apache.syncope.core.persistence.api.entity.task.PullTask;
52 import org.apache.syncope.core.persistence.api.entity.user.User;
53 import org.apache.syncope.core.provisioning.api.Connector;
54 import org.apache.syncope.core.provisioning.api.ProvisionSorter;
55 import org.apache.syncope.core.provisioning.api.pushpull.AnyObjectPullResultHandler;
56 import org.apache.syncope.core.provisioning.api.pushpull.GroupPullResultHandler;
57 import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
58 import org.apache.syncope.core.provisioning.api.pushpull.PullActions;
59 import org.apache.syncope.core.provisioning.api.pushpull.RealmPullResultHandler;
60 import org.apache.syncope.core.provisioning.api.pushpull.ReconFilterBuilder;
61 import org.apache.syncope.core.provisioning.api.pushpull.SyncopePullExecutor;
62 import org.apache.syncope.core.provisioning.api.pushpull.SyncopePullResultHandler;
63 import org.apache.syncope.core.provisioning.api.pushpull.UserPullResultHandler;
64 import org.apache.syncope.core.provisioning.api.rules.PullMatch;
65 import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils;
66 import org.apache.syncope.core.provisioning.java.utils.MappingUtils;
67 import org.apache.syncope.core.spring.ApplicationContextProvider;
68 import org.apache.syncope.core.spring.implementation.ImplementationManager;
69 import org.identityconnectors.framework.common.objects.Name;
70 import org.identityconnectors.framework.common.objects.ObjectClass;
71 import org.identityconnectors.framework.common.objects.OperationOptions;
72 import org.identityconnectors.framework.common.objects.SyncToken;
73 import org.quartz.JobExecutionContext;
74 import org.quartz.JobExecutionException;
75 import org.springframework.beans.factory.annotation.Autowired;
76 import org.springframework.beans.factory.support.AbstractBeanDefinition;
77
78 public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> implements SyncopePullExecutor {
79
80 @Autowired
81 protected GroupDAO groupDAO;
82
83 @Autowired
84 protected PlainSchemaDAO plainSchemaDAO;
85
86 @Autowired
87 protected VirSchemaDAO virSchemaDAO;
88
89 @Autowired
90 protected InboundMatcher inboundMatcher;
91
92 @Autowired
93 protected AnyUtilsFactory anyUtilsFactory;
94
95 @Autowired
96 protected PlainAttrValidationManager validator;
97
98 protected final Map<String, SyncToken> latestSyncTokens = Collections.synchronizedMap(new HashMap<>());
99
100 protected ProvisioningProfile<PullTask, PullActions> profile;
101
102 protected final Map<String, MutablePair<Integer, String>> handled = new HashMap<>();
103
104 protected final Map<String, PullActions> perContextActions = new ConcurrentHashMap<>();
105
106 protected Optional<ReconFilterBuilder> perContextReconFilterBuilder = Optional.empty();
107
108 @Override
109 public void setLatestSyncToken(final String objectClass, final SyncToken latestSyncToken) {
110 latestSyncTokens.put(objectClass, latestSyncToken);
111 }
112
113 @Override
114 public void reportHandled(final String objectClass, final Name name) {
115 synchronized (handled) {
116 MutablePair<Integer, String> pair = Optional.ofNullable(handled.get(objectClass)).orElseGet(() -> {
117 MutablePair<Integer, String> p = MutablePair.of(0, null);
118 handled.put(objectClass, p);
119 return p;
120 });
121 pair.setLeft(pair.getLeft() + 1);
122 pair.setRight(name.getNameValue());
123
124 if (!handled.isEmpty()) {
125 StringBuilder builder = new StringBuilder("Processed:\n");
126 handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t').
127 append(k).
128 append(" / latest: ").append(v.getRight()).
129 append('\n'));
130 setStatus(builder.toString());
131 }
132 }
133 }
134
135 @Override
136 public boolean wasInterruptRequested() {
137 return interrupt;
138 }
139
140 @Override
141 public void setInterrupted() {
142 this.interrupted = true;
143 }
144
145 protected void setGroupOwners(final GroupPullResultHandler ghandler) {
146 ghandler.getGroupOwnerMap().forEach((groupKey, ownerKey) -> {
147 Group group = groupDAO.find(groupKey);
148 if (group == null) {
149 throw new NotFoundException("Group " + groupKey);
150 }
151 if (StringUtils.isBlank(ownerKey)) {
152 group.setGroupOwner(null);
153 group.setUserOwner(null);
154 } else {
155 Optional<PullMatch> match = inboundMatcher.match(
156 anyTypeDAO.findUser(),
157 ownerKey,
158 profile.getTask().getResource(),
159 profile.getConnector());
160 if (match.isPresent()) {
161 group.setUserOwner((User) match.get().getAny());
162 } else {
163 inboundMatcher.match(
164 anyTypeDAO.findGroup(),
165 ownerKey,
166 profile.getTask().getResource(),
167 profile.getConnector()).
168 ifPresent(groupMatch -> group.setGroupOwner((Group) groupMatch.getAny()));
169 }
170 }
171
172 groupDAO.save(group);
173 });
174 }
175
176 protected List<PullActions> getPullActions(final List<? extends Implementation> impls) {
177 List<PullActions> result = new ArrayList<>();
178
179 impls.forEach(impl -> {
180 try {
181 result.add(ImplementationManager.build(
182 impl,
183 () -> perContextActions.get(impl.getKey()),
184 instance -> perContextActions.put(impl.getKey(), instance)));
185 } catch (Exception e) {
186 LOG.warn("While building {}", impl, e);
187 }
188 });
189
190 return result;
191 }
192
193 protected ReconFilterBuilder getReconFilterBuilder(final PullTask pullTask) throws ClassNotFoundException {
194 return ImplementationManager.build(
195 pullTask.getReconFilterBuilder(),
196 () -> perContextReconFilterBuilder.orElse(null),
197 instance -> perContextReconFilterBuilder = Optional.of(instance));
198 }
199
200 protected RealmPullResultHandler buildRealmHandler() {
201 return (RealmPullResultHandler) ApplicationContextProvider.getBeanFactory().
202 createBean(DefaultRealmPullResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
203 }
204
205 protected AnyObjectPullResultHandler buildAnyObjectHandler() {
206 return (AnyObjectPullResultHandler) ApplicationContextProvider.getBeanFactory().
207 createBean(DefaultAnyObjectPullResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
208 }
209
210 protected UserPullResultHandler buildUserHandler() {
211 return (UserPullResultHandler) ApplicationContextProvider.getBeanFactory().
212 createBean(DefaultUserPullResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
213 }
214
215 protected GroupPullResultHandler buildGroupHandler() {
216 return (GroupPullResultHandler) ApplicationContextProvider.getBeanFactory().
217 createBean(DefaultGroupPullResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
218 }
219
220 @Override
221 protected String doExecuteProvisioning(
222 final PullTask pullTask,
223 final Connector connector,
224 final boolean dryRun,
225 final String executor,
226 final JobExecutionContext context) throws JobExecutionException {
227
228 LOG.debug("Executing pull on {}", pullTask.getResource());
229
230 profile = new ProvisioningProfile<>(connector, pullTask);
231 profile.getActions().addAll(getPullActions(pullTask.getActions()));
232 profile.setDryRun(dryRun);
233 profile.setConflictResolutionAction(
234 Optional.ofNullable(pullTask.getResource().getPullPolicy()).
235 map(PullPolicy::getConflictResolutionAction).
236 orElse(ConflictResolutionAction.IGNORE));
237 profile.setExecutor(executor);
238
239 PullResultHandlerDispatcher dispatcher = new PullResultHandlerDispatcher(profile, this);
240
241 latestSyncTokens.clear();
242
243 if (!profile.isDryRun()) {
244 for (PullActions action : profile.getActions()) {
245 action.beforeAll(profile);
246 }
247 }
248
249 setStatus("Initialization completed");
250
251
252 if (pullTask.getResource().getOrgUnit() != null) {
253 setStatus("Pulling " + pullTask.getResource().getOrgUnit().getObjectClass());
254
255 OrgUnit orgUnit = pullTask.getResource().getOrgUnit();
256
257 Set<String> moreAttrsToGet = new HashSet<>();
258 profile.getActions().forEach(a -> moreAttrsToGet.addAll(a.moreAttrsToGet(profile, orgUnit)));
259 OperationOptions options = MappingUtils.buildOperationOptions(
260 MappingUtils.getPullItems(orgUnit.getItems().stream()), moreAttrsToGet.toArray(String[]::new));
261
262 dispatcher.addHandlerSupplier(orgUnit.getObjectClass(), () -> {
263 RealmPullResultHandler handler = buildRealmHandler();
264 handler.setProfile(profile);
265 return handler;
266 });
267
268 try {
269 switch (pullTask.getPullMode()) {
270 case INCREMENTAL:
271 if (!dryRun) {
272 latestSyncTokens.put(
273 orgUnit.getObjectClass(),
274 ConnObjectUtils.toSyncToken(orgUnit.getSyncToken()));
275 }
276
277 connector.sync(new ObjectClass(orgUnit.getObjectClass()),
278 ConnObjectUtils.toSyncToken(orgUnit.getSyncToken()),
279 dispatcher,
280 options);
281
282 if (!dryRun) {
283 orgUnit.setSyncToken(
284 ConnObjectUtils.toString(latestSyncTokens.get(orgUnit.getObjectClass())));
285 resourceDAO.save(pullTask.getResource());
286 }
287 break;
288
289 case FILTERED_RECONCILIATION:
290 connector.filteredReconciliation(new ObjectClass(orgUnit.getObjectClass()),
291 getReconFilterBuilder(pullTask),
292 dispatcher,
293 options);
294 break;
295
296 case FULL_RECONCILIATION:
297 default:
298 connector.fullReconciliation(
299 new ObjectClass(orgUnit.getObjectClass()),
300 dispatcher,
301 options);
302 break;
303 }
304 } catch (Throwable t) {
305 throw new JobExecutionException("While pulling from connector", t);
306 }
307 }
308
309
310 ProvisionSorter provisionSorter = getProvisionSorter(pullTask);
311
312 GroupPullResultHandler ghandler = buildGroupHandler();
313 for (Provision provision : pullTask.getResource().getProvisions().stream().
314 filter(provision -> provision.getMapping() != null).sorted(provisionSorter).
315 collect(Collectors.toList())) {
316
317 setStatus("Pulling " + provision.getObjectClass());
318
319 AnyType anyType = anyTypeDAO.find(provision.getAnyType());
320
321 dispatcher.addHandlerSupplier(provision.getObjectClass(), () -> {
322 SyncopePullResultHandler handler;
323 switch (anyType.getKind()) {
324 case USER:
325 handler = buildUserHandler();
326 break;
327
328 case GROUP:
329 handler = ghandler;
330 break;
331
332 case ANY_OBJECT:
333 default:
334 handler = buildAnyObjectHandler();
335 }
336 handler.setProfile(profile);
337 return handler;
338 });
339
340 boolean setSyncTokens = false;
341 try {
342 Set<String> moreAttrsToGet = new HashSet<>();
343 profile.getActions().forEach(a -> moreAttrsToGet.addAll(a.moreAttrsToGet(profile, provision)));
344 Stream<Item> mapItems = Stream.concat(
345 MappingUtils.getPullItems(provision.getMapping().getItems().stream()),
346 virSchemaDAO.find(pullTask.getResource().getKey(), anyType.getKey()).stream().
347 map(VirSchema::asLinkingMappingItem));
348 OperationOptions options = MappingUtils.buildOperationOptions(
349 mapItems, moreAttrsToGet.toArray(String[]::new));
350
351 switch (pullTask.getPullMode()) {
352 case INCREMENTAL:
353 if (!dryRun) {
354 latestSyncTokens.put(
355 provision.getObjectClass(),
356 ConnObjectUtils.toSyncToken(provision.getSyncToken()));
357 }
358
359 connector.sync(
360 new ObjectClass(provision.getObjectClass()),
361 ConnObjectUtils.toSyncToken(provision.getSyncToken()),
362 dispatcher,
363 options);
364
365 if (!dryRun) {
366 setSyncTokens = true;
367 }
368 break;
369
370 case FILTERED_RECONCILIATION:
371 connector.filteredReconciliation(new ObjectClass(provision.getObjectClass()),
372 getReconFilterBuilder(pullTask),
373 dispatcher,
374 options);
375 break;
376
377 case FULL_RECONCILIATION:
378 default:
379 connector.fullReconciliation(
380 new ObjectClass(provision.getObjectClass()),
381 dispatcher,
382 options);
383 }
384 } catch (Throwable t) {
385 throw new JobExecutionException("While pulling from connector", t);
386 } finally {
387 if (setSyncTokens) {
388 latestSyncTokens.forEach((objectClass, syncToken) -> pullTask.getResource().
389 getProvisionByObjectClass(objectClass).
390 ifPresent(p -> p.setSyncToken(ConnObjectUtils.toString(syncToken))));
391 resourceDAO.save(pullTask.getResource());
392 }
393 }
394 }
395
396 dispatcher.shutdown();
397
398 for (Provision provision : pullTask.getResource().getProvisions().stream().
399 filter(provision -> provision.getMapping() != null && provision.getUidOnCreate() != null).
400 sorted(provisionSorter).collect(Collectors.toList())) {
401
402 try {
403 AnyUtils anyUtils = anyUtilsFactory.getInstance(anyTypeDAO.find(provision.getAnyType()).getKind());
404 profile.getResults().stream().
405 filter(result -> result.getUidValue() != null && result.getKey() != null
406 && result.getOperation() == ResourceOperation.CREATE
407 && result.getAnyType().equals(provision.getAnyType())).
408 forEach(result -> anyUtils.addAttr(
409 validator,
410 result.getKey(),
411 plainSchemaDAO.find(provision.getUidOnCreate()), result.getUidValue()));
412 } catch (Throwable t) {
413 LOG.error("While setting UID on create", t);
414 }
415 }
416
417 try {
418 setGroupOwners(ghandler);
419 } catch (Exception e) {
420 LOG.error("While setting group owners", e);
421 }
422
423 if (!profile.isDryRun()) {
424 for (PullActions action : profile.getActions()) {
425 action.afterAll(profile);
426 }
427 }
428
429 setStatus("Pull done");
430
431 String result = createReport(profile.getResults(), pullTask.getResource(), dryRun);
432 LOG.debug("Pull result: {}", result);
433 return result;
434 }
435 }