View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.pushpull;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Optional;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.stream.Collectors;
28  import org.apache.commons.lang3.StringUtils;
29  import org.apache.commons.lang3.tuple.MutablePair;
30  import org.apache.syncope.common.lib.SyncopeConstants;
31  import org.apache.syncope.common.lib.to.Provision;
32  import org.apache.syncope.common.lib.types.ConflictResolutionAction;
33  import org.apache.syncope.core.persistence.api.dao.AnyDAO;
34  import org.apache.syncope.core.persistence.api.dao.AnySearchDAO;
35  import org.apache.syncope.core.persistence.api.dao.RealmDAO;
36  import org.apache.syncope.core.persistence.api.dao.search.SearchCond;
37  import org.apache.syncope.core.persistence.api.entity.Any;
38  import org.apache.syncope.core.persistence.api.entity.AnyType;
39  import org.apache.syncope.core.persistence.api.entity.AnyUtilsFactory;
40  import org.apache.syncope.core.persistence.api.entity.ExternalResource;
41  import org.apache.syncope.core.persistence.api.entity.Implementation;
42  import org.apache.syncope.core.persistence.api.entity.Realm;
43  import org.apache.syncope.core.persistence.api.entity.policy.PushPolicy;
44  import org.apache.syncope.core.persistence.api.entity.task.PushTask;
45  import org.apache.syncope.core.persistence.api.search.SearchCondConverter;
46  import org.apache.syncope.core.persistence.api.search.SearchCondVisitor;
47  import org.apache.syncope.core.provisioning.api.Connector;
48  import org.apache.syncope.core.provisioning.api.ProvisionSorter;
49  import org.apache.syncope.core.provisioning.api.pushpull.AnyObjectPushResultHandler;
50  import org.apache.syncope.core.provisioning.api.pushpull.GroupPushResultHandler;
51  import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
52  import org.apache.syncope.core.provisioning.api.pushpull.PushActions;
53  import org.apache.syncope.core.provisioning.api.pushpull.RealmPushResultHandler;
54  import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushExecutor;
55  import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushResultHandler;
56  import org.apache.syncope.core.provisioning.api.pushpull.UserPushResultHandler;
57  import org.apache.syncope.core.spring.ApplicationContextProvider;
58  import org.apache.syncope.core.spring.implementation.ImplementationManager;
59  import org.quartz.JobExecutionContext;
60  import org.quartz.JobExecutionException;
61  import org.springframework.beans.factory.annotation.Autowired;
62  import org.springframework.beans.factory.support.AbstractBeanDefinition;
63  
64  public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> implements SyncopePushExecutor {
65  
66      @Autowired
67      protected AnySearchDAO searchDAO;
68  
69      @Autowired
70      protected RealmDAO realmDAO;
71  
72      @Autowired
73      protected AnyUtilsFactory anyUtilsFactory;
74  
75      @Autowired
76      protected SearchCondVisitor searchCondVisitor;
77  
78      protected ProvisioningProfile<PushTask, PushActions> profile;
79  
80      protected final Map<String, MutablePair<Integer, String>> handled = new ConcurrentHashMap<>();
81  
82      protected final Map<String, PushActions> perContextActions = new ConcurrentHashMap<>();
83  
84      @Override
85      public void reportHandled(final String anyType, final String key) {
86          MutablePair<Integer, String> pair = handled.get(anyType);
87          if (pair == null) {
88              pair = MutablePair.of(0, null);
89              handled.put(anyType, pair);
90          }
91          pair.setLeft(pair.getLeft() + 1);
92          pair.setRight(key);
93  
94          if (!handled.isEmpty()) {
95              StringBuilder builder = new StringBuilder("Processed:\n");
96              handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t').
97                      append(k).
98                      append(" / latest: ").append(v.getRight()).
99                      append('\n'));
100             setStatus(builder.toString());
101         }
102     }
103 
104     @Override
105     public boolean wasInterruptRequested() {
106         return interrupt;
107     }
108 
109     @Override
110     public void setInterrupted() {
111         this.interrupted = true;
112     }
113 
114     protected boolean doHandle(
115             final List<? extends Any<?>> anys,
116             final PushResultHandlerDispatcher dispatcher,
117             final ExternalResource resource)
118             throws JobExecutionException {
119 
120         boolean result = true;
121         for (int i = 0; i < anys.size() && result; i++) {
122             try {
123                 result = dispatcher.handle(anys.get(i).getType().getKey(), anys.get(i).getKey());
124             } catch (Exception e) {
125                 LOG.warn("Failure pushing '{}' on '{}'", anys.get(i), resource, e);
126                 throw new JobExecutionException("While pushing " + anys.get(i) + " on " + resource, e);
127             }
128         }
129         return result;
130     }
131 
132     protected RealmPushResultHandler buildRealmHandler() {
133         return (RealmPushResultHandler) ApplicationContextProvider.getBeanFactory().
134                 createBean(DefaultRealmPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
135     }
136 
137     protected AnyObjectPushResultHandler buildAnyObjectHandler() {
138         return (AnyObjectPushResultHandler) ApplicationContextProvider.getBeanFactory().
139                 createBean(DefaultAnyObjectPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
140     }
141 
142     protected UserPushResultHandler buildUserHandler() {
143         return (UserPushResultHandler) ApplicationContextProvider.getBeanFactory().
144                 createBean(DefaultUserPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
145     }
146 
147     protected GroupPushResultHandler buildGroupHandler() {
148         return (GroupPushResultHandler) ApplicationContextProvider.getBeanFactory().
149                 createBean(DefaultGroupPushResultHandler.class, AbstractBeanDefinition.AUTOWIRE_BY_NAME, false);
150     }
151 
152     protected List<PushActions> getPushActions(final List<? extends Implementation> impls) {
153         List<PushActions> result = new ArrayList<>();
154 
155         impls.forEach(impl -> {
156             try {
157                 result.add(ImplementationManager.build(
158                         impl,
159                         () -> perContextActions.get(impl.getKey()),
160                         instance -> perContextActions.put(impl.getKey(), instance)));
161             } catch (Exception e) {
162                 LOG.warn("While building {}", impl, e);
163             }
164         });
165 
166         return result;
167     }
168 
169     @Override
170     protected String doExecuteProvisioning(
171             final PushTask pushTask,
172             final Connector connector,
173             final boolean dryRun,
174             final String executor,
175             final JobExecutionContext context) throws JobExecutionException {
176 
177         LOG.debug("Executing push on {}", pushTask.getResource());
178 
179         profile = new ProvisioningProfile<>(connector, pushTask);
180         profile.getActions().addAll(getPushActions(pushTask.getActions()));
181         profile.setDryRun(dryRun);
182         profile.setConflictResolutionAction(
183                 Optional.ofNullable(pushTask.getResource().getPushPolicy()).
184                         map(PushPolicy::getConflictResolutionAction).
185                         orElse(ConflictResolutionAction.IGNORE));
186         profile.setExecutor(executor);
187 
188         PushResultHandlerDispatcher dispatcher = new PushResultHandlerDispatcher(profile, this);
189 
190         if (!profile.isDryRun()) {
191             for (PushActions action : profile.getActions()) {
192                 action.beforeAll(profile);
193             }
194         }
195 
196         setStatus("Initialization completed");
197 
198         // First realms...
199         if (pushTask.getResource().getOrgUnit() != null) {
200             setStatus("Pushing realms");
201 
202             dispatcher.addHandlerSupplier(SyncopeConstants.REALM_ANYTYPE, () -> {
203                 RealmPushResultHandler handler = buildRealmHandler();
204                 handler.setProfile(profile);
205                 return handler;
206             });
207 
208             // Never push the root realm
209             List<Realm> realms = realmDAO.findDescendants(
210                     profile.getTask().getSourceRealm().getFullPath(), null, -1, -1).stream().
211                     filter(realm -> realm.getParent() != null).collect(Collectors.toList());
212             boolean result = true;
213             for (int i = 0; i < realms.size() && result; i++) {
214                 try {
215                     result = dispatcher.handle(SyncopeConstants.REALM_ANYTYPE, realms.get(i).getKey());
216                 } catch (Exception e) {
217                     LOG.warn("Failure pushing '{}' on '{}'", realms.get(i), pushTask.getResource(), e);
218                     throw new JobExecutionException(
219                             "While pushing " + realms.get(i) + " on " + pushTask.getResource(), e);
220                 }
221             }
222         }
223 
224         // ...then provisions for any types
225         ProvisionSorter provisionSorter = getProvisionSorter(pushTask);
226 
227         for (Provision provision : pushTask.getResource().getProvisions().stream().
228                 filter(provision -> provision.getMapping() != null).sorted(provisionSorter).
229                 collect(Collectors.toList())) {
230 
231             setStatus("Pushing " + provision.getAnyType());
232 
233             AnyType anyType = anyTypeDAO.find(provision.getAnyType());
234 
235             AnyDAO<?> anyDAO = anyUtilsFactory.getInstance(anyType.getKind()).dao();
236 
237             dispatcher.addHandlerSupplier(provision.getAnyType(), () -> {
238                 SyncopePushResultHandler handler;
239                 switch (anyType.getKind()) {
240                     case USER:
241                         handler = buildUserHandler();
242                         break;
243 
244                     case GROUP:
245                         handler = buildGroupHandler();
246                         break;
247 
248                     case ANY_OBJECT:
249                     default:
250                         handler = buildAnyObjectHandler();
251                 }
252                 handler.setProfile(profile);
253                 return handler;
254             });
255 
256             String filter = pushTask.getFilter(anyType.getKey()).orElse(null);
257             SearchCond cond = StringUtils.isBlank(filter)
258                     ? anyDAO.getAllMatchingCond()
259                     : SearchCondConverter.convert(searchCondVisitor, filter);
260             int count = searchDAO.count(
261                     profile.getTask().getSourceRealm(),
262                     true,
263                     Set.of(profile.getTask().getSourceRealm().getFullPath()),
264                     cond,
265                     anyType.getKind());
266             boolean result = true;
267             for (int page = 1; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE) + 1 && result; page++) {
268                 List<? extends Any<?>> anys = searchDAO.search(
269                         profile.getTask().getSourceRealm(),
270                         true,
271                         Set.of(profile.getTask().getSourceRealm().getFullPath()),
272                         cond,
273                         page,
274                         AnyDAO.DEFAULT_PAGE_SIZE,
275                         List.of(),
276                         anyType.getKind());
277                 result = doHandle(anys, dispatcher, pushTask.getResource());
278             }
279         }
280 
281         dispatcher.shutdown();
282 
283         if (!profile.isDryRun()) {
284             for (PushActions action : profile.getActions()) {
285                 action.afterAll(profile);
286             }
287         }
288 
289         setStatus("Push done");
290 
291         String result = createReport(profile.getResults(), pushTask.getResource(), dryRun);
292         LOG.debug("Push result: {}", result);
293         return result;
294     }
295 }