View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.propagation;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Comparator;
24  import java.util.List;
25  import java.util.Optional;
26  import java.util.concurrent.CompletionService;
27  import java.util.concurrent.ExecutorCompletionService;
28  import java.util.concurrent.Future;
29  import java.util.stream.Collectors;
30  import org.apache.syncope.common.lib.types.ExecStatus;
31  import org.apache.syncope.core.persistence.api.attrvalue.validation.PlainAttrValidationManager;
32  import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO;
33  import org.apache.syncope.core.persistence.api.dao.PlainSchemaDAO;
34  import org.apache.syncope.core.persistence.api.dao.TaskDAO;
35  import org.apache.syncope.core.persistence.api.entity.AnyUtilsFactory;
36  import org.apache.syncope.core.persistence.api.entity.Exec;
37  import org.apache.syncope.core.persistence.api.entity.task.PropagationTask;
38  import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
39  import org.apache.syncope.core.persistence.api.entity.task.TaskUtilsFactory;
40  import org.apache.syncope.core.provisioning.api.AuditManager;
41  import org.apache.syncope.core.provisioning.api.ConnectorManager;
42  import org.apache.syncope.core.provisioning.api.data.TaskDataBinder;
43  import org.apache.syncope.core.provisioning.api.notification.NotificationManager;
44  import org.apache.syncope.core.provisioning.api.propagation.PropagationException;
45  import org.apache.syncope.core.provisioning.api.propagation.PropagationReporter;
46  import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskCallable;
47  import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo;
48  import org.apache.syncope.core.provisioning.java.pushpull.OutboundMatcher;
49  import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils;
50  import org.apache.syncope.core.spring.ApplicationContextProvider;
51  import org.springframework.beans.factory.support.AbstractBeanDefinition;
52  import org.springframework.context.ApplicationEventPublisher;
53  import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
54  
55  /**
56   * Sorts the tasks to be executed according to related
57   * {@link org.apache.syncope.core.persistence.api.entity.ExternalResource}'s priority, then execute.
58   * Tasks related to resources with NULL priority are executed after other tasks, concurrently.
59   * Failure during execution of a task related to resource with non-NULL priority are treated as fatal and will interrupt
60   * the whole process, resulting in a global failure.
61   */
62  public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExecutor {
63  
64      /**
65       * Creates new instances of {@link PropagationTaskCallable} for usage with
66       * {@link java.util.concurrent.CompletionService}.
67       *
68       * @param taskInfo to be executed
69       * @param reporter to report propagation execution status
70       * @param executor user that triggered the propagation execution
71       * @return new {@link PropagationTaskCallable} instance for usage with
72       * {@link java.util.concurrent.CompletionService}
73       */
74      protected PropagationTaskCallable newPropagationTaskCallable(
75              final PropagationTaskInfo taskInfo, final PropagationReporter reporter, final String executor) {
76  
77          PropagationTaskCallable callable = (PropagationTaskCallable) ApplicationContextProvider.getBeanFactory().
78                  createBean(DefaultPropagationTaskCallable.class, AbstractBeanDefinition.AUTOWIRE_BY_TYPE, false);
79          callable.setTaskInfo(taskInfo);
80          callable.setReporter(reporter);
81          callable.setExecutor(executor);
82  
83          return callable;
84      }
85  
86      protected final ThreadPoolTaskExecutor taskExecutor;
87  
88      public PriorityPropagationTaskExecutor(
89              final ConnectorManager connectorManager,
90              final ConnObjectUtils connObjectUtils,
91              final TaskDAO taskDAO,
92              final ExternalResourceDAO resourceDAO,
93              final PlainSchemaDAO plainSchemaDAO,
94              final NotificationManager notificationManager,
95              final AuditManager auditManager,
96              final TaskDataBinder taskDataBinder,
97              final AnyUtilsFactory anyUtilsFactory,
98              final TaskUtilsFactory taskUtilsFactory,
99              final OutboundMatcher outboundMatcher,
100             final PlainAttrValidationManager validator,
101             final ApplicationEventPublisher publisher,
102             final ThreadPoolTaskExecutor taskExecutor) {
103 
104         super(connectorManager,
105                 connObjectUtils,
106                 taskDAO,
107                 resourceDAO,
108                 plainSchemaDAO,
109                 notificationManager,
110                 auditManager,
111                 taskDataBinder,
112                 anyUtilsFactory,
113                 taskUtilsFactory,
114                 outboundMatcher,
115                 validator,
116                 publisher);
117         this.taskExecutor = taskExecutor;
118     }
119 
120     @Override
121     public PropagationReporter execute(
122             final Collection<PropagationTaskInfo> taskInfos,
123             final boolean nullPriorityAsync,
124             final String executor) {
125 
126         PropagationReporter reporter = new DefaultPropagationReporter();
127         try {
128             List<PropagationTaskInfo> prioritizedTasks = taskInfos.stream().
129                     filter(task -> task.getResource().getPropagationPriority() != null).
130                     sorted(Comparator.comparing(task -> task.getResource().getPropagationPriority())).
131                     collect(Collectors.toList());
132             LOG.debug("Propagation tasks sorted by priority, for serial execution: {}", prioritizedTasks);
133 
134             List<PropagationTaskInfo> concurrentTasks = taskInfos.stream().
135                     filter(task -> !prioritizedTasks.contains(task)).
136                     collect(Collectors.toList());
137             LOG.debug("Propagation tasks for concurrent execution: {}", concurrentTasks);
138 
139             // first process priority resources sequentially and fail as soon as any propagation failure is reported
140             prioritizedTasks.forEach(taskInfo -> {
141                 TaskExec<PropagationTask> exec = null;
142                 ExecStatus execStatus;
143                 String errorMessage = null;
144                 try {
145                     exec = newPropagationTaskCallable(taskInfo, reporter, executor).call();
146                     execStatus = ExecStatus.valueOf(exec.getStatus());
147                 } catch (Exception e) {
148                     LOG.error("Unexpected exception", e);
149                     execStatus = ExecStatus.FAILURE;
150                     errorMessage = e.getMessage();
151                 }
152                 if (execStatus != ExecStatus.SUCCESS) {
153                     throw new PropagationException(
154                             taskInfo.getResource().getKey(),
155                             Optional.ofNullable(exec).map(Exec::getMessage).orElse(errorMessage));
156                 }
157             });
158 
159             // then process non-priority resources concurrently...
160             if (!concurrentTasks.isEmpty()) {
161                 CompletionService<TaskExec<PropagationTask>> completionService =
162                         new ExecutorCompletionService<>(taskExecutor);
163                 List<Future<TaskExec<PropagationTask>>> futures = new ArrayList<>();
164 
165                 concurrentTasks.forEach(taskInfo -> {
166                     try {
167                         futures.add(completionService.submit(newPropagationTaskCallable(taskInfo, reporter, executor)));
168 
169                         if (nullPriorityAsync) {
170                             reporter.onSuccessOrNonPriorityResourceFailures(
171                                     taskInfo, ExecStatus.CREATED, null, null, null, null);
172                         }
173                     } catch (Exception e) {
174                         LOG.error("While submitting task for async execution: {}", taskInfo, e);
175                         rejected(taskInfo, e.getMessage(), reporter, executor);
176                     }
177                 });
178 
179                 // ...waiting for all callables to complete, if async processing was not required
180                 if (!nullPriorityAsync) {
181                     futures.forEach(future -> {
182                         try {
183                             future.get();
184                         } catch (Exception e) {
185                             LOG.error("Unexpected exception", e);
186                         }
187                     });
188                 }
189             }
190         } catch (PropagationException e) {
191             LOG.error("Error propagation priority resource", e);
192             reporter.onPriorityResourceFailure(e.getResourceName(), taskInfos);
193         }
194 
195         return reporter;
196     }
197 }