ScheduledThreadPoolExecutor.java revision 51b1b6997fd3f980076b8081f7f1165ccc2a4008
1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37import java.util.concurrent.atomic.*;
38import java.util.concurrent.locks.*;
39import java.util.*;
40
41/**
42 * A {@link ThreadPoolExecutor} that can additionally schedule
43 * commands to run after a given delay, or to execute
44 * periodically. This class is preferable to {@link java.util.Timer}
45 * when multiple worker threads are needed, or when the additional
46 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
47 * this class extends) are required.
48 *
49 * <p>Delayed tasks execute no sooner than they are enabled, but
50 * without any real-time guarantees about when, after they are
51 * enabled, they will commence. Tasks scheduled for exactly the same
52 * execution time are enabled in first-in-first-out (FIFO) order of
53 * submission.
54 *
55 * <p>When a submitted task is cancelled before it is run, execution
56 * is suppressed. By default, such a cancelled task is not
57 * automatically removed from the work queue until its delay
58 * elapses. While this enables further inspection and monitoring, it
59 * may also cause unbounded retention of cancelled tasks. To avoid
60 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
61 * causes tasks to be immediately removed from the work queue at
62 * time of cancellation.
63 *
64 * <p>Successive executions of a task scheduled via
65 * {@code scheduleAtFixedRate} or
66 * {@code scheduleWithFixedDelay} do not overlap. While different
67 * executions may be performed by different threads, the effects of
68 * prior executions <a
69 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
70 * those of subsequent ones.
71 *
72 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
73 * of the inherited tuning methods are not useful for it. In
74 * particular, because it acts as a fixed-sized pool using
75 * {@code corePoolSize} threads and an unbounded queue, adjustments
76 * to {@code maximumPoolSize} have no useful effect. Additionally, it
77 * is almost never a good idea to set {@code corePoolSize} to zero or
78 * use {@code allowCoreThreadTimeOut} because this may leave the pool
79 * without threads to handle tasks once they become eligible to run.
80 *
81 * <p><b>Extension notes:</b> This class overrides the
82 * {@link ThreadPoolExecutor#execute execute} and
83 * {@link AbstractExecutorService#submit(Runnable) submit}
84 * methods to generate internal {@link ScheduledFuture} objects to
85 * control per-task delays and scheduling.  To preserve
86 * functionality, any further overrides of these methods in
87 * subclasses must invoke superclass versions, which effectively
88 * disables additional task customization.  However, this class
89 * provides alternative protected extension method
90 * {@code decorateTask} (one version each for {@code Runnable} and
91 * {@code Callable}) that can be used to customize the concrete task
92 * types used to execute commands entered via {@code execute},
93 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
94 * and {@code scheduleWithFixedDelay}.  By default, a
95 * {@code ScheduledThreadPoolExecutor} uses a task type extending
96 * {@link FutureTask}. However, this may be modified or replaced using
97 * subclasses of the form:
98 *
99 *  <pre> {@code
100 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
101 *
102 *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
103 *
104 *   protected <V> RunnableScheduledFuture<V> decorateTask(
105 *                Runnable r, RunnableScheduledFuture<V> task) {
106 *       return new CustomTask<V>(r, task);
107 *   }
108 *
109 *   protected <V> RunnableScheduledFuture<V> decorateTask(
110 *                Callable<V> c, RunnableScheduledFuture<V> task) {
111 *       return new CustomTask<V>(c, task);
112 *   }
113 *   // ... add constructors, etc.
114 * }}</pre>
115 *
116 * @since 1.5
117 * @author Doug Lea
118 */
119public class ScheduledThreadPoolExecutor
120        extends ThreadPoolExecutor
121        implements ScheduledExecutorService {
122
123    /*
124     * This class specializes ThreadPoolExecutor implementation by
125     *
126     * 1. Using a custom task type, ScheduledFutureTask for
127     *    tasks, even those that don't require scheduling (i.e.,
128     *    those submitted using ExecutorService execute, not
129     *    ScheduledExecutorService methods) which are treated as
130     *    delayed tasks with a delay of zero.
131     *
132     * 2. Using a custom queue (DelayedWorkQueue), a variant of
133     *    unbounded DelayQueue. The lack of capacity constraint and
134     *    the fact that corePoolSize and maximumPoolSize are
135     *    effectively identical simplifies some execution mechanics
136     *    (see delayedExecute) compared to ThreadPoolExecutor.
137     *
138     * 3. Supporting optional run-after-shutdown parameters, which
139     *    leads to overrides of shutdown methods to remove and cancel
140     *    tasks that should NOT be run after shutdown, as well as
141     *    different recheck logic when task (re)submission overlaps
142     *    with a shutdown.
143     *
144     * 4. Task decoration methods to allow interception and
145     *    instrumentation, which are needed because subclasses cannot
146     *    otherwise override submit methods to get this effect. These
147     *    don't have any impact on pool control logic though.
148     */
149
150    /**
151     * False if should cancel/suppress periodic tasks on shutdown.
152     */
153    private volatile boolean continueExistingPeriodicTasksAfterShutdown;
154
155    /**
156     * False if should cancel non-periodic tasks on shutdown.
157     */
158    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
159
160    /**
161     * True if ScheduledFutureTask.cancel should remove from queue
162     */
163    private volatile boolean removeOnCancel = false;
164
165    /**
166     * Sequence number to break scheduling ties, and in turn to
167     * guarantee FIFO order among tied entries.
168     */
169    private static final AtomicLong sequencer = new AtomicLong(0);
170
171    /**
172     * Returns current nanosecond time.
173     */
174    final long now() {
175        return System.nanoTime();
176    }
177
178    private class ScheduledFutureTask<V>
179            extends FutureTask<V> implements RunnableScheduledFuture<V> {
180
181        /** Sequence number to break ties FIFO */
182        private final long sequenceNumber;
183
184        /** The time the task is enabled to execute in nanoTime units */
185        private long time;
186
187        /**
188         * Period in nanoseconds for repeating tasks.  A positive
189         * value indicates fixed-rate execution.  A negative value
190         * indicates fixed-delay execution.  A value of 0 indicates a
191         * non-repeating task.
192         */
193        private final long period;
194
195        /** The actual task to be re-enqueued by reExecutePeriodic */
196        RunnableScheduledFuture<V> outerTask = this;
197
198        /**
199         * Index into delay queue, to support faster cancellation.
200         */
201        int heapIndex;
202
203        /**
204         * Creates a one-shot action with given nanoTime-based trigger time.
205         */
206        ScheduledFutureTask(Runnable r, V result, long ns) {
207            super(r, result);
208            this.time = ns;
209            this.period = 0;
210            this.sequenceNumber = sequencer.getAndIncrement();
211        }
212
213        /**
214         * Creates a periodic action with given nano time and period.
215         */
216        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
217            super(r, result);
218            this.time = ns;
219            this.period = period;
220            this.sequenceNumber = sequencer.getAndIncrement();
221        }
222
223        /**
224         * Creates a one-shot action with given nanoTime-based trigger.
225         */
226        ScheduledFutureTask(Callable<V> callable, long ns) {
227            super(callable);
228            this.time = ns;
229            this.period = 0;
230            this.sequenceNumber = sequencer.getAndIncrement();
231        }
232
233        public long getDelay(TimeUnit unit) {
234            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
235        }
236
237        public int compareTo(Delayed other) {
238            if (other == this) // compare zero ONLY if same object
239                return 0;
240            if (other instanceof ScheduledFutureTask) {
241                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
242                long diff = time - x.time;
243                if (diff < 0)
244                    return -1;
245                else if (diff > 0)
246                    return 1;
247                else if (sequenceNumber < x.sequenceNumber)
248                    return -1;
249                else
250                    return 1;
251            }
252            long d = (getDelay(TimeUnit.NANOSECONDS) -
253                      other.getDelay(TimeUnit.NANOSECONDS));
254            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
255        }
256
257        /**
258         * Returns true if this is a periodic (not a one-shot) action.
259         *
260         * @return true if periodic
261         */
262        public boolean isPeriodic() {
263            return period != 0;
264        }
265
266        /**
267         * Sets the next time to run for a periodic task.
268         */
269        private void setNextRunTime() {
270            long p = period;
271            if (p > 0)
272                time += p;
273            else
274                time = triggerTime(-p);
275        }
276
277        public boolean cancel(boolean mayInterruptIfRunning) {
278            boolean cancelled = super.cancel(mayInterruptIfRunning);
279            if (cancelled && removeOnCancel && heapIndex >= 0)
280                remove(this);
281            return cancelled;
282        }
283
284        /**
285         * Overrides FutureTask version so as to reset/requeue if periodic.
286         */
287        public void run() {
288            boolean periodic = isPeriodic();
289            if (!canRunInCurrentRunState(periodic))
290                cancel(false);
291            else if (!periodic)
292                ScheduledFutureTask.super.run();
293            else if (ScheduledFutureTask.super.runAndReset()) {
294                setNextRunTime();
295                reExecutePeriodic(outerTask);
296            }
297        }
298    }
299
300    /**
301     * Returns true if can run a task given current run state
302     * and run-after-shutdown parameters.
303     *
304     * @param periodic true if this task periodic, false if delayed
305     */
306    boolean canRunInCurrentRunState(boolean periodic) {
307        return isRunningOrShutdown(periodic ?
308                                   continueExistingPeriodicTasksAfterShutdown :
309                                   executeExistingDelayedTasksAfterShutdown);
310    }
311
312    /**
313     * Main execution method for delayed or periodic tasks.  If pool
314     * is shut down, rejects the task. Otherwise adds task to queue
315     * and starts a thread, if necessary, to run it.  (We cannot
316     * prestart the thread to run the task because the task (probably)
317     * shouldn't be run yet,) If the pool is shut down while the task
318     * is being added, cancel and remove it if required by state and
319     * run-after-shutdown parameters.
320     *
321     * @param task the task
322     */
323    private void delayedExecute(RunnableScheduledFuture<?> task) {
324        if (isShutdown())
325            reject(task);
326        else {
327            super.getQueue().add(task);
328            if (isShutdown() &&
329                !canRunInCurrentRunState(task.isPeriodic()) &&
330                remove(task))
331                task.cancel(false);
332            else
333                ensurePrestart();
334        }
335    }
336
337    /**
338     * Requeues a periodic task unless current run state precludes it.
339     * Same idea as delayedExecute except drops task rather than rejecting.
340     *
341     * @param task the task
342     */
343    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
344        if (canRunInCurrentRunState(true)) {
345            super.getQueue().add(task);
346            if (!canRunInCurrentRunState(true) && remove(task))
347                task.cancel(false);
348            else
349                ensurePrestart();
350        }
351    }
352
353    /**
354     * Cancels and clears the queue of all tasks that should not be run
355     * due to shutdown policy.  Invoked within super.shutdown.
356     */
357    @Override void onShutdown() {
358        BlockingQueue<Runnable> q = super.getQueue();
359        boolean keepDelayed =
360            getExecuteExistingDelayedTasksAfterShutdownPolicy();
361        boolean keepPeriodic =
362            getContinueExistingPeriodicTasksAfterShutdownPolicy();
363        if (!keepDelayed && !keepPeriodic) {
364            for (Object e : q.toArray())
365                if (e instanceof RunnableScheduledFuture<?>)
366                    ((RunnableScheduledFuture<?>) e).cancel(false);
367            q.clear();
368        }
369        else {
370            // Traverse snapshot to avoid iterator exceptions
371            for (Object e : q.toArray()) {
372                if (e instanceof RunnableScheduledFuture) {
373                    RunnableScheduledFuture<?> t =
374                        (RunnableScheduledFuture<?>)e;
375                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
376                        t.isCancelled()) { // also remove if already cancelled
377                        if (q.remove(t))
378                            t.cancel(false);
379                    }
380                }
381            }
382        }
383        tryTerminate();
384    }
385
386    /**
387     * Modifies or replaces the task used to execute a runnable.
388     * This method can be used to override the concrete
389     * class used for managing internal tasks.
390     * The default implementation simply returns the given task.
391     *
392     * @param runnable the submitted Runnable
393     * @param task the task created to execute the runnable
394     * @return a task that can execute the runnable
395     * @since 1.6
396     */
397    protected <V> RunnableScheduledFuture<V> decorateTask(
398        Runnable runnable, RunnableScheduledFuture<V> task) {
399        return task;
400    }
401
402    /**
403     * Modifies or replaces the task used to execute a callable.
404     * This method can be used to override the concrete
405     * class used for managing internal tasks.
406     * The default implementation simply returns the given task.
407     *
408     * @param callable the submitted Callable
409     * @param task the task created to execute the callable
410     * @return a task that can execute the callable
411     * @since 1.6
412     */
413    protected <V> RunnableScheduledFuture<V> decorateTask(
414        Callable<V> callable, RunnableScheduledFuture<V> task) {
415        return task;
416    }
417
418    /**
419     * Creates a new {@code ScheduledThreadPoolExecutor} with the
420     * given core pool size.
421     *
422     * @param corePoolSize the number of threads to keep in the pool, even
423     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
424     * @throws IllegalArgumentException if {@code corePoolSize < 0}
425     */
426    public ScheduledThreadPoolExecutor(int corePoolSize) {
427        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
428              new DelayedWorkQueue());
429    }
430
431    /**
432     * Creates a new {@code ScheduledThreadPoolExecutor} with the
433     * given initial parameters.
434     *
435     * @param corePoolSize the number of threads to keep in the pool, even
436     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
437     * @param threadFactory the factory to use when the executor
438     *        creates a new thread
439     * @throws IllegalArgumentException if {@code corePoolSize < 0}
440     * @throws NullPointerException if {@code threadFactory} is null
441     */
442    public ScheduledThreadPoolExecutor(int corePoolSize,
443                                       ThreadFactory threadFactory) {
444        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
445              new DelayedWorkQueue(), threadFactory);
446    }
447
448    /**
449     * Creates a new ScheduledThreadPoolExecutor with the given
450     * initial parameters.
451     *
452     * @param corePoolSize the number of threads to keep in the pool, even
453     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
454     * @param handler the handler to use when execution is blocked
455     *        because the thread bounds and queue capacities are reached
456     * @throws IllegalArgumentException if {@code corePoolSize < 0}
457     * @throws NullPointerException if {@code handler} is null
458     */
459    public ScheduledThreadPoolExecutor(int corePoolSize,
460                                       RejectedExecutionHandler handler) {
461        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
462              new DelayedWorkQueue(), handler);
463    }
464
465    /**
466     * Creates a new ScheduledThreadPoolExecutor with the given
467     * initial parameters.
468     *
469     * @param corePoolSize the number of threads to keep in the pool, even
470     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
471     * @param threadFactory the factory to use when the executor
472     *        creates a new thread
473     * @param handler the handler to use when execution is blocked
474     *        because the thread bounds and queue capacities are reached
475     * @throws IllegalArgumentException if {@code corePoolSize < 0}
476     * @throws NullPointerException if {@code threadFactory} or
477     *         {@code handler} is null
478     */
479    public ScheduledThreadPoolExecutor(int corePoolSize,
480                                       ThreadFactory threadFactory,
481                                       RejectedExecutionHandler handler) {
482        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
483              new DelayedWorkQueue(), threadFactory, handler);
484    }
485
486    /**
487     * Returns the trigger time of a delayed action.
488     */
489    private long triggerTime(long delay, TimeUnit unit) {
490        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
491    }
492
493    /**
494     * Returns the trigger time of a delayed action.
495     */
496    long triggerTime(long delay) {
497        return now() +
498            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
499    }
500
501    /**
502     * Constrains the values of all delays in the queue to be within
503     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
504     * This may occur if a task is eligible to be dequeued, but has
505     * not yet been, while some other task is added with a delay of
506     * Long.MAX_VALUE.
507     */
508    private long overflowFree(long delay) {
509        Delayed head = (Delayed) super.getQueue().peek();
510        if (head != null) {
511            long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
512            if (headDelay < 0 && (delay - headDelay < 0))
513                delay = Long.MAX_VALUE + headDelay;
514        }
515        return delay;
516    }
517
518    /**
519     * @throws RejectedExecutionException {@inheritDoc}
520     * @throws NullPointerException       {@inheritDoc}
521     */
522    public ScheduledFuture<?> schedule(Runnable command,
523                                       long delay,
524                                       TimeUnit unit) {
525        if (command == null || unit == null)
526            throw new NullPointerException();
527        RunnableScheduledFuture<?> t = decorateTask(command,
528            new ScheduledFutureTask<Void>(command, null,
529                                          triggerTime(delay, unit)));
530        delayedExecute(t);
531        return t;
532    }
533
534    /**
535     * @throws RejectedExecutionException {@inheritDoc}
536     * @throws NullPointerException       {@inheritDoc}
537     */
538    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
539                                           long delay,
540                                           TimeUnit unit) {
541        if (callable == null || unit == null)
542            throw new NullPointerException();
543        RunnableScheduledFuture<V> t = decorateTask(callable,
544            new ScheduledFutureTask<V>(callable,
545                                       triggerTime(delay, unit)));
546        delayedExecute(t);
547        return t;
548    }
549
550    /**
551     * @throws RejectedExecutionException {@inheritDoc}
552     * @throws NullPointerException       {@inheritDoc}
553     * @throws IllegalArgumentException   {@inheritDoc}
554     */
555    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
556                                                  long initialDelay,
557                                                  long period,
558                                                  TimeUnit unit) {
559        if (command == null || unit == null)
560            throw new NullPointerException();
561        if (period <= 0)
562            throw new IllegalArgumentException();
563        ScheduledFutureTask<Void> sft =
564            new ScheduledFutureTask<Void>(command,
565                                          null,
566                                          triggerTime(initialDelay, unit),
567                                          unit.toNanos(period));
568        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
569        sft.outerTask = t;
570        delayedExecute(t);
571        return t;
572    }
573
574    /**
575     * @throws RejectedExecutionException {@inheritDoc}
576     * @throws NullPointerException       {@inheritDoc}
577     * @throws IllegalArgumentException   {@inheritDoc}
578     */
579    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
580                                                     long initialDelay,
581                                                     long delay,
582                                                     TimeUnit unit) {
583        if (command == null || unit == null)
584            throw new NullPointerException();
585        if (delay <= 0)
586            throw new IllegalArgumentException();
587        ScheduledFutureTask<Void> sft =
588            new ScheduledFutureTask<Void>(command,
589                                          null,
590                                          triggerTime(initialDelay, unit),
591                                          unit.toNanos(-delay));
592        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
593        sft.outerTask = t;
594        delayedExecute(t);
595        return t;
596    }
597
598    /**
599     * Executes {@code command} with zero required delay.
600     * This has effect equivalent to
601     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
602     * Note that inspections of the queue and of the list returned by
603     * {@code shutdownNow} will access the zero-delayed
604     * {@link ScheduledFuture}, not the {@code command} itself.
605     *
606     * <p>A consequence of the use of {@code ScheduledFuture} objects is
607     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
608     * called with a null second {@code Throwable} argument, even if the
609     * {@code command} terminated abruptly.  Instead, the {@code Throwable}
610     * thrown by such a task can be obtained via {@link Future#get}.
611     *
612     * @throws RejectedExecutionException at discretion of
613     *         {@code RejectedExecutionHandler}, if the task
614     *         cannot be accepted for execution because the
615     *         executor has been shut down
616     * @throws NullPointerException {@inheritDoc}
617     */
618    public void execute(Runnable command) {
619        schedule(command, 0, TimeUnit.NANOSECONDS);
620    }
621
622    // Override AbstractExecutorService methods
623
624    /**
625     * @throws RejectedExecutionException {@inheritDoc}
626     * @throws NullPointerException       {@inheritDoc}
627     */
628    public Future<?> submit(Runnable task) {
629        return schedule(task, 0, TimeUnit.NANOSECONDS);
630    }
631
632    /**
633     * @throws RejectedExecutionException {@inheritDoc}
634     * @throws NullPointerException       {@inheritDoc}
635     */
636    public <T> Future<T> submit(Runnable task, T result) {
637        return schedule(Executors.callable(task, result),
638                        0, TimeUnit.NANOSECONDS);
639    }
640
641    /**
642     * @throws RejectedExecutionException {@inheritDoc}
643     * @throws NullPointerException       {@inheritDoc}
644     */
645    public <T> Future<T> submit(Callable<T> task) {
646        return schedule(task, 0, TimeUnit.NANOSECONDS);
647    }
648
649    /**
650     * Sets the policy on whether to continue executing existing
651     * periodic tasks even when this executor has been {@code shutdown}.
652     * In this case, these tasks will only terminate upon
653     * {@code shutdownNow} or after setting the policy to
654     * {@code false} when already shutdown.
655     * This value is by default {@code false}.
656     *
657     * @param value if {@code true}, continue after shutdown, else don't.
658     * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
659     */
660    public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
661        continueExistingPeriodicTasksAfterShutdown = value;
662        if (!value && isShutdown())
663            onShutdown();
664    }
665
666    /**
667     * Gets the policy on whether to continue executing existing
668     * periodic tasks even when this executor has been {@code shutdown}.
669     * In this case, these tasks will only terminate upon
670     * {@code shutdownNow} or after setting the policy to
671     * {@code false} when already shutdown.
672     * This value is by default {@code false}.
673     *
674     * @return {@code true} if will continue after shutdown
675     * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
676     */
677    public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
678        return continueExistingPeriodicTasksAfterShutdown;
679    }
680
681    /**
682     * Sets the policy on whether to execute existing delayed
683     * tasks even when this executor has been {@code shutdown}.
684     * In this case, these tasks will only terminate upon
685     * {@code shutdownNow}, or after setting the policy to
686     * {@code false} when already shutdown.
687     * This value is by default {@code true}.
688     *
689     * @param value if {@code true}, execute after shutdown, else don't.
690     * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
691     */
692    public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
693        executeExistingDelayedTasksAfterShutdown = value;
694        if (!value && isShutdown())
695            onShutdown();
696    }
697
698    /**
699     * Gets the policy on whether to execute existing delayed
700     * tasks even when this executor has been {@code shutdown}.
701     * In this case, these tasks will only terminate upon
702     * {@code shutdownNow}, or after setting the policy to
703     * {@code false} when already shutdown.
704     * This value is by default {@code true}.
705     *
706     * @return {@code true} if will execute after shutdown
707     * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
708     */
709    public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
710        return executeExistingDelayedTasksAfterShutdown;
711    }
712
713    /**
714     * Sets the policy on whether cancelled tasks should be immediately
715     * removed from the work queue at time of cancellation.  This value is
716     * by default {@code false}.
717     *
718     * @param value if {@code true}, remove on cancellation, else don't
719     * @see #getRemoveOnCancelPolicy
720     * @since 1.7
721     */
722    public void setRemoveOnCancelPolicy(boolean value) {
723        removeOnCancel = value;
724    }
725
726    /**
727     * Gets the policy on whether cancelled tasks should be immediately
728     * removed from the work queue at time of cancellation.  This value is
729     * by default {@code false}.
730     *
731     * @return {@code true} if cancelled tasks are immediately removed
732     *         from the queue
733     * @see #setRemoveOnCancelPolicy
734     * @since 1.7
735     */
736    public boolean getRemoveOnCancelPolicy() {
737        return removeOnCancel;
738    }
739
740    /**
741     * Initiates an orderly shutdown in which previously submitted
742     * tasks are executed, but no new tasks will be accepted.
743     * Invocation has no additional effect if already shut down.
744     *
745     * <p>This method does not wait for previously submitted tasks to
746     * complete execution.  Use {@link #awaitTermination awaitTermination}
747     * to do that.
748     *
749     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
750     * has been set {@code false}, existing delayed tasks whose delays
751     * have not yet elapsed are cancelled.  And unless the {@code
752     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
753     * {@code true}, future executions of existing periodic tasks will
754     * be cancelled.
755     *
756     * @throws SecurityException {@inheritDoc}
757     */
758    public void shutdown() {
759        super.shutdown();
760    }
761
762    /**
763     * Attempts to stop all actively executing tasks, halts the
764     * processing of waiting tasks, and returns a list of the tasks
765     * that were awaiting execution.
766     *
767     * <p>This method does not wait for actively executing tasks to
768     * terminate.  Use {@link #awaitTermination awaitTermination} to
769     * do that.
770     *
771     * <p>There are no guarantees beyond best-effort attempts to stop
772     * processing actively executing tasks.  This implementation
773     * cancels tasks via {@link Thread#interrupt}, so any task that
774     * fails to respond to interrupts may never terminate.
775     *
776     * @return list of tasks that never commenced execution.
777     *         Each element of this list is a {@link ScheduledFuture},
778     *         including those tasks submitted using {@code execute},
779     *         which are for scheduling purposes used as the basis of a
780     *         zero-delay {@code ScheduledFuture}.
781     * @throws SecurityException {@inheritDoc}
782     */
783    public List<Runnable> shutdownNow() {
784        return super.shutdownNow();
785    }
786
787    /**
788     * Returns the task queue used by this executor.  Each element of
789     * this queue is a {@link ScheduledFuture}, including those
790     * tasks submitted using {@code execute} which are for scheduling
791     * purposes used as the basis of a zero-delay
792     * {@code ScheduledFuture}.  Iteration over this queue is
793     * <em>not</em> guaranteed to traverse tasks in the order in
794     * which they will execute.
795     *
796     * @return the task queue
797     */
798    public BlockingQueue<Runnable> getQueue() {
799        return super.getQueue();
800    }
801
802    /**
803     * Specialized delay queue. To mesh with TPE declarations, this
804     * class must be declared as a BlockingQueue<Runnable> even though
805     * it can only hold RunnableScheduledFutures.
806     */
807    static class DelayedWorkQueue extends AbstractQueue<Runnable>
808        implements BlockingQueue<Runnable> {
809
810        /*
811         * A DelayedWorkQueue is based on a heap-based data structure
812         * like those in DelayQueue and PriorityQueue, except that
813         * every ScheduledFutureTask also records its index into the
814         * heap array. This eliminates the need to find a task upon
815         * cancellation, greatly speeding up removal (down from O(n)
816         * to O(log n)), and reducing garbage retention that would
817         * otherwise occur by waiting for the element to rise to top
818         * before clearing. But because the queue may also hold
819         * RunnableScheduledFutures that are not ScheduledFutureTasks,
820         * we are not guaranteed to have such indices available, in
821         * which case we fall back to linear search. (We expect that
822         * most tasks will not be decorated, and that the faster cases
823         * will be much more common.)
824         *
825         * All heap operations must record index changes -- mainly
826         * within siftUp and siftDown. Upon removal, a task's
827         * heapIndex is set to -1. Note that ScheduledFutureTasks can
828         * appear at most once in the queue (this need not be true for
829         * other kinds of tasks or work queues), so are uniquely
830         * identified by heapIndex.
831         */
832
833        private static final int INITIAL_CAPACITY = 16;
834        private RunnableScheduledFuture[] queue =
835            new RunnableScheduledFuture[INITIAL_CAPACITY];
836        private final ReentrantLock lock = new ReentrantLock();
837        private int size = 0;
838
839        /**
840         * Thread designated to wait for the task at the head of the
841         * queue.  This variant of the Leader-Follower pattern
842         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
843         * minimize unnecessary timed waiting.  When a thread becomes
844         * the leader, it waits only for the next delay to elapse, but
845         * other threads await indefinitely.  The leader thread must
846         * signal some other thread before returning from take() or
847         * poll(...), unless some other thread becomes leader in the
848         * interim.  Whenever the head of the queue is replaced with a
849         * task with an earlier expiration time, the leader field is
850         * invalidated by being reset to null, and some waiting
851         * thread, but not necessarily the current leader, is
852         * signalled.  So waiting threads must be prepared to acquire
853         * and lose leadership while waiting.
854         */
855        private Thread leader = null;
856
857        /**
858         * Condition signalled when a newer task becomes available at the
859         * head of the queue or a new thread may need to become leader.
860         */
861        private final Condition available = lock.newCondition();
862
863        /**
864         * Set f's heapIndex if it is a ScheduledFutureTask.
865         */
866        private void setIndex(RunnableScheduledFuture f, int idx) {
867            if (f instanceof ScheduledFutureTask)
868                ((ScheduledFutureTask)f).heapIndex = idx;
869        }
870
871        /**
872         * Sift element added at bottom up to its heap-ordered spot.
873         * Call only when holding lock.
874         */
875        private void siftUp(int k, RunnableScheduledFuture key) {
876            while (k > 0) {
877                int parent = (k - 1) >>> 1;
878                RunnableScheduledFuture e = queue[parent];
879                if (key.compareTo(e) >= 0)
880                    break;
881                queue[k] = e;
882                setIndex(e, k);
883                k = parent;
884            }
885            queue[k] = key;
886            setIndex(key, k);
887        }
888
889        /**
890         * Sift element added at top down to its heap-ordered spot.
891         * Call only when holding lock.
892         */
893        private void siftDown(int k, RunnableScheduledFuture key) {
894            int half = size >>> 1;
895            while (k < half) {
896                int child = (k << 1) + 1;
897                RunnableScheduledFuture c = queue[child];
898                int right = child + 1;
899                if (right < size && c.compareTo(queue[right]) > 0)
900                    c = queue[child = right];
901                if (key.compareTo(c) <= 0)
902                    break;
903                queue[k] = c;
904                setIndex(c, k);
905                k = child;
906            }
907            queue[k] = key;
908            setIndex(key, k);
909        }
910
911        /**
912         * Resize the heap array.  Call only when holding lock.
913         */
914        private void grow() {
915            int oldCapacity = queue.length;
916            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
917            if (newCapacity < 0) // overflow
918                newCapacity = Integer.MAX_VALUE;
919            queue = Arrays.copyOf(queue, newCapacity);
920        }
921
922        /**
923         * Find index of given object, or -1 if absent
924         */
925        private int indexOf(Object x) {
926            if (x != null) {
927                if (x instanceof ScheduledFutureTask) {
928                    int i = ((ScheduledFutureTask) x).heapIndex;
929                    // Sanity check; x could conceivably be a
930                    // ScheduledFutureTask from some other pool.
931                    if (i >= 0 && i < size && queue[i] == x)
932                        return i;
933                } else {
934                    for (int i = 0; i < size; i++)
935                        if (x.equals(queue[i]))
936                            return i;
937                }
938            }
939            return -1;
940        }
941
942        public boolean contains(Object x) {
943            final ReentrantLock lock = this.lock;
944            lock.lock();
945            try {
946                return indexOf(x) != -1;
947            } finally {
948                lock.unlock();
949            }
950        }
951
952        public boolean remove(Object x) {
953            final ReentrantLock lock = this.lock;
954            lock.lock();
955            try {
956                int i = indexOf(x);
957                if (i < 0)
958                    return false;
959
960                setIndex(queue[i], -1);
961                int s = --size;
962                RunnableScheduledFuture replacement = queue[s];
963                queue[s] = null;
964                if (s != i) {
965                    siftDown(i, replacement);
966                    if (queue[i] == replacement)
967                        siftUp(i, replacement);
968                }
969                return true;
970            } finally {
971                lock.unlock();
972            }
973        }
974
975        public int size() {
976            final ReentrantLock lock = this.lock;
977            lock.lock();
978            try {
979                return size;
980            } finally {
981                lock.unlock();
982            }
983        }
984
985        public boolean isEmpty() {
986            return size() == 0;
987        }
988
989        public int remainingCapacity() {
990            return Integer.MAX_VALUE;
991        }
992
993        public RunnableScheduledFuture peek() {
994            final ReentrantLock lock = this.lock;
995            lock.lock();
996            try {
997                return queue[0];
998            } finally {
999                lock.unlock();
1000            }
1001        }
1002
1003        public boolean offer(Runnable x) {
1004            if (x == null)
1005                throw new NullPointerException();
1006            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
1007            final ReentrantLock lock = this.lock;
1008            lock.lock();
1009            try {
1010                int i = size;
1011                if (i >= queue.length)
1012                    grow();
1013                size = i + 1;
1014                if (i == 0) {
1015                    queue[0] = e;
1016                    setIndex(e, 0);
1017                } else {
1018                    siftUp(i, e);
1019                }
1020                if (queue[0] == e) {
1021                    leader = null;
1022                    available.signal();
1023                }
1024            } finally {
1025                lock.unlock();
1026            }
1027            return true;
1028        }
1029
1030        public void put(Runnable e) {
1031            offer(e);
1032        }
1033
1034        public boolean add(Runnable e) {
1035            return offer(e);
1036        }
1037
1038        public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1039            return offer(e);
1040        }
1041
1042        /**
1043         * Performs common bookkeeping for poll and take: Replaces
1044         * first element with last and sifts it down.  Call only when
1045         * holding lock.
1046         * @param f the task to remove and return
1047         */
1048        private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1049            int s = --size;
1050            RunnableScheduledFuture x = queue[s];
1051            queue[s] = null;
1052            if (s != 0)
1053                siftDown(0, x);
1054            setIndex(f, -1);
1055            return f;
1056        }
1057
1058        public RunnableScheduledFuture poll() {
1059            final ReentrantLock lock = this.lock;
1060            lock.lock();
1061            try {
1062                RunnableScheduledFuture first = queue[0];
1063                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1064                    return null;
1065                else
1066                    return finishPoll(first);
1067            } finally {
1068                lock.unlock();
1069            }
1070        }
1071
1072        public RunnableScheduledFuture take() throws InterruptedException {
1073            final ReentrantLock lock = this.lock;
1074            lock.lockInterruptibly();
1075            try {
1076                for (;;) {
1077                    RunnableScheduledFuture first = queue[0];
1078                    if (first == null)
1079                        available.await();
1080                    else {
1081                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
1082                        if (delay <= 0)
1083                            return finishPoll(first);
1084                        else if (leader != null)
1085                            available.await();
1086                        else {
1087                            Thread thisThread = Thread.currentThread();
1088                            leader = thisThread;
1089                            try {
1090                                available.awaitNanos(delay);
1091                            } finally {
1092                                if (leader == thisThread)
1093                                    leader = null;
1094                            }
1095                        }
1096                    }
1097                }
1098            } finally {
1099                if (leader == null && queue[0] != null)
1100                    available.signal();
1101                lock.unlock();
1102            }
1103        }
1104
1105        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1106            throws InterruptedException {
1107            long nanos = unit.toNanos(timeout);
1108            final ReentrantLock lock = this.lock;
1109            lock.lockInterruptibly();
1110            try {
1111                for (;;) {
1112                    RunnableScheduledFuture first = queue[0];
1113                    if (first == null) {
1114                        if (nanos <= 0)
1115                            return null;
1116                        else
1117                            nanos = available.awaitNanos(nanos);
1118                    } else {
1119                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
1120                        if (delay <= 0)
1121                            return finishPoll(first);
1122                        if (nanos <= 0)
1123                            return null;
1124                        if (nanos < delay || leader != null)
1125                            nanos = available.awaitNanos(nanos);
1126                        else {
1127                            Thread thisThread = Thread.currentThread();
1128                            leader = thisThread;
1129                            try {
1130                                long timeLeft = available.awaitNanos(delay);
1131                                nanos -= delay - timeLeft;
1132                            } finally {
1133                                if (leader == thisThread)
1134                                    leader = null;
1135                            }
1136                        }
1137                    }
1138                }
1139            } finally {
1140                if (leader == null && queue[0] != null)
1141                    available.signal();
1142                lock.unlock();
1143            }
1144        }
1145
1146        public void clear() {
1147            final ReentrantLock lock = this.lock;
1148            lock.lock();
1149            try {
1150                for (int i = 0; i < size; i++) {
1151                    RunnableScheduledFuture t = queue[i];
1152                    if (t != null) {
1153                        queue[i] = null;
1154                        setIndex(t, -1);
1155                    }
1156                }
1157                size = 0;
1158            } finally {
1159                lock.unlock();
1160            }
1161        }
1162
1163        /**
1164         * Return and remove first element only if it is expired.
1165         * Used only by drainTo.  Call only when holding lock.
1166         */
1167        private RunnableScheduledFuture pollExpired() {
1168            RunnableScheduledFuture first = queue[0];
1169            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1170                return null;
1171            return finishPoll(first);
1172        }
1173
1174        public int drainTo(Collection<? super Runnable> c) {
1175            if (c == null)
1176                throw new NullPointerException();
1177            if (c == this)
1178                throw new IllegalArgumentException();
1179            final ReentrantLock lock = this.lock;
1180            lock.lock();
1181            try {
1182                RunnableScheduledFuture first;
1183                int n = 0;
1184                while ((first = pollExpired()) != null) {
1185                    c.add(first);
1186                    ++n;
1187                }
1188                return n;
1189            } finally {
1190                lock.unlock();
1191            }
1192        }
1193
1194        public int drainTo(Collection<? super Runnable> c, int maxElements) {
1195            if (c == null)
1196                throw new NullPointerException();
1197            if (c == this)
1198                throw new IllegalArgumentException();
1199            if (maxElements <= 0)
1200                return 0;
1201            final ReentrantLock lock = this.lock;
1202            lock.lock();
1203            try {
1204                RunnableScheduledFuture first;
1205                int n = 0;
1206                while (n < maxElements && (first = pollExpired()) != null) {
1207                    c.add(first);
1208                    ++n;
1209                }
1210                return n;
1211            } finally {
1212                lock.unlock();
1213            }
1214        }
1215
1216        public Object[] toArray() {
1217            final ReentrantLock lock = this.lock;
1218            lock.lock();
1219            try {
1220                return Arrays.copyOf(queue, size, Object[].class);
1221            } finally {
1222                lock.unlock();
1223            }
1224        }
1225
1226        @SuppressWarnings("unchecked")
1227        public <T> T[] toArray(T[] a) {
1228            final ReentrantLock lock = this.lock;
1229            lock.lock();
1230            try {
1231                if (a.length < size)
1232                    return (T[]) Arrays.copyOf(queue, size, a.getClass());
1233                System.arraycopy(queue, 0, a, 0, size);
1234                if (a.length > size)
1235                    a[size] = null;
1236                return a;
1237            } finally {
1238                lock.unlock();
1239            }
1240        }
1241
1242        public Iterator<Runnable> iterator() {
1243            return new Itr(Arrays.copyOf(queue, size));
1244        }
1245
1246        /**
1247         * Snapshot iterator that works off copy of underlying q array.
1248         */
1249        private class Itr implements Iterator<Runnable> {
1250            final RunnableScheduledFuture[] array;
1251            int cursor = 0;     // index of next element to return
1252            int lastRet = -1;   // index of last element, or -1 if no such
1253
1254            Itr(RunnableScheduledFuture[] array) {
1255                this.array = array;
1256            }
1257
1258            public boolean hasNext() {
1259                return cursor < array.length;
1260            }
1261
1262            public Runnable next() {
1263                if (cursor >= array.length)
1264                    throw new NoSuchElementException();
1265                lastRet = cursor;
1266                return array[cursor++];
1267            }
1268
1269            public void remove() {
1270                if (lastRet < 0)
1271                    throw new IllegalStateException();
1272                DelayedWorkQueue.this.remove(array[lastRet]);
1273                lastRet = -1;
1274            }
1275        }
1276    }
1277}
1278