View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.pushpull;
20  
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Optional;
28  import java.util.Set;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.stream.Collectors;
31  import java.util.stream.Stream;
32  import org.apache.commons.lang3.StringUtils;
33  import org.apache.commons.lang3.tuple.MutablePair;
34  import org.apache.syncope.common.lib.to.Item;
35  import org.apache.syncope.common.lib.to.OrgUnit;
36  import org.apache.syncope.common.lib.to.Provision;
37  import org.apache.syncope.common.lib.types.ConflictResolutionAction;
38  import org.apache.syncope.common.lib.types.ResourceOperation;
39  import org.apache.syncope.core.persistence.api.attrvalue.validation.PlainAttrValidationManager;
40  import org.apache.syncope.core.persistence.api.dao.GroupDAO;
41  import org.apache.syncope.core.persistence.api.dao.NotFoundException;
42  import org.apache.syncope.core.persistence.api.dao.PlainSchemaDAO;
43  import org.apache.syncope.core.persistence.api.dao.VirSchemaDAO;
44  import org.apache.syncope.core.persistence.api.entity.AnyType;
45  import org.apache.syncope.core.persistence.api.entity.AnyUtils;
46  import org.apache.syncope.core.persistence.api.entity.AnyUtilsFactory;
47  import org.apache.syncope.core.persistence.api.entity.Implementation;
48  import org.apache.syncope.core.persistence.api.entity.VirSchema;
49  import org.apache.syncope.core.persistence.api.entity.group.Group;
50  import org.apache.syncope.core.persistence.api.entity.policy.PullPolicy;
51  import org.apache.syncope.core.persistence.api.entity.task.PullTask;
52  import org.apache.syncope.core.persistence.api.entity.user.User;
53  import org.apache.syncope.core.provisioning.api.Connector;
54  import org.apache.syncope.core.provisioning.api.ProvisionSorter;
55  import org.apache.syncope.core.provisioning.api.pushpull.AnyObjectPullResultHandler;
56  import org.apache.syncope.core.provisioning.api.pushpull.GroupPullResultHandler;
57  import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
58  import org.apache.syncope.core.provisioning.api.pushpull.PullActions;
59  import org.apache.syncope.core.provisioning.api.pushpull.RealmPullResultHandler;
60  import org.apache.syncope.core.provisioning.api.pushpull.ReconFilterBuilder;
61  import org.apache.syncope.core.provisioning.api.pushpull.SyncopePullExecutor;
62  import org.apache.syncope.core.provisioning.api.pushpull.SyncopePullResultHandler;
63  import org.apache.syncope.core.provisioning.api.pushpull.UserPullResultHandler;
64  import org.apache.syncope.core.provisioning.api.rules.PullMatch;
65  import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils;
66  import org.apache.syncope.core.provisioning.java.utils.MappingUtils;
67  import org.apache.syncope.core.spring.ApplicationContextProvider;
68  import org.apache.syncope.core.spring.implementation.ImplementationManager;
69  import org.identityconnectors.framework.common.objects.Name;
70  import org.identityconnectors.framework.common.objects.ObjectClass;
71  import org.identityconnectors.framework.common.objects.OperationOptions;
72  import org.identityconnectors.framework.common.objects.SyncToken;
73  import org.quartz.JobExecutionContext;
74  import org.quartz.JobExecutionException;
75  import org.springframework.beans.factory.annotation.Autowired;
76  import org.springframework.beans.factory.support.AbstractBeanDefinition;
77  
78  public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> implements SyncopePullExecutor {
79  
80      @Autowired
81      protected GroupDAO groupDAO;
82  
83      @Autowired
84      protected PlainSchemaDAO plainSchemaDAO;
85  
86      @Autowired
87      protected VirSchemaDAO virSchemaDAO;
88  
89      @Autowired
90      protected InboundMatcher inboundMatcher;
91  
92      @Autowired
93      protected AnyUtilsFactory anyUtilsFactory;
94  
95      @Autowired
96      protected PlainAttrValidationManager validator;
97  
98      protected final Map<String, SyncToken> latestSyncTokens = Collections.synchronizedMap(new HashMap<>());
99  
100     protected ProvisioningProfile<PullTask, PullActions> profile;
101 
102     protected final Map<String, MutablePair<Integer, String>> handled = new HashMap<>();
103 
104     protected final Map<String, PullActions> perContextActions = new ConcurrentHashMap<>();
105 
106     protected Optional<ReconFilterBuilder> perContextReconFilterBuilder = Optional.empty();
107 
108     @Override
109     public void setLatestSyncToken(final String objectClass, final SyncToken latestSyncToken) {
110         latestSyncTokens.put(objectClass, latestSyncToken);
111     }
112 
113     @Override
114     public void reportHandled(final String objectClass, final Name name) {
115         synchronized (handled) {
116             MutablePair<Integer, String> pair = Optional.ofNullable(handled.get(objectClass)).orElseGet(() -> {
117                 MutablePair<Integer, String> p = MutablePair.of(0, null);
118                 handled.put(objectClass, p);
119                 return p;
120             });
121             pair.setLeft(pair.getLeft() + 1);
122             pair.setRight(name.getNameValue());
123 
124             if (!handled.isEmpty()) {
125                 StringBuilder builder = new StringBuilder("Processed:\n");
126                 handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t').
127                         append(k).
128                         append(" / latest: ").append(v.getRight()).
129                         append('\n'));
130                 setStatus(builder.toString());
131             }
132         }
133     }
134 
135     @Override
136     public boolean wasInterruptRequested() {
137         return interrupt;
138     }
139 
140     @Override
141     public void setInterrupted() {
142         this.interrupted = true;
143     }
144 
145     protected void setGroupOwners(final GroupPullResultHandler ghandler) {
146         ghandler.getGroupOwnerMap().forEach((groupKey, ownerKey) -> {
147             Group group = groupDAO.find(groupKey);
148             if (group == null) {
149                 throw new NotFoundException("Group " + groupKey);
150             }
151             if (StringUtils.isBlank(ownerKey)) {
152                 group.setGroupOwner(null);
153                 group.setUserOwner(null);
154             } else {
155                 Optional<PullMatch> match = inboundMatcher.match(
156                         anyTypeDAO.findUser(),
157                         ownerKey,
158                         profile.getTask().getResource(),
159                         profile.getConnector());
160                 if (match.isPresent()) {
161                     group.setUserOwner((User) match.get().getAny());
162                 } else {
163                     inboundMatcher.match(
164                             anyTypeDAO.findGroup(),
165                             ownerKey,
166                             profile.getTask().getResource(),
167                             profile.getConnector()).
168                             ifPresent(groupMatch -> group.setGroupOwner((Group) groupMatch.getAny()));
169                 }
170             }
171 
172             groupDAO.save(group);
173         });
174     }
175 
176     protected List<PullActions> getPullActions(final List<? extends Implementation> impls) {
177         List<PullActions> result = new ArrayList<>();
178 
179         impls.forEach(impl -> {
180             try {
181                 result.add(ImplementationManager.build(
182                         impl,
183                         () -> perContextActions.get(impl.getKey()),
184                         instance -> perContextActions.put(impl.getKey(), instance)));
185             } catch (Exception e) {
186                 LOG.warn("While building {}", impl, e);
187             }
188         });
189 
190         return result;
191     }
192 
193     protected ReconFilterBuilder getReconFilterBuilder(final PullTask pullTask) throws ClassNotFoundException {
194         return ImplementationManager.build(
195                 pullTask.getReconFilterBuilder(),
196                 () -> perContextReconFilterBuilder.orElse(null),
197                 instance -> perContextReconFilterBuilder = Optional.of(instance));
198     }
199 
200     protected RealmPullResultHandler buildRealmHandler() {
201         return (RealmPullResultHandler) ApplicationContextProvider.getBeanFactory().
202                 createBean(DefaultRealmPullResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
203     }
204 
205     protected AnyObjectPullResultHandler buildAnyObjectHandler() {
206         return (AnyObjectPullResultHandler) ApplicationContextProvider.getBeanFactory().
207                 createBean(DefaultAnyObjectPullResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
208     }
209 
210     protected UserPullResultHandler buildUserHandler() {
211         return (UserPullResultHandler) ApplicationContextProvider.getBeanFactory().
212                 createBean(DefaultUserPullResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
213     }
214 
215     protected GroupPullResultHandler buildGroupHandler() {
216         return (GroupPullResultHandler) ApplicationContextProvider.getBeanFactory().
217                 createBean(DefaultGroupPullResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
218     }
219 
220     @Override
221     protected String doExecuteProvisioning(
222             final PullTask pullTask,
223             final Connector connector,
224             final boolean dryRun,
225             final String executor,
226             final JobExecutionContext context) throws JobExecutionException {
227 
228         LOG.debug("Executing pull on {}", pullTask.getResource());
229 
230         profile = new ProvisioningProfile<>(connector, pullTask);
231         profile.getActions().addAll(getPullActions(pullTask.getActions()));
232         profile.setDryRun(dryRun);
233         profile.setConflictResolutionAction(
234                 Optional.ofNullable(pullTask.getResource().getPullPolicy()).
235                         map(PullPolicy::getConflictResolutionAction).
236                         orElse(ConflictResolutionAction.IGNORE));
237         profile.setExecutor(executor);
238 
239         PullResultHandlerDispatcher dispatcher = new PullResultHandlerDispatcher(profile, this);
240 
241         latestSyncTokens.clear();
242 
243         if (!profile.isDryRun()) {
244             for (PullActions action : profile.getActions()) {
245                 action.beforeAll(profile);
246             }
247         }
248 
249         setStatus("Initialization completed");
250 
251         // First realms...
252         if (pullTask.getResource().getOrgUnit() != null) {
253             setStatus("Pulling " + pullTask.getResource().getOrgUnit().getObjectClass());
254 
255             OrgUnit orgUnit = pullTask.getResource().getOrgUnit();
256 
257             Set<String> moreAttrsToGet = new HashSet<>();
258             profile.getActions().forEach(a -> moreAttrsToGet.addAll(a.moreAttrsToGet(profile, orgUnit)));
259             OperationOptions options = MappingUtils.buildOperationOptions(
260                     MappingUtils.getPullItems(orgUnit.getItems().stream()), moreAttrsToGet.toArray(String[]::new));
261 
262             dispatcher.addHandlerSupplier(orgUnit.getObjectClass(), () -> {
263                 RealmPullResultHandler handler = buildRealmHandler();
264                 handler.setProfile(profile);
265                 return handler;
266             });
267 
268             try {
269                 switch (pullTask.getPullMode()) {
270                     case INCREMENTAL:
271                         if (!dryRun) {
272                             latestSyncTokens.put(
273                                     orgUnit.getObjectClass(),
274                                     ConnObjectUtils.toSyncToken(orgUnit.getSyncToken()));
275                         }
276 
277                         connector.sync(new ObjectClass(orgUnit.getObjectClass()),
278                                 ConnObjectUtils.toSyncToken(orgUnit.getSyncToken()),
279                                 dispatcher,
280                                 options);
281 
282                         if (!dryRun) {
283                             orgUnit.setSyncToken(
284                                     ConnObjectUtils.toString(latestSyncTokens.get(orgUnit.getObjectClass())));
285                             resourceDAO.save(pullTask.getResource());
286                         }
287                         break;
288 
289                     case FILTERED_RECONCILIATION:
290                         connector.filteredReconciliation(new ObjectClass(orgUnit.getObjectClass()),
291                                 getReconFilterBuilder(pullTask),
292                                 dispatcher,
293                                 options);
294                         break;
295 
296                     case FULL_RECONCILIATION:
297                     default:
298                         connector.fullReconciliation(
299                                 new ObjectClass(orgUnit.getObjectClass()),
300                                 dispatcher,
301                                 options);
302                         break;
303                 }
304             } catch (Throwable t) {
305                 throw new JobExecutionException("While pulling from connector", t);
306             }
307         }
308 
309         // ...then provisions for any types
310         ProvisionSorter provisionSorter = getProvisionSorter(pullTask);
311 
312         GroupPullResultHandler ghandler = buildGroupHandler();
313         for (Provision provision : pullTask.getResource().getProvisions().stream().
314                 filter(provision -> provision.getMapping() != null).sorted(provisionSorter).
315                 collect(Collectors.toList())) {
316 
317             setStatus("Pulling " + provision.getObjectClass());
318 
319             AnyType anyType = anyTypeDAO.find(provision.getAnyType());
320 
321             dispatcher.addHandlerSupplier(provision.getObjectClass(), () -> {
322                 SyncopePullResultHandler handler;
323                 switch (anyType.getKind()) {
324                     case USER:
325                         handler = buildUserHandler();
326                         break;
327 
328                     case GROUP:
329                         handler = ghandler;
330                         break;
331 
332                     case ANY_OBJECT:
333                     default:
334                         handler = buildAnyObjectHandler();
335                 }
336                 handler.setProfile(profile);
337                 return handler;
338             });
339 
340             boolean setSyncTokens = false;
341             try {
342                 Set<String> moreAttrsToGet = new HashSet<>();
343                 profile.getActions().forEach(a -> moreAttrsToGet.addAll(a.moreAttrsToGet(profile, provision)));
344                 Stream<Item> mapItems = Stream.concat(
345                         MappingUtils.getPullItems(provision.getMapping().getItems().stream()),
346                         virSchemaDAO.find(pullTask.getResource().getKey(), anyType.getKey()).stream().
347                                 map(VirSchema::asLinkingMappingItem));
348                 OperationOptions options = MappingUtils.buildOperationOptions(
349                         mapItems, moreAttrsToGet.toArray(String[]::new));
350 
351                 switch (pullTask.getPullMode()) {
352                     case INCREMENTAL:
353                         if (!dryRun) {
354                             latestSyncTokens.put(
355                                     provision.getObjectClass(),
356                                     ConnObjectUtils.toSyncToken(provision.getSyncToken()));
357                         }
358 
359                         connector.sync(
360                                 new ObjectClass(provision.getObjectClass()),
361                                 ConnObjectUtils.toSyncToken(provision.getSyncToken()),
362                                 dispatcher,
363                                 options);
364 
365                         if (!dryRun) {
366                             setSyncTokens = true;
367                         }
368                         break;
369 
370                     case FILTERED_RECONCILIATION:
371                         connector.filteredReconciliation(new ObjectClass(provision.getObjectClass()),
372                                 getReconFilterBuilder(pullTask),
373                                 dispatcher,
374                                 options);
375                         break;
376 
377                     case FULL_RECONCILIATION:
378                     default:
379                         connector.fullReconciliation(
380                                 new ObjectClass(provision.getObjectClass()),
381                                 dispatcher,
382                                 options);
383                 }
384             } catch (Throwable t) {
385                 throw new JobExecutionException("While pulling from connector", t);
386             } finally {
387                 if (setSyncTokens) {
388                     latestSyncTokens.forEach((objectClass, syncToken) -> pullTask.getResource().
389                             getProvisionByObjectClass(objectClass).
390                             ifPresent(p -> p.setSyncToken(ConnObjectUtils.toString(syncToken))));
391                     resourceDAO.save(pullTask.getResource());
392                 }
393             }
394         }
395 
396         dispatcher.shutdown();
397 
398         for (Provision provision : pullTask.getResource().getProvisions().stream().
399                 filter(provision -> provision.getMapping() != null && provision.getUidOnCreate() != null).
400                 sorted(provisionSorter).collect(Collectors.toList())) {
401 
402             try {
403                 AnyUtils anyUtils = anyUtilsFactory.getInstance(anyTypeDAO.find(provision.getAnyType()).getKind());
404                 profile.getResults().stream().
405                         filter(result -> result.getUidValue() != null && result.getKey() != null
406                         && result.getOperation() == ResourceOperation.CREATE
407                         && result.getAnyType().equals(provision.getAnyType())).
408                         forEach(result -> anyUtils.addAttr(
409                         validator,
410                         result.getKey(),
411                         plainSchemaDAO.find(provision.getUidOnCreate()), result.getUidValue()));
412             } catch (Throwable t) {
413                 LOG.error("While setting UID on create", t);
414             }
415         }
416 
417         try {
418             setGroupOwners(ghandler);
419         } catch (Exception e) {
420             LOG.error("While setting group owners", e);
421         }
422 
423         if (!profile.isDryRun()) {
424             for (PullActions action : profile.getActions()) {
425                 action.afterAll(profile);
426             }
427         }
428 
429         setStatus("Pull done");
430 
431         String result = createReport(profile.getResults(), pullTask.getResource(), dryRun);
432         LOG.debug("Pull result: {}", result);
433         return result;
434     }
435 }