1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.logic;
20
21 import java.lang.reflect.Method;
22 import java.time.OffsetDateTime;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Optional;
28 import java.util.Set;
29 import java.util.stream.Collectors;
30 import javax.ws.rs.core.Response;
31 import org.apache.commons.lang3.ArrayUtils;
32 import org.apache.commons.lang3.tuple.Pair;
33 import org.apache.commons.lang3.tuple.Triple;
34 import org.apache.syncope.common.lib.SyncopeClientException;
35 import org.apache.syncope.common.lib.to.ExecTO;
36 import org.apache.syncope.common.lib.to.JobTO;
37 import org.apache.syncope.common.lib.to.MacroTaskTO;
38 import org.apache.syncope.common.lib.to.PropagationTaskTO;
39 import org.apache.syncope.common.lib.to.SchedTaskTO;
40 import org.apache.syncope.common.lib.to.TaskTO;
41 import org.apache.syncope.common.lib.types.AnyTypeKind;
42 import org.apache.syncope.common.lib.types.ClientExceptionType;
43 import org.apache.syncope.common.lib.types.ExecStatus;
44 import org.apache.syncope.common.lib.types.IdRepoEntitlement;
45 import org.apache.syncope.common.lib.types.JobAction;
46 import org.apache.syncope.common.lib.types.JobType;
47 import org.apache.syncope.common.lib.types.TaskType;
48 import org.apache.syncope.common.rest.api.RESTHeaders;
49 import org.apache.syncope.common.rest.api.batch.BatchResponseItem;
50 import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO;
51 import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
52 import org.apache.syncope.core.persistence.api.dao.NotFoundException;
53 import org.apache.syncope.core.persistence.api.dao.NotificationDAO;
54 import org.apache.syncope.core.persistence.api.dao.TaskDAO;
55 import org.apache.syncope.core.persistence.api.dao.TaskExecDAO;
56 import org.apache.syncope.core.persistence.api.dao.search.OrderByClause;
57 import org.apache.syncope.core.persistence.api.entity.ExternalResource;
58 import org.apache.syncope.core.persistence.api.entity.Notification;
59 import org.apache.syncope.core.persistence.api.entity.task.MacroTask;
60 import org.apache.syncope.core.persistence.api.entity.task.NotificationTask;
61 import org.apache.syncope.core.persistence.api.entity.task.PropagationTask;
62 import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
63 import org.apache.syncope.core.persistence.api.entity.task.Task;
64 import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
65 import org.apache.syncope.core.persistence.api.entity.task.TaskUtils;
66 import org.apache.syncope.core.persistence.api.entity.task.TaskUtilsFactory;
67 import org.apache.syncope.core.provisioning.api.data.TaskDataBinder;
68 import org.apache.syncope.core.provisioning.api.job.JobManager;
69 import org.apache.syncope.core.provisioning.api.job.JobNamer;
70 import org.apache.syncope.core.provisioning.api.notification.NotificationJobDelegate;
71 import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskExecutor;
72 import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo;
73 import org.apache.syncope.core.provisioning.api.utils.ExceptionUtils2;
74 import org.apache.syncope.core.provisioning.java.propagation.DefaultPropagationReporter;
75 import org.apache.syncope.core.spring.security.AuthContextUtils;
76 import org.apache.syncope.core.spring.security.DelegatedAdministrationException;
77 import org.identityconnectors.framework.common.objects.ObjectClass;
78 import org.quartz.JobDataMap;
79 import org.quartz.JobKey;
80 import org.quartz.SchedulerException;
81 import org.springframework.dao.InvalidDataAccessApiUsageException;
82 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
83 import org.springframework.security.access.prepost.PreAuthorize;
84 import org.springframework.transaction.annotation.Transactional;
85
86 public class TaskLogic extends AbstractExecutableLogic<TaskTO> {
87
88 protected final TaskDAO taskDAO;
89
90 protected final TaskExecDAO taskExecDAO;
91
92 protected final ExternalResourceDAO resourceDAO;
93
94 protected final NotificationDAO notificationDAO;
95
96 protected final TaskDataBinder binder;
97
98 protected final PropagationTaskExecutor taskExecutor;
99
100 protected final NotificationJobDelegate notificationJobDelegate;
101
102 protected final TaskUtilsFactory taskUtilsFactory;
103
104 public TaskLogic(
105 final JobManager jobManager,
106 final SchedulerFactoryBean scheduler,
107 final JobStatusDAO jobStatusDAO,
108 final TaskDAO taskDAO,
109 final TaskExecDAO taskExecDAO,
110 final ExternalResourceDAO resourceDAO,
111 final NotificationDAO notificationDAO,
112 final TaskDataBinder binder,
113 final PropagationTaskExecutor taskExecutor,
114 final NotificationJobDelegate notificationJobDelegate,
115 final TaskUtilsFactory taskUtilsFactory) {
116
117 super(jobManager, scheduler, jobStatusDAO);
118
119 this.taskDAO = taskDAO;
120 this.taskExecDAO = taskExecDAO;
121 this.resourceDAO = resourceDAO;
122 this.notificationDAO = notificationDAO;
123 this.binder = binder;
124 this.taskExecutor = taskExecutor;
125 this.notificationJobDelegate = notificationJobDelegate;
126 this.taskUtilsFactory = taskUtilsFactory;
127 }
128
129 protected void securityChecks(final String entitlement, final String realm) {
130 Set<String> authRealms = AuthContextUtils.getAuthorizations().get(entitlement);
131 if (authRealms.stream().noneMatch(r -> realm.startsWith(r))) {
132 throw new DelegatedAdministrationException(realm, MacroTask.class.getSimpleName(), null);
133 }
134 }
135
136 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_CREATE + "')")
137 public <T extends SchedTaskTO> T createSchedTask(final TaskType type, final T taskTO) {
138 TaskUtils taskUtils = taskUtilsFactory.getInstance(taskTO);
139 if (taskUtils.getType() != type) {
140 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.InvalidRequest);
141 sce.getElements().add("Found " + type + ", expected " + taskUtils.getType());
142 throw sce;
143 }
144
145 if (taskUtils.getType() == TaskType.MACRO) {
146 securityChecks(IdRepoEntitlement.TASK_CREATE, ((MacroTaskTO) taskTO).getRealm());
147 }
148
149 SchedTask task = binder.createSchedTask(taskTO, taskUtils);
150 task = taskDAO.save(task);
151
152 try {
153 jobManager.register(
154 task,
155 task.getStartAt(),
156 AuthContextUtils.getUsername());
157 } catch (Exception e) {
158 LOG.error("While registering quartz job for task " + task.getKey(), e);
159
160 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Scheduling);
161 sce.getElements().add(e.getMessage());
162 throw sce;
163 }
164
165 return binder.getTaskTO(task, taskUtils, false);
166 }
167
168 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_UPDATE + "')")
169 public <T extends SchedTaskTO> T updateSchedTask(final TaskType type, final SchedTaskTO taskTO) {
170 SchedTask task = taskDAO.find(type, taskTO.getKey());
171 if (task == null) {
172 throw new NotFoundException("Task " + taskTO.getKey());
173 }
174
175 TaskUtils taskUtils = taskUtilsFactory.getInstance(task);
176 if (taskUtils.getType() != type) {
177 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.InvalidRequest);
178 sce.getElements().add("Found " + type + ", expected " + taskUtils.getType());
179 throw sce;
180 }
181
182 if (taskUtils.getType() == TaskType.MACRO) {
183 securityChecks(IdRepoEntitlement.TASK_UPDATE, ((MacroTask) task).getRealm().getFullPath());
184 securityChecks(IdRepoEntitlement.TASK_UPDATE, ((MacroTaskTO) taskTO).getRealm());
185 }
186
187 binder.updateSchedTask(task, taskTO, taskUtils);
188 task = taskDAO.save(task);
189 try {
190 jobManager.register(
191 task,
192 task.getStartAt(),
193 AuthContextUtils.getUsername());
194 } catch (Exception e) {
195 LOG.error("While registering quartz job for task " + task.getKey(), e);
196
197 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Scheduling);
198 sce.getElements().add(e.getMessage());
199 throw sce;
200 }
201
202 return binder.getTaskTO(task, taskUtils, false);
203 }
204
205 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_LIST + "')")
206 @Transactional(readOnly = true)
207 public <T extends TaskTO> Pair<Integer, List<T>> search(
208 final TaskType type,
209 final String resource,
210 final String notification,
211 final AnyTypeKind anyTypeKind,
212 final String entityKey,
213 final int page,
214 final int size,
215 final List<OrderByClause> orderByClauses,
216 final boolean details) {
217
218 try {
219 if (type == null) {
220 throw new IllegalArgumentException("type is required");
221 }
222
223 ExternalResource resourceObj = resourceDAO.find(resource);
224 if (resource != null && resourceObj == null) {
225 throw new IllegalArgumentException("Missing External Resource: " + resource);
226 }
227
228 Notification notificationObj = notificationDAO.find(notification);
229 if (notification != null && notificationObj == null) {
230 throw new IllegalArgumentException("Missing Notification: " + notification);
231 }
232
233 int count = taskDAO.count(
234 type,
235 resourceObj,
236 notificationObj,
237 anyTypeKind,
238 entityKey);
239
240 List<T> result = taskDAO.findAll(
241 type,
242 resourceObj,
243 notificationObj,
244 anyTypeKind,
245 entityKey,
246 page,
247 size,
248 orderByClauses).stream().
249 <T>map(task -> binder.getTaskTO(task, taskUtilsFactory.getInstance(type), details)).
250 collect(Collectors.toList());
251
252 return Pair.of(count, result);
253 } catch (IllegalArgumentException | InvalidDataAccessApiUsageException e) {
254 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.InvalidRequest);
255 sce.getElements().add(e.getMessage());
256 throw sce;
257 }
258 }
259
260 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_READ + "')")
261 @Transactional(readOnly = true)
262 public <T extends TaskTO> T read(final TaskType type, final String key, final boolean details) {
263 Task<?> task = taskDAO.find(type, key);
264 if (task == null) {
265 throw new NotFoundException("Task " + key);
266 }
267
268 TaskUtils taskUtils = taskUtilsFactory.getInstance(task);
269 if (type != null && taskUtils.getType() != type) {
270 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.InvalidRequest);
271 sce.getElements().add("Found " + type + ", expected " + taskUtils.getType());
272 throw sce;
273 }
274
275 if (taskUtils.getType() == TaskType.MACRO) {
276 securityChecks(IdRepoEntitlement.TASK_READ, ((MacroTask) task).getRealm().getFullPath());
277 }
278
279 return binder.getTaskTO(task, taskUtilsFactory.getInstance(task), details);
280 }
281
282 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_EXECUTE + "')")
283 @Override
284 public ExecTO execute(final String key, final OffsetDateTime startAt, final boolean dryRun) {
285 Task<?> task = taskDAO.find(key).orElseThrow(() -> new NotFoundException("Task " + key));
286 if (startAt != null && startAt.isBefore(OffsetDateTime.now())) {
287 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Scheduling);
288 sce.getElements().add("Cannot schedule in the past");
289 throw sce;
290 }
291
292 TaskUtils taskUtils = taskUtilsFactory.getInstance(task);
293 String executor = AuthContextUtils.getUsername();
294
295 ExecTO result = null;
296 switch (taskUtils.getType()) {
297 case PROPAGATION:
298 PropagationTask propagationTask = (PropagationTask) task;
299 PropagationTaskInfo taskInfo = new PropagationTaskInfo(
300 propagationTask.getResource(),
301 propagationTask.getOperation(),
302 new ObjectClass(propagationTask.getObjectClassName()),
303 propagationTask.getAnyTypeKind(),
304 propagationTask.getAnyType(),
305 propagationTask.getEntityKey(),
306 propagationTask.getConnObjectKey(),
307 propagationTask.getPropagationData());
308 taskInfo.setKey(propagationTask.getKey());
309 taskInfo.setOldConnObjectKey(propagationTask.getOldConnObjectKey());
310
311 TaskExec<PropagationTask> propExec = taskExecutor.execute(
312 taskInfo, new DefaultPropagationReporter(), executor);
313 result = binder.getExecTO(propExec);
314 break;
315
316 case NOTIFICATION:
317 TaskExec<NotificationTask> notExec = notificationJobDelegate.executeSingle(
318 (NotificationTask) task, executor);
319 result = binder.getExecTO(notExec);
320 break;
321
322 case SCHEDULED:
323 case PULL:
324 case PUSH:
325 case MACRO:
326 if (taskUtils.getType() == TaskType.MACRO) {
327 securityChecks(IdRepoEntitlement.TASK_EXECUTE, ((MacroTask) task).getRealm().getFullPath());
328 }
329
330 if (!((SchedTask) task).isActive()) {
331 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Scheduling);
332 sce.getElements().add("Task " + key + " is not active");
333 throw sce;
334 }
335
336 try {
337 Map<String, Object> jobDataMap = jobManager.register(
338 (SchedTask) task,
339 startAt,
340 executor);
341 jobDataMap.put(JobManager.DRY_RUN_JOBDETAIL_KEY, dryRun);
342
343 if (startAt == null) {
344 scheduler.getScheduler().triggerJob(JobNamer.getJobKey(task), new JobDataMap(jobDataMap));
345 }
346 } catch (Exception e) {
347 LOG.error("While executing task {}", task, e);
348
349 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Scheduling);
350 sce.getElements().add(e.getMessage());
351 throw sce;
352 }
353
354 result = new ExecTO();
355 result.setJobType(JobType.TASK);
356 result.setRefKey(task.getKey());
357 result.setRefDesc(binder.buildRefDesc(task));
358 result.setStart(OffsetDateTime.now());
359 result.setExecutor(executor);
360 result.setStatus("JOB_FIRED");
361 result.setMessage("Job fired; waiting for results...");
362 break;
363
364 default:
365 }
366
367 return result;
368 }
369
370 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_DELETE + "')")
371 public <T extends TaskTO> T delete(final TaskType type, final String key) {
372 Task<?> task = taskDAO.find(type, key);
373 if (task == null) {
374 throw new NotFoundException("Task " + key);
375 }
376
377 TaskUtils taskUtils = taskUtilsFactory.getInstance(task);
378 if (type != null && taskUtils.getType() != type) {
379 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.InvalidRequest);
380 sce.getElements().add("Found " + type + ", expected " + taskUtils.getType());
381 throw sce;
382 }
383
384 if (taskUtils.getType() == TaskType.MACRO) {
385 securityChecks(IdRepoEntitlement.TASK_DELETE, ((MacroTask) task).getRealm().getFullPath());
386 }
387
388 T taskToDelete = binder.getTaskTO(task, taskUtils, true);
389
390 if (TaskType.SCHEDULED == taskUtils.getType()
391 || TaskType.PULL == taskUtils.getType()
392 || TaskType.PUSH == taskUtils.getType()) {
393
394 jobManager.unregister(task);
395 }
396
397 taskDAO.delete(task);
398 return taskToDelete;
399 }
400
401 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_READ + "')")
402 @Override
403 public Pair<Integer, List<ExecTO>> listExecutions(
404 final String key,
405 final OffsetDateTime before,
406 final OffsetDateTime after,
407 final int page,
408 final int size,
409 final List<OrderByClause> orderByClauses) {
410
411 Task<?> task = taskDAO.find(key).orElseThrow(() -> new NotFoundException("Task " + key));
412
413 if (task instanceof MacroTask) {
414 securityChecks(IdRepoEntitlement.TASK_READ, ((MacroTask) task).getRealm().getFullPath());
415 }
416
417 Integer count = taskExecDAO.count(task, before, after);
418
419 List<ExecTO> result = taskExecDAO.findAll(task, before, after, page, size, orderByClauses).stream().
420 map(exec -> binder.getExecTO(exec)).collect(Collectors.toList());
421
422 return Pair.of(count, result);
423 }
424
425 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_LIST + "')")
426 @Override
427 public List<ExecTO> listRecentExecutions(final int max) {
428 return taskExecDAO.findRecent(max).stream().
429 map(exec -> {
430 try {
431 if (exec.getTask() instanceof MacroTask) {
432 securityChecks(IdRepoEntitlement.TASK_DELETE,
433 ((MacroTask) exec.getTask()).getRealm().getFullPath());
434 }
435
436 return binder.getExecTO(exec);
437 } catch (DelegatedAdministrationException e) {
438 LOG.error("Skip executions for command task", e);
439 return null;
440 }
441 }).
442 filter(Objects::nonNull).
443 collect(Collectors.toList());
444 }
445
446 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_DELETE + "')")
447 @Override
448 public ExecTO deleteExecution(final String execKey) {
449 TaskExec<?> exec = taskExecDAO.find(execKey).
450 orElseThrow(() -> new NotFoundException("Task execution " + execKey));
451
452 if (exec.getTask() instanceof MacroTask) {
453 securityChecks(IdRepoEntitlement.TASK_DELETE, ((MacroTask) exec.getTask()).getRealm().getFullPath());
454 }
455
456 ExecTO executionToDelete = binder.getExecTO(exec);
457 taskExecDAO.delete(exec);
458 return executionToDelete;
459 }
460
461 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_DELETE + "')")
462 @Override
463 public List<BatchResponseItem> deleteExecutions(
464 final String key,
465 final OffsetDateTime before,
466 final OffsetDateTime after) {
467
468 Task<?> task = taskDAO.find(key).orElseThrow(() -> new NotFoundException("Task " + key));
469
470 List<BatchResponseItem> batchResponseItems = new ArrayList<>();
471
472 taskExecDAO.findAll(task, before, after, -1, -1, List.of()).forEach(exec -> {
473 BatchResponseItem item = new BatchResponseItem();
474 item.getHeaders().put(RESTHeaders.RESOURCE_KEY, List.of(exec.getKey()));
475 batchResponseItems.add(item);
476
477 try {
478 if (exec.getTask() instanceof MacroTask) {
479 securityChecks(IdRepoEntitlement.TASK_DELETE,
480 ((MacroTask) exec.getTask()).getRealm().getFullPath());
481 }
482
483 taskExecDAO.delete(exec);
484 item.setStatus(Response.Status.OK.getStatusCode());
485 } catch (Exception e) {
486 LOG.error("Error deleting execution {} of task {}", exec.getKey(), key, e);
487 item.setStatus(Response.Status.BAD_REQUEST.getStatusCode());
488 item.setContent(ExceptionUtils2.getFullStackTrace(e));
489 }
490 });
491
492 return batchResponseItems;
493 }
494
495 @Override
496 protected Triple<JobType, String, String> getReference(final JobKey jobKey) {
497 String key = JobNamer.getTaskKeyFromJobName(jobKey.getName());
498
499 Task<?> task = taskDAO.find(key).orElse(null);
500 return task == null || !(task instanceof SchedTask)
501 ? null
502 : Triple.of(JobType.TASK, key, binder.buildRefDesc(task));
503 }
504
505 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_LIST + "')")
506 @Override
507 public List<JobTO> listJobs() {
508 return super.doListJobs(true);
509 }
510
511 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_READ + "')")
512 @Override
513 public JobTO getJob(final String key) {
514 Task<?> task = taskDAO.find(key).orElseThrow(() -> new NotFoundException("Task " + key));
515
516 if (task instanceof MacroTask) {
517 securityChecks(IdRepoEntitlement.TASK_READ, ((MacroTask) task).getRealm().getFullPath());
518 }
519
520 JobTO jobTO = null;
521 try {
522 jobTO = getJobTO(JobNamer.getJobKey(task), false);
523 } catch (SchedulerException e) {
524 LOG.error("Problems while retrieving scheduled job {}", JobNamer.getJobKey(task), e);
525
526 SyncopeClientException sce = SyncopeClientException.build(ClientExceptionType.Scheduling);
527 sce.getElements().add(e.getMessage());
528 throw sce;
529 }
530 if (jobTO == null) {
531 throw new NotFoundException("Job for task " + key);
532 }
533 return jobTO;
534 }
535
536 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_EXECUTE + "')")
537 @Override
538 public void actionJob(final String key, final JobAction action) {
539 Task<?> task = taskDAO.find(key).orElseThrow(() -> new NotFoundException("Task " + key));
540
541 if (task instanceof MacroTask) {
542 securityChecks(IdRepoEntitlement.TASK_EXECUTE, ((MacroTask) task).getRealm().getFullPath());
543 }
544
545 doActionJob(JobNamer.getJobKey(task), action);
546 }
547
548 @PreAuthorize("hasRole('" + IdRepoEntitlement.TASK_DELETE + "')")
549 public List<PropagationTaskTO> purgePropagations(
550 final OffsetDateTime since,
551 final List<ExecStatus> statuses,
552 final List<String> resources) {
553
554 return taskDAO.purgePropagations(since, statuses, Optional.ofNullable(resources).
555 map(r -> r.stream().map(resourceDAO::find).
556 filter(Objects::nonNull).collect(Collectors.toList())).
557 orElse(null));
558 }
559
560 @Override
561 protected TaskTO resolveReference(final Method method, final Object... args)
562 throws UnresolvedReferenceException {
563
564 String key = null;
565
566 if (ArrayUtils.isNotEmpty(args)
567 && !"deleteExecution".equals(method.getName()) && !"readExecution".equals(method.getName())) {
568
569 for (int i = 0; key == null && i < args.length; i++) {
570 if (args[i] instanceof String) {
571 key = (String) args[i];
572 } else if (args[i] instanceof TaskTO) {
573 key = ((TaskTO) args[i]).getKey();
574 }
575 }
576 }
577
578 if (key != null) {
579 String taskKey = key;
580 try {
581 Task<?> task = taskDAO.find(taskKey).orElseThrow(() -> new NotFoundException("Task " + taskKey));
582 return binder.getTaskTO(task, taskUtilsFactory.getInstance(task), false);
583 } catch (Throwable ignore) {
584 LOG.debug("Unresolved reference", ignore);
585 throw new UnresolvedReferenceException(ignore);
586 }
587 }
588
589 throw new UnresolvedReferenceException();
590 }
591 }