1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.pushpull.stream;
20
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Set;
25 import java.util.stream.Collectors;
26 import java.util.stream.Stream;
27 import org.apache.syncope.common.lib.to.Item;
28 import org.apache.syncope.common.lib.to.Mapping;
29 import org.apache.syncope.common.lib.to.Provision;
30 import org.apache.syncope.common.lib.to.ProvisioningReport;
31 import org.apache.syncope.common.lib.to.PullTaskTO;
32 import org.apache.syncope.common.lib.types.ConflictResolutionAction;
33 import org.apache.syncope.common.lib.types.IdMImplementationType;
34 import org.apache.syncope.common.lib.types.MappingPurpose;
35 import org.apache.syncope.common.lib.types.PullMode;
36 import org.apache.syncope.common.lib.types.TaskType;
37 import org.apache.syncope.core.persistence.api.dao.ImplementationDAO;
38 import org.apache.syncope.core.persistence.api.dao.RealmDAO;
39 import org.apache.syncope.core.persistence.api.entity.AnyType;
40 import org.apache.syncope.core.persistence.api.entity.AnyUtils;
41 import org.apache.syncope.core.persistence.api.entity.ExternalResource;
42 import org.apache.syncope.core.persistence.api.entity.Implementation;
43 import org.apache.syncope.core.persistence.api.entity.PlainSchema;
44 import org.apache.syncope.core.persistence.api.entity.VirSchema;
45 import org.apache.syncope.core.persistence.api.entity.policy.PullCorrelationRuleEntity;
46 import org.apache.syncope.core.persistence.api.entity.policy.PullPolicy;
47 import org.apache.syncope.core.persistence.api.entity.task.PullTask;
48 import org.apache.syncope.core.provisioning.api.Connector;
49 import org.apache.syncope.core.provisioning.api.pushpull.GroupPullResultHandler;
50 import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
51 import org.apache.syncope.core.provisioning.api.pushpull.PullActions;
52 import org.apache.syncope.core.provisioning.api.pushpull.SyncopePullResultHandler;
53 import org.apache.syncope.core.provisioning.api.pushpull.stream.SyncopeStreamPullExecutor;
54 import org.apache.syncope.core.provisioning.java.pushpull.PullJobDelegate;
55 import org.apache.syncope.core.provisioning.java.utils.MappingUtils;
56 import org.apache.syncope.core.spring.security.SecureRandomUtils;
57 import org.identityconnectors.framework.common.objects.ObjectClass;
58 import org.quartz.JobExecutionException;
59 import org.springframework.beans.factory.annotation.Autowired;
60
61 public class StreamPullJobDelegate extends PullJobDelegate implements SyncopeStreamPullExecutor {
62
63 @Autowired
64 private ImplementationDAO implementationDAO;
65
66 @Autowired
67 private RealmDAO realmDAO;
68
69 private PullPolicy pullPolicy(
70 final AnyType anyType,
71 final ConflictResolutionAction conflictResolutionAction,
72 final String pullCorrelationRule) {
73
74 PullCorrelationRuleEntity pullCorrelationRuleEntity = null;
75 if (pullCorrelationRule != null) {
76 Implementation impl = implementationDAO.find(pullCorrelationRule);
77 if (impl == null || !IdMImplementationType.PULL_CORRELATION_RULE.equals(impl.getType())) {
78 LOG.debug("Invalid " + Implementation.class.getSimpleName() + " {}, ignoring...", pullCorrelationRule);
79 } else {
80 pullCorrelationRuleEntity = entityFactory.newEntity(PullCorrelationRuleEntity.class);
81 pullCorrelationRuleEntity.setAnyType(anyType);
82 pullCorrelationRuleEntity.setImplementation(impl);
83 }
84 }
85
86 PullPolicy pullPolicy = entityFactory.newEntity(PullPolicy.class);
87 pullPolicy.setConflictResolutionAction(conflictResolutionAction);
88
89 if (pullCorrelationRuleEntity != null) {
90 pullPolicy.add(pullCorrelationRuleEntity);
91 pullCorrelationRuleEntity.setPullPolicy(pullPolicy);
92 }
93
94 return pullPolicy;
95 }
96
97 private Provision provision(
98 final AnyType anyType,
99 final String keyColumn,
100 final List<String> columns) throws JobExecutionException {
101
102 Provision provision = new Provision();
103 provision.setAnyType(anyType.getKey());
104 provision.setObjectClass(anyType.getKey());
105
106 Mapping mapping = new Mapping();
107 provision.setMapping(mapping);
108
109 AnyUtils anyUtils = anyUtilsFactory.getInstance(anyType.getKind());
110 if (anyUtils.getField(keyColumn) == null) {
111 PlainSchema keyColumnSchema = plainSchemaDAO.find(keyColumn);
112 if (keyColumnSchema == null) {
113 throw new JobExecutionException("Plain Schema for key column not found: " + keyColumn);
114 }
115 }
116
117 Item connObjectKeyItem = new Item();
118 connObjectKeyItem.setExtAttrName(keyColumn);
119 connObjectKeyItem.setIntAttrName(keyColumn);
120 connObjectKeyItem.setPurpose(MappingPurpose.PULL);
121 mapping.setConnObjectKeyItem(connObjectKeyItem);
122
123 columns.stream().
124 filter(column -> anyUtils.getField(column) != null
125 || plainSchemaDAO.find(column) != null || virSchemaDAO.find(column) != null).
126 map(column -> {
127 Item item = new Item();
128 item.setExtAttrName(column);
129 item.setIntAttrName(column);
130 item.setPurpose(MappingPurpose.PULL);
131 mapping.add(item);
132 return item;
133 }).forEach(mapping::add);
134
135 return provision;
136 }
137
138 private ExternalResource externalResource(
139 final AnyType anyType,
140 final String keyColumn,
141 final List<String> columns,
142 final ConflictResolutionAction conflictResolutionAction,
143 final String pullCorrelationRule) throws JobExecutionException {
144
145 Provision provision = provision(anyType, keyColumn, columns);
146
147 ExternalResource resource = entityFactory.newEntity(ExternalResource.class);
148 resource.setKey("StreamPull_" + SecureRandomUtils.generateRandomUUID().toString());
149 resource.getProvisions().add(provision);
150
151 resource.setPullPolicy(pullPolicy(anyType, conflictResolutionAction, pullCorrelationRule));
152
153 return resource;
154 }
155
156 @Override
157 public List<ProvisioningReport> pull(
158 final AnyType anyType,
159 final String keyColumn,
160 final List<String> columns,
161 final ConflictResolutionAction conflictResolutionAction,
162 final String pullCorrelationRule,
163 final Connector connector,
164 final PullTaskTO pullTaskTO,
165 final String executor) throws JobExecutionException {
166
167 LOG.debug("Executing stream pull");
168
169 taskType = TaskType.PULL;
170 try {
171 ExternalResource resource =
172 externalResource(anyType, keyColumn, columns, conflictResolutionAction, pullCorrelationRule);
173 Provision provision = resource.getProvisions().get(0);
174
175 task = entityFactory.newEntity(PullTask.class);
176 task.setResource(resource);
177 task.setMatchingRule(pullTaskTO.getMatchingRule());
178 task.setUnmatchingRule(pullTaskTO.getUnmatchingRule());
179 task.setPullMode(PullMode.FULL_RECONCILIATION);
180 task.setPerformCreate(true);
181 task.setPerformUpdate(true);
182 task.setPerformDelete(false);
183 task.setSyncStatus(false);
184 task.setDestinationRealm(realmDAO.findByFullPath(pullTaskTO.getDestinationRealm()));
185 task.setRemediation(pullTaskTO.isRemediation());
186
187 profile = new ProvisioningProfile<>(connector, task);
188 profile.setDryRun(false);
189 profile.setConflictResolutionAction(conflictResolutionAction);
190 profile.getActions().addAll(getPullActions(pullTaskTO.getActions().stream().
191 map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList())));
192 profile.setExecutor(executor);
193
194 for (PullActions action : profile.getActions()) {
195 action.beforeAll(profile);
196 }
197
198 SyncopePullResultHandler handler;
199 GroupPullResultHandler ghandler = buildGroupHandler();
200 switch (anyType.getKind()) {
201 case USER:
202 handler = buildUserHandler();
203 break;
204
205 case GROUP:
206 handler = ghandler;
207 break;
208
209 case ANY_OBJECT:
210 default:
211 handler = buildAnyObjectHandler();
212 }
213 handler.setProfile(profile);
214
215
216 Set<String> moreAttrsToGet = new HashSet<>();
217 profile.getActions().forEach(a -> moreAttrsToGet.addAll(a.moreAttrsToGet(profile, provision)));
218
219 Stream<Item> mapItems = Stream.concat(
220 MappingUtils.getPullItems(provision.getMapping().getItems().stream()),
221 virSchemaDAO.find(resource.getKey(), anyType.getKey()).stream().
222 map(VirSchema::asLinkingMappingItem));
223
224 connector.fullReconciliation(
225 new ObjectClass(provision.getObjectClass()),
226 handler,
227 MappingUtils.buildOperationOptions(mapItems, moreAttrsToGet.toArray(String[]::new)));
228
229 try {
230 setGroupOwners(ghandler);
231 } catch (Exception e) {
232 LOG.error("While setting group owners", e);
233 }
234
235 for (PullActions action : profile.getActions()) {
236 action.afterAll(profile);
237 }
238
239 return profile.getResults();
240 } catch (Exception e) {
241 throw e instanceof JobExecutionException
242 ? (JobExecutionException) e
243 : new JobExecutionException("While stream pulling", e);
244 } finally {
245 setStatus(null);
246 }
247 }
248 }