View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.pushpull.stream;
20  
21  import java.util.HashSet;
22  import java.util.List;
23  import java.util.Objects;
24  import java.util.Set;
25  import java.util.stream.Collectors;
26  import java.util.stream.Stream;
27  import org.apache.syncope.common.lib.to.Item;
28  import org.apache.syncope.common.lib.to.Mapping;
29  import org.apache.syncope.common.lib.to.Provision;
30  import org.apache.syncope.common.lib.to.ProvisioningReport;
31  import org.apache.syncope.common.lib.to.PullTaskTO;
32  import org.apache.syncope.common.lib.types.ConflictResolutionAction;
33  import org.apache.syncope.common.lib.types.IdMImplementationType;
34  import org.apache.syncope.common.lib.types.MappingPurpose;
35  import org.apache.syncope.common.lib.types.PullMode;
36  import org.apache.syncope.common.lib.types.TaskType;
37  import org.apache.syncope.core.persistence.api.dao.ImplementationDAO;
38  import org.apache.syncope.core.persistence.api.dao.RealmDAO;
39  import org.apache.syncope.core.persistence.api.entity.AnyType;
40  import org.apache.syncope.core.persistence.api.entity.AnyUtils;
41  import org.apache.syncope.core.persistence.api.entity.ExternalResource;
42  import org.apache.syncope.core.persistence.api.entity.Implementation;
43  import org.apache.syncope.core.persistence.api.entity.PlainSchema;
44  import org.apache.syncope.core.persistence.api.entity.VirSchema;
45  import org.apache.syncope.core.persistence.api.entity.policy.PullCorrelationRuleEntity;
46  import org.apache.syncope.core.persistence.api.entity.policy.PullPolicy;
47  import org.apache.syncope.core.persistence.api.entity.task.PullTask;
48  import org.apache.syncope.core.provisioning.api.Connector;
49  import org.apache.syncope.core.provisioning.api.pushpull.GroupPullResultHandler;
50  import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
51  import org.apache.syncope.core.provisioning.api.pushpull.PullActions;
52  import org.apache.syncope.core.provisioning.api.pushpull.SyncopePullResultHandler;
53  import org.apache.syncope.core.provisioning.api.pushpull.stream.SyncopeStreamPullExecutor;
54  import org.apache.syncope.core.provisioning.java.pushpull.PullJobDelegate;
55  import org.apache.syncope.core.provisioning.java.utils.MappingUtils;
56  import org.apache.syncope.core.spring.security.SecureRandomUtils;
57  import org.identityconnectors.framework.common.objects.ObjectClass;
58  import org.quartz.JobExecutionException;
59  import org.springframework.beans.factory.annotation.Autowired;
60  
61  public class StreamPullJobDelegate extends PullJobDelegate implements SyncopeStreamPullExecutor {
62  
63      @Autowired
64      private ImplementationDAO implementationDAO;
65  
66      @Autowired
67      private RealmDAO realmDAO;
68  
69      private PullPolicy pullPolicy(
70              final AnyType anyType,
71              final ConflictResolutionAction conflictResolutionAction,
72              final String pullCorrelationRule) {
73  
74          PullCorrelationRuleEntity pullCorrelationRuleEntity = null;
75          if (pullCorrelationRule != null) {
76              Implementation impl = implementationDAO.find(pullCorrelationRule);
77              if (impl == null || !IdMImplementationType.PULL_CORRELATION_RULE.equals(impl.getType())) {
78                  LOG.debug("Invalid " + Implementation.class.getSimpleName() + " {}, ignoring...", pullCorrelationRule);
79              } else {
80                  pullCorrelationRuleEntity = entityFactory.newEntity(PullCorrelationRuleEntity.class);
81                  pullCorrelationRuleEntity.setAnyType(anyType);
82                  pullCorrelationRuleEntity.setImplementation(impl);
83              }
84          }
85  
86          PullPolicy pullPolicy = entityFactory.newEntity(PullPolicy.class);
87          pullPolicy.setConflictResolutionAction(conflictResolutionAction);
88  
89          if (pullCorrelationRuleEntity != null) {
90              pullPolicy.add(pullCorrelationRuleEntity);
91              pullCorrelationRuleEntity.setPullPolicy(pullPolicy);
92          }
93  
94          return pullPolicy;
95      }
96  
97      private Provision provision(
98              final AnyType anyType,
99              final String keyColumn,
100             final List<String> columns) throws JobExecutionException {
101 
102         Provision provision = new Provision();
103         provision.setAnyType(anyType.getKey());
104         provision.setObjectClass(anyType.getKey());
105 
106         Mapping mapping = new Mapping();
107         provision.setMapping(mapping);
108 
109         AnyUtils anyUtils = anyUtilsFactory.getInstance(anyType.getKind());
110         if (anyUtils.getField(keyColumn) == null) {
111             PlainSchema keyColumnSchema = plainSchemaDAO.find(keyColumn);
112             if (keyColumnSchema == null) {
113                 throw new JobExecutionException("Plain Schema for key column not found: " + keyColumn);
114             }
115         }
116 
117         Item connObjectKeyItem = new Item();
118         connObjectKeyItem.setExtAttrName(keyColumn);
119         connObjectKeyItem.setIntAttrName(keyColumn);
120         connObjectKeyItem.setPurpose(MappingPurpose.PULL);
121         mapping.setConnObjectKeyItem(connObjectKeyItem);
122 
123         columns.stream().
124                 filter(column -> anyUtils.getField(column) != null
125                 || plainSchemaDAO.find(column) != null || virSchemaDAO.find(column) != null).
126                 map(column -> {
127                     Item item = new Item();
128                     item.setExtAttrName(column);
129                     item.setIntAttrName(column);
130                     item.setPurpose(MappingPurpose.PULL);
131                     mapping.add(item);
132                     return item;
133                 }).forEach(mapping::add);
134 
135         return provision;
136     }
137 
138     private ExternalResource externalResource(
139             final AnyType anyType,
140             final String keyColumn,
141             final List<String> columns,
142             final ConflictResolutionAction conflictResolutionAction,
143             final String pullCorrelationRule) throws JobExecutionException {
144 
145         Provision provision = provision(anyType, keyColumn, columns);
146 
147         ExternalResource resource = entityFactory.newEntity(ExternalResource.class);
148         resource.setKey("StreamPull_" + SecureRandomUtils.generateRandomUUID().toString());
149         resource.getProvisions().add(provision);
150 
151         resource.setPullPolicy(pullPolicy(anyType, conflictResolutionAction, pullCorrelationRule));
152 
153         return resource;
154     }
155 
156     @Override
157     public List<ProvisioningReport> pull(
158             final AnyType anyType,
159             final String keyColumn,
160             final List<String> columns,
161             final ConflictResolutionAction conflictResolutionAction,
162             final String pullCorrelationRule,
163             final Connector connector,
164             final PullTaskTO pullTaskTO,
165             final String executor) throws JobExecutionException {
166 
167         LOG.debug("Executing stream pull");
168 
169         taskType = TaskType.PULL;
170         try {
171             ExternalResource resource =
172                     externalResource(anyType, keyColumn, columns, conflictResolutionAction, pullCorrelationRule);
173             Provision provision = resource.getProvisions().get(0);
174 
175             task = entityFactory.newEntity(PullTask.class);
176             task.setResource(resource);
177             task.setMatchingRule(pullTaskTO.getMatchingRule());
178             task.setUnmatchingRule(pullTaskTO.getUnmatchingRule());
179             task.setPullMode(PullMode.FULL_RECONCILIATION);
180             task.setPerformCreate(true);
181             task.setPerformUpdate(true);
182             task.setPerformDelete(false);
183             task.setSyncStatus(false);
184             task.setDestinationRealm(realmDAO.findByFullPath(pullTaskTO.getDestinationRealm()));
185             task.setRemediation(pullTaskTO.isRemediation());
186 
187             profile = new ProvisioningProfile<>(connector, task);
188             profile.setDryRun(false);
189             profile.setConflictResolutionAction(conflictResolutionAction);
190             profile.getActions().addAll(getPullActions(pullTaskTO.getActions().stream().
191                     map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList())));
192             profile.setExecutor(executor);
193 
194             for (PullActions action : profile.getActions()) {
195                 action.beforeAll(profile);
196             }
197 
198             SyncopePullResultHandler handler;
199             GroupPullResultHandler ghandler = buildGroupHandler();
200             switch (anyType.getKind()) {
201                 case USER:
202                     handler = buildUserHandler();
203                     break;
204 
205                 case GROUP:
206                     handler = ghandler;
207                     break;
208 
209                 case ANY_OBJECT:
210                 default:
211                     handler = buildAnyObjectHandler();
212             }
213             handler.setProfile(profile);
214 
215             // execute filtered pull
216             Set<String> moreAttrsToGet = new HashSet<>();
217             profile.getActions().forEach(a -> moreAttrsToGet.addAll(a.moreAttrsToGet(profile, provision)));
218 
219             Stream<Item> mapItems = Stream.concat(
220                     MappingUtils.getPullItems(provision.getMapping().getItems().stream()),
221                     virSchemaDAO.find(resource.getKey(), anyType.getKey()).stream().
222                             map(VirSchema::asLinkingMappingItem));
223 
224             connector.fullReconciliation(
225                     new ObjectClass(provision.getObjectClass()),
226                     handler,
227                     MappingUtils.buildOperationOptions(mapItems, moreAttrsToGet.toArray(String[]::new)));
228 
229             try {
230                 setGroupOwners(ghandler);
231             } catch (Exception e) {
232                 LOG.error("While setting group owners", e);
233             }
234 
235             for (PullActions action : profile.getActions()) {
236                 action.afterAll(profile);
237             }
238 
239             return profile.getResults();
240         } catch (Exception e) {
241             throw e instanceof JobExecutionException
242                     ? (JobExecutionException) e
243                     : new JobExecutionException("While stream pulling", e);
244         } finally {
245             setStatus(null);
246         }
247     }
248 }