1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.pushpull;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.stream.Collectors;
28 import org.apache.commons.lang3.StringUtils;
29 import org.apache.commons.lang3.tuple.MutablePair;
30 import org.apache.syncope.common.lib.SyncopeConstants;
31 import org.apache.syncope.common.lib.to.Provision;
32 import org.apache.syncope.common.lib.types.ConflictResolutionAction;
33 import org.apache.syncope.core.persistence.api.dao.AnyDAO;
34 import org.apache.syncope.core.persistence.api.dao.AnySearchDAO;
35 import org.apache.syncope.core.persistence.api.dao.RealmDAO;
36 import org.apache.syncope.core.persistence.api.dao.search.SearchCond;
37 import org.apache.syncope.core.persistence.api.entity.Any;
38 import org.apache.syncope.core.persistence.api.entity.AnyType;
39 import org.apache.syncope.core.persistence.api.entity.AnyUtilsFactory;
40 import org.apache.syncope.core.persistence.api.entity.ExternalResource;
41 import org.apache.syncope.core.persistence.api.entity.Implementation;
42 import org.apache.syncope.core.persistence.api.entity.Realm;
43 import org.apache.syncope.core.persistence.api.entity.policy.PushPolicy;
44 import org.apache.syncope.core.persistence.api.entity.task.PushTask;
45 import org.apache.syncope.core.persistence.api.search.SearchCondConverter;
46 import org.apache.syncope.core.persistence.api.search.SearchCondVisitor;
47 import org.apache.syncope.core.provisioning.api.Connector;
48 import org.apache.syncope.core.provisioning.api.ProvisionSorter;
49 import org.apache.syncope.core.provisioning.api.pushpull.AnyObjectPushResultHandler;
50 import org.apache.syncope.core.provisioning.api.pushpull.GroupPushResultHandler;
51 import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
52 import org.apache.syncope.core.provisioning.api.pushpull.PushActions;
53 import org.apache.syncope.core.provisioning.api.pushpull.RealmPushResultHandler;
54 import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushExecutor;
55 import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushResultHandler;
56 import org.apache.syncope.core.provisioning.api.pushpull.UserPushResultHandler;
57 import org.apache.syncope.core.spring.ApplicationContextProvider;
58 import org.apache.syncope.core.spring.implementation.ImplementationManager;
59 import org.quartz.JobExecutionContext;
60 import org.quartz.JobExecutionException;
61 import org.springframework.beans.factory.annotation.Autowired;
62 import org.springframework.beans.factory.support.AbstractBeanDefinition;
63
64 public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> implements SyncopePushExecutor {
65
66 @Autowired
67 protected AnySearchDAO searchDAO;
68
69 @Autowired
70 protected RealmDAO realmDAO;
71
72 @Autowired
73 protected AnyUtilsFactory anyUtilsFactory;
74
75 @Autowired
76 protected SearchCondVisitor searchCondVisitor;
77
78 protected ProvisioningProfile<PushTask, PushActions> profile;
79
80 protected final Map<String, MutablePair<Integer, String>> handled = new ConcurrentHashMap<>();
81
82 protected final Map<String, PushActions> perContextActions = new ConcurrentHashMap<>();
83
84 @Override
85 public void reportHandled(final String anyType, final String key) {
86 MutablePair<Integer, String> pair = handled.get(anyType);
87 if (pair == null) {
88 pair = MutablePair.of(0, null);
89 handled.put(anyType, pair);
90 }
91 pair.setLeft(pair.getLeft() + 1);
92 pair.setRight(key);
93
94 if (!handled.isEmpty()) {
95 StringBuilder builder = new StringBuilder("Processed:\n");
96 handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t').
97 append(k).
98 append(" / latest: ").append(v.getRight()).
99 append('\n'));
100 setStatus(builder.toString());
101 }
102 }
103
104 @Override
105 public boolean wasInterruptRequested() {
106 return interrupt;
107 }
108
109 @Override
110 public void setInterrupted() {
111 this.interrupted = true;
112 }
113
114 protected boolean doHandle(
115 final List<? extends Any<?>> anys,
116 final PushResultHandlerDispatcher dispatcher,
117 final ExternalResource resource)
118 throws JobExecutionException {
119
120 boolean result = true;
121 for (int i = 0; i < anys.size() && result; i++) {
122 try {
123 result = dispatcher.handle(anys.get(i).getType().getKey(), anys.get(i).getKey());
124 } catch (Exception e) {
125 LOG.warn("Failure pushing '{}' on '{}'", anys.get(i), resource, e);
126 throw new JobExecutionException("While pushing " + anys.get(i) + " on " + resource, e);
127 }
128 }
129 return result;
130 }
131
132 protected RealmPushResultHandler buildRealmHandler() {
133 return (RealmPushResultHandler) ApplicationContextProvider.getBeanFactory().
134 createBean(DefaultRealmPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
135 }
136
137 protected AnyObjectPushResultHandler buildAnyObjectHandler() {
138 return (AnyObjectPushResultHandler) ApplicationContextProvider.getBeanFactory().
139 createBean(DefaultAnyObjectPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
140 }
141
142 protected UserPushResultHandler buildUserHandler() {
143 return (UserPushResultHandler) ApplicationContextProvider.getBeanFactory().
144 createBean(DefaultUserPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
145 }
146
147 protected GroupPushResultHandler buildGroupHandler() {
148 return (GroupPushResultHandler) ApplicationContextProvider.getBeanFactory().
149 createBean(DefaultGroupPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
150 }
151
152 protected List<PushActions> getPushActions(final List<? extends Implementation> impls) {
153 List<PushActions> result = new ArrayList<>();
154
155 impls.forEach(impl -> {
156 try {
157 result.add(ImplementationManager.build(
158 impl,
159 () -> perContextActions.get(impl.getKey()),
160 instance -> perContextActions.put(impl.getKey(), instance)));
161 } catch (Exception e) {
162 LOG.warn("While building {}", impl, e);
163 }
164 });
165
166 return result;
167 }
168
169 @Override
170 protected String doExecuteProvisioning(
171 final PushTask pushTask,
172 final Connector connector,
173 final boolean dryRun,
174 final String executor,
175 final JobExecutionContext context) throws JobExecutionException {
176
177 LOG.debug("Executing push on {}", pushTask.getResource());
178
179 profile = new ProvisioningProfile<>(connector, pushTask);
180 profile.getActions().addAll(getPushActions(pushTask.getActions()));
181 profile.setDryRun(dryRun);
182 profile.setConflictResolutionAction(
183 Optional.ofNullable(pushTask.getResource().getPushPolicy()).
184 map(PushPolicy::getConflictResolutionAction).
185 orElse(ConflictResolutionAction.IGNORE));
186 profile.setExecutor(executor);
187
188 PushResultHandlerDispatcher dispatcher = new PushResultHandlerDispatcher(profile, this);
189
190 if (!profile.isDryRun()) {
191 for (PushActions action : profile.getActions()) {
192 action.beforeAll(profile);
193 }
194 }
195
196 setStatus("Initialization completed");
197
198
199 if (pushTask.getResource().getOrgUnit() != null) {
200 setStatus("Pushing realms");
201
202 dispatcher.addHandlerSupplier(SyncopeConstants.REALM_ANYTYPE, () -> {
203 RealmPushResultHandler handler = buildRealmHandler();
204 handler.setProfile(profile);
205 return handler;
206 });
207
208
209 List<Realm> realms = realmDAO.findDescendants(
210 profile.getTask().getSourceRealm().getFullPath(), null, -1, -1).stream().
211 filter(realm -> realm.getParent() != null).collect(Collectors.toList());
212 boolean result = true;
213 for (int i = 0; i < realms.size() && result; i++) {
214 try {
215 result = dispatcher.handle(SyncopeConstants.REALM_ANYTYPE, realms.get(i).getKey());
216 } catch (Exception e) {
217 LOG.warn("Failure pushing '{}' on '{}'", realms.get(i), pushTask.getResource(), e);
218 throw new JobExecutionException(
219 "While pushing " + realms.get(i) + " on " + pushTask.getResource(), e);
220 }
221 }
222 }
223
224
225 ProvisionSorter provisionSorter = getProvisionSorter(pushTask);
226
227 for (Provision provision : pushTask.getResource().getProvisions().stream().
228 filter(provision -> provision.getMapping() != null).sorted(provisionSorter).
229 collect(Collectors.toList())) {
230
231 setStatus("Pushing " + provision.getAnyType());
232
233 AnyType anyType = anyTypeDAO.find(provision.getAnyType());
234
235 AnyDAO<?> anyDAO = anyUtilsFactory.getInstance(anyType.getKind()).dao();
236
237 dispatcher.addHandlerSupplier(provision.getAnyType(), () -> {
238 SyncopePushResultHandler handler;
239 switch (anyType.getKind()) {
240 case USER:
241 handler = buildUserHandler();
242 break;
243
244 case GROUP:
245 handler = buildGroupHandler();
246 break;
247
248 case ANY_OBJECT:
249 default:
250 handler = buildAnyObjectHandler();
251 }
252 handler.setProfile(profile);
253 return handler;
254 });
255
256 String filter = pushTask.getFilter(anyType.getKey()).orElse(null);
257 SearchCond cond = StringUtils.isBlank(filter)
258 ? anyDAO.getAllMatchingCond()
259 : SearchCondConverter.convert(searchCondVisitor, filter);
260 int count = searchDAO.count(
261 profile.getTask().getSourceRealm(),
262 true,
263 Set.of(profile.getTask().getSourceRealm().getFullPath()),
264 cond,
265 anyType.getKind());
266 boolean result = true;
267 for (int page = 1; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE) + 1 && result; page++) {
268 List<? extends Any<?>> anys = searchDAO.search(
269 profile.getTask().getSourceRealm(),
270 true,
271 Set.of(profile.getTask().getSourceRealm().getFullPath()),
272 cond,
273 page,
274 AnyDAO.DEFAULT_PAGE_SIZE,
275 List.of(),
276 anyType.getKind());
277 result = doHandle(anys, dispatcher, pushTask.getResource());
278 }
279 }
280
281 dispatcher.shutdown();
282
283 if (!profile.isDryRun()) {
284 for (PushActions action : profile.getActions()) {
285 action.afterAll(profile);
286 }
287 }
288
289 setStatus("Push done");
290
291 String result = createReport(profile.getResults(), pushTask.getResource(), dryRun);
292 LOG.debug("Push result: {}", result);
293 return result;
294 }
295 }