View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.syncope.core.provisioning.java.job;
20  
21  import java.sql.Connection;
22  import java.sql.PreparedStatement;
23  import java.sql.ResultSet;
24  import java.sql.SQLException;
25  import java.time.OffsetDateTime;
26  import java.util.Date;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.Map;
31  import java.util.Optional;
32  import java.util.Set;
33  import javax.sql.DataSource;
34  import org.apache.commons.lang3.StringUtils;
35  import org.apache.syncope.common.keymaster.client.api.ConfParamOps;
36  import org.apache.syncope.common.lib.SyncopeConstants;
37  import org.apache.syncope.common.lib.types.IdRepoImplementationType;
38  import org.apache.syncope.common.lib.types.TaskType;
39  import org.apache.syncope.core.persistence.api.DomainHolder;
40  import org.apache.syncope.core.persistence.api.SyncopeCoreLoader;
41  import org.apache.syncope.core.persistence.api.dao.ImplementationDAO;
42  import org.apache.syncope.core.persistence.api.dao.ReportDAO;
43  import org.apache.syncope.core.persistence.api.dao.TaskDAO;
44  import org.apache.syncope.core.persistence.api.entity.Implementation;
45  import org.apache.syncope.core.persistence.api.entity.Report;
46  import org.apache.syncope.core.persistence.api.entity.task.PullTask;
47  import org.apache.syncope.core.persistence.api.entity.task.PushTask;
48  import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
49  import org.apache.syncope.core.persistence.api.entity.task.Task;
50  import org.apache.syncope.core.persistence.api.entity.task.TaskUtilsFactory;
51  import org.apache.syncope.core.provisioning.api.job.JobManager;
52  import org.apache.syncope.core.provisioning.api.job.JobNamer;
53  import org.apache.syncope.core.provisioning.api.job.SchedTaskJobDelegate;
54  import org.apache.syncope.core.provisioning.java.job.notification.NotificationJob;
55  import org.apache.syncope.core.provisioning.java.job.report.ReportJob;
56  import org.apache.syncope.core.provisioning.java.pushpull.PullJobDelegate;
57  import org.apache.syncope.core.provisioning.java.pushpull.PushJobDelegate;
58  import org.apache.syncope.core.spring.security.AuthContextUtils;
59  import org.apache.syncope.core.spring.security.SecurityProperties;
60  import org.identityconnectors.common.IOUtil;
61  import org.quartz.CronScheduleBuilder;
62  import org.quartz.Job;
63  import org.quartz.JobBuilder;
64  import org.quartz.JobDataMap;
65  import org.quartz.JobKey;
66  import org.quartz.Scheduler;
67  import org.quartz.SchedulerException;
68  import org.quartz.Trigger;
69  import org.quartz.TriggerBuilder;
70  import org.quartz.TriggerKey;
71  import org.quartz.impl.jdbcjobstore.Constants;
72  import org.slf4j.Logger;
73  import org.slf4j.LoggerFactory;
74  import org.springframework.jdbc.datasource.DataSourceUtils;
75  import org.springframework.scheduling.quartz.SchedulerFactoryBean;
76  import org.springframework.transaction.annotation.Transactional;
77  
78  public class DefaultJobManager implements JobManager, SyncopeCoreLoader {
79  
80      protected static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
81  
82      protected final DomainHolder domainHolder;
83  
84      protected final SchedulerFactoryBean scheduler;
85  
86      protected final TaskDAO taskDAO;
87  
88      protected final ReportDAO reportDAO;
89  
90      protected final ImplementationDAO implementationDAO;
91  
92      protected final TaskUtilsFactory taskUtilsFactory;
93  
94      protected final ConfParamOps confParamOps;
95  
96      protected final SecurityProperties securityProperties;
97  
98      protected boolean disableQuartzInstance;
99  
100     public DefaultJobManager(
101             final DomainHolder domainHolder,
102             final SchedulerFactoryBean scheduler,
103             final TaskDAO taskDAO,
104             final ReportDAO reportDAO,
105             final ImplementationDAO implementationDAO,
106             final TaskUtilsFactory taskUtilsFactory,
107             final ConfParamOps confParamOps,
108             final SecurityProperties securityProperties) {
109 
110         this.domainHolder = domainHolder;
111         this.scheduler = scheduler;
112         this.taskDAO = taskDAO;
113         this.reportDAO = reportDAO;
114         this.implementationDAO = implementationDAO;
115         this.taskUtilsFactory = taskUtilsFactory;
116         this.confParamOps = confParamOps;
117         this.securityProperties = securityProperties;
118     }
119 
120     public void setDisableQuartzInstance(final boolean disableQuartzInstance) {
121         this.disableQuartzInstance = disableQuartzInstance;
122     }
123 
124     protected boolean isRunningHere(final JobKey jobKey) throws SchedulerException {
125         return scheduler.getScheduler().getCurrentlyExecutingJobs().stream().
126                 anyMatch(jec -> jobKey.equals(jec.getJobDetail().getKey()));
127     }
128 
129     protected boolean isRunningElsewhere(final JobKey jobKey) throws SchedulerException {
130         if (!scheduler.getScheduler().getMetaData().isJobStoreClustered()) {
131             return false;
132         }
133 
134         DataSource dataSource = domainHolder.getDomains().get(SyncopeConstants.MASTER_DOMAIN);
135         Connection conn = DataSourceUtils.getConnection(dataSource);
136         PreparedStatement stmt = null;
137         ResultSet resultSet = null;
138         try {
139             stmt = conn.prepareStatement(
140                     "SELECT 1 FROM " + Constants.DEFAULT_TABLE_PREFIX + "FIRED_TRIGGERS "
141                     + "WHERE JOB_NAME = ? AND JOB_GROUP = ?");
142             stmt.setString(1, jobKey.getName());
143             stmt.setString(2, jobKey.getGroup());
144 
145             resultSet = stmt.executeQuery();
146             return resultSet.next();
147         } catch (SQLException e) {
148             throw new SchedulerException(e);
149         } finally {
150             IOUtil.quietClose(resultSet);
151             IOUtil.quietClose(stmt);
152             DataSourceUtils.releaseConnection(conn, dataSource);
153         }
154     }
155 
156     @Override
157     public boolean isRunning(final JobKey jobKey) throws SchedulerException {
158         return isRunningHere(jobKey) || isRunningElsewhere(jobKey);
159     }
160 
161     protected void registerJob(
162             final String jobName,
163             final Class<? extends Job> jobClass,
164             final String cronExpression,
165             final Date startAt,
166             final Map<String, Object> jobMap)
167             throws SchedulerException {
168 
169         if (isRunning(new JobKey(jobName, Scheduler.DEFAULT_GROUP))) {
170             LOG.debug("Job {} already running, cancel", jobName);
171             return;
172         }
173 
174         // 0. unregister job
175         unregisterJob(jobName);
176 
177         // 1. JobDetail
178         JobBuilder jobDetailBuilder = JobBuilder.newJob(jobClass).
179                 withIdentity(jobName).
180                 usingJobData(new JobDataMap(jobMap));
181 
182         // 2. Trigger
183         if (cronExpression == null && startAt == null) {
184             // Jobs added with no trigger must be durable
185             scheduler.getScheduler().addJob(jobDetailBuilder.storeDurably().build(), true);
186         } else {
187             TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger().
188                     withIdentity(JobNamer.getTriggerName(jobName));
189 
190             if (cronExpression == null) {
191                 triggerBuilder.startAt(startAt);
192             } else {
193                 triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression));
194 
195                 if (startAt == null) {
196                     triggerBuilder.startNow();
197                 } else {
198                     triggerBuilder.startAt(startAt);
199                 }
200             }
201 
202             scheduler.getScheduler().scheduleJob(jobDetailBuilder.build(), triggerBuilder.build());
203         }
204     }
205 
206     @Override
207     public Map<String, Object> register(
208             final SchedTask task,
209             final OffsetDateTime startAt,
210             final String executor) throws SchedulerException {
211 
212         Implementation jobDelegate = task.getJobDelegate() == null
213                 ? task instanceof PullTask
214                         ? implementationDAO.findByType(IdRepoImplementationType.TASKJOB_DELEGATE).stream().
215                                 filter(impl -> PullJobDelegate.class.getName().equals(impl.getBody())).
216                                 findFirst().orElse(null)
217                         : task instanceof PushTask
218                                 ? implementationDAO.findByType(IdRepoImplementationType.TASKJOB_DELEGATE).stream().
219                                         filter(impl -> PushJobDelegate.class.getName().equals(impl.getBody())).
220                                         findFirst().orElse(null)
221                                 : null
222                 : task.getJobDelegate();
223         if (jobDelegate == null) {
224             throw new IllegalArgumentException("Task " + task
225                     + " does not provide any " + SchedTaskJobDelegate.class.getSimpleName());
226         }
227 
228         Map<String, Object> jobMap = createJobMapForExecutionContext(executor);
229         jobMap.put(JobManager.TASK_TYPE, taskUtilsFactory.getInstance(task).getType());
230         jobMap.put(JobManager.TASK_KEY, task.getKey());
231         jobMap.put(JobManager.DELEGATE_IMPLEMENTATION, jobDelegate.getKey());
232 
233         registerJob(
234                 JobNamer.getJobKey(task).getName(),
235                 TaskJob.class,
236                 task.getCronExpression(),
237                 Optional.ofNullable(startAt).map(s -> new Date(s.toInstant().toEpochMilli())).orElse(null),
238                 jobMap);
239         return jobMap;
240     }
241 
242     @Override
243     public Map<String, Object> register(
244             final Report report,
245             final OffsetDateTime startAt,
246             final String executor) throws SchedulerException {
247 
248         Map<String, Object> jobMap = createJobMapForExecutionContext(executor);
249         jobMap.put(JobManager.REPORT_KEY, report.getKey());
250         jobMap.put(JobManager.DELEGATE_IMPLEMENTATION, report.getJobDelegate().getKey());
251 
252         registerJob(
253                 JobNamer.getJobKey(report).getName(),
254                 ReportJob.class,
255                 report.getCronExpression(),
256                 Optional.ofNullable(startAt).map(s -> new Date(s.toInstant().toEpochMilli())).orElse(null),
257                 jobMap);
258         return jobMap;
259     }
260 
261     protected Map<String, Object> createJobMapForExecutionContext(final String executor) {
262         Map<String, Object> jobMap = new HashMap<>();
263         jobMap.put(JobManager.DOMAIN_KEY, AuthContextUtils.getDomain());
264         jobMap.put(JobManager.EXECUTOR_KEY, executor);
265         return jobMap;
266     }
267 
268     protected void unregisterJob(final String jobName) {
269         try {
270             scheduler.getScheduler().unscheduleJob(new TriggerKey(jobName, Scheduler.DEFAULT_GROUP));
271             scheduler.getScheduler().deleteJob(new JobKey(jobName, Scheduler.DEFAULT_GROUP));
272         } catch (SchedulerException e) {
273             LOG.error("Could not remove job " + jobName, e);
274         }
275     }
276 
277     @Override
278     public void unregister(final Task<?> task) {
279         unregisterJob(JobNamer.getJobKey(task).getName());
280     }
281 
282     @Override
283     public void unregister(final Report report) {
284         unregisterJob(JobNamer.getJobKey(report).getName());
285     }
286 
287     @Override
288     public int getOrder() {
289         return 500;
290     }
291 
292     @Transactional
293     @Override
294     public void load(final String domain, final DataSource datasource) {
295         if (disableQuartzInstance) {
296             String instanceId = "AUTO";
297             try {
298                 instanceId = scheduler.getScheduler().getSchedulerInstanceId();
299                 scheduler.getScheduler().standby();
300 
301                 LOG.info("Successfully put Quartz instance {} in standby", instanceId);
302             } catch (SchedulerException e) {
303                 LOG.error("Could not put Quartz instance {} in standby", instanceId, e);
304             }
305 
306             return;
307         }
308 
309         String notificationJobCronExp = AuthContextUtils.callAsAdmin(SyncopeConstants.MASTER_DOMAIN, () -> {
310             String result = StringUtils.EMPTY;
311 
312             String conf = confParamOps.get(
313                     SyncopeConstants.MASTER_DOMAIN, "notificationjob.cronExpression", null, String.class);
314             if (conf == null) {
315                 result = NotificationJob.DEFAULT_CRON_EXP;
316             } else if (!StringUtils.EMPTY.equals(conf)) {
317                 result = conf;
318             }
319             return result;
320         });
321 
322         AuthContextUtils.callAsAdmin(domain, () -> {
323             // 1. jobs for SchedTasks
324             Set<SchedTask> tasks = new HashSet<>(taskDAO.<SchedTask>findAll(TaskType.SCHEDULED));
325             tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PULL));
326             tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PUSH));
327 
328             boolean loadException = false;
329             for (Iterator<SchedTask> it = tasks.iterator(); it.hasNext() && !loadException;) {
330                 SchedTask task = it.next();
331                 try {
332                     register(task, task.getStartAt(), securityProperties.getAdminUser());
333                 } catch (Exception e) {
334                     LOG.error("While loading job instance for task " + task.getKey(), e);
335                     loadException = true;
336                 }
337             }
338 
339             if (loadException) {
340                 LOG.debug("Errors while loading job instances for tasks, aborting");
341             } else {
342                 // 2. jobs for Reports
343                 for (Iterator<Report> it = reportDAO.findAll().iterator(); it.hasNext() && !loadException;) {
344                     Report report = it.next();
345                     try {
346                         register(report, null, securityProperties.getAdminUser());
347                     } catch (Exception e) {
348                         LOG.error("While loading job instance for report " + report.getName(), e);
349                         loadException = true;
350                     }
351                 }
352 
353                 if (loadException) {
354                     LOG.debug("Errors while loading job instances for reports, aborting");
355                 }
356             }
357 
358             return null;
359         });
360 
361         if (SyncopeConstants.MASTER_DOMAIN.equals(domain)) {
362             // 3. NotificationJob
363             if (StringUtils.isBlank(notificationJobCronExp)) {
364                 LOG.debug("Empty value provided for {}'s cron, not registering anything on Quartz",
365                         NotificationJob.class.getSimpleName());
366             } else {
367                 LOG.debug("{}'s cron expression: {} - registering Quartz job and trigger",
368                         NotificationJob.class.getSimpleName(), notificationJobCronExp);
369 
370                 try {
371                     Map<String, Object> jobData = createJobMapForExecutionContext(securityProperties.getAdminUser());
372                     registerJob(
373                             NOTIFICATION_JOB.getName(),
374                             NotificationJob.class,
375                             notificationJobCronExp,
376                             null,
377                             jobData);
378                 } catch (Exception e) {
379                     LOG.error("While loading {} instance", NotificationJob.class.getSimpleName(), e);
380                 }
381             }
382 
383             // 4. SystemLoadReporterJob (fixed schedule, every minute)
384             LOG.debug("Registering {}", SystemLoadReporterJob.class);
385             try {
386                 Map<String, Object> jobData = createJobMapForExecutionContext(securityProperties.getAdminUser());
387                 registerJob(
388                         StringUtils.uncapitalize(SystemLoadReporterJob.class.getSimpleName()),
389                         SystemLoadReporterJob.class,
390                         "0 * * * * ?",
391                         null,
392                         jobData);
393             } catch (Exception e) {
394                 LOG.error("While loading {} instance", SystemLoadReporterJob.class.getSimpleName(), e);
395             }
396         }
397     }
398 
399     @Override
400     public void unload(final String domain) {
401         AuthContextUtils.callAsAdmin(domain, () -> {
402             // 1. jobs for SchedTasks
403             Set<SchedTask> tasks = new HashSet<>(taskDAO.<SchedTask>findAll(TaskType.SCHEDULED));
404             tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PULL));
405             tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PUSH));
406 
407             tasks.forEach(task -> {
408                 try {
409                     unregister(task);
410                 } catch (Exception e) {
411                     LOG.error("While unloading job instance for task " + task.getKey(), e);
412                 }
413             });
414 
415             // 2. jobs for Reports
416             reportDAO.findAll().forEach(report -> {
417                 try {
418                     unregister(report);
419                 } catch (Exception e) {
420                     LOG.error("While unloading job instance for report " + report.getName(), e);
421                 }
422             });
423 
424             return null;
425         });
426     }
427 }