001// Licensed under the Apache License, Version 2.0 (the "License"); 002// you may not use this file except in compliance with the License. 003// You may obtain a copy of the License at 004// 005// http://www.apache.org/licenses/LICENSE-2.0 006// 007// Unless required by applicable law or agreed to in writing, software 008// distributed under the License is distributed on an "AS IS" BASIS, 009// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 010// See the License for the specific language governing permissions and 011// limitations under the License. 012 013package org.apache.tapestry5.ioc.internal.services.cron; 014 015import org.apache.tapestry5.ioc.Invokable; 016import org.apache.tapestry5.ioc.annotations.PostInjection; 017import org.apache.tapestry5.ioc.internal.util.CollectionFactory; 018import org.apache.tapestry5.ioc.services.ParallelExecutor; 019import org.apache.tapestry5.ioc.services.RegistryShutdownHub; 020import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor; 021import org.apache.tapestry5.ioc.services.cron.PeriodicJob; 022import org.apache.tapestry5.ioc.services.cron.Schedule; 023import org.slf4j.Logger; 024 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Set; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.locks.Lock; 031import java.util.concurrent.locks.ReentrantLock; 032 033public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable 034{ 035 private final ParallelExecutor parallelExecutor; 036 037 private final Logger logger; 038 039 // Synchronized by jobLock 040 private final List<Job> jobs = CollectionFactory.newList(); 041 042 private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor"); 043 044 private transient boolean shutdown; 045 046 private static final long FIVE_MINUTES = 5 * 60 * 1000; 047 048 private final AtomicInteger jobIdAllocator = new AtomicInteger(); 049 050 private final Lock jobLock = new ReentrantLock(); 051 052 private class Job implements PeriodicJob, Invokable<Void> 053 { 054 final int jobId = jobIdAllocator.incrementAndGet(); 055 056 private final Schedule schedule; 057 058 private final String name; 059 060 private final Runnable runnableJob; 061 062 private boolean executing, canceled; 063 064 private long nextExecution; 065 066 public Job(Schedule schedule, String name, Runnable runnableJob) 067 { 068 this.schedule = schedule; 069 this.name = name; 070 this.runnableJob = runnableJob; 071 072 nextExecution = schedule.firstExecution(); 073 } 074 075 @Override 076 public String getName() 077 { 078 return name; 079 } 080 081 public long getNextExecution() 082 { 083 try 084 { 085 jobLock.lock(); 086 return nextExecution; 087 } finally 088 { 089 jobLock.unlock(); 090 } 091 } 092 093 094 @Override 095 public boolean isExecuting() 096 { 097 try 098 { 099 jobLock.lock(); 100 return executing; 101 } finally 102 { 103 jobLock.unlock(); 104 } 105 } 106 107 @Override 108 public boolean isCanceled() 109 { 110 try 111 { 112 jobLock.lock(); 113 return canceled; 114 } finally 115 { 116 jobLock.unlock(); 117 } 118 } 119 120 @Override 121 public void cancel() 122 { 123 try 124 { 125 jobLock.lock(); 126 127 canceled = true; 128 129 if (!executing) 130 { 131 removeJob(this); 132 } 133 134 // Otherwise, it will be caught when the job finishes execution. 135 } finally 136 { 137 jobLock.unlock(); 138 } 139 } 140 141 @Override 142 public String toString() 143 { 144 StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId); 145 146 builder.append(", (").append(name).append(')'); 147 148 if (executing) 149 { 150 builder.append(", executing"); 151 } 152 153 if (canceled) 154 { 155 builder.append(", canceled"); 156 } else 157 { 158 builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution)); 159 } 160 161 return builder.append(']').toString(); 162 } 163 164 /** 165 * Starts execution of the job; this sets the executing flag, calculates the next execution time, 166 * and uses the ParallelExecutor to run the job. 167 */ 168 void start() 169 { 170 try 171 { 172 jobLock.lock(); 173 executing = true; 174 175 // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options 176 // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing 177 // overlapping executions of the Job on a more rigid schedule. Use Quartz. 178 179 nextExecution = schedule.nextExecution(nextExecution); 180 181 parallelExecutor.invoke(this); 182 } finally 183 { 184 jobLock.unlock(); 185 } 186 187 if (logger.isTraceEnabled()) 188 { 189 logger.trace(this + " sent for execution"); 190 } 191 } 192 193 void cleanupAfterExecution() 194 { 195 try 196 { 197 if (logger.isTraceEnabled()) 198 { 199 logger.trace(this + " execution complete"); 200 } 201 202 executing = false; 203 204 if (canceled) 205 { 206 removeJob(this); 207 } else 208 { 209 // Again, naive but necessary. 210 thread.interrupt(); 211 } 212 } finally 213 { 214 jobLock.unlock(); 215 } 216 } 217 218 @Override 219 public Void invoke() 220 { 221 logger.debug("Executing job #{} ({})", jobId, name); 222 223 try 224 { 225 runnableJob.run(); 226 } finally 227 { 228 cleanupAfterExecution(); 229 } 230 231 return null; 232 } 233 234 } 235 236 public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger) 237 { 238 this.parallelExecutor = parallelExecutor; 239 this.logger = logger; 240 } 241 242 @PostInjection 243 public void start(RegistryShutdownHub hub) 244 { 245 hub.addRegistryShutdownListener(new Runnable() 246 { 247 @Override 248 public void run() 249 { 250 registryDidShutdown(); 251 } 252 }); 253 254 thread.start(); 255 } 256 257 258 void removeJob(Job job) 259 { 260 if (logger.isDebugEnabled()) 261 { 262 logger.debug("Removing " + job); 263 } 264 265 try 266 { 267 jobLock.lock(); 268 jobs.remove(job); 269 } finally 270 { 271 jobLock.unlock(); 272 } 273 } 274 275 276 @Override 277 public PeriodicJob addJob(Schedule schedule, String name, Runnable job) 278 { 279 assert schedule != null; 280 assert name != null; 281 assert job != null; 282 283 Job periodicJob = new Job(schedule, name, job); 284 285 try 286 { 287 jobLock.lock(); 288 289 jobs.add(periodicJob); 290 } finally 291 { 292 jobLock.unlock(); 293 } 294 295 if (logger.isDebugEnabled()) 296 { 297 logger.debug("Added " + periodicJob); 298 } 299 300 // Wake the thread so that it can start the job, if necessary. 301 302 // Technically, this is only necessary if the new job is scheduled earlier 303 // than any job currently in the list of jobs, but this naive implementation 304 // is simpler. 305 thread.interrupt(); 306 307 return periodicJob; 308 } 309 310 @Override 311 public void run() 312 { 313 while (!shutdown) 314 { 315 long nextExecution = executeCurrentBatch(); 316 317 try 318 { 319 long delay = nextExecution - System.currentTimeMillis(); 320 321 if (logger.isTraceEnabled()) 322 { 323 logger.trace(String.format("Sleeping for %,d ms", delay)); 324 } 325 326 if (delay > 0) 327 { 328 Thread.sleep(delay); 329 } 330 } catch (InterruptedException 331 ex) 332 { 333 // Ignored; the thread is interrupted() to shut it down, 334 // or to have it execute a new batch. 335 336 logger.trace("Interrupted"); 337 } 338 } 339 } 340 341 private void registryDidShutdown() 342 { 343 shutdown = true; 344 345 thread.interrupt(); 346 } 347 348 /** 349 * Finds jobs and executes jobs that are ready to be executed. 350 * 351 * @return the next execution time (from the non-executing job that is scheduled earliest for execution). 352 */ 353 private long executeCurrentBatch() 354 { 355 long now = System.currentTimeMillis(); 356 long nextExecution = now + FIVE_MINUTES; 357 358 try 359 { 360 jobLock.lock(); 361 // TAP5-2455 362 Set<Job> jobsToCancel = null; 363 364 for (Job job : jobs) 365 { 366 if (job.isExecuting()) 367 { 368 continue; 369 } 370 371 long jobNextExecution = job.getNextExecution(); 372 373 if (jobNextExecution <= 0) 374 { 375 if (jobsToCancel == null) 376 { 377 jobsToCancel = new HashSet<PeriodicExecutorImpl.Job>(); 378 } 379 jobsToCancel.add(job); 380 } else if (jobNextExecution <= now) 381 { 382 job.start(); 383 } else 384 { 385 nextExecution = Math.min(nextExecution, jobNextExecution); 386 } 387 } 388 if (jobsToCancel != null) 389 { 390 for (Job job : jobsToCancel) 391 { 392 job.cancel(); 393 } 394 } 395 } finally 396 { 397 jobLock.unlock(); 398 } 399 400 return nextExecution; 401 } 402 403 404}