1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.syncope.core.provisioning.java.job;
20
21 import java.sql.Connection;
22 import java.sql.PreparedStatement;
23 import java.sql.ResultSet;
24 import java.sql.SQLException;
25 import java.time.OffsetDateTime;
26 import java.util.Date;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.Set;
33 import javax.sql.DataSource;
34 import org.apache.commons.lang3.StringUtils;
35 import org.apache.syncope.common.keymaster.client.api.ConfParamOps;
36 import org.apache.syncope.common.lib.SyncopeConstants;
37 import org.apache.syncope.common.lib.types.IdRepoImplementationType;
38 import org.apache.syncope.common.lib.types.TaskType;
39 import org.apache.syncope.core.persistence.api.DomainHolder;
40 import org.apache.syncope.core.persistence.api.SyncopeCoreLoader;
41 import org.apache.syncope.core.persistence.api.dao.ImplementationDAO;
42 import org.apache.syncope.core.persistence.api.dao.ReportDAO;
43 import org.apache.syncope.core.persistence.api.dao.TaskDAO;
44 import org.apache.syncope.core.persistence.api.entity.Implementation;
45 import org.apache.syncope.core.persistence.api.entity.Report;
46 import org.apache.syncope.core.persistence.api.entity.task.PullTask;
47 import org.apache.syncope.core.persistence.api.entity.task.PushTask;
48 import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
49 import org.apache.syncope.core.persistence.api.entity.task.Task;
50 import org.apache.syncope.core.persistence.api.entity.task.TaskUtilsFactory;
51 import org.apache.syncope.core.provisioning.api.job.JobManager;
52 import org.apache.syncope.core.provisioning.api.job.JobNamer;
53 import org.apache.syncope.core.provisioning.api.job.SchedTaskJobDelegate;
54 import org.apache.syncope.core.provisioning.java.job.notification.NotificationJob;
55 import org.apache.syncope.core.provisioning.java.job.report.ReportJob;
56 import org.apache.syncope.core.provisioning.java.pushpull.PullJobDelegate;
57 import org.apache.syncope.core.provisioning.java.pushpull.PushJobDelegate;
58 import org.apache.syncope.core.spring.security.AuthContextUtils;
59 import org.apache.syncope.core.spring.security.SecurityProperties;
60 import org.identityconnectors.common.IOUtil;
61 import org.quartz.CronScheduleBuilder;
62 import org.quartz.Job;
63 import org.quartz.JobBuilder;
64 import org.quartz.JobDataMap;
65 import org.quartz.JobKey;
66 import org.quartz.Scheduler;
67 import org.quartz.SchedulerException;
68 import org.quartz.Trigger;
69 import org.quartz.TriggerBuilder;
70 import org.quartz.TriggerKey;
71 import org.quartz.impl.jdbcjobstore.Constants;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74 import org.springframework.jdbc.datasource.DataSourceUtils;
75 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
76 import org.springframework.transaction.annotation.Transactional;
77
78 public class DefaultJobManager implements JobManager, SyncopeCoreLoader {
79
80 protected static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
81
82 protected final DomainHolder domainHolder;
83
84 protected final SchedulerFactoryBean scheduler;
85
86 protected final TaskDAO taskDAO;
87
88 protected final ReportDAO reportDAO;
89
90 protected final ImplementationDAO implementationDAO;
91
92 protected final TaskUtilsFactory taskUtilsFactory;
93
94 protected final ConfParamOps confParamOps;
95
96 protected final SecurityProperties securityProperties;
97
98 protected boolean disableQuartzInstance;
99
100 public DefaultJobManager(
101 final DomainHolder domainHolder,
102 final SchedulerFactoryBean scheduler,
103 final TaskDAO taskDAO,
104 final ReportDAO reportDAO,
105 final ImplementationDAO implementationDAO,
106 final TaskUtilsFactory taskUtilsFactory,
107 final ConfParamOps confParamOps,
108 final SecurityProperties securityProperties) {
109
110 this.domainHolder = domainHolder;
111 this.scheduler = scheduler;
112 this.taskDAO = taskDAO;
113 this.reportDAO = reportDAO;
114 this.implementationDAO = implementationDAO;
115 this.taskUtilsFactory = taskUtilsFactory;
116 this.confParamOps = confParamOps;
117 this.securityProperties = securityProperties;
118 }
119
120 public void setDisableQuartzInstance(final boolean disableQuartzInstance) {
121 this.disableQuartzInstance = disableQuartzInstance;
122 }
123
124 protected boolean isRunningHere(final JobKey jobKey) throws SchedulerException {
125 return scheduler.getScheduler().getCurrentlyExecutingJobs().stream().
126 anyMatch(jec -> jobKey.equals(jec.getJobDetail().getKey()));
127 }
128
129 protected boolean isRunningElsewhere(final JobKey jobKey) throws SchedulerException {
130 if (!scheduler.getScheduler().getMetaData().isJobStoreClustered()) {
131 return false;
132 }
133
134 DataSource dataSource = domainHolder.getDomains().get(SyncopeConstants.MASTER_DOMAIN);
135 Connection conn = DataSourceUtils.getConnection(dataSource);
136 PreparedStatement stmt = null;
137 ResultSet resultSet = null;
138 try {
139 stmt = conn.prepareStatement(
140 "SELECT 1 FROM " + Constants.DEFAULT_TABLE_PREFIX + "FIRED_TRIGGERS "
141 + "WHERE JOB_NAME = ? AND JOB_GROUP = ?");
142 stmt.setString(1, jobKey.getName());
143 stmt.setString(2, jobKey.getGroup());
144
145 resultSet = stmt.executeQuery();
146 return resultSet.next();
147 } catch (SQLException e) {
148 throw new SchedulerException(e);
149 } finally {
150 IOUtil.quietClose(resultSet);
151 IOUtil.quietClose(stmt);
152 DataSourceUtils.releaseConnection(conn, dataSource);
153 }
154 }
155
156 @Override
157 public boolean isRunning(final JobKey jobKey) throws SchedulerException {
158 return isRunningHere(jobKey) || isRunningElsewhere(jobKey);
159 }
160
161 protected void registerJob(
162 final String jobName,
163 final Class<? extends Job> jobClass,
164 final String cronExpression,
165 final Date startAt,
166 final Map<String, Object> jobMap)
167 throws SchedulerException {
168
169 if (isRunning(new JobKey(jobName, Scheduler.DEFAULT_GROUP))) {
170 LOG.debug("Job {} already running, cancel", jobName);
171 return;
172 }
173
174
175 unregisterJob(jobName);
176
177
178 JobBuilder jobDetailBuilder = JobBuilder.newJob(jobClass).
179 withIdentity(jobName).
180 usingJobData(new JobDataMap(jobMap));
181
182
183 if (cronExpression == null && startAt == null) {
184
185 scheduler.getScheduler().addJob(jobDetailBuilder.storeDurably().build(), true);
186 } else {
187 TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger().
188 withIdentity(JobNamer.getTriggerName(jobName));
189
190 if (cronExpression == null) {
191 triggerBuilder.startAt(startAt);
192 } else {
193 triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression));
194
195 if (startAt == null) {
196 triggerBuilder.startNow();
197 } else {
198 triggerBuilder.startAt(startAt);
199 }
200 }
201
202 scheduler.getScheduler().scheduleJob(jobDetailBuilder.build(), triggerBuilder.build());
203 }
204 }
205
206 @Override
207 public Map<String, Object> register(
208 final SchedTask task,
209 final OffsetDateTime startAt,
210 final String executor) throws SchedulerException {
211
212 Implementation jobDelegate = task.getJobDelegate() == null
213 ? task instanceof PullTask
214 ? implementationDAO.findByType(IdRepoImplementationType.TASKJOB_DELEGATE).stream().
215 filter(impl -> PullJobDelegate.class.getName().equals(impl.getBody())).
216 findFirst().orElse(null)
217 : task instanceof PushTask
218 ? implementationDAO.findByType(IdRepoImplementationType.TASKJOB_DELEGATE).stream().
219 filter(impl -> PushJobDelegate.class.getName().equals(impl.getBody())).
220 findFirst().orElse(null)
221 : null
222 : task.getJobDelegate();
223 if (jobDelegate == null) {
224 throw new IllegalArgumentException("Task " + task
225 + " does not provide any " + SchedTaskJobDelegate.class.getSimpleName());
226 }
227
228 Map<String, Object> jobMap = createJobMapForExecutionContext(executor);
229 jobMap.put(JobManager.TASK_TYPE, taskUtilsFactory.getInstance(task).getType());
230 jobMap.put(JobManager.TASK_KEY, task.getKey());
231 jobMap.put(JobManager.DELEGATE_IMPLEMENTATION, jobDelegate.getKey());
232
233 registerJob(
234 JobNamer.getJobKey(task).getName(),
235 TaskJob.class,
236 task.getCronExpression(),
237 Optional.ofNullable(startAt).map(s -> new Date(s.toInstant().toEpochMilli())).orElse(null),
238 jobMap);
239 return jobMap;
240 }
241
242 @Override
243 public Map<String, Object> register(
244 final Report report,
245 final OffsetDateTime startAt,
246 final String executor) throws SchedulerException {
247
248 Map<String, Object> jobMap = createJobMapForExecutionContext(executor);
249 jobMap.put(JobManager.REPORT_KEY, report.getKey());
250 jobMap.put(JobManager.DELEGATE_IMPLEMENTATION, report.getJobDelegate().getKey());
251
252 registerJob(
253 JobNamer.getJobKey(report).getName(),
254 ReportJob.class,
255 report.getCronExpression(),
256 Optional.ofNullable(startAt).map(s -> new Date(s.toInstant().toEpochMilli())).orElse(null),
257 jobMap);
258 return jobMap;
259 }
260
261 protected Map<String, Object> createJobMapForExecutionContext(final String executor) {
262 Map<String, Object> jobMap = new HashMap<>();
263 jobMap.put(JobManager.DOMAIN_KEY, AuthContextUtils.getDomain());
264 jobMap.put(JobManager.EXECUTOR_KEY, executor);
265 return jobMap;
266 }
267
268 protected void unregisterJob(final String jobName) {
269 try {
270 scheduler.getScheduler().unscheduleJob(new TriggerKey(jobName, Scheduler.DEFAULT_GROUP));
271 scheduler.getScheduler().deleteJob(new JobKey(jobName, Scheduler.DEFAULT_GROUP));
272 } catch (SchedulerException e) {
273 LOG.error("Could not remove job " + jobName, e);
274 }
275 }
276
277 @Override
278 public void unregister(final Task<?> task) {
279 unregisterJob(JobNamer.getJobKey(task).getName());
280 }
281
282 @Override
283 public void unregister(final Report report) {
284 unregisterJob(JobNamer.getJobKey(report).getName());
285 }
286
287 @Override
288 public int getOrder() {
289 return 500;
290 }
291
292 @Transactional
293 @Override
294 public void load(final String domain, final DataSource datasource) {
295 if (disableQuartzInstance) {
296 String instanceId = "AUTO";
297 try {
298 instanceId = scheduler.getScheduler().getSchedulerInstanceId();
299 scheduler.getScheduler().standby();
300
301 LOG.info("Successfully put Quartz instance {} in standby", instanceId);
302 } catch (SchedulerException e) {
303 LOG.error("Could not put Quartz instance {} in standby", instanceId, e);
304 }
305
306 return;
307 }
308
309 String notificationJobCronExp = AuthContextUtils.callAsAdmin(SyncopeConstants.MASTER_DOMAIN, () -> {
310 String result = StringUtils.EMPTY;
311
312 String conf = confParamOps.get(
313 SyncopeConstants.MASTER_DOMAIN, "notificationjob.cronExpression", null, String.class);
314 if (conf == null) {
315 result = NotificationJob.DEFAULT_CRON_EXP;
316 } else if (!StringUtils.EMPTY.equals(conf)) {
317 result = conf;
318 }
319 return result;
320 });
321
322 AuthContextUtils.callAsAdmin(domain, () -> {
323
324 Set<SchedTask> tasks = new HashSet<>(taskDAO.<SchedTask>findAll(TaskType.SCHEDULED));
325 tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PULL));
326 tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PUSH));
327
328 boolean loadException = false;
329 for (Iterator<SchedTask> it = tasks.iterator(); it.hasNext() && !loadException;) {
330 SchedTask task = it.next();
331 try {
332 register(task, task.getStartAt(), securityProperties.getAdminUser());
333 } catch (Exception e) {
334 LOG.error("While loading job instance for task " + task.getKey(), e);
335 loadException = true;
336 }
337 }
338
339 if (loadException) {
340 LOG.debug("Errors while loading job instances for tasks, aborting");
341 } else {
342
343 for (Iterator<Report> it = reportDAO.findAll().iterator(); it.hasNext() && !loadException;) {
344 Report report = it.next();
345 try {
346 register(report, null, securityProperties.getAdminUser());
347 } catch (Exception e) {
348 LOG.error("While loading job instance for report " + report.getName(), e);
349 loadException = true;
350 }
351 }
352
353 if (loadException) {
354 LOG.debug("Errors while loading job instances for reports, aborting");
355 }
356 }
357
358 return null;
359 });
360
361 if (SyncopeConstants.MASTER_DOMAIN.equals(domain)) {
362
363 if (StringUtils.isBlank(notificationJobCronExp)) {
364 LOG.debug("Empty value provided for {}'s cron, not registering anything on Quartz",
365 NotificationJob.class.getSimpleName());
366 } else {
367 LOG.debug("{}'s cron expression: {} - registering Quartz job and trigger",
368 NotificationJob.class.getSimpleName(), notificationJobCronExp);
369
370 try {
371 Map<String, Object> jobData = createJobMapForExecutionContext(securityProperties.getAdminUser());
372 registerJob(
373 NOTIFICATION_JOB.getName(),
374 NotificationJob.class,
375 notificationJobCronExp,
376 null,
377 jobData);
378 } catch (Exception e) {
379 LOG.error("While loading {} instance", NotificationJob.class.getSimpleName(), e);
380 }
381 }
382
383
384 LOG.debug("Registering {}", SystemLoadReporterJob.class);
385 try {
386 Map<String, Object> jobData = createJobMapForExecutionContext(securityProperties.getAdminUser());
387 registerJob(
388 StringUtils.uncapitalize(SystemLoadReporterJob.class.getSimpleName()),
389 SystemLoadReporterJob.class,
390 "0 * * * * ?",
391 null,
392 jobData);
393 } catch (Exception e) {
394 LOG.error("While loading {} instance", SystemLoadReporterJob.class.getSimpleName(), e);
395 }
396 }
397 }
398
399 @Override
400 public void unload(final String domain) {
401 AuthContextUtils.callAsAdmin(domain, () -> {
402
403 Set<SchedTask> tasks = new HashSet<>(taskDAO.<SchedTask>findAll(TaskType.SCHEDULED));
404 tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PULL));
405 tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PUSH));
406
407 tasks.forEach(task -> {
408 try {
409 unregister(task);
410 } catch (Exception e) {
411 LOG.error("While unloading job instance for task " + task.getKey(), e);
412 }
413 });
414
415
416 reportDAO.findAll().forEach(report -> {
417 try {
418 unregister(report);
419 } catch (Exception e) {
420 LOG.error("While unloading job instance for report " + report.getName(), e);
421 }
422 });
423
424 return null;
425 });
426 }
427 }