View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.logic;
20  
21  import com.fasterxml.jackson.dataformat.csv.CsvSchema;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.lang.reflect.Method;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Optional;
28  import java.util.Set;
29  import java.util.stream.Stream;
30  import org.apache.commons.lang3.BooleanUtils;
31  import org.apache.commons.lang3.StringUtils;
32  import org.apache.commons.lang3.tuple.Pair;
33  import org.apache.commons.lang3.tuple.Triple;
34  import org.apache.syncope.common.lib.Attr;
35  import org.apache.syncope.common.lib.SyncopeClientException;
36  import org.apache.syncope.common.lib.SyncopeConstants;
37  import org.apache.syncope.common.lib.to.ConnObject;
38  import org.apache.syncope.common.lib.to.EntityTO;
39  import org.apache.syncope.common.lib.to.Item;
40  import org.apache.syncope.common.lib.to.Provision;
41  import org.apache.syncope.common.lib.to.ProvisioningReport;
42  import org.apache.syncope.common.lib.to.PullTaskTO;
43  import org.apache.syncope.common.lib.to.PushTaskTO;
44  import org.apache.syncope.common.lib.to.ReconStatus;
45  import org.apache.syncope.common.lib.types.AnyEntitlement;
46  import org.apache.syncope.common.lib.types.AnyTypeKind;
47  import org.apache.syncope.common.lib.types.ClientExceptionType;
48  import org.apache.syncope.common.lib.types.IdMEntitlement;
49  import org.apache.syncope.common.lib.types.IdRepoEntitlement;
50  import org.apache.syncope.common.lib.types.MatchType;
51  import org.apache.syncope.common.rest.api.beans.AbstractCSVSpec;
52  import org.apache.syncope.common.rest.api.beans.CSVPullSpec;
53  import org.apache.syncope.common.rest.api.beans.CSVPushSpec;
54  import org.apache.syncope.core.persistence.api.dao.AnyDAO;
55  import org.apache.syncope.core.persistence.api.dao.AnyObjectDAO;
56  import org.apache.syncope.core.persistence.api.dao.AnySearchDAO;
57  import org.apache.syncope.core.persistence.api.dao.AnyTypeDAO;
58  import org.apache.syncope.core.persistence.api.dao.DerSchemaDAO;
59  import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO;
60  import org.apache.syncope.core.persistence.api.dao.GroupDAO;
61  import org.apache.syncope.core.persistence.api.dao.NotFoundException;
62  import org.apache.syncope.core.persistence.api.dao.PlainSchemaDAO;
63  import org.apache.syncope.core.persistence.api.dao.RealmDAO;
64  import org.apache.syncope.core.persistence.api.dao.UserDAO;
65  import org.apache.syncope.core.persistence.api.dao.VirSchemaDAO;
66  import org.apache.syncope.core.persistence.api.dao.search.OrderByClause;
67  import org.apache.syncope.core.persistence.api.dao.search.SearchCond;
68  import org.apache.syncope.core.persistence.api.entity.Any;
69  import org.apache.syncope.core.persistence.api.entity.AnyType;
70  import org.apache.syncope.core.persistence.api.entity.AnyUtils;
71  import org.apache.syncope.core.persistence.api.entity.AnyUtilsFactory;
72  import org.apache.syncope.core.persistence.api.entity.ExternalResource;
73  import org.apache.syncope.core.persistence.api.entity.Realm;
74  import org.apache.syncope.core.persistence.api.entity.VirSchema;
75  import org.apache.syncope.core.persistence.api.entity.user.LinkedAccount;
76  import org.apache.syncope.core.persistence.api.entity.user.User;
77  import org.apache.syncope.core.provisioning.api.ConnectorManager;
78  import org.apache.syncope.core.provisioning.api.MappingManager;
79  import org.apache.syncope.core.provisioning.api.VirAttrHandler;
80  import org.apache.syncope.core.provisioning.api.pushpull.ConstantReconFilterBuilder;
81  import org.apache.syncope.core.provisioning.api.pushpull.KeyValueReconFilterBuilder;
82  import org.apache.syncope.core.provisioning.api.pushpull.ReconFilterBuilder;
83  import org.apache.syncope.core.provisioning.api.pushpull.SyncopeSinglePullExecutor;
84  import org.apache.syncope.core.provisioning.api.pushpull.SyncopeSinglePushExecutor;
85  import org.apache.syncope.core.provisioning.api.pushpull.stream.SyncopeStreamPullExecutor;
86  import org.apache.syncope.core.provisioning.api.pushpull.stream.SyncopeStreamPushExecutor;
87  import org.apache.syncope.core.provisioning.api.utils.RealmUtils;
88  import org.apache.syncope.core.provisioning.java.pushpull.InboundMatcher;
89  import org.apache.syncope.core.provisioning.java.pushpull.OutboundMatcher;
90  import org.apache.syncope.core.provisioning.java.pushpull.SinglePullJobDelegate;
91  import org.apache.syncope.core.provisioning.java.pushpull.SinglePushJobDelegate;
92  import org.apache.syncope.core.provisioning.java.pushpull.stream.CSVStreamConnector;
93  import org.apache.syncope.core.provisioning.java.pushpull.stream.StreamPullJobDelegate;
94  import org.apache.syncope.core.provisioning.java.pushpull.stream.StreamPushJobDelegate;
95  import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils;
96  import org.apache.syncope.core.provisioning.java.utils.MappingUtils;
97  import org.apache.syncope.core.spring.ApplicationContextProvider;
98  import org.apache.syncope.core.spring.security.AuthContextUtils;
99  import org.identityconnectors.framework.common.objects.Attribute;
100 import org.identityconnectors.framework.common.objects.ConnectorObject;
101 import org.identityconnectors.framework.common.objects.ObjectClass;
102 import org.identityconnectors.framework.common.objects.OperationOptions;
103 import org.identityconnectors.framework.common.objects.OperationalAttributes;
104 import org.identityconnectors.framework.common.objects.SearchResult;
105 import org.identityconnectors.framework.common.objects.SyncDeltaBuilder;
106 import org.identityconnectors.framework.common.objects.SyncDeltaType;
107 import org.identityconnectors.framework.common.objects.SyncToken;
108 import org.identityconnectors.framework.common.objects.Uid;
109 import org.identityconnectors.framework.common.objects.filter.Filter;
110 import org.identityconnectors.framework.spi.SearchResultsHandler;
111 import org.quartz.JobExecutionException;
112 import org.springframework.beans.factory.support.AbstractBeanDefinition;
113 import org.springframework.security.access.prepost.PreAuthorize;
114 import org.springframework.transaction.annotation.Transactional;
115 
116 public class ReconciliationLogic extends AbstractTransactionalLogic<EntityTO> {
117 
118     protected final AnyUtilsFactory anyUtilsFactory;
119 
120     protected final AnyTypeDAO anyTypeDAO;
121 
122     protected final ExternalResourceDAO resourceDAO;
123 
124     protected final RealmDAO realmDAO;
125 
126     protected final PlainSchemaDAO plainSchemaDAO;
127 
128     protected final DerSchemaDAO derSchemaDAO;
129 
130     protected final VirSchemaDAO virSchemaDAO;
131 
132     protected final AnySearchDAO anySearchDAO;
133 
134     protected final VirAttrHandler virAttrHandler;
135 
136     protected final MappingManager mappingManager;
137 
138     protected final InboundMatcher inboundMatcher;
139 
140     protected final OutboundMatcher outboundMatcher;
141 
142     protected final ConnectorManager connectorManager;
143 
144     public ReconciliationLogic(
145             final AnyUtilsFactory anyUtilsFactory,
146             final AnyTypeDAO anyTypeDAO,
147             final ExternalResourceDAO resourceDAO,
148             final RealmDAO realmDAO,
149             final PlainSchemaDAO plainSchemaDAO,
150             final DerSchemaDAO derSchemaDAO,
151             final VirSchemaDAO virSchemaDAO,
152             final AnySearchDAO anySearchDAO,
153             final VirAttrHandler virAttrHandler,
154             final MappingManager mappingManager,
155             final InboundMatcher inboundMatcher,
156             final OutboundMatcher outboundMatcher,
157             final ConnectorManager connectorManager) {
158 
159         this.anyUtilsFactory = anyUtilsFactory;
160         this.anyTypeDAO = anyTypeDAO;
161         this.resourceDAO = resourceDAO;
162         this.realmDAO = realmDAO;
163         this.plainSchemaDAO = plainSchemaDAO;
164         this.derSchemaDAO = derSchemaDAO;
165         this.virSchemaDAO = virSchemaDAO;
166         this.anySearchDAO = anySearchDAO;
167         this.virAttrHandler = virAttrHandler;
168         this.mappingManager = mappingManager;
169         this.inboundMatcher = inboundMatcher;
170         this.outboundMatcher = outboundMatcher;
171         this.connectorManager = connectorManager;
172     }
173 
174     protected Triple<AnyType, ExternalResource, Provision> getProvision(
175             final String anyTypeKey, final String resourceKey) {
176 
177         AnyType anyType = anyTypeDAO.find(anyTypeKey);
178         if (anyType == null) {
179             throw new NotFoundException("AnyType '" + anyTypeKey + "'");
180         }
181 
182         ExternalResource resource = resourceDAO.find(resourceKey);
183         if (resource == null) {
184             throw new NotFoundException("Resource '" + resourceKey + "'");
185         }
186         Provision provision = resource.getProvisionByAnyType(anyType.getKey()).
187                 orElseThrow(() -> new NotFoundException(
188                 "Provision for " + anyType + " on Resource '" + resourceKey + "'"));
189         if (provision.getMapping() == null) {
190             throw new NotFoundException("Mapping for " + anyType + " on Resource '" + resourceKey + "'");
191         }
192 
193         return Triple.of(anyType, resource, provision);
194     }
195 
196     protected ConnObject getOnSyncope(
197             final Item connObjectKeyItem,
198             final String connObjectKeyValue,
199             final Boolean suspended,
200             final Set<Attribute> attrs) {
201 
202         ConnObject connObjectTO = ConnObjectUtils.getConnObjectTO(null, attrs);
203         connObjectTO.getAttrs().add(new Attr.Builder(connObjectKeyItem.getExtAttrName()).
204                 value(connObjectKeyValue).build());
205         connObjectTO.getAttrs().add(new Attr.Builder(Uid.NAME).
206                 value(connObjectKeyValue).build());
207         Optional.ofNullable(suspended).ifPresent(s -> {
208             connObjectTO.getAttrs().removeIf(a -> OperationalAttributes.ENABLE_NAME.equals(a.getSchema()));
209             connObjectTO.getAttrs().add(new Attr.Builder(OperationalAttributes.ENABLE_NAME).
210                     value(BooleanUtils.negate(s).toString()).build());
211         });
212 
213         return connObjectTO;
214     }
215 
216     protected ConnObject getOnSyncope(
217             final Any<?> any,
218             final Item connObjectKeyItem,
219             final ExternalResource resource,
220             final Provision provision) {
221 
222         Pair<String, Set<Attribute>> prepared = mappingManager.prepareAttrsFromAny(
223                 any, null, false, true, resource, provision);
224         return getOnSyncope(
225                 connObjectKeyItem,
226                 prepared.getLeft(),
227                 any instanceof User ? ((User) any).isSuspended() : null,
228                 prepared.getRight());
229     }
230 
231     protected ConnObject getOnSyncope(
232             final LinkedAccount account,
233             final Item connObjectKeyItem,
234             final Provision provision) {
235 
236         Set<Attribute> attrs = mappingManager.prepareAttrsFromLinkedAccount(
237                 account.getOwner(), account, null, false, provision);
238         return getOnSyncope(
239                 connObjectKeyItem,
240                 account.getConnObjectKeyValue(),
241                 account.isSuspended(),
242                 attrs);
243     }
244 
245     protected Any<?> getAny(final Provision provision, final AnyTypeKind anyTypeKind, final String anyKey) {
246         AnyDAO<?> dao = anyUtilsFactory.getInstance(anyTypeKind).dao();
247 
248         String actualKey = anyKey;
249         if (!SyncopeConstants.UUID_PATTERN.matcher(anyKey).matches()) {
250             actualKey = dao instanceof UserDAO
251                     ? ((UserDAO) dao).findKey(anyKey)
252                     : dao instanceof GroupDAO
253                             ? ((GroupDAO) dao).findKey(anyKey)
254                             : ((AnyObjectDAO) dao).findKey(provision.getAnyType(), anyKey);
255         }
256 
257         return Optional.ofNullable(dao.authFind(actualKey)).
258                 orElseThrow(() -> new NotFoundException(provision.getAnyType() + " '" + anyKey + "'"));
259     }
260 
261     @PreAuthorize("hasRole('" + IdMEntitlement.RESOURCE_GET_CONNOBJECT + "')")
262     public ReconStatus status(
263             final String anyTypeKey,
264             final String resourceKey,
265             final String anyKey,
266             final Set<String> moreAttrsToGet) {
267 
268         Triple<AnyType, ExternalResource, Provision> triple = getProvision(anyTypeKey, resourceKey);
269 
270         Item connObjectKeyItem = MappingUtils.getConnObjectKeyItem(triple.getRight()).
271                 orElseThrow(() -> new NotFoundException(
272                 "ConnObjectKey for " + triple.getLeft().getKey()
273                 + " on resource '" + triple.getMiddle().getKey() + "'"));
274 
275         Any<?> any = getAny(triple.getRight(), triple.getLeft().getKind(), anyKey);
276 
277         ReconStatus status = new ReconStatus();
278         status.setMatchType(MatchType.ANY);
279         status.setAnyTypeKind(any.getType().getKind());
280         status.setAnyKey(any.getKey());
281         status.setRealm(any.getRealm().getFullPath());
282         status.setOnSyncope(getOnSyncope(any, connObjectKeyItem, triple.getMiddle(), triple.getRight()));
283 
284         List<ConnectorObject> connObjs = outboundMatcher.match(
285                 connectorManager.getConnector(triple.getMiddle()),
286                 any,
287                 triple.getMiddle(),
288                 triple.getRight(),
289                 Optional.of(moreAttrsToGet.toArray(String[]::new)));
290         if (!connObjs.isEmpty()) {
291             status.setOnResource(ConnObjectUtils.getConnObjectTO(
292                     outboundMatcher.getFIQL(connObjs.get(0), triple.getMiddle(), triple.getRight()),
293                     connObjs.get(0).getAttributes()));
294 
295             if (connObjs.size() > 1) {
296                 LOG.warn("Expected single match, found {}", connObjs);
297             } else {
298                 virAttrHandler.setValues(any, connObjs.get(0));
299             }
300         }
301 
302         return status;
303     }
304 
305     protected SyncDeltaBuilder syncDeltaBuilder(
306             final AnyType anyType,
307             final ExternalResource resource,
308             final Provision provision,
309             final Filter filter,
310             final Set<String> moreAttrsToGet) {
311 
312         Stream<Item> mapItems = Stream.concat(
313                 provision.getMapping().getItems().stream(),
314                 virSchemaDAO.find(resource.getKey(), anyType.getKey()).stream().map(VirSchema::asLinkingMappingItem));
315         OperationOptions options = MappingUtils.buildOperationOptions(mapItems, moreAttrsToGet.toArray(String[]::new));
316 
317         SyncDeltaBuilder syncDeltaBuilder = new SyncDeltaBuilder().
318                 setToken(new SyncToken("")).
319                 setDeltaType(SyncDeltaType.CREATE_OR_UPDATE).
320                 setObjectClass(new ObjectClass(provision.getObjectClass()));
321         connectorManager.getConnector(resource).
322                 search(syncDeltaBuilder.getObjectClass(), filter, new SearchResultsHandler() {
323 
324                     @Override
325                     public boolean handle(final ConnectorObject connObj) {
326                         syncDeltaBuilder.setObject(connObj);
327                         return false;
328                     }
329 
330                     @Override
331                     public void handleResult(final SearchResult sr) {
332                         // do nothing
333                     }
334                 }, 1, null, List.of(), options);
335 
336         return syncDeltaBuilder;
337     }
338 
339     @PreAuthorize("hasRole('" + IdMEntitlement.RESOURCE_GET_CONNOBJECT + "')")
340     public ReconStatus status(
341             final String anyTypeKey,
342             final String resourceKey,
343             final Filter filter,
344             final Set<String> moreAttrsToGet) {
345 
346         Triple<AnyType, ExternalResource, Provision> triple = getProvision(anyTypeKey, resourceKey);
347 
348         SyncDeltaBuilder syncDeltaBuilder = syncDeltaBuilder(
349                 triple.getLeft(), triple.getMiddle(), triple.getRight(), filter, moreAttrsToGet);
350 
351         ReconStatus status = new ReconStatus();
352         if (syncDeltaBuilder.getObject() != null) {
353             Item connObjectKeyItem = MappingUtils.getConnObjectKeyItem(triple.getRight()).
354                     orElseThrow(() -> new NotFoundException(
355                     "ConnObjectKey for " + triple.getLeft().getKey()
356                     + " on resource '" + triple.getMiddle().getKey() + "'"));
357 
358             inboundMatcher.match(
359                     syncDeltaBuilder.build(), triple.getMiddle(), triple.getRight(), triple.getLeft().getKind()).
360                     stream().findFirst().ifPresent(match -> {
361 
362                         if (match.getAny() != null) {
363                             status.setMatchType(MatchType.ANY);
364                             status.setAnyTypeKind(match.getAny().getType().getKind());
365                             status.setAnyKey(match.getAny().getKey());
366                             status.setRealm(match.getAny().getRealm().getFullPath());
367                             status.setOnSyncope(getOnSyncope(
368                                     match.getAny(), connObjectKeyItem, triple.getMiddle(), triple.getRight()));
369                         } else if (match.getLinkedAccount() != null) {
370                             status.setMatchType(MatchType.LINKED_ACCOUNT);
371                             status.setAnyTypeKind(AnyTypeKind.USER);
372                             status.setAnyKey(match.getLinkedAccount().getOwner().getKey());
373                             status.setRealm(match.getLinkedAccount().getOwner().getRealm().getFullPath());
374                             status.setOnSyncope(getOnSyncope(
375                                     match.getLinkedAccount(), connObjectKeyItem, triple.getRight()));
376                         }
377                     });
378 
379             status.setOnResource(ConnObjectUtils.getConnObjectTO(
380                     outboundMatcher.getFIQL(syncDeltaBuilder.getObject(), triple.getMiddle(), triple.getRight()),
381                     syncDeltaBuilder.getObject().getAttributes()));
382 
383             if (status.getMatchType() == MatchType.ANY && StringUtils.isNotBlank(status.getAnyKey())) {
384                 virAttrHandler.setValues(
385                         getAny(triple.getRight(), triple.getLeft().getKind(), status.getAnyKey()),
386                         syncDeltaBuilder.getObject());
387             }
388         }
389 
390         return status;
391     }
392 
393     protected SyncopeSinglePushExecutor singlePushExecutor() {
394         return (SyncopeSinglePushExecutor) ApplicationContextProvider.getBeanFactory().
395                 createBean(SinglePushJobDelegate.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
396     }
397 
398     @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_EXECUTE + "')")
399     public List<ProvisioningReport> push(
400             final String anyTypeKey,
401             final String resourceKey,
402             final String anyKey,
403             final PushTaskTO pushTask) {
404 
405         Triple<AnyType, ExternalResource, Provision> triple = getProvision(anyTypeKey, resourceKey);
406 
407         SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Reconciliation);
408         List<ProvisioningReport> results = new ArrayList<>();
409         try {
410             results.addAll(singlePushExecutor().push(
411                     triple.getMiddle(),
412                     triple.getRight(),
413                     connectorManager.getConnector(triple.getMiddle()),
414                     getAny(triple.getRight(), triple.getLeft().getKind(), anyKey),
415                     pushTask,
416                     AuthContextUtils.getWho()));
417             if (!results.isEmpty() && results.get(0).getStatus() == ProvisioningReport.Status.FAILURE) {
418                 sce.getElements().add(results.get(0).getMessage());
419             }
420         } catch (JobExecutionException e) {
421             sce.getElements().add(e.getMessage());
422         }
423 
424         if (!sce.isEmpty()) {
425             throw sce;
426         }
427 
428         return results;
429     }
430 
431     @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_EXECUTE + "')")
432     public List<ProvisioningReport> push(
433             final String anyTypeKey,
434             final String resourceKey,
435             final Filter filter,
436             final Set<String> moreAttrsToGet,
437             final PushTaskTO pushTask) {
438 
439         Triple<AnyType, ExternalResource, Provision> triple = getProvision(anyTypeKey, resourceKey);
440 
441         SyncDeltaBuilder syncDeltaBuilder = syncDeltaBuilder(
442                 triple.getLeft(), triple.getMiddle(), triple.getRight(), filter, moreAttrsToGet);
443 
444         SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Reconciliation);
445         List<ProvisioningReport> results = new ArrayList<>();
446 
447         if (syncDeltaBuilder.getObject() != null) {
448             inboundMatcher.match(
449                     syncDeltaBuilder.build(), triple.getMiddle(), triple.getRight(), triple.getLeft().getKind()).
450                     stream().findFirst().ifPresent(match -> {
451 
452                         try {
453                             if (match.getMatchTarget() == MatchType.ANY) {
454                                 results.addAll(singlePushExecutor().push(
455                                         triple.getMiddle(),
456                                         triple.getRight(),
457                                         connectorManager.getConnector(triple.getMiddle()),
458                                         match.getAny(),
459                                         pushTask,
460                                         AuthContextUtils.getWho()));
461                                 if (!results.isEmpty()
462                                         && results.get(0).getStatus() == ProvisioningReport.Status.FAILURE) {
463 
464                                     sce.getElements().add(results.get(0).getMessage());
465                                 }
466                             } else {
467                                 ProvisioningReport result = singlePushExecutor().push(
468                                         triple.getMiddle(),
469                                         triple.getRight(),
470                                         connectorManager.getConnector(triple.getMiddle()),
471                                         match.getLinkedAccount(),
472                                         pushTask,
473                                         AuthContextUtils.getWho());
474                                 if (result.getStatus() == ProvisioningReport.Status.FAILURE) {
475                                     sce.getElements().add(result.getMessage());
476                                 } else {
477                                     results.add(result);
478                                 }
479                             }
480                         } catch (JobExecutionException e) {
481                             sce.getElements().add(e.getMessage());
482                         }
483                     });
484         }
485 
486         if (!sce.isEmpty()) {
487             throw sce;
488         }
489 
490         return results;
491     }
492 
493     protected List<ProvisioningReport> pull(
494             final ExternalResource resource,
495             final Provision provision,
496             final ReconFilterBuilder reconFilterBuilder,
497             final Set<String> moreAttrsToGet,
498             final PullTaskTO pullTask) {
499 
500         if (pullTask.getDestinationRealm() == null || realmDAO.findByFullPath(pullTask.getDestinationRealm()) == null) {
501             throw new NotFoundException("Realm " + pullTask.getDestinationRealm());
502         }
503 
504         SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Reconciliation);
505         List<ProvisioningReport> results = new ArrayList<>();
506         try {
507             SyncopeSinglePullExecutor executor =
508                     (SyncopeSinglePullExecutor) ApplicationContextProvider.getBeanFactory().
509                             createBean(SinglePullJobDelegate.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
510 
511             results.addAll(executor.pull(
512                     resource,
513                     provision,
514                     connectorManager.getConnector(resource),
515                     reconFilterBuilder,
516                     moreAttrsToGet,
517                     pullTask,
518                     AuthContextUtils.getWho()));
519             if (!results.isEmpty() && results.get(0).getStatus() == ProvisioningReport.Status.FAILURE) {
520                 sce.getElements().add(results.get(0).getMessage());
521             }
522         } catch (JobExecutionException e) {
523             sce.getElements().add(e.getMessage());
524         }
525 
526         if (!sce.isEmpty()) {
527             throw sce;
528         }
529 
530         return results;
531     }
532 
533     @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_EXECUTE + "')")
534     @Transactional(noRollbackFor = SyncopeClientException.class)
535     public List<ProvisioningReport> pull(
536             final String anyTypeKey,
537             final String resourceKey,
538             final String anyKey,
539             final Set<String> moreAttrsToGet,
540             final PullTaskTO pullTask) {
541 
542         Triple<AnyType, ExternalResource, Provision> triple = getProvision(anyTypeKey, resourceKey);
543 
544         if (triple.getRight().getMapping().getConnObjectKeyItem().isEmpty()) {
545             throw new NotFoundException(
546                     "ConnObjectKey cannot be determined for mapping " + anyTypeKey);
547         }
548 
549         Any<?> any = getAny(triple.getRight(), triple.getLeft().getKind(), anyKey);
550 
551         String connObjectKeyValue = mappingManager.getConnObjectKeyValue(any, triple.getMiddle(), triple.getRight()).
552                 orElseThrow(() -> new NotFoundException(
553                 "ConnObjectKey for " + triple.getLeft().getKey()
554                 + " on resource '" + triple.getMiddle().getKey() + "'"));
555 
556         return pull(
557                 triple.getMiddle(),
558                 triple.getRight(),
559                 new KeyValueReconFilterBuilder(
560                         triple.getRight().getMapping().getConnObjectKeyItem().get().getExtAttrName(),
561                         connObjectKeyValue),
562                 moreAttrsToGet,
563                 pullTask);
564     }
565 
566     @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_EXECUTE + "')")
567     @Transactional(noRollbackFor = SyncopeClientException.class)
568     public List<ProvisioningReport> pull(
569             final String anyTypeKey,
570             final String resourceKey,
571             final Filter filter,
572             final Set<String> moreAttrsToGet,
573             final PullTaskTO pullTask) {
574 
575         Triple<AnyType, ExternalResource, Provision> triple = getProvision(anyTypeKey, resourceKey);
576 
577         return pull(
578                 triple.getMiddle(),
579                 triple.getRight(),
580                 new ConstantReconFilterBuilder(filter),
581                 moreAttrsToGet,
582                 pullTask);
583     }
584 
585     protected CsvSchema.Builder csvSchema(final AbstractCSVSpec spec) {
586         CsvSchema.Builder schemaBuilder = new CsvSchema.Builder().setUseHeader(true).
587                 setColumnSeparator(spec.getColumnSeparator()).
588                 setArrayElementSeparator(spec.getArrayElementSeparator()).
589                 setQuoteChar(spec.getQuoteChar()).
590                 setLineSeparator(spec.getLineSeparator()).
591                 setNullValue(spec.getNullValue()).
592                 setAllowComments(spec.getAllowComments());
593         if (spec.getEscapeChar() != null) {
594             schemaBuilder.setEscapeChar(spec.getEscapeChar());
595         }
596         return schemaBuilder;
597     }
598 
599     @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_EXECUTE + "')")
600     public List<ProvisioningReport> push(
601             final SearchCond searchCond,
602             final int page,
603             final int size,
604             final List<OrderByClause> orderBy,
605             final String realm,
606             final CSVPushSpec spec,
607             final OutputStream os) {
608 
609         AnyType anyType = anyTypeDAO.find(spec.getAnyTypeKey());
610         if (anyType == null) {
611             throw new NotFoundException("AnyType '" + spec.getAnyTypeKey() + "'");
612         }
613 
614         AnyUtils anyUtils = anyUtilsFactory.getInstance(anyType.getKind());
615 
616         String entitlement;
617         switch (anyType.getKind()) {
618             case GROUP:
619                 entitlement = IdRepoEntitlement.GROUP_SEARCH;
620                 break;
621 
622             case ANY_OBJECT:
623                 entitlement = AnyEntitlement.SEARCH.getFor(anyType.getKey());
624                 break;
625 
626             case USER:
627             default:
628                 entitlement = IdRepoEntitlement.USER_SEARCH;
629         }
630 
631         Realm base = Optional.ofNullable(realmDAO.findByFullPath(realm)).
632                 orElseThrow(() -> new NotFoundException("Realm " + realm));
633 
634         Set<String> adminRealms = RealmUtils.getEffective(AuthContextUtils.getAuthorizations().get(entitlement), realm);
635         SearchCond effectiveCond = searchCond == null ? anyUtils.dao().getAllMatchingCond() : searchCond;
636 
637         List<Any<?>> matching;
638         if (spec.getIgnorePaging()) {
639             matching = new ArrayList<>();
640 
641             int count = anySearchDAO.count(base, true, adminRealms, effectiveCond, anyType.getKind());
642             int pages = (count / AnyDAO.DEFAULT_PAGE_SIZE) + 1;
643 
644             for (int p = 1; p <= pages; p++) {
645                 matching.addAll(anySearchDAO.search(base, true, adminRealms, effectiveCond,
646                         p, AnyDAO.DEFAULT_PAGE_SIZE, orderBy, anyType.getKind()));
647             }
648         } else {
649             matching = anySearchDAO.search(
650                     base, true, adminRealms, effectiveCond, page, size, orderBy, anyType.getKind());
651         }
652 
653         List<String> columns = new ArrayList<>();
654         spec.getFields().forEach(item -> {
655             if (anyUtils.getField(item) == null) {
656                 LOG.warn("Ignoring invalid field {}", item);
657             } else {
658                 columns.add(item);
659             }
660         });
661         spec.getPlainAttrs().forEach(item -> {
662             if (plainSchemaDAO.find(item) == null) {
663                 LOG.warn("Ignoring invalid plain schema {}", item);
664             } else {
665                 columns.add(item);
666             }
667         });
668         spec.getDerAttrs().forEach(item -> {
669             if (derSchemaDAO.find(item) == null) {
670                 LOG.warn("Ignoring invalid derived schema {}", item);
671             } else {
672                 columns.add(item);
673             }
674         });
675         spec.getVirAttrs().forEach(item -> {
676             if (virSchemaDAO.find(item) == null) {
677                 LOG.warn("Ignoring invalid virtual schema {}", item);
678             } else {
679                 columns.add(item);
680             }
681         });
682 
683         PushTaskTO pushTask = new PushTaskTO();
684         pushTask.setMatchingRule(spec.getMatchingRule());
685         pushTask.setUnmatchingRule(spec.getUnmatchingRule());
686         pushTask.getActions().addAll(spec.getProvisioningActions());
687 
688         try (CSVStreamConnector connector = new CSVStreamConnector(
689                 null,
690                 spec.getArrayElementSeparator(),
691                 csvSchema(spec),
692                 null,
693                 os,
694                 columns.toArray(String[]::new))) {
695 
696             SyncopeStreamPushExecutor executor =
697                     (SyncopeStreamPushExecutor) ApplicationContextProvider.getBeanFactory().
698                             createBean(StreamPushJobDelegate.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
699             return executor.push(
700                     anyType,
701                     matching,
702                     columns,
703                     connector,
704                     spec.getPropagationActions(),
705                     pushTask,
706                     AuthContextUtils.getWho());
707         } catch (Exception e) {
708             LOG.error("Could not push to stream", e);
709             SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Reconciliation);
710             sce.getElements().add(e.getMessage());
711             throw sce;
712         }
713     }
714 
715     @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_EXECUTE + "')")
716     @Transactional(noRollbackFor = SyncopeClientException.class)
717     public List<ProvisioningReport> pull(final CSVPullSpec spec, final InputStream csv) {
718         AnyType anyType = anyTypeDAO.find(spec.getAnyTypeKey());
719         if (anyType == null) {
720             throw new NotFoundException("AnyType '" + spec.getAnyTypeKey() + "'");
721         }
722 
723         if (realmDAO.findByFullPath(spec.getDestinationRealm()) == null) {
724             throw new NotFoundException("Realm " + spec.getDestinationRealm());
725         }
726 
727         PullTaskTO pullTask = new PullTaskTO();
728         pullTask.setDestinationRealm(spec.getDestinationRealm());
729         pullTask.setRemediation(spec.getRemediation());
730         pullTask.setMatchingRule(spec.getMatchingRule());
731         pullTask.setUnmatchingRule(spec.getUnmatchingRule());
732         pullTask.getActions().addAll(spec.getProvisioningActions());
733 
734         try (CSVStreamConnector connector = new CSVStreamConnector(
735                 spec.getKeyColumn(),
736                 spec.getArrayElementSeparator(),
737                 csvSchema(spec),
738                 csv,
739                 null)) {
740 
741             List<String> columns = connector.getColumns(spec);
742             if (!columns.contains(spec.getKeyColumn())) {
743                 throw new NotFoundException("Key column '" + spec.getKeyColumn() + "'");
744             }
745 
746             SyncopeStreamPullExecutor executor =
747                     (SyncopeStreamPullExecutor) ApplicationContextProvider.getBeanFactory().
748                             createBean(StreamPullJobDelegate.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
749             return executor.pull(anyType,
750                     spec.getKeyColumn(),
751                     columns,
752                     spec.getConflictResolutionAction(),
753                     spec.getPullCorrelationRule(),
754                     connector,
755                     pullTask,
756                     AuthContextUtils.getWho());
757         } catch (NotFoundException e) {
758             throw e;
759         } catch (Exception e) {
760             LOG.error("Could not pull from stream", e);
761             SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Reconciliation);
762             sce.getElements().add(e.getMessage());
763             throw sce;
764         }
765     }
766 
767     @Override
768     protected EntityTO resolveReference(final Method method, final Object... os)
769             throws UnresolvedReferenceException {
770 
771         throw new UnresolvedReferenceException();
772     }
773 }