001// Licensed under the Apache License, Version 2.0 (the "License");
002// you may not use this file except in compliance with the License.
003// You may obtain a copy of the License at
004//
005// http://www.apache.org/licenses/LICENSE-2.0
006//
007// Unless required by applicable law or agreed to in writing, software
008// distributed under the License is distributed on an "AS IS" BASIS,
009// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
010// See the License for the specific language governing permissions and
011// limitations under the License.
012
013package org.apache.tapestry5.ioc.internal.services.cron;
014
015import org.apache.tapestry5.ioc.Invokable;
016import org.apache.tapestry5.ioc.annotations.PostInjection;
017import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
018import org.apache.tapestry5.ioc.services.ParallelExecutor;
019import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
020import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
021import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
022import org.apache.tapestry5.ioc.services.cron.Schedule;
023import org.slf4j.Logger;
024
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Set;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.concurrent.locks.Lock;
031import java.util.concurrent.locks.ReentrantLock;
032
033public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
034{
035    private final ParallelExecutor parallelExecutor;
036
037    private final Logger logger;
038
039    // Synchronized by jobLock
040    private final List<Job> jobs = CollectionFactory.newList();
041
042    private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
043
044    private transient boolean shutdown;
045
046    private static final long FIVE_MINUTES = 5 * 60 * 1000;
047
048    private final AtomicInteger jobIdAllocator = new AtomicInteger();
049
050    private final Lock jobLock = new ReentrantLock();
051
052    private class Job implements PeriodicJob, Invokable<Void>
053    {
054        final int jobId = jobIdAllocator.incrementAndGet();
055
056        private final Schedule schedule;
057
058        private final String name;
059
060        private final Runnable runnableJob;
061
062        private boolean executing, canceled;
063
064        private long nextExecution;
065
066        public Job(Schedule schedule, String name, Runnable runnableJob)
067        {
068            this.schedule = schedule;
069            this.name = name;
070            this.runnableJob = runnableJob;
071
072            nextExecution = schedule.firstExecution();
073        }
074
075        @Override
076        public String getName()
077        {
078            return name;
079        }
080
081        public long getNextExecution()
082        {
083            try
084            {
085                jobLock.lock();
086                return nextExecution;
087            } finally
088            {
089                jobLock.unlock();
090            }
091        }
092
093
094        @Override
095        public boolean isExecuting()
096        {
097            try
098            {
099                jobLock.lock();
100                return executing;
101            } finally
102            {
103                jobLock.unlock();
104            }
105        }
106
107        @Override
108        public boolean isCanceled()
109        {
110            try
111            {
112                jobLock.lock();
113                return canceled;
114            } finally
115            {
116                jobLock.unlock();
117            }
118        }
119
120        @Override
121        public void cancel()
122        {
123            try
124            {
125                jobLock.lock();
126
127                canceled = true;
128
129                if (!executing)
130                {
131                    removeJob(this);
132                }
133
134                // Otherwise, it will be caught when the job finishes execution.
135            } finally
136            {
137                jobLock.unlock();
138            }
139        }
140
141        @Override
142        public String toString()
143        {
144            StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
145
146            builder.append(", (").append(name).append(')');
147
148            if (executing)
149            {
150                builder.append(", executing");
151            }
152
153            if (canceled)
154            {
155                builder.append(", canceled");
156            } else
157            {
158                builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
159            }
160
161            return builder.append(']').toString();
162        }
163
164        /**
165         * Starts execution of the job; this sets the executing flag, calculates the next execution time,
166         * and uses the ParallelExecutor to run the job.
167         */
168        void start()
169        {
170            try
171            {
172                jobLock.lock();
173                executing = true;
174
175                // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
176                // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
177                // overlapping executions of the Job on a more rigid schedule.  Use Quartz.
178
179                nextExecution = schedule.nextExecution(nextExecution);
180
181                parallelExecutor.invoke(this);
182            } finally
183            {
184                jobLock.unlock();
185            }
186
187            if (logger.isTraceEnabled())
188            {
189                logger.trace(this + " sent for execution");
190            }
191        }
192
193        void cleanupAfterExecution()
194        {
195            try
196            {
197                if (logger.isTraceEnabled())
198                {
199                    logger.trace(this + " execution complete");
200                }
201
202                executing = false;
203
204                if (canceled)
205                {
206                    removeJob(this);
207                } else
208                {
209                    // Again, naive but necessary.
210                    thread.interrupt();
211                }
212            } finally
213            {
214                jobLock.unlock();
215            }
216        }
217
218        @Override
219        public Void invoke()
220        {
221            logger.debug("Executing job #{} ({})", jobId, name);
222
223            try
224            {
225                runnableJob.run();
226            } finally
227            {
228                cleanupAfterExecution();
229            }
230
231            return null;
232        }
233
234    }
235
236    public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
237    {
238        this.parallelExecutor = parallelExecutor;
239        this.logger = logger;
240    }
241
242    @PostInjection
243    public void start(RegistryShutdownHub hub)
244    {
245        hub.addRegistryShutdownListener(new Runnable()
246        {
247            @Override
248            public void run()
249            {
250                registryDidShutdown();
251            }
252        });
253
254        thread.start();
255    }
256
257
258    void removeJob(Job job)
259    {
260        if (logger.isDebugEnabled())
261        {
262            logger.debug("Removing " + job);
263        }
264
265        try
266        {
267            jobLock.lock();
268            jobs.remove(job);
269        } finally
270        {
271            jobLock.unlock();
272        }
273    }
274
275
276    @Override
277    public PeriodicJob addJob(Schedule schedule, String name, Runnable job)
278    {
279        assert schedule != null;
280        assert name != null;
281        assert job != null;
282
283        Job periodicJob = new Job(schedule, name, job);
284
285        try
286        {
287            jobLock.lock();
288
289            jobs.add(periodicJob);
290        } finally
291        {
292            jobLock.unlock();
293        }
294
295        if (logger.isDebugEnabled())
296        {
297            logger.debug("Added " + periodicJob);
298        }
299
300        // Wake the thread so that it can start the job, if necessary.
301
302        // Technically, this is only necessary if the new job is scheduled earlier
303        // than any job currently in the list of jobs, but this naive implementation
304        // is simpler.
305        thread.interrupt();
306
307        return periodicJob;
308    }
309
310    @Override
311    public void run()
312    {
313        while (!shutdown)
314        {
315            long nextExecution = executeCurrentBatch();
316
317            try
318            {
319                long delay = nextExecution - System.currentTimeMillis();
320
321                if (logger.isTraceEnabled())
322                {
323                    logger.trace(String.format("Sleeping for %,d ms", delay));
324                }
325
326                if (delay > 0)
327                {
328                    Thread.sleep(delay);
329                }
330            } catch (InterruptedException
331                    ex)
332            {
333                // Ignored; the thread is interrupted() to shut it down,
334                // or to have it execute a new batch.
335
336                logger.trace("Interrupted");
337            }
338        }
339    }
340
341    private void registryDidShutdown()
342    {
343        shutdown = true;
344
345        thread.interrupt();
346    }
347
348    /**
349     * Finds jobs and executes jobs that are ready to be executed.
350     *
351     * @return the next execution time (from the non-executing job that is scheduled earliest for execution).
352     */
353    private long executeCurrentBatch()
354    {
355        long now = System.currentTimeMillis();
356        long nextExecution = now + FIVE_MINUTES;
357
358        try
359        {
360            jobLock.lock();
361            // TAP5-2455
362            Set<Job> jobsToCancel = null;
363
364            for (Job job : jobs)
365            {
366                if (job.isExecuting())
367                {
368                    continue;
369                }
370
371                long jobNextExecution = job.getNextExecution();
372
373                if (jobNextExecution <= 0)
374                {
375                    if (jobsToCancel == null)
376                    {
377                        jobsToCancel = new HashSet<PeriodicExecutorImpl.Job>();
378                    }
379                    jobsToCancel.add(job);
380                } else if (jobNextExecution <= now)
381                {
382                    job.start();
383                } else
384                {
385                    nextExecution = Math.min(nextExecution, jobNextExecution);
386                }
387            }
388            if (jobsToCancel != null)
389            {
390                for (Job job : jobsToCancel)
391                {
392                    job.cancel();
393                }
394            }
395        } finally
396        {
397            jobLock.unlock();
398        }
399
400        return nextExecution;
401    }
402
403
404}