ScheduledThreadPoolExecutor.java revision 51b1b6997fd3f980076b8081f7f1165ccc2a4008
1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25/* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36package java.util.concurrent; 37import java.util.concurrent.atomic.*; 38import java.util.concurrent.locks.*; 39import java.util.*; 40 41/** 42 * A {@link ThreadPoolExecutor} that can additionally schedule 43 * commands to run after a given delay, or to execute 44 * periodically. This class is preferable to {@link java.util.Timer} 45 * when multiple worker threads are needed, or when the additional 46 * flexibility or capabilities of {@link ThreadPoolExecutor} (which 47 * this class extends) are required. 48 * 49 * <p>Delayed tasks execute no sooner than they are enabled, but 50 * without any real-time guarantees about when, after they are 51 * enabled, they will commence. Tasks scheduled for exactly the same 52 * execution time are enabled in first-in-first-out (FIFO) order of 53 * submission. 54 * 55 * <p>When a submitted task is cancelled before it is run, execution 56 * is suppressed. By default, such a cancelled task is not 57 * automatically removed from the work queue until its delay 58 * elapses. While this enables further inspection and monitoring, it 59 * may also cause unbounded retention of cancelled tasks. To avoid 60 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which 61 * causes tasks to be immediately removed from the work queue at 62 * time of cancellation. 63 * 64 * <p>Successive executions of a task scheduled via 65 * {@code scheduleAtFixedRate} or 66 * {@code scheduleWithFixedDelay} do not overlap. While different 67 * executions may be performed by different threads, the effects of 68 * prior executions <a 69 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 70 * those of subsequent ones. 71 * 72 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 73 * of the inherited tuning methods are not useful for it. In 74 * particular, because it acts as a fixed-sized pool using 75 * {@code corePoolSize} threads and an unbounded queue, adjustments 76 * to {@code maximumPoolSize} have no useful effect. Additionally, it 77 * is almost never a good idea to set {@code corePoolSize} to zero or 78 * use {@code allowCoreThreadTimeOut} because this may leave the pool 79 * without threads to handle tasks once they become eligible to run. 80 * 81 * <p><b>Extension notes:</b> This class overrides the 82 * {@link ThreadPoolExecutor#execute execute} and 83 * {@link AbstractExecutorService#submit(Runnable) submit} 84 * methods to generate internal {@link ScheduledFuture} objects to 85 * control per-task delays and scheduling. To preserve 86 * functionality, any further overrides of these methods in 87 * subclasses must invoke superclass versions, which effectively 88 * disables additional task customization. However, this class 89 * provides alternative protected extension method 90 * {@code decorateTask} (one version each for {@code Runnable} and 91 * {@code Callable}) that can be used to customize the concrete task 92 * types used to execute commands entered via {@code execute}, 93 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 94 * and {@code scheduleWithFixedDelay}. By default, a 95 * {@code ScheduledThreadPoolExecutor} uses a task type extending 96 * {@link FutureTask}. However, this may be modified or replaced using 97 * subclasses of the form: 98 * 99 * <pre> {@code 100 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 101 * 102 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 103 * 104 * protected <V> RunnableScheduledFuture<V> decorateTask( 105 * Runnable r, RunnableScheduledFuture<V> task) { 106 * return new CustomTask<V>(r, task); 107 * } 108 * 109 * protected <V> RunnableScheduledFuture<V> decorateTask( 110 * Callable<V> c, RunnableScheduledFuture<V> task) { 111 * return new CustomTask<V>(c, task); 112 * } 113 * // ... add constructors, etc. 114 * }}</pre> 115 * 116 * @since 1.5 117 * @author Doug Lea 118 */ 119public class ScheduledThreadPoolExecutor 120 extends ThreadPoolExecutor 121 implements ScheduledExecutorService { 122 123 /* 124 * This class specializes ThreadPoolExecutor implementation by 125 * 126 * 1. Using a custom task type, ScheduledFutureTask for 127 * tasks, even those that don't require scheduling (i.e., 128 * those submitted using ExecutorService execute, not 129 * ScheduledExecutorService methods) which are treated as 130 * delayed tasks with a delay of zero. 131 * 132 * 2. Using a custom queue (DelayedWorkQueue), a variant of 133 * unbounded DelayQueue. The lack of capacity constraint and 134 * the fact that corePoolSize and maximumPoolSize are 135 * effectively identical simplifies some execution mechanics 136 * (see delayedExecute) compared to ThreadPoolExecutor. 137 * 138 * 3. Supporting optional run-after-shutdown parameters, which 139 * leads to overrides of shutdown methods to remove and cancel 140 * tasks that should NOT be run after shutdown, as well as 141 * different recheck logic when task (re)submission overlaps 142 * with a shutdown. 143 * 144 * 4. Task decoration methods to allow interception and 145 * instrumentation, which are needed because subclasses cannot 146 * otherwise override submit methods to get this effect. These 147 * don't have any impact on pool control logic though. 148 */ 149 150 /** 151 * False if should cancel/suppress periodic tasks on shutdown. 152 */ 153 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 154 155 /** 156 * False if should cancel non-periodic tasks on shutdown. 157 */ 158 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 159 160 /** 161 * True if ScheduledFutureTask.cancel should remove from queue 162 */ 163 private volatile boolean removeOnCancel = false; 164 165 /** 166 * Sequence number to break scheduling ties, and in turn to 167 * guarantee FIFO order among tied entries. 168 */ 169 private static final AtomicLong sequencer = new AtomicLong(0); 170 171 /** 172 * Returns current nanosecond time. 173 */ 174 final long now() { 175 return System.nanoTime(); 176 } 177 178 private class ScheduledFutureTask<V> 179 extends FutureTask<V> implements RunnableScheduledFuture<V> { 180 181 /** Sequence number to break ties FIFO */ 182 private final long sequenceNumber; 183 184 /** The time the task is enabled to execute in nanoTime units */ 185 private long time; 186 187 /** 188 * Period in nanoseconds for repeating tasks. A positive 189 * value indicates fixed-rate execution. A negative value 190 * indicates fixed-delay execution. A value of 0 indicates a 191 * non-repeating task. 192 */ 193 private final long period; 194 195 /** The actual task to be re-enqueued by reExecutePeriodic */ 196 RunnableScheduledFuture<V> outerTask = this; 197 198 /** 199 * Index into delay queue, to support faster cancellation. 200 */ 201 int heapIndex; 202 203 /** 204 * Creates a one-shot action with given nanoTime-based trigger time. 205 */ 206 ScheduledFutureTask(Runnable r, V result, long ns) { 207 super(r, result); 208 this.time = ns; 209 this.period = 0; 210 this.sequenceNumber = sequencer.getAndIncrement(); 211 } 212 213 /** 214 * Creates a periodic action with given nano time and period. 215 */ 216 ScheduledFutureTask(Runnable r, V result, long ns, long period) { 217 super(r, result); 218 this.time = ns; 219 this.period = period; 220 this.sequenceNumber = sequencer.getAndIncrement(); 221 } 222 223 /** 224 * Creates a one-shot action with given nanoTime-based trigger. 225 */ 226 ScheduledFutureTask(Callable<V> callable, long ns) { 227 super(callable); 228 this.time = ns; 229 this.period = 0; 230 this.sequenceNumber = sequencer.getAndIncrement(); 231 } 232 233 public long getDelay(TimeUnit unit) { 234 return unit.convert(time - now(), TimeUnit.NANOSECONDS); 235 } 236 237 public int compareTo(Delayed other) { 238 if (other == this) // compare zero ONLY if same object 239 return 0; 240 if (other instanceof ScheduledFutureTask) { 241 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 242 long diff = time - x.time; 243 if (diff < 0) 244 return -1; 245 else if (diff > 0) 246 return 1; 247 else if (sequenceNumber < x.sequenceNumber) 248 return -1; 249 else 250 return 1; 251 } 252 long d = (getDelay(TimeUnit.NANOSECONDS) - 253 other.getDelay(TimeUnit.NANOSECONDS)); 254 return (d == 0) ? 0 : ((d < 0) ? -1 : 1); 255 } 256 257 /** 258 * Returns true if this is a periodic (not a one-shot) action. 259 * 260 * @return true if periodic 261 */ 262 public boolean isPeriodic() { 263 return period != 0; 264 } 265 266 /** 267 * Sets the next time to run for a periodic task. 268 */ 269 private void setNextRunTime() { 270 long p = period; 271 if (p > 0) 272 time += p; 273 else 274 time = triggerTime(-p); 275 } 276 277 public boolean cancel(boolean mayInterruptIfRunning) { 278 boolean cancelled = super.cancel(mayInterruptIfRunning); 279 if (cancelled && removeOnCancel && heapIndex >= 0) 280 remove(this); 281 return cancelled; 282 } 283 284 /** 285 * Overrides FutureTask version so as to reset/requeue if periodic. 286 */ 287 public void run() { 288 boolean periodic = isPeriodic(); 289 if (!canRunInCurrentRunState(periodic)) 290 cancel(false); 291 else if (!periodic) 292 ScheduledFutureTask.super.run(); 293 else if (ScheduledFutureTask.super.runAndReset()) { 294 setNextRunTime(); 295 reExecutePeriodic(outerTask); 296 } 297 } 298 } 299 300 /** 301 * Returns true if can run a task given current run state 302 * and run-after-shutdown parameters. 303 * 304 * @param periodic true if this task periodic, false if delayed 305 */ 306 boolean canRunInCurrentRunState(boolean periodic) { 307 return isRunningOrShutdown(periodic ? 308 continueExistingPeriodicTasksAfterShutdown : 309 executeExistingDelayedTasksAfterShutdown); 310 } 311 312 /** 313 * Main execution method for delayed or periodic tasks. If pool 314 * is shut down, rejects the task. Otherwise adds task to queue 315 * and starts a thread, if necessary, to run it. (We cannot 316 * prestart the thread to run the task because the task (probably) 317 * shouldn't be run yet,) If the pool is shut down while the task 318 * is being added, cancel and remove it if required by state and 319 * run-after-shutdown parameters. 320 * 321 * @param task the task 322 */ 323 private void delayedExecute(RunnableScheduledFuture<?> task) { 324 if (isShutdown()) 325 reject(task); 326 else { 327 super.getQueue().add(task); 328 if (isShutdown() && 329 !canRunInCurrentRunState(task.isPeriodic()) && 330 remove(task)) 331 task.cancel(false); 332 else 333 ensurePrestart(); 334 } 335 } 336 337 /** 338 * Requeues a periodic task unless current run state precludes it. 339 * Same idea as delayedExecute except drops task rather than rejecting. 340 * 341 * @param task the task 342 */ 343 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 344 if (canRunInCurrentRunState(true)) { 345 super.getQueue().add(task); 346 if (!canRunInCurrentRunState(true) && remove(task)) 347 task.cancel(false); 348 else 349 ensurePrestart(); 350 } 351 } 352 353 /** 354 * Cancels and clears the queue of all tasks that should not be run 355 * due to shutdown policy. Invoked within super.shutdown. 356 */ 357 @Override void onShutdown() { 358 BlockingQueue<Runnable> q = super.getQueue(); 359 boolean keepDelayed = 360 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 361 boolean keepPeriodic = 362 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 363 if (!keepDelayed && !keepPeriodic) { 364 for (Object e : q.toArray()) 365 if (e instanceof RunnableScheduledFuture<?>) 366 ((RunnableScheduledFuture<?>) e).cancel(false); 367 q.clear(); 368 } 369 else { 370 // Traverse snapshot to avoid iterator exceptions 371 for (Object e : q.toArray()) { 372 if (e instanceof RunnableScheduledFuture) { 373 RunnableScheduledFuture<?> t = 374 (RunnableScheduledFuture<?>)e; 375 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || 376 t.isCancelled()) { // also remove if already cancelled 377 if (q.remove(t)) 378 t.cancel(false); 379 } 380 } 381 } 382 } 383 tryTerminate(); 384 } 385 386 /** 387 * Modifies or replaces the task used to execute a runnable. 388 * This method can be used to override the concrete 389 * class used for managing internal tasks. 390 * The default implementation simply returns the given task. 391 * 392 * @param runnable the submitted Runnable 393 * @param task the task created to execute the runnable 394 * @return a task that can execute the runnable 395 * @since 1.6 396 */ 397 protected <V> RunnableScheduledFuture<V> decorateTask( 398 Runnable runnable, RunnableScheduledFuture<V> task) { 399 return task; 400 } 401 402 /** 403 * Modifies or replaces the task used to execute a callable. 404 * This method can be used to override the concrete 405 * class used for managing internal tasks. 406 * The default implementation simply returns the given task. 407 * 408 * @param callable the submitted Callable 409 * @param task the task created to execute the callable 410 * @return a task that can execute the callable 411 * @since 1.6 412 */ 413 protected <V> RunnableScheduledFuture<V> decorateTask( 414 Callable<V> callable, RunnableScheduledFuture<V> task) { 415 return task; 416 } 417 418 /** 419 * Creates a new {@code ScheduledThreadPoolExecutor} with the 420 * given core pool size. 421 * 422 * @param corePoolSize the number of threads to keep in the pool, even 423 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 424 * @throws IllegalArgumentException if {@code corePoolSize < 0} 425 */ 426 public ScheduledThreadPoolExecutor(int corePoolSize) { 427 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 428 new DelayedWorkQueue()); 429 } 430 431 /** 432 * Creates a new {@code ScheduledThreadPoolExecutor} with the 433 * given initial parameters. 434 * 435 * @param corePoolSize the number of threads to keep in the pool, even 436 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 437 * @param threadFactory the factory to use when the executor 438 * creates a new thread 439 * @throws IllegalArgumentException if {@code corePoolSize < 0} 440 * @throws NullPointerException if {@code threadFactory} is null 441 */ 442 public ScheduledThreadPoolExecutor(int corePoolSize, 443 ThreadFactory threadFactory) { 444 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 445 new DelayedWorkQueue(), threadFactory); 446 } 447 448 /** 449 * Creates a new ScheduledThreadPoolExecutor with the given 450 * initial parameters. 451 * 452 * @param corePoolSize the number of threads to keep in the pool, even 453 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 454 * @param handler the handler to use when execution is blocked 455 * because the thread bounds and queue capacities are reached 456 * @throws IllegalArgumentException if {@code corePoolSize < 0} 457 * @throws NullPointerException if {@code handler} is null 458 */ 459 public ScheduledThreadPoolExecutor(int corePoolSize, 460 RejectedExecutionHandler handler) { 461 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 462 new DelayedWorkQueue(), handler); 463 } 464 465 /** 466 * Creates a new ScheduledThreadPoolExecutor with the given 467 * initial parameters. 468 * 469 * @param corePoolSize the number of threads to keep in the pool, even 470 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 471 * @param threadFactory the factory to use when the executor 472 * creates a new thread 473 * @param handler the handler to use when execution is blocked 474 * because the thread bounds and queue capacities are reached 475 * @throws IllegalArgumentException if {@code corePoolSize < 0} 476 * @throws NullPointerException if {@code threadFactory} or 477 * {@code handler} is null 478 */ 479 public ScheduledThreadPoolExecutor(int corePoolSize, 480 ThreadFactory threadFactory, 481 RejectedExecutionHandler handler) { 482 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 483 new DelayedWorkQueue(), threadFactory, handler); 484 } 485 486 /** 487 * Returns the trigger time of a delayed action. 488 */ 489 private long triggerTime(long delay, TimeUnit unit) { 490 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 491 } 492 493 /** 494 * Returns the trigger time of a delayed action. 495 */ 496 long triggerTime(long delay) { 497 return now() + 498 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 499 } 500 501 /** 502 * Constrains the values of all delays in the queue to be within 503 * Long.MAX_VALUE of each other, to avoid overflow in compareTo. 504 * This may occur if a task is eligible to be dequeued, but has 505 * not yet been, while some other task is added with a delay of 506 * Long.MAX_VALUE. 507 */ 508 private long overflowFree(long delay) { 509 Delayed head = (Delayed) super.getQueue().peek(); 510 if (head != null) { 511 long headDelay = head.getDelay(TimeUnit.NANOSECONDS); 512 if (headDelay < 0 && (delay - headDelay < 0)) 513 delay = Long.MAX_VALUE + headDelay; 514 } 515 return delay; 516 } 517 518 /** 519 * @throws RejectedExecutionException {@inheritDoc} 520 * @throws NullPointerException {@inheritDoc} 521 */ 522 public ScheduledFuture<?> schedule(Runnable command, 523 long delay, 524 TimeUnit unit) { 525 if (command == null || unit == null) 526 throw new NullPointerException(); 527 RunnableScheduledFuture<?> t = decorateTask(command, 528 new ScheduledFutureTask<Void>(command, null, 529 triggerTime(delay, unit))); 530 delayedExecute(t); 531 return t; 532 } 533 534 /** 535 * @throws RejectedExecutionException {@inheritDoc} 536 * @throws NullPointerException {@inheritDoc} 537 */ 538 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 539 long delay, 540 TimeUnit unit) { 541 if (callable == null || unit == null) 542 throw new NullPointerException(); 543 RunnableScheduledFuture<V> t = decorateTask(callable, 544 new ScheduledFutureTask<V>(callable, 545 triggerTime(delay, unit))); 546 delayedExecute(t); 547 return t; 548 } 549 550 /** 551 * @throws RejectedExecutionException {@inheritDoc} 552 * @throws NullPointerException {@inheritDoc} 553 * @throws IllegalArgumentException {@inheritDoc} 554 */ 555 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 556 long initialDelay, 557 long period, 558 TimeUnit unit) { 559 if (command == null || unit == null) 560 throw new NullPointerException(); 561 if (period <= 0) 562 throw new IllegalArgumentException(); 563 ScheduledFutureTask<Void> sft = 564 new ScheduledFutureTask<Void>(command, 565 null, 566 triggerTime(initialDelay, unit), 567 unit.toNanos(period)); 568 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 569 sft.outerTask = t; 570 delayedExecute(t); 571 return t; 572 } 573 574 /** 575 * @throws RejectedExecutionException {@inheritDoc} 576 * @throws NullPointerException {@inheritDoc} 577 * @throws IllegalArgumentException {@inheritDoc} 578 */ 579 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 580 long initialDelay, 581 long delay, 582 TimeUnit unit) { 583 if (command == null || unit == null) 584 throw new NullPointerException(); 585 if (delay <= 0) 586 throw new IllegalArgumentException(); 587 ScheduledFutureTask<Void> sft = 588 new ScheduledFutureTask<Void>(command, 589 null, 590 triggerTime(initialDelay, unit), 591 unit.toNanos(-delay)); 592 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 593 sft.outerTask = t; 594 delayedExecute(t); 595 return t; 596 } 597 598 /** 599 * Executes {@code command} with zero required delay. 600 * This has effect equivalent to 601 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 602 * Note that inspections of the queue and of the list returned by 603 * {@code shutdownNow} will access the zero-delayed 604 * {@link ScheduledFuture}, not the {@code command} itself. 605 * 606 * <p>A consequence of the use of {@code ScheduledFuture} objects is 607 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 608 * called with a null second {@code Throwable} argument, even if the 609 * {@code command} terminated abruptly. Instead, the {@code Throwable} 610 * thrown by such a task can be obtained via {@link Future#get}. 611 * 612 * @throws RejectedExecutionException at discretion of 613 * {@code RejectedExecutionHandler}, if the task 614 * cannot be accepted for execution because the 615 * executor has been shut down 616 * @throws NullPointerException {@inheritDoc} 617 */ 618 public void execute(Runnable command) { 619 schedule(command, 0, TimeUnit.NANOSECONDS); 620 } 621 622 // Override AbstractExecutorService methods 623 624 /** 625 * @throws RejectedExecutionException {@inheritDoc} 626 * @throws NullPointerException {@inheritDoc} 627 */ 628 public Future<?> submit(Runnable task) { 629 return schedule(task, 0, TimeUnit.NANOSECONDS); 630 } 631 632 /** 633 * @throws RejectedExecutionException {@inheritDoc} 634 * @throws NullPointerException {@inheritDoc} 635 */ 636 public <T> Future<T> submit(Runnable task, T result) { 637 return schedule(Executors.callable(task, result), 638 0, TimeUnit.NANOSECONDS); 639 } 640 641 /** 642 * @throws RejectedExecutionException {@inheritDoc} 643 * @throws NullPointerException {@inheritDoc} 644 */ 645 public <T> Future<T> submit(Callable<T> task) { 646 return schedule(task, 0, TimeUnit.NANOSECONDS); 647 } 648 649 /** 650 * Sets the policy on whether to continue executing existing 651 * periodic tasks even when this executor has been {@code shutdown}. 652 * In this case, these tasks will only terminate upon 653 * {@code shutdownNow} or after setting the policy to 654 * {@code false} when already shutdown. 655 * This value is by default {@code false}. 656 * 657 * @param value if {@code true}, continue after shutdown, else don't. 658 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 659 */ 660 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 661 continueExistingPeriodicTasksAfterShutdown = value; 662 if (!value && isShutdown()) 663 onShutdown(); 664 } 665 666 /** 667 * Gets the policy on whether to continue executing existing 668 * periodic tasks even when this executor has been {@code shutdown}. 669 * In this case, these tasks will only terminate upon 670 * {@code shutdownNow} or after setting the policy to 671 * {@code false} when already shutdown. 672 * This value is by default {@code false}. 673 * 674 * @return {@code true} if will continue after shutdown 675 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 676 */ 677 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 678 return continueExistingPeriodicTasksAfterShutdown; 679 } 680 681 /** 682 * Sets the policy on whether to execute existing delayed 683 * tasks even when this executor has been {@code shutdown}. 684 * In this case, these tasks will only terminate upon 685 * {@code shutdownNow}, or after setting the policy to 686 * {@code false} when already shutdown. 687 * This value is by default {@code true}. 688 * 689 * @param value if {@code true}, execute after shutdown, else don't. 690 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 691 */ 692 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 693 executeExistingDelayedTasksAfterShutdown = value; 694 if (!value && isShutdown()) 695 onShutdown(); 696 } 697 698 /** 699 * Gets the policy on whether to execute existing delayed 700 * tasks even when this executor has been {@code shutdown}. 701 * In this case, these tasks will only terminate upon 702 * {@code shutdownNow}, or after setting the policy to 703 * {@code false} when already shutdown. 704 * This value is by default {@code true}. 705 * 706 * @return {@code true} if will execute after shutdown 707 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 708 */ 709 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 710 return executeExistingDelayedTasksAfterShutdown; 711 } 712 713 /** 714 * Sets the policy on whether cancelled tasks should be immediately 715 * removed from the work queue at time of cancellation. This value is 716 * by default {@code false}. 717 * 718 * @param value if {@code true}, remove on cancellation, else don't 719 * @see #getRemoveOnCancelPolicy 720 * @since 1.7 721 */ 722 public void setRemoveOnCancelPolicy(boolean value) { 723 removeOnCancel = value; 724 } 725 726 /** 727 * Gets the policy on whether cancelled tasks should be immediately 728 * removed from the work queue at time of cancellation. This value is 729 * by default {@code false}. 730 * 731 * @return {@code true} if cancelled tasks are immediately removed 732 * from the queue 733 * @see #setRemoveOnCancelPolicy 734 * @since 1.7 735 */ 736 public boolean getRemoveOnCancelPolicy() { 737 return removeOnCancel; 738 } 739 740 /** 741 * Initiates an orderly shutdown in which previously submitted 742 * tasks are executed, but no new tasks will be accepted. 743 * Invocation has no additional effect if already shut down. 744 * 745 * <p>This method does not wait for previously submitted tasks to 746 * complete execution. Use {@link #awaitTermination awaitTermination} 747 * to do that. 748 * 749 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 750 * has been set {@code false}, existing delayed tasks whose delays 751 * have not yet elapsed are cancelled. And unless the {@code 752 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 753 * {@code true}, future executions of existing periodic tasks will 754 * be cancelled. 755 * 756 * @throws SecurityException {@inheritDoc} 757 */ 758 public void shutdown() { 759 super.shutdown(); 760 } 761 762 /** 763 * Attempts to stop all actively executing tasks, halts the 764 * processing of waiting tasks, and returns a list of the tasks 765 * that were awaiting execution. 766 * 767 * <p>This method does not wait for actively executing tasks to 768 * terminate. Use {@link #awaitTermination awaitTermination} to 769 * do that. 770 * 771 * <p>There are no guarantees beyond best-effort attempts to stop 772 * processing actively executing tasks. This implementation 773 * cancels tasks via {@link Thread#interrupt}, so any task that 774 * fails to respond to interrupts may never terminate. 775 * 776 * @return list of tasks that never commenced execution. 777 * Each element of this list is a {@link ScheduledFuture}, 778 * including those tasks submitted using {@code execute}, 779 * which are for scheduling purposes used as the basis of a 780 * zero-delay {@code ScheduledFuture}. 781 * @throws SecurityException {@inheritDoc} 782 */ 783 public List<Runnable> shutdownNow() { 784 return super.shutdownNow(); 785 } 786 787 /** 788 * Returns the task queue used by this executor. Each element of 789 * this queue is a {@link ScheduledFuture}, including those 790 * tasks submitted using {@code execute} which are for scheduling 791 * purposes used as the basis of a zero-delay 792 * {@code ScheduledFuture}. Iteration over this queue is 793 * <em>not</em> guaranteed to traverse tasks in the order in 794 * which they will execute. 795 * 796 * @return the task queue 797 */ 798 public BlockingQueue<Runnable> getQueue() { 799 return super.getQueue(); 800 } 801 802 /** 803 * Specialized delay queue. To mesh with TPE declarations, this 804 * class must be declared as a BlockingQueue<Runnable> even though 805 * it can only hold RunnableScheduledFutures. 806 */ 807 static class DelayedWorkQueue extends AbstractQueue<Runnable> 808 implements BlockingQueue<Runnable> { 809 810 /* 811 * A DelayedWorkQueue is based on a heap-based data structure 812 * like those in DelayQueue and PriorityQueue, except that 813 * every ScheduledFutureTask also records its index into the 814 * heap array. This eliminates the need to find a task upon 815 * cancellation, greatly speeding up removal (down from O(n) 816 * to O(log n)), and reducing garbage retention that would 817 * otherwise occur by waiting for the element to rise to top 818 * before clearing. But because the queue may also hold 819 * RunnableScheduledFutures that are not ScheduledFutureTasks, 820 * we are not guaranteed to have such indices available, in 821 * which case we fall back to linear search. (We expect that 822 * most tasks will not be decorated, and that the faster cases 823 * will be much more common.) 824 * 825 * All heap operations must record index changes -- mainly 826 * within siftUp and siftDown. Upon removal, a task's 827 * heapIndex is set to -1. Note that ScheduledFutureTasks can 828 * appear at most once in the queue (this need not be true for 829 * other kinds of tasks or work queues), so are uniquely 830 * identified by heapIndex. 831 */ 832 833 private static final int INITIAL_CAPACITY = 16; 834 private RunnableScheduledFuture[] queue = 835 new RunnableScheduledFuture[INITIAL_CAPACITY]; 836 private final ReentrantLock lock = new ReentrantLock(); 837 private int size = 0; 838 839 /** 840 * Thread designated to wait for the task at the head of the 841 * queue. This variant of the Leader-Follower pattern 842 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 843 * minimize unnecessary timed waiting. When a thread becomes 844 * the leader, it waits only for the next delay to elapse, but 845 * other threads await indefinitely. The leader thread must 846 * signal some other thread before returning from take() or 847 * poll(...), unless some other thread becomes leader in the 848 * interim. Whenever the head of the queue is replaced with a 849 * task with an earlier expiration time, the leader field is 850 * invalidated by being reset to null, and some waiting 851 * thread, but not necessarily the current leader, is 852 * signalled. So waiting threads must be prepared to acquire 853 * and lose leadership while waiting. 854 */ 855 private Thread leader = null; 856 857 /** 858 * Condition signalled when a newer task becomes available at the 859 * head of the queue or a new thread may need to become leader. 860 */ 861 private final Condition available = lock.newCondition(); 862 863 /** 864 * Set f's heapIndex if it is a ScheduledFutureTask. 865 */ 866 private void setIndex(RunnableScheduledFuture f, int idx) { 867 if (f instanceof ScheduledFutureTask) 868 ((ScheduledFutureTask)f).heapIndex = idx; 869 } 870 871 /** 872 * Sift element added at bottom up to its heap-ordered spot. 873 * Call only when holding lock. 874 */ 875 private void siftUp(int k, RunnableScheduledFuture key) { 876 while (k > 0) { 877 int parent = (k - 1) >>> 1; 878 RunnableScheduledFuture e = queue[parent]; 879 if (key.compareTo(e) >= 0) 880 break; 881 queue[k] = e; 882 setIndex(e, k); 883 k = parent; 884 } 885 queue[k] = key; 886 setIndex(key, k); 887 } 888 889 /** 890 * Sift element added at top down to its heap-ordered spot. 891 * Call only when holding lock. 892 */ 893 private void siftDown(int k, RunnableScheduledFuture key) { 894 int half = size >>> 1; 895 while (k < half) { 896 int child = (k << 1) + 1; 897 RunnableScheduledFuture c = queue[child]; 898 int right = child + 1; 899 if (right < size && c.compareTo(queue[right]) > 0) 900 c = queue[child = right]; 901 if (key.compareTo(c) <= 0) 902 break; 903 queue[k] = c; 904 setIndex(c, k); 905 k = child; 906 } 907 queue[k] = key; 908 setIndex(key, k); 909 } 910 911 /** 912 * Resize the heap array. Call only when holding lock. 913 */ 914 private void grow() { 915 int oldCapacity = queue.length; 916 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 917 if (newCapacity < 0) // overflow 918 newCapacity = Integer.MAX_VALUE; 919 queue = Arrays.copyOf(queue, newCapacity); 920 } 921 922 /** 923 * Find index of given object, or -1 if absent 924 */ 925 private int indexOf(Object x) { 926 if (x != null) { 927 if (x instanceof ScheduledFutureTask) { 928 int i = ((ScheduledFutureTask) x).heapIndex; 929 // Sanity check; x could conceivably be a 930 // ScheduledFutureTask from some other pool. 931 if (i >= 0 && i < size && queue[i] == x) 932 return i; 933 } else { 934 for (int i = 0; i < size; i++) 935 if (x.equals(queue[i])) 936 return i; 937 } 938 } 939 return -1; 940 } 941 942 public boolean contains(Object x) { 943 final ReentrantLock lock = this.lock; 944 lock.lock(); 945 try { 946 return indexOf(x) != -1; 947 } finally { 948 lock.unlock(); 949 } 950 } 951 952 public boolean remove(Object x) { 953 final ReentrantLock lock = this.lock; 954 lock.lock(); 955 try { 956 int i = indexOf(x); 957 if (i < 0) 958 return false; 959 960 setIndex(queue[i], -1); 961 int s = --size; 962 RunnableScheduledFuture replacement = queue[s]; 963 queue[s] = null; 964 if (s != i) { 965 siftDown(i, replacement); 966 if (queue[i] == replacement) 967 siftUp(i, replacement); 968 } 969 return true; 970 } finally { 971 lock.unlock(); 972 } 973 } 974 975 public int size() { 976 final ReentrantLock lock = this.lock; 977 lock.lock(); 978 try { 979 return size; 980 } finally { 981 lock.unlock(); 982 } 983 } 984 985 public boolean isEmpty() { 986 return size() == 0; 987 } 988 989 public int remainingCapacity() { 990 return Integer.MAX_VALUE; 991 } 992 993 public RunnableScheduledFuture peek() { 994 final ReentrantLock lock = this.lock; 995 lock.lock(); 996 try { 997 return queue[0]; 998 } finally { 999 lock.unlock(); 1000 } 1001 } 1002 1003 public boolean offer(Runnable x) { 1004 if (x == null) 1005 throw new NullPointerException(); 1006 RunnableScheduledFuture e = (RunnableScheduledFuture)x; 1007 final ReentrantLock lock = this.lock; 1008 lock.lock(); 1009 try { 1010 int i = size; 1011 if (i >= queue.length) 1012 grow(); 1013 size = i + 1; 1014 if (i == 0) { 1015 queue[0] = e; 1016 setIndex(e, 0); 1017 } else { 1018 siftUp(i, e); 1019 } 1020 if (queue[0] == e) { 1021 leader = null; 1022 available.signal(); 1023 } 1024 } finally { 1025 lock.unlock(); 1026 } 1027 return true; 1028 } 1029 1030 public void put(Runnable e) { 1031 offer(e); 1032 } 1033 1034 public boolean add(Runnable e) { 1035 return offer(e); 1036 } 1037 1038 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 1039 return offer(e); 1040 } 1041 1042 /** 1043 * Performs common bookkeeping for poll and take: Replaces 1044 * first element with last and sifts it down. Call only when 1045 * holding lock. 1046 * @param f the task to remove and return 1047 */ 1048 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { 1049 int s = --size; 1050 RunnableScheduledFuture x = queue[s]; 1051 queue[s] = null; 1052 if (s != 0) 1053 siftDown(0, x); 1054 setIndex(f, -1); 1055 return f; 1056 } 1057 1058 public RunnableScheduledFuture poll() { 1059 final ReentrantLock lock = this.lock; 1060 lock.lock(); 1061 try { 1062 RunnableScheduledFuture first = queue[0]; 1063 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 1064 return null; 1065 else 1066 return finishPoll(first); 1067 } finally { 1068 lock.unlock(); 1069 } 1070 } 1071 1072 public RunnableScheduledFuture take() throws InterruptedException { 1073 final ReentrantLock lock = this.lock; 1074 lock.lockInterruptibly(); 1075 try { 1076 for (;;) { 1077 RunnableScheduledFuture first = queue[0]; 1078 if (first == null) 1079 available.await(); 1080 else { 1081 long delay = first.getDelay(TimeUnit.NANOSECONDS); 1082 if (delay <= 0) 1083 return finishPoll(first); 1084 else if (leader != null) 1085 available.await(); 1086 else { 1087 Thread thisThread = Thread.currentThread(); 1088 leader = thisThread; 1089 try { 1090 available.awaitNanos(delay); 1091 } finally { 1092 if (leader == thisThread) 1093 leader = null; 1094 } 1095 } 1096 } 1097 } 1098 } finally { 1099 if (leader == null && queue[0] != null) 1100 available.signal(); 1101 lock.unlock(); 1102 } 1103 } 1104 1105 public RunnableScheduledFuture poll(long timeout, TimeUnit unit) 1106 throws InterruptedException { 1107 long nanos = unit.toNanos(timeout); 1108 final ReentrantLock lock = this.lock; 1109 lock.lockInterruptibly(); 1110 try { 1111 for (;;) { 1112 RunnableScheduledFuture first = queue[0]; 1113 if (first == null) { 1114 if (nanos <= 0) 1115 return null; 1116 else 1117 nanos = available.awaitNanos(nanos); 1118 } else { 1119 long delay = first.getDelay(TimeUnit.NANOSECONDS); 1120 if (delay <= 0) 1121 return finishPoll(first); 1122 if (nanos <= 0) 1123 return null; 1124 if (nanos < delay || leader != null) 1125 nanos = available.awaitNanos(nanos); 1126 else { 1127 Thread thisThread = Thread.currentThread(); 1128 leader = thisThread; 1129 try { 1130 long timeLeft = available.awaitNanos(delay); 1131 nanos -= delay - timeLeft; 1132 } finally { 1133 if (leader == thisThread) 1134 leader = null; 1135 } 1136 } 1137 } 1138 } 1139 } finally { 1140 if (leader == null && queue[0] != null) 1141 available.signal(); 1142 lock.unlock(); 1143 } 1144 } 1145 1146 public void clear() { 1147 final ReentrantLock lock = this.lock; 1148 lock.lock(); 1149 try { 1150 for (int i = 0; i < size; i++) { 1151 RunnableScheduledFuture t = queue[i]; 1152 if (t != null) { 1153 queue[i] = null; 1154 setIndex(t, -1); 1155 } 1156 } 1157 size = 0; 1158 } finally { 1159 lock.unlock(); 1160 } 1161 } 1162 1163 /** 1164 * Return and remove first element only if it is expired. 1165 * Used only by drainTo. Call only when holding lock. 1166 */ 1167 private RunnableScheduledFuture pollExpired() { 1168 RunnableScheduledFuture first = queue[0]; 1169 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 1170 return null; 1171 return finishPoll(first); 1172 } 1173 1174 public int drainTo(Collection<? super Runnable> c) { 1175 if (c == null) 1176 throw new NullPointerException(); 1177 if (c == this) 1178 throw new IllegalArgumentException(); 1179 final ReentrantLock lock = this.lock; 1180 lock.lock(); 1181 try { 1182 RunnableScheduledFuture first; 1183 int n = 0; 1184 while ((first = pollExpired()) != null) { 1185 c.add(first); 1186 ++n; 1187 } 1188 return n; 1189 } finally { 1190 lock.unlock(); 1191 } 1192 } 1193 1194 public int drainTo(Collection<? super Runnable> c, int maxElements) { 1195 if (c == null) 1196 throw new NullPointerException(); 1197 if (c == this) 1198 throw new IllegalArgumentException(); 1199 if (maxElements <= 0) 1200 return 0; 1201 final ReentrantLock lock = this.lock; 1202 lock.lock(); 1203 try { 1204 RunnableScheduledFuture first; 1205 int n = 0; 1206 while (n < maxElements && (first = pollExpired()) != null) { 1207 c.add(first); 1208 ++n; 1209 } 1210 return n; 1211 } finally { 1212 lock.unlock(); 1213 } 1214 } 1215 1216 public Object[] toArray() { 1217 final ReentrantLock lock = this.lock; 1218 lock.lock(); 1219 try { 1220 return Arrays.copyOf(queue, size, Object[].class); 1221 } finally { 1222 lock.unlock(); 1223 } 1224 } 1225 1226 @SuppressWarnings("unchecked") 1227 public <T> T[] toArray(T[] a) { 1228 final ReentrantLock lock = this.lock; 1229 lock.lock(); 1230 try { 1231 if (a.length < size) 1232 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 1233 System.arraycopy(queue, 0, a, 0, size); 1234 if (a.length > size) 1235 a[size] = null; 1236 return a; 1237 } finally { 1238 lock.unlock(); 1239 } 1240 } 1241 1242 public Iterator<Runnable> iterator() { 1243 return new Itr(Arrays.copyOf(queue, size)); 1244 } 1245 1246 /** 1247 * Snapshot iterator that works off copy of underlying q array. 1248 */ 1249 private class Itr implements Iterator<Runnable> { 1250 final RunnableScheduledFuture[] array; 1251 int cursor = 0; // index of next element to return 1252 int lastRet = -1; // index of last element, or -1 if no such 1253 1254 Itr(RunnableScheduledFuture[] array) { 1255 this.array = array; 1256 } 1257 1258 public boolean hasNext() { 1259 return cursor < array.length; 1260 } 1261 1262 public Runnable next() { 1263 if (cursor >= array.length) 1264 throw new NoSuchElementException(); 1265 lastRet = cursor; 1266 return array[cursor++]; 1267 } 1268 1269 public void remove() { 1270 if (lastRet < 0) 1271 throw new IllegalStateException(); 1272 DelayedWorkQueue.this.remove(array[lastRet]); 1273 lastRet = -1; 1274 } 1275 } 1276 } 1277} 1278