1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.propagation;
20
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Comparator;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.CompletionService;
27 import java.util.concurrent.ExecutorCompletionService;
28 import java.util.concurrent.Future;
29 import java.util.stream.Collectors;
30 import org.apache.syncope.common.lib.types.ExecStatus;
31 import org.apache.syncope.core.persistence.api.attrvalue.validation.PlainAttrValidationManager;
32 import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO;
33 import org.apache.syncope.core.persistence.api.dao.PlainSchemaDAO;
34 import org.apache.syncope.core.persistence.api.dao.TaskDAO;
35 import org.apache.syncope.core.persistence.api.entity.AnyUtilsFactory;
36 import org.apache.syncope.core.persistence.api.entity.Exec;
37 import org.apache.syncope.core.persistence.api.entity.task.PropagationTask;
38 import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
39 import org.apache.syncope.core.persistence.api.entity.task.TaskUtilsFactory;
40 import org.apache.syncope.core.provisioning.api.AuditManager;
41 import org.apache.syncope.core.provisioning.api.ConnectorManager;
42 import org.apache.syncope.core.provisioning.api.data.TaskDataBinder;
43 import org.apache.syncope.core.provisioning.api.notification.NotificationManager;
44 import org.apache.syncope.core.provisioning.api.propagation.PropagationException;
45 import org.apache.syncope.core.provisioning.api.propagation.PropagationReporter;
46 import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskCallable;
47 import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo;
48 import org.apache.syncope.core.provisioning.java.pushpull.OutboundMatcher;
49 import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils;
50 import org.apache.syncope.core.spring.ApplicationContextProvider;
51 import org.springframework.beans.factory.support.AbstractBeanDefinition;
52 import org.springframework.context.ApplicationEventPublisher;
53 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
54
55
56
57
58
59
60
61
62 public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExecutor {
63
64
65
66
67
68
69
70
71
72
73
74 protected PropagationTaskCallable newPropagationTaskCallable(
75 final PropagationTaskInfo taskInfo, final PropagationReporter reporter, final String executor) {
76
77 PropagationTaskCallable callable = (PropagationTaskCallable) ApplicationContextProvider.getBeanFactory().
78 createBean(DefaultPropagationTaskCallable.class, AbstractBeanDefinition.AUTOWIRE_BY_TYPE, false);
79 callable.setTaskInfo(taskInfo);
80 callable.setReporter(reporter);
81 callable.setExecutor(executor);
82
83 return callable;
84 }
85
86 protected final ThreadPoolTaskExecutor taskExecutor;
87
88 public PriorityPropagationTaskExecutor(
89 final ConnectorManager connectorManager,
90 final ConnObjectUtils connObjectUtils,
91 final TaskDAO taskDAO,
92 final ExternalResourceDAO resourceDAO,
93 final PlainSchemaDAO plainSchemaDAO,
94 final NotificationManager notificationManager,
95 final AuditManager auditManager,
96 final TaskDataBinder taskDataBinder,
97 final AnyUtilsFactory anyUtilsFactory,
98 final TaskUtilsFactory taskUtilsFactory,
99 final OutboundMatcher outboundMatcher,
100 final PlainAttrValidationManager validator,
101 final ApplicationEventPublisher publisher,
102 final ThreadPoolTaskExecutor taskExecutor) {
103
104 super(connectorManager,
105 connObjectUtils,
106 taskDAO,
107 resourceDAO,
108 plainSchemaDAO,
109 notificationManager,
110 auditManager,
111 taskDataBinder,
112 anyUtilsFactory,
113 taskUtilsFactory,
114 outboundMatcher,
115 validator,
116 publisher);
117 this.taskExecutor = taskExecutor;
118 }
119
120 @Override
121 public PropagationReporter execute(
122 final Collection<PropagationTaskInfo> taskInfos,
123 final boolean nullPriorityAsync,
124 final String executor) {
125
126 PropagationReporter reporter = new DefaultPropagationReporter();
127 try {
128 List<PropagationTaskInfo> prioritizedTasks = taskInfos.stream().
129 filter(task -> task.getResource().getPropagationPriority() != null).
130 sorted(Comparator.comparing(task -> task.getResource().getPropagationPriority())).
131 collect(Collectors.toList());
132 LOG.debug("Propagation tasks sorted by priority, for serial execution: {}", prioritizedTasks);
133
134 List<PropagationTaskInfo> concurrentTasks = taskInfos.stream().
135 filter(task -> !prioritizedTasks.contains(task)).
136 collect(Collectors.toList());
137 LOG.debug("Propagation tasks for concurrent execution: {}", concurrentTasks);
138
139
140 prioritizedTasks.forEach(taskInfo -> {
141 TaskExec<PropagationTask> exec = null;
142 ExecStatus execStatus;
143 String errorMessage = null;
144 try {
145 exec = newPropagationTaskCallable(taskInfo, reporter, executor).call();
146 execStatus = ExecStatus.valueOf(exec.getStatus());
147 } catch (Exception e) {
148 LOG.error("Unexpected exception", e);
149 execStatus = ExecStatus.FAILURE;
150 errorMessage = e.getMessage();
151 }
152 if (execStatus != ExecStatus.SUCCESS) {
153 throw new PropagationException(
154 taskInfo.getResource().getKey(),
155 Optional.ofNullable(exec).map(Exec::getMessage).orElse(errorMessage));
156 }
157 });
158
159
160 if (!concurrentTasks.isEmpty()) {
161 CompletionService<TaskExec<PropagationTask>> completionService =
162 new ExecutorCompletionService<>(taskExecutor);
163 List<Future<TaskExec<PropagationTask>>> futures = new ArrayList<>();
164
165 concurrentTasks.forEach(taskInfo -> {
166 try {
167 futures.add(completionService.submit(newPropagationTaskCallable(taskInfo, reporter, executor)));
168
169 if (nullPriorityAsync) {
170 reporter.onSuccessOrNonPriorityResourceFailures(
171 taskInfo, ExecStatus.CREATED, null, null, null, null);
172 }
173 } catch (Exception e) {
174 LOG.error("While submitting task for async execution: {}", taskInfo, e);
175 rejected(taskInfo, e.getMessage(), reporter, executor);
176 }
177 });
178
179
180 if (!nullPriorityAsync) {
181 futures.forEach(future -> {
182 try {
183 future.get();
184 } catch (Exception e) {
185 LOG.error("Unexpected exception", e);
186 }
187 });
188 }
189 }
190 } catch (PropagationException e) {
191 LOG.error("Error propagation priority resource", e);
192 reporter.onPriorityResourceFailure(e.getResourceName(), taskInfos);
193 }
194
195 return reporter;
196 }
197 }