1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/licenses/publicdomain 5 */ 6 7package java.util.concurrent; 8import java.util.concurrent.locks.*; 9import java.util.concurrent.atomic.*; 10import java.util.*; 11 12/** 13 * An {@link ExecutorService} that executes each submitted task using 14 * one of possibly several pooled threads, normally configured 15 * using {@link Executors} factory methods. 16 * 17 * <p>Thread pools address two different problems: they usually 18 * provide improved performance when executing large numbers of 19 * asynchronous tasks, due to reduced per-task invocation overhead, 20 * and they provide a means of bounding and managing the resources, 21 * including threads, consumed when executing a collection of tasks. 22 * Each {@code ThreadPoolExecutor} also maintains some basic 23 * statistics, such as the number of completed tasks. 24 * 25 * <p>To be useful across a wide range of contexts, this class 26 * provides many adjustable parameters and extensibility 27 * hooks. However, programmers are urged to use the more convenient 28 * {@link Executors} factory methods {@link 29 * Executors#newCachedThreadPool} (unbounded thread pool, with 30 * automatic thread reclamation), {@link Executors#newFixedThreadPool} 31 * (fixed size thread pool) and {@link 32 * Executors#newSingleThreadExecutor} (single background thread), that 33 * preconfigure settings for the most common usage 34 * scenarios. Otherwise, use the following guide when manually 35 * configuring and tuning this class: 36 * 37 * <dl> 38 * 39 * <dt>Core and maximum pool sizes</dt> 40 * 41 * <dd>A {@code ThreadPoolExecutor} will automatically adjust the 42 * pool size (see {@link #getPoolSize}) 43 * according to the bounds set by 44 * corePoolSize (see {@link #getCorePoolSize}) and 45 * maximumPoolSize (see {@link #getMaximumPoolSize}). 46 * 47 * When a new task is submitted in method {@link #execute}, and fewer 48 * than corePoolSize threads are running, a new thread is created to 49 * handle the request, even if other worker threads are idle. If 50 * there are more than corePoolSize but less than maximumPoolSize 51 * threads running, a new thread will be created only if the queue is 52 * full. By setting corePoolSize and maximumPoolSize the same, you 53 * create a fixed-size thread pool. By setting maximumPoolSize to an 54 * essentially unbounded value such as {@code Integer.MAX_VALUE}, you 55 * allow the pool to accommodate an arbitrary number of concurrent 56 * tasks. Most typically, core and maximum pool sizes are set only 57 * upon construction, but they may also be changed dynamically using 58 * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd> 59 * 60 * <dt>On-demand construction</dt> 61 * 62 * <dd> By default, even core threads are initially created and 63 * started only when new tasks arrive, but this can be overridden 64 * dynamically using method {@link #prestartCoreThread} or {@link 65 * #prestartAllCoreThreads}. You probably want to prestart threads if 66 * you construct the pool with a non-empty queue. </dd> 67 * 68 * <dt>Creating new threads</dt> 69 * 70 * <dd>New threads are created using a {@link ThreadFactory}. If not 71 * otherwise specified, a {@link Executors#defaultThreadFactory} is 72 * used, that creates threads to all be in the same {@link 73 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and 74 * non-daemon status. By supplying a different ThreadFactory, you can 75 * alter the thread's name, thread group, priority, daemon status, 76 * etc. If a {@code ThreadFactory} fails to create a thread when asked 77 * by returning null from {@code newThread}, the executor will 78 * continue, but might not be able to execute any tasks. Threads 79 * should possess the "modifyThread" {@code RuntimePermission}. If 80 * worker threads or other threads using the pool do not possess this 81 * permission, service may be degraded: configuration changes may not 82 * take effect in a timely manner, and a shutdown pool may remain in a 83 * state in which termination is possible but not completed.</dd> 84 * 85 * <dt>Keep-alive times</dt> 86 * 87 * <dd>If the pool currently has more than corePoolSize threads, 88 * excess threads will be terminated if they have been idle for more 89 * than the keepAliveTime (see {@link #getKeepAliveTime}). This 90 * provides a means of reducing resource consumption when the pool is 91 * not being actively used. If the pool becomes more active later, new 92 * threads will be constructed. This parameter can also be changed 93 * dynamically using method {@link #setKeepAliveTime}. Using a value 94 * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively 95 * disables idle threads from ever terminating prior to shut down. The 96 * keep-alive policy applies only when there are more than 97 * corePoolSizeThreads.</dd> 98 * 99 * <dt>Queuing</dt> 100 * 101 * <dd>Any {@link BlockingQueue} may be used to transfer and hold 102 * submitted tasks. The use of this queue interacts with pool sizing: 103 * 104 * <ul> 105 * 106 * <li> If fewer than corePoolSize threads are running, the Executor 107 * always prefers adding a new thread 108 * rather than queuing.</li> 109 * 110 * <li> If corePoolSize or more threads are running, the Executor 111 * always prefers queuing a request rather than adding a new 112 * thread.</li> 113 * 114 * <li> If a request cannot be queued, a new thread is created unless 115 * this would exceed maximumPoolSize, in which case, the task will be 116 * rejected.</li> 117 * 118 * </ul> 119 * 120 * There are three general strategies for queuing: 121 * <ol> 122 * 123 * <li> <em> Direct handoffs.</em> A good default choice for a work 124 * queue is a {@link SynchronousQueue} that hands off tasks to threads 125 * without otherwise holding them. Here, an attempt to queue a task 126 * will fail if no threads are immediately available to run it, so a 127 * new thread will be constructed. This policy avoids lockups when 128 * handling sets of requests that might have internal dependencies. 129 * Direct handoffs generally require unbounded maximumPoolSizes to 130 * avoid rejection of new submitted tasks. This in turn admits the 131 * possibility of unbounded thread growth when commands continue to 132 * arrive on average faster than they can be processed. </li> 133 * 134 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 135 * example a {@link LinkedBlockingQueue} without a predefined 136 * capacity) will cause new tasks to wait in the queue when all 137 * corePoolSize threads are busy. Thus, no more than corePoolSize 138 * threads will ever be created. (And the value of the maximumPoolSize 139 * therefore doesn't have any effect.) This may be appropriate when 140 * each task is completely independent of others, so tasks cannot 141 * affect each others execution; for example, in a web page server. 142 * While this style of queuing can be useful in smoothing out 143 * transient bursts of requests, it admits the possibility of 144 * unbounded work queue growth when commands continue to arrive on 145 * average faster than they can be processed. </li> 146 * 147 * <li><em>Bounded queues.</em> A bounded queue (for example, an 148 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 149 * used with finite maximumPoolSizes, but can be more difficult to 150 * tune and control. Queue sizes and maximum pool sizes may be traded 151 * off for each other: Using large queues and small pools minimizes 152 * CPU usage, OS resources, and context-switching overhead, but can 153 * lead to artificially low throughput. If tasks frequently block (for 154 * example if they are I/O bound), a system may be able to schedule 155 * time for more threads than you otherwise allow. Use of small queues 156 * generally requires larger pool sizes, which keeps CPUs busier but 157 * may encounter unacceptable scheduling overhead, which also 158 * decreases throughput. </li> 159 * 160 * </ol> 161 * 162 * </dd> 163 * 164 * <dt>Rejected tasks</dt> 165 * 166 * <dd> New tasks submitted in method {@link #execute} will be 167 * <em>rejected</em> when the Executor has been shut down, and also 168 * when the Executor uses finite bounds for both maximum threads and 169 * work queue capacity, and is saturated. In either case, the {@code 170 * execute} method invokes the {@link 171 * RejectedExecutionHandler#rejectedExecution} method of its {@link 172 * RejectedExecutionHandler}. Four predefined handler policies are 173 * provided: 174 * 175 * <ol> 176 * 177 * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the 178 * handler throws a runtime {@link RejectedExecutionException} upon 179 * rejection. </li> 180 * 181 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread 182 * that invokes {@code execute} itself runs the task. This provides a 183 * simple feedback control mechanism that will slow down the rate that 184 * new tasks are submitted. </li> 185 * 186 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that 187 * cannot be executed is simply dropped. </li> 188 * 189 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the 190 * executor is not shut down, the task at the head of the work queue 191 * is dropped, and then execution is retried (which can fail again, 192 * causing this to be repeated.) </li> 193 * 194 * </ol> 195 * 196 * It is possible to define and use other kinds of {@link 197 * RejectedExecutionHandler} classes. Doing so requires some care 198 * especially when policies are designed to work only under particular 199 * capacity or queuing policies. </dd> 200 * 201 * <dt>Hook methods</dt> 202 * 203 * <dd>This class provides {@code protected} overridable {@link 204 * #beforeExecute} and {@link #afterExecute} methods that are called 205 * before and after execution of each task. These can be used to 206 * manipulate the execution environment; for example, reinitializing 207 * ThreadLocals, gathering statistics, or adding log 208 * entries. Additionally, method {@link #terminated} can be overridden 209 * to perform any special processing that needs to be done once the 210 * Executor has fully terminated. 211 * 212 * <p>If hook or callback methods throw exceptions, internal worker 213 * threads may in turn fail and abruptly terminate.</dd> 214 * 215 * <dt>Queue maintenance</dt> 216 * 217 * <dd> Method {@link #getQueue} allows access to the work queue for 218 * purposes of monitoring and debugging. Use of this method for any 219 * other purpose is strongly discouraged. Two supplied methods, 220 * {@link #remove} and {@link #purge} are available to assist in 221 * storage reclamation when large numbers of queued tasks become 222 * cancelled.</dd> 223 * 224 * <dt>Finalization</dt> 225 * 226 * <dd> A pool that is no longer referenced in a program <em>AND</em> 227 * has no remaining threads will be {@code shutdown} automatically. If 228 * you would like to ensure that unreferenced pools are reclaimed even 229 * if users forget to call {@link #shutdown}, then you must arrange 230 * that unused threads eventually die, by setting appropriate 231 * keep-alive times using a lower bound of zero core threads. </dd> 232 * 233 * </dl> 234 * 235 * <p> <b>Extension example</b>. Most extensions of this class 236 * override one or more of the protected hook methods. For example, 237 * here is a subclass that adds a simple pause/resume feature: 238 * 239 * <pre> {@code 240 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 241 * private boolean isPaused; 242 * private ReentrantLock pauseLock = new ReentrantLock(); 243 * private Condition unpaused = pauseLock.newCondition(); 244 * 245 * public PausableThreadPoolExecutor(...) { super(...); } 246 * 247 * protected void beforeExecute(Thread t, Runnable r) { 248 * super.beforeExecute(t, r); 249 * pauseLock.lock(); 250 * try { 251 * while (isPaused) unpaused.await(); 252 * } catch (InterruptedException ie) { 253 * t.interrupt(); 254 * } finally { 255 * pauseLock.unlock(); 256 * } 257 * } 258 * 259 * public void pause() { 260 * pauseLock.lock(); 261 * try { 262 * isPaused = true; 263 * } finally { 264 * pauseLock.unlock(); 265 * } 266 * } 267 * 268 * public void resume() { 269 * pauseLock.lock(); 270 * try { 271 * isPaused = false; 272 * unpaused.signalAll(); 273 * } finally { 274 * pauseLock.unlock(); 275 * } 276 * } 277 * }}</pre> 278 * 279 * @since 1.5 280 * @author Doug Lea 281 */ 282public class ThreadPoolExecutor extends AbstractExecutorService { 283 /** 284 * The main pool control state, ctl, is an atomic integer packing 285 * two conceptual fields 286 * workerCount, indicating the effective number of threads 287 * runState, indicating whether running, shutting down etc 288 * 289 * In order to pack them into one int, we limit workerCount to 290 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 291 * billion) otherwise representable. If this is ever an issue in 292 * the future, the variable can be changed to be an AtomicLong, 293 * and the shift/mask constants below adjusted. But until the need 294 * arises, this code is a bit faster and simpler using an int. 295 * 296 * The workerCount is the number of workers that have been 297 * permitted to start and not permitted to stop. The value may be 298 * transiently different from the actual number of live threads, 299 * for example when a ThreadFactory fails to create a thread when 300 * asked, and when exiting threads are still performing 301 * bookkeeping before terminating. The user-visible pool size is 302 * reported as the current size of the workers set. 303 * 304 * The runState provides the main lifecyle control, taking on values: 305 * 306 * RUNNING: Accept new tasks and process queued tasks 307 * SHUTDOWN: Don't accept new tasks, but process queued tasks 308 * STOP: Don't accept new tasks, don't process queued tasks, 309 * and interrupt in-progress tasks 310 * TIDYING: All tasks have terminated, workerCount is zero, 311 * the thread transitioning to state TIDYING 312 * will run the terminated() hook method 313 * TERMINATED: terminated() has completed 314 * 315 * The numerical order among these values matters, to allow 316 * ordered comparisons. The runState monotonically increases over 317 * time, but need not hit each state. The transitions are: 318 * 319 * RUNNING -> SHUTDOWN 320 * On invocation of shutdown(), perhaps implicitly in finalize() 321 * (RUNNING or SHUTDOWN) -> STOP 322 * On invocation of shutdownNow() 323 * SHUTDOWN -> TIDYING 324 * When both queue and pool are empty 325 * STOP -> TIDYING 326 * When pool is empty 327 * TIDYING -> TERMINATED 328 * When the terminated() hook method has completed 329 * 330 * Threads waiting in awaitTermination() will return when the 331 * state reaches TERMINATED. 332 * 333 * Detecting the transition from SHUTDOWN to TIDYING is less 334 * straightforward than you'd like because the queue may become 335 * empty after non-empty and vice versa during SHUTDOWN state, but 336 * we can only terminate if, after seeing that it is empty, we see 337 * that workerCount is 0 (which sometimes entails a recheck -- see 338 * below). 339 */ 340 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 341 private static final int COUNT_BITS = Integer.SIZE - 3; 342 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 343 344 // runState is stored in the high-order bits 345 private static final int RUNNING = -1 << COUNT_BITS; 346 private static final int SHUTDOWN = 0 << COUNT_BITS; 347 private static final int STOP = 1 << COUNT_BITS; 348 private static final int TIDYING = 2 << COUNT_BITS; 349 private static final int TERMINATED = 3 << COUNT_BITS; 350 351 // Packing and unpacking ctl 352 private static int runStateOf(int c) { return c & ~CAPACITY; } 353 private static int workerCountOf(int c) { return c & CAPACITY; } 354 private static int ctlOf(int rs, int wc) { return rs | wc; } 355 356 /* 357 * Bit field accessors that don't require unpacking ctl. 358 * These depend on the bit layout and on workerCount being never negative. 359 */ 360 361 private static boolean runStateLessThan(int c, int s) { 362 return c < s; 363 } 364 365 private static boolean runStateAtLeast(int c, int s) { 366 return c >= s; 367 } 368 369 private static boolean isRunning(int c) { 370 return c < SHUTDOWN; 371 } 372 373 /** 374 * Attempt to CAS-increment the workerCount field of ctl. 375 */ 376 private boolean compareAndIncrementWorkerCount(int expect) { 377 return ctl.compareAndSet(expect, expect + 1); 378 } 379 380 /** 381 * Attempt to CAS-decrement the workerCount field of ctl. 382 */ 383 private boolean compareAndDecrementWorkerCount(int expect) { 384 return ctl.compareAndSet(expect, expect - 1); 385 } 386 387 /** 388 * Decrements the workerCount field of ctl. This is called only on 389 * abrupt termination of a thread (see processWorkerExit). Other 390 * decrements are performed within getTask. 391 */ 392 private void decrementWorkerCount() { 393 do {} while (! compareAndDecrementWorkerCount(ctl.get())); 394 } 395 396 /** 397 * The queue used for holding tasks and handing off to worker 398 * threads. We do not require that workQueue.poll() returning 399 * null necessarily means that workQueue.isEmpty(), so rely 400 * solely on isEmpty to see if the queue is empty (which we must 401 * do for example when deciding whether to transition from 402 * SHUTDOWN to TIDYING). This accommodates special-purpose 403 * queues such as DelayQueues for which poll() is allowed to 404 * return null even if it may later return non-null when delays 405 * expire. 406 */ 407 private final BlockingQueue<Runnable> workQueue; 408 409 /** 410 * Lock held on access to workers set and related bookkeeping. 411 * While we could use a concurrent set of some sort, it turns out 412 * to be generally preferable to use a lock. Among the reasons is 413 * that this serializes interruptIdleWorkers, which avoids 414 * unnecessary interrupt storms, especially during shutdown. 415 * Otherwise exiting threads would concurrently interrupt those 416 * that have not yet interrupted. It also simplifies some of the 417 * associated statistics bookkeeping of largestPoolSize etc. We 418 * also hold mainLock on shutdown and shutdownNow, for the sake of 419 * ensuring workers set is stable while separately checking 420 * permission to interrupt and actually interrupting. 421 */ 422 private final ReentrantLock mainLock = new ReentrantLock(); 423 424 /** 425 * Set containing all worker threads in pool. Accessed only when 426 * holding mainLock. 427 */ 428 private final HashSet<Worker> workers = new HashSet<Worker>(); 429 430 /** 431 * Wait condition to support awaitTermination 432 */ 433 private final Condition termination = mainLock.newCondition(); 434 435 /** 436 * Tracks largest attained pool size. Accessed only under 437 * mainLock. 438 */ 439 private int largestPoolSize; 440 441 /** 442 * Counter for completed tasks. Updated only on termination of 443 * worker threads. Accessed only under mainLock. 444 */ 445 private long completedTaskCount; 446 447 /* 448 * All user control parameters are declared as volatiles so that 449 * ongoing actions are based on freshest values, but without need 450 * for locking, since no internal invariants depend on them 451 * changing synchronously with respect to other actions. 452 */ 453 454 /** 455 * Factory for new threads. All threads are created using this 456 * factory (via method addWorker). All callers must be prepared 457 * for addWorker to fail, which may reflect a system or user's 458 * policy limiting the number of threads. Even though it is not 459 * treated as an error, failure to create threads may result in 460 * new tasks being rejected or existing ones remaining stuck in 461 * the queue. On the other hand, no special precautions exist to 462 * handle OutOfMemoryErrors that might be thrown while trying to 463 * create threads, since there is generally no recourse from 464 * within this class. 465 */ 466 private volatile ThreadFactory threadFactory; 467 468 /** 469 * Handler called when saturated or shutdown in execute. 470 */ 471 private volatile RejectedExecutionHandler handler; 472 473 /** 474 * Timeout in nanoseconds for idle threads waiting for work. 475 * Threads use this timeout when there are more than corePoolSize 476 * present. Otherwise they wait forever for new work. 477 */ 478 private volatile long keepAliveTime; 479 480 /** 481 * Core pool size is the minimum number of workers to keep alive 482 * (and not allow to time out etc). 483 */ 484 private volatile int corePoolSize; 485 486 /** 487 * Maximum pool size. Note that the actual maximum is internally 488 * bounded by CAPACITY. 489 */ 490 private volatile int maximumPoolSize; 491 492 /** 493 * The default rejected execution handler 494 */ 495 private static final RejectedExecutionHandler defaultHandler = 496 new AbortPolicy(); 497 498 /** 499 * Permission required for callers of shutdown and shutdownNow. 500 * We additionally require (see checkShutdownAccess) that callers 501 * have permission to actually interrupt threads in the worker set 502 * (as governed by Thread.interrupt, which relies on 503 * ThreadGroup.checkAccess, which in turn relies on 504 * SecurityManager.checkAccess). Shutdowns are attempted only if 505 * these checks pass. 506 * 507 * All actual invocations of Thread.interrupt (see 508 * interruptIdleWorkers and interruptWorkers) ignore 509 * SecurityExceptions, meaning that the attempted interrupts 510 * silently fail. In the case of shutdown, they should not fail 511 * unless the SecurityManager has inconsistent policies, sometimes 512 * allowing access to a thread and sometimes not. In such cases, 513 * failure to actually interrupt threads may disable or delay full 514 * termination. Other uses of interruptIdleWorkers are advisory, 515 * and failure to actually interrupt will merely delay response to 516 * configuration changes so is not handled exceptionally. 517 */ 518 private static final RuntimePermission shutdownPerm = 519 new RuntimePermission("modifyThread"); 520 521 /** 522 * Class Worker mainly maintains interrupt control state for 523 * threads running tasks, along with other minor bookkeeping. 524 * This class opportunistically extends AbstractQueuedSynchronizer 525 * to simplify acquiring and releasing a lock surrounding each 526 * task execution. This protects against interrupts that are 527 * intended to wake up a worker thread waiting for a task from 528 * instead interrupting a task being run. We implement a simple 529 * non-reentrant mutual exclusion lock rather than use ReentrantLock 530 * because we do not want worker tasks to be able to reacquire the 531 * lock when they invoke pool control methods like setCorePoolSize. 532 */ 533 private final class Worker 534 extends AbstractQueuedSynchronizer 535 implements Runnable 536 { 537 /** 538 * This class will never be serialized, but we provide a 539 * serialVersionUID to suppress a javac warning. 540 */ 541 private static final long serialVersionUID = 6138294804551838833L; 542 543 /** Thread this worker is running in. Null if factory fails. */ 544 final Thread thread; 545 /** Initial task to run. Possibly null. */ 546 Runnable firstTask; 547 /** Per-thread task counter */ 548 volatile long completedTasks; 549 550 /** 551 * Creates with given first task and thread from ThreadFactory. 552 * @param firstTask the first task (null if none) 553 */ 554 Worker(Runnable firstTask) { 555 this.firstTask = firstTask; 556 this.thread = getThreadFactory().newThread(this); 557 } 558 559 /** Delegates main run loop to outer runWorker */ 560 public void run() { 561 runWorker(this); 562 } 563 564 // Lock methods 565 // 566 // The value 0 represents the unlocked state. 567 // The value 1 represents the locked state. 568 569 protected boolean isHeldExclusively() { 570 return getState() == 1; 571 } 572 573 protected boolean tryAcquire(int unused) { 574 if (compareAndSetState(0, 1)) { 575 setExclusiveOwnerThread(Thread.currentThread()); 576 return true; 577 } 578 return false; 579 } 580 581 protected boolean tryRelease(int unused) { 582 setExclusiveOwnerThread(null); 583 setState(0); 584 return true; 585 } 586 587 public void lock() { acquire(1); } 588 public boolean tryLock() { return tryAcquire(1); } 589 public void unlock() { release(1); } 590 public boolean isLocked() { return isHeldExclusively(); } 591 } 592 593 /* 594 * Methods for setting control state 595 */ 596 597 /** 598 * Transitions runState to given target, or leaves it alone if 599 * already at least the given target. 600 * 601 * @param targetState the desired state, either SHUTDOWN or STOP 602 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 603 */ 604 private void advanceRunState(int targetState) { 605 for (;;) { 606 int c = ctl.get(); 607 if (runStateAtLeast(c, targetState) || 608 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 609 break; 610 } 611 } 612 613 /** 614 * Transitions to TERMINATED state if either (SHUTDOWN and pool 615 * and queue empty) or (STOP and pool empty). If otherwise 616 * eligible to terminate but workerCount is nonzero, interrupts an 617 * idle worker to ensure that shutdown signals propagate. This 618 * method must be called following any action that might make 619 * termination possible -- reducing worker count or removing tasks 620 * from the queue during shutdown. The method is non-private to 621 * allow access from ScheduledThreadPoolExecutor. 622 */ 623 final void tryTerminate() { 624 for (;;) { 625 int c = ctl.get(); 626 if (isRunning(c) || 627 runStateAtLeast(c, TIDYING) || 628 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 629 return; 630 if (workerCountOf(c) != 0) { // Eligible to terminate 631 interruptIdleWorkers(ONLY_ONE); 632 return; 633 } 634 635 final ReentrantLock mainLock = this.mainLock; 636 mainLock.lock(); 637 try { 638 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 639 try { 640 terminated(); 641 } finally { 642 ctl.set(ctlOf(TERMINATED, 0)); 643 termination.signalAll(); 644 } 645 return; 646 } 647 } finally { 648 mainLock.unlock(); 649 } 650 // else retry on failed CAS 651 } 652 } 653 654 /* 655 * Methods for controlling interrupts to worker threads. 656 */ 657 658 /** 659 * If there is a security manager, makes sure caller has 660 * permission to shut down threads in general (see shutdownPerm). 661 * If this passes, additionally makes sure the caller is allowed 662 * to interrupt each worker thread. This might not be true even if 663 * first check passed, if the SecurityManager treats some threads 664 * specially. 665 */ 666 private void checkShutdownAccess() { 667 SecurityManager security = System.getSecurityManager(); 668 if (security != null) { 669 security.checkPermission(shutdownPerm); 670 final ReentrantLock mainLock = this.mainLock; 671 mainLock.lock(); 672 try { 673 for (Worker w : workers) 674 security.checkAccess(w.thread); 675 } finally { 676 mainLock.unlock(); 677 } 678 } 679 } 680 681 /** 682 * Interrupts all threads, even if active. Ignores SecurityExceptions 683 * (in which case some threads may remain uninterrupted). 684 */ 685 private void interruptWorkers() { 686 final ReentrantLock mainLock = this.mainLock; 687 mainLock.lock(); 688 try { 689 for (Worker w : workers) { 690 try { 691 w.thread.interrupt(); 692 } catch (SecurityException ignore) { 693 } 694 } 695 } finally { 696 mainLock.unlock(); 697 } 698 } 699 700 /** 701 * Interrupts threads that might be waiting for tasks (as 702 * indicated by not being locked) so they can check for 703 * termination or configuration changes. Ignores 704 * SecurityExceptions (in which case some threads may remain 705 * uninterrupted). 706 * 707 * @param onlyOne If true, interrupt at most one worker. This is 708 * called only from tryTerminate when termination is otherwise 709 * enabled but there are still other workers. In this case, at 710 * most one waiting worker is interrupted to propagate shutdown 711 * signals in case all threads are currently waiting. 712 * Interrupting any arbitrary thread ensures that newly arriving 713 * workers since shutdown began will also eventually exit. 714 * To guarantee eventual termination, it suffices to always 715 * interrupt only one idle worker, but shutdown() interrupts all 716 * idle workers so that redundant workers exit promptly, not 717 * waiting for a straggler task to finish. 718 */ 719 private void interruptIdleWorkers(boolean onlyOne) { 720 final ReentrantLock mainLock = this.mainLock; 721 mainLock.lock(); 722 try { 723 for (Worker w : workers) { 724 Thread t = w.thread; 725 if (!t.isInterrupted() && w.tryLock()) { 726 try { 727 t.interrupt(); 728 } catch (SecurityException ignore) { 729 } finally { 730 w.unlock(); 731 } 732 } 733 if (onlyOne) 734 break; 735 } 736 } finally { 737 mainLock.unlock(); 738 } 739 } 740 741 /** 742 * Common form of interruptIdleWorkers, to avoid having to 743 * remember what the boolean argument means. 744 */ 745 private void interruptIdleWorkers() { 746 interruptIdleWorkers(false); 747 } 748 749 private static final boolean ONLY_ONE = true; 750 751 /** 752 * Ensures that unless the pool is stopping, the current thread 753 * does not have its interrupt set. This requires a double-check 754 * of state in case the interrupt was cleared concurrently with a 755 * shutdownNow -- if so, the interrupt is re-enabled. 756 */ 757 private void clearInterruptsForTaskRun() { 758 if (runStateLessThan(ctl.get(), STOP) && 759 Thread.interrupted() && 760 runStateAtLeast(ctl.get(), STOP)) 761 Thread.currentThread().interrupt(); 762 } 763 764 /* 765 * Misc utilities, most of which are also exported to 766 * ScheduledThreadPoolExecutor 767 */ 768 769 /** 770 * Invokes the rejected execution handler for the given command. 771 * Package-protected for use by ScheduledThreadPoolExecutor. 772 */ 773 final void reject(Runnable command) { 774 handler.rejectedExecution(command, this); 775 } 776 777 /** 778 * Performs any further cleanup following run state transition on 779 * invocation of shutdown. A no-op here, but used by 780 * ScheduledThreadPoolExecutor to cancel delayed tasks. 781 */ 782 void onShutdown() { 783 } 784 785 /** 786 * State check needed by ScheduledThreadPoolExecutor to 787 * enable running tasks during shutdown. 788 * 789 * @param shutdownOK true if should return true if SHUTDOWN 790 */ 791 final boolean isRunningOrShutdown(boolean shutdownOK) { 792 int rs = runStateOf(ctl.get()); 793 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); 794 } 795 796 /** 797 * Drains the task queue into a new list, normally using 798 * drainTo. But if the queue is a DelayQueue or any other kind of 799 * queue for which poll or drainTo may fail to remove some 800 * elements, it deletes them one by one. 801 */ 802 private List<Runnable> drainQueue() { 803 BlockingQueue<Runnable> q = workQueue; 804 List<Runnable> taskList = new ArrayList<Runnable>(); 805 q.drainTo(taskList); 806 if (!q.isEmpty()) { 807 for (Runnable r : q.toArray(new Runnable[0])) { 808 if (q.remove(r)) 809 taskList.add(r); 810 } 811 } 812 return taskList; 813 } 814 815 /* 816 * Methods for creating, running and cleaning up after workers 817 */ 818 819 /** 820 * Checks if a new worker can be added with respect to current 821 * pool state and the given bound (either core or maximum). If so, 822 * the worker count is adjusted accordingly, and, if possible, a 823 * new worker is created and started running firstTask as its 824 * first task. This method returns false if the pool is stopped or 825 * eligible to shut down. It also returns false if the thread 826 * factory fails to create a thread when asked, which requires a 827 * backout of workerCount, and a recheck for termination, in case 828 * the existence of this worker was holding up termination. 829 * 830 * @param firstTask the task the new thread should run first (or 831 * null if none). Workers are created with an initial first task 832 * (in method execute()) to bypass queuing when there are fewer 833 * than corePoolSize threads (in which case we always start one), 834 * or when the queue is full (in which case we must bypass queue). 835 * Initially idle threads are usually created via 836 * prestartCoreThread or to replace other dying workers. 837 * 838 * @param core if true use corePoolSize as bound, else 839 * maximumPoolSize. (A boolean indicator is used here rather than a 840 * value to ensure reads of fresh values after checking other pool 841 * state). 842 * @return true if successful 843 */ 844 private boolean addWorker(Runnable firstTask, boolean core) { 845 retry: 846 for (;;) { 847 int c = ctl.get(); 848 int rs = runStateOf(c); 849 850 // Check if queue empty only if necessary. 851 if (rs >= SHUTDOWN && 852 ! (rs == SHUTDOWN && 853 firstTask == null && 854 ! workQueue.isEmpty())) 855 return false; 856 857 for (;;) { 858 int wc = workerCountOf(c); 859 if (wc >= CAPACITY || 860 wc >= (core ? corePoolSize : maximumPoolSize)) 861 return false; 862 if (compareAndIncrementWorkerCount(c)) 863 break retry; 864 c = ctl.get(); // Re-read ctl 865 if (runStateOf(c) != rs) 866 continue retry; 867 // else CAS failed due to workerCount change; retry inner loop 868 } 869 } 870 871 Worker w = new Worker(firstTask); 872 Thread t = w.thread; 873 874 final ReentrantLock mainLock = this.mainLock; 875 mainLock.lock(); 876 try { 877 // Recheck while holding lock. 878 // Back out on ThreadFactory failure or if 879 // shut down before lock acquired. 880 int c = ctl.get(); 881 int rs = runStateOf(c); 882 883 if (t == null || 884 (rs >= SHUTDOWN && 885 ! (rs == SHUTDOWN && 886 firstTask == null))) { 887 decrementWorkerCount(); 888 tryTerminate(); 889 return false; 890 } 891 892 workers.add(w); 893 894 int s = workers.size(); 895 if (s > largestPoolSize) 896 largestPoolSize = s; 897 } finally { 898 mainLock.unlock(); 899 } 900 901 t.start(); 902 // It is possible (but unlikely) for a thread to have been 903 // added to workers, but not yet started, during transition to 904 // STOP, which could result in a rare missed interrupt, 905 // because Thread.interrupt is not guaranteed to have any effect 906 // on a non-yet-started Thread (see Thread#interrupt). 907 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) 908 t.interrupt(); 909 910 return true; 911 } 912 913 /** 914 * Performs cleanup and bookkeeping for a dying worker. Called 915 * only from worker threads. Unless completedAbruptly is set, 916 * assumes that workerCount has already been adjusted to account 917 * for exit. This method removes thread from worker set, and 918 * possibly terminates the pool or replaces the worker if either 919 * it exited due to user task exception or if fewer than 920 * corePoolSize workers are running or queue is non-empty but 921 * there are no workers. 922 * 923 * @param w the worker 924 * @param completedAbruptly if the worker died due to user exception 925 */ 926 private void processWorkerExit(Worker w, boolean completedAbruptly) { 927 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 928 decrementWorkerCount(); 929 930 final ReentrantLock mainLock = this.mainLock; 931 mainLock.lock(); 932 try { 933 completedTaskCount += w.completedTasks; 934 workers.remove(w); 935 } finally { 936 mainLock.unlock(); 937 } 938 939 tryTerminate(); 940 941 int c = ctl.get(); 942 if (runStateLessThan(c, STOP)) { 943 if (!completedAbruptly) { 944 int min = corePoolSize; 945 if (min == 0 && ! workQueue.isEmpty()) 946 min = 1; 947 if (workerCountOf(c) >= min) 948 return; // replacement not needed 949 } 950 addWorker(null, false); 951 } 952 } 953 954 /** 955 * Performs blocking or timed wait for a task, depending on 956 * current configuration settings, or returns null if this worker 957 * must exit because of any of: 958 * 1. There are more than maximumPoolSize workers (due to 959 * a call to setMaximumPoolSize). 960 * 2. The pool is stopped. 961 * 3. The pool is shutdown and the queue is empty. 962 * 4. This worker timed out waiting for a task, and timed-out 963 * workers are subject to termination (that is, 964 * {@code workerCount > corePoolSize}) 965 * both before and after the timed wait. 966 * 967 * @return task, or null if the worker must exit, in which case 968 * workerCount is decremented 969 */ 970 private Runnable getTask() { 971 boolean timedOut = false; // Did the last poll() time out? 972 973 retry: 974 for (;;) { 975 int c = ctl.get(); 976 int rs = runStateOf(c); 977 978 // Check if queue empty only if necessary. 979 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 980 decrementWorkerCount(); 981 return null; 982 } 983 984 boolean timed; // Are workers subject to culling? 985 986 for (;;) { 987 int wc = workerCountOf(c); 988 timed = wc > corePoolSize; 989 990 if (wc <= maximumPoolSize && ! (timedOut && timed)) 991 break; 992 if (compareAndDecrementWorkerCount(c)) 993 return null; 994 c = ctl.get(); // Re-read ctl 995 if (runStateOf(c) != rs) 996 continue retry; 997 // else CAS failed due to workerCount change; retry inner loop 998 } 999 1000 try { 1001 Runnable r = timed ? 1002 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 1003 workQueue.take(); 1004 if (r != null) 1005 return r; 1006 timedOut = true; 1007 } catch (InterruptedException retry) { 1008 timedOut = false; 1009 } 1010 } 1011 } 1012 1013 /** 1014 * Main worker run loop. Repeatedly gets tasks from queue and 1015 * executes them, while coping with a number of issues: 1016 * 1017 * 1. We may start out with an initial task, in which case we 1018 * don't need to get the first one. Otherwise, as long as pool is 1019 * running, we get tasks from getTask. If it returns null then the 1020 * worker exits due to changed pool state or configuration 1021 * parameters. Other exits result from exception throws in 1022 * external code, in which case completedAbruptly holds, which 1023 * usually leads processWorkerExit to replace this thread. 1024 * 1025 * 2. Before running any task, the lock is acquired to prevent 1026 * other pool interrupts while the task is executing, and 1027 * clearInterruptsForTaskRun called to ensure that unless pool is 1028 * stopping, this thread does not have its interrupt set. 1029 * 1030 * 3. Each task run is preceded by a call to beforeExecute, which 1031 * might throw an exception, in which case we cause thread to die 1032 * (breaking loop with completedAbruptly true) without processing 1033 * the task. 1034 * 1035 * 4. Assuming beforeExecute completes normally, we run the task, 1036 * gathering any of its thrown exceptions to send to 1037 * afterExecute. We separately handle RuntimeException, Error 1038 * (both of which the specs guarantee that we trap) and arbitrary 1039 * Throwables. Because we cannot rethrow Throwables within 1040 * Runnable.run, we wrap them within Errors on the way out (to the 1041 * thread's UncaughtExceptionHandler). Any thrown exception also 1042 * conservatively causes thread to die. 1043 * 1044 * 5. After task.run completes, we call afterExecute, which may 1045 * also throw an exception, which will also cause thread to 1046 * die. According to JLS Sec 14.20, this exception is the one that 1047 * will be in effect even if task.run throws. 1048 * 1049 * The net effect of the exception mechanics is that afterExecute 1050 * and the thread's UncaughtExceptionHandler have as accurate 1051 * information as we can provide about any problems encountered by 1052 * user code. 1053 * 1054 * @param w the worker 1055 */ 1056 final void runWorker(Worker w) { 1057 Runnable task = w.firstTask; 1058 w.firstTask = null; 1059 boolean completedAbruptly = true; 1060 try { 1061 while (task != null || (task = getTask()) != null) { 1062 w.lock(); 1063 clearInterruptsForTaskRun(); 1064 try { 1065 beforeExecute(w.thread, task); 1066 Throwable thrown = null; 1067 try { 1068 task.run(); 1069 } catch (RuntimeException x) { 1070 thrown = x; throw x; 1071 } catch (Error x) { 1072 thrown = x; throw x; 1073 } catch (Throwable x) { 1074 thrown = x; throw new Error(x); 1075 } finally { 1076 afterExecute(task, thrown); 1077 } 1078 } finally { 1079 task = null; 1080 w.completedTasks++; 1081 w.unlock(); 1082 } 1083 } 1084 completedAbruptly = false; 1085 } finally { 1086 processWorkerExit(w, completedAbruptly); 1087 } 1088 } 1089 1090 // Public constructors and methods 1091 1092 /** 1093 * Creates a new {@code ThreadPoolExecutor} with the given initial 1094 * parameters and default thread factory and rejected execution handler. 1095 * It may be more convenient to use one of the {@link Executors} factory 1096 * methods instead of this general purpose constructor. 1097 * 1098 * @param corePoolSize the number of threads to keep in the pool, even 1099 * if they are idle 1100 * @param maximumPoolSize the maximum number of threads to allow in the 1101 * pool 1102 * @param keepAliveTime when the number of threads is greater than 1103 * the core, this is the maximum time that excess idle threads 1104 * will wait for new tasks before terminating. 1105 * @param unit the time unit for the {@code keepAliveTime} argument 1106 * @param workQueue the queue to use for holding tasks before they are 1107 * executed. This queue will hold only the {@code Runnable} 1108 * tasks submitted by the {@code execute} method. 1109 * @throws IllegalArgumentException if one of the following holds:<br> 1110 * {@code corePoolSize < 0}<br> 1111 * {@code keepAliveTime < 0}<br> 1112 * {@code maximumPoolSize <= 0}<br> 1113 * {@code maximumPoolSize < corePoolSize} 1114 * @throws NullPointerException if {@code workQueue} is null 1115 */ 1116 public ThreadPoolExecutor(int corePoolSize, 1117 int maximumPoolSize, 1118 long keepAliveTime, 1119 TimeUnit unit, 1120 BlockingQueue<Runnable> workQueue) { 1121 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1122 Executors.defaultThreadFactory(), defaultHandler); 1123 } 1124 1125 /** 1126 * Creates a new {@code ThreadPoolExecutor} with the given initial 1127 * parameters and default rejected execution handler. 1128 * 1129 * @param corePoolSize the number of threads to keep in the pool, even 1130 * if they are idle 1131 * @param maximumPoolSize the maximum number of threads to allow in the 1132 * pool 1133 * @param keepAliveTime when the number of threads is greater than 1134 * the core, this is the maximum time that excess idle threads 1135 * will wait for new tasks before terminating. 1136 * @param unit the time unit for the {@code keepAliveTime} argument 1137 * @param workQueue the queue to use for holding tasks before they are 1138 * executed. This queue will hold only the {@code Runnable} 1139 * tasks submitted by the {@code execute} method. 1140 * @param threadFactory the factory to use when the executor 1141 * creates a new thread 1142 * @throws IllegalArgumentException if one of the following holds:<br> 1143 * {@code corePoolSize < 0}<br> 1144 * {@code keepAliveTime < 0}<br> 1145 * {@code maximumPoolSize <= 0}<br> 1146 * {@code maximumPoolSize < corePoolSize} 1147 * @throws NullPointerException if {@code workQueue} 1148 * or {@code threadFactory} is null 1149 */ 1150 public ThreadPoolExecutor(int corePoolSize, 1151 int maximumPoolSize, 1152 long keepAliveTime, 1153 TimeUnit unit, 1154 BlockingQueue<Runnable> workQueue, 1155 ThreadFactory threadFactory) { 1156 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1157 threadFactory, defaultHandler); 1158 } 1159 1160 /** 1161 * Creates a new {@code ThreadPoolExecutor} with the given initial 1162 * parameters and default thread factory. 1163 * 1164 * @param corePoolSize the number of threads to keep in the pool, even 1165 * if they are idle 1166 * @param maximumPoolSize the maximum number of threads to allow in the 1167 * pool 1168 * @param keepAliveTime when the number of threads is greater than 1169 * the core, this is the maximum time that excess idle threads 1170 * will wait for new tasks before terminating. 1171 * @param unit the time unit for the {@code keepAliveTime} argument 1172 * @param workQueue the queue to use for holding tasks before they are 1173 * executed. This queue will hold only the {@code Runnable} 1174 * tasks submitted by the {@code execute} method. 1175 * @param handler the handler to use when execution is blocked 1176 * because the thread bounds and queue capacities are reached 1177 * @throws IllegalArgumentException if one of the following holds:<br> 1178 * {@code corePoolSize < 0}<br> 1179 * {@code keepAliveTime < 0}<br> 1180 * {@code maximumPoolSize <= 0}<br> 1181 * {@code maximumPoolSize < corePoolSize} 1182 * @throws NullPointerException if {@code workQueue} 1183 * or {@code handler} is null 1184 */ 1185 public ThreadPoolExecutor(int corePoolSize, 1186 int maximumPoolSize, 1187 long keepAliveTime, 1188 TimeUnit unit, 1189 BlockingQueue<Runnable> workQueue, 1190 RejectedExecutionHandler handler) { 1191 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1192 Executors.defaultThreadFactory(), handler); 1193 } 1194 1195 /** 1196 * Creates a new {@code ThreadPoolExecutor} with the given initial 1197 * parameters. 1198 * 1199 * @param corePoolSize the number of threads to keep in the pool, even 1200 * if they are idle 1201 * @param maximumPoolSize the maximum number of threads to allow in the 1202 * pool 1203 * @param keepAliveTime when the number of threads is greater than 1204 * the core, this is the maximum time that excess idle threads 1205 * will wait for new tasks before terminating. 1206 * @param unit the time unit for the {@code keepAliveTime} argument 1207 * @param workQueue the queue to use for holding tasks before they are 1208 * executed. This queue will hold only the {@code Runnable} 1209 * tasks submitted by the {@code execute} method. 1210 * @param threadFactory the factory to use when the executor 1211 * creates a new thread 1212 * @param handler the handler to use when execution is blocked 1213 * because the thread bounds and queue capacities are reached 1214 * @throws IllegalArgumentException if one of the following holds:<br> 1215 * {@code corePoolSize < 0}<br> 1216 * {@code keepAliveTime < 0}<br> 1217 * {@code maximumPoolSize <= 0}<br> 1218 * {@code maximumPoolSize < corePoolSize} 1219 * @throws NullPointerException if {@code workQueue} 1220 * or {@code threadFactory} or {@code handler} is null 1221 */ 1222 public ThreadPoolExecutor(int corePoolSize, 1223 int maximumPoolSize, 1224 long keepAliveTime, 1225 TimeUnit unit, 1226 BlockingQueue<Runnable> workQueue, 1227 ThreadFactory threadFactory, 1228 RejectedExecutionHandler handler) { 1229 if (corePoolSize < 0 || 1230 maximumPoolSize <= 0 || 1231 maximumPoolSize < corePoolSize || 1232 keepAliveTime < 0) 1233 throw new IllegalArgumentException(); 1234 if (workQueue == null || threadFactory == null || handler == null) 1235 throw new NullPointerException(); 1236 this.corePoolSize = corePoolSize; 1237 this.maximumPoolSize = maximumPoolSize; 1238 this.workQueue = workQueue; 1239 this.keepAliveTime = unit.toNanos(keepAliveTime); 1240 this.threadFactory = threadFactory; 1241 this.handler = handler; 1242 } 1243 1244 /** 1245 * Executes the given task sometime in the future. The task 1246 * may execute in a new thread or in an existing pooled thread. 1247 * 1248 * If the task cannot be submitted for execution, either because this 1249 * executor has been shutdown or because its capacity has been reached, 1250 * the task is handled by the current {@code RejectedExecutionHandler}. 1251 * 1252 * @param command the task to execute 1253 * @throws RejectedExecutionException at discretion of 1254 * {@code RejectedExecutionHandler}, if the task 1255 * cannot be accepted for execution 1256 * @throws NullPointerException if {@code command} is null 1257 */ 1258 public void execute(Runnable command) { 1259 if (command == null) 1260 throw new NullPointerException(); 1261 /* 1262 * Proceed in 3 steps: 1263 * 1264 * 1. If fewer than corePoolSize threads are running, try to 1265 * start a new thread with the given command as its first 1266 * task. The call to addWorker atomically checks runState and 1267 * workerCount, and so prevents false alarms that would add 1268 * threads when it shouldn't, by returning false. 1269 * 1270 * 2. If a task can be successfully queued, then we still need 1271 * to double-check whether we should have added a thread 1272 * (because existing ones died since last checking) or that 1273 * the pool shut down since entry into this method. So we 1274 * recheck state and if necessary roll back the enqueuing if 1275 * stopped, or start a new thread if there are none. 1276 * 1277 * 3. If we cannot queue task, then we try to add a new 1278 * thread. If it fails, we know we are shut down or saturated 1279 * and so reject the task. 1280 */ 1281 int c = ctl.get(); 1282 if (workerCountOf(c) < corePoolSize) { 1283 if (addWorker(command, true)) 1284 return; 1285 c = ctl.get(); 1286 } 1287 if (isRunning(c) && workQueue.offer(command)) { 1288 int recheck = ctl.get(); 1289 if (! isRunning(recheck) && remove(command)) 1290 reject(command); 1291 else if (workerCountOf(recheck) == 0) 1292 addWorker(null, false); 1293 } 1294 else if (!addWorker(command, false)) 1295 reject(command); 1296 } 1297 1298 /** 1299 * Initiates an orderly shutdown in which previously submitted 1300 * tasks are executed, but no new tasks will be accepted. 1301 * Invocation has no additional effect if already shut down. 1302 * 1303 * <p>This method does not wait for previously submitted tasks to 1304 * complete execution. Use {@link #awaitTermination awaitTermination} 1305 * to do that. 1306 * 1307 * @throws SecurityException {@inheritDoc} 1308 */ 1309 public void shutdown() { 1310 final ReentrantLock mainLock = this.mainLock; 1311 mainLock.lock(); 1312 try { 1313 checkShutdownAccess(); 1314 advanceRunState(SHUTDOWN); 1315 interruptIdleWorkers(); 1316 onShutdown(); // hook for ScheduledThreadPoolExecutor 1317 } finally { 1318 mainLock.unlock(); 1319 } 1320 tryTerminate(); 1321 } 1322 1323 /** 1324 * Attempts to stop all actively executing tasks, halts the 1325 * processing of waiting tasks, and returns a list of the tasks 1326 * that were awaiting execution. These tasks are drained (removed) 1327 * from the task queue upon return from this method. 1328 * 1329 * <p>This method does not wait for actively executing tasks to 1330 * terminate. Use {@link #awaitTermination awaitTermination} to 1331 * do that. 1332 * 1333 * <p>There are no guarantees beyond best-effort attempts to stop 1334 * processing actively executing tasks. This implementation 1335 * cancels tasks via {@link Thread#interrupt}, so any task that 1336 * fails to respond to interrupts may never terminate. 1337 * 1338 * @throws SecurityException {@inheritDoc} 1339 */ 1340 public List<Runnable> shutdownNow() { 1341 List<Runnable> tasks; 1342 final ReentrantLock mainLock = this.mainLock; 1343 mainLock.lock(); 1344 try { 1345 checkShutdownAccess(); 1346 advanceRunState(STOP); 1347 interruptWorkers(); 1348 tasks = drainQueue(); 1349 } finally { 1350 mainLock.unlock(); 1351 } 1352 tryTerminate(); 1353 return tasks; 1354 } 1355 1356 public boolean isShutdown() { 1357 return ! isRunning(ctl.get()); 1358 } 1359 1360 /** 1361 * Returns true if this executor is in the process of terminating 1362 * after {@link #shutdown} or {@link #shutdownNow} but has not 1363 * completely terminated. This method may be useful for 1364 * debugging. A return of {@code true} reported a sufficient 1365 * period after shutdown may indicate that submitted tasks have 1366 * ignored or suppressed interruption, causing this executor not 1367 * to properly terminate. 1368 * 1369 * @return true if terminating but not yet terminated 1370 */ 1371 public boolean isTerminating() { 1372 int c = ctl.get(); 1373 return ! isRunning(c) && runStateLessThan(c, TERMINATED); 1374 } 1375 1376 public boolean isTerminated() { 1377 return runStateAtLeast(ctl.get(), TERMINATED); 1378 } 1379 1380 public boolean awaitTermination(long timeout, TimeUnit unit) 1381 throws InterruptedException { 1382 long nanos = unit.toNanos(timeout); 1383 final ReentrantLock mainLock = this.mainLock; 1384 mainLock.lock(); 1385 try { 1386 for (;;) { 1387 if (runStateAtLeast(ctl.get(), TERMINATED)) 1388 return true; 1389 if (nanos <= 0) 1390 return false; 1391 nanos = termination.awaitNanos(nanos); 1392 } 1393 } finally { 1394 mainLock.unlock(); 1395 } 1396 } 1397 1398 /** 1399 * Invokes {@code shutdown} when this executor is no longer 1400 * referenced and it has no threads. 1401 */ 1402 protected void finalize() { 1403 shutdown(); 1404 } 1405 1406 /** 1407 * Sets the thread factory used to create new threads. 1408 * 1409 * @param threadFactory the new thread factory 1410 * @throws NullPointerException if threadFactory is null 1411 * @see #getThreadFactory 1412 */ 1413 public void setThreadFactory(ThreadFactory threadFactory) { 1414 if (threadFactory == null) 1415 throw new NullPointerException(); 1416 this.threadFactory = threadFactory; 1417 } 1418 1419 /** 1420 * Returns the thread factory used to create new threads. 1421 * 1422 * @return the current thread factory 1423 * @see #setThreadFactory 1424 */ 1425 public ThreadFactory getThreadFactory() { 1426 return threadFactory; 1427 } 1428 1429 /** 1430 * Sets a new handler for unexecutable tasks. 1431 * 1432 * @param handler the new handler 1433 * @throws NullPointerException if handler is null 1434 * @see #getRejectedExecutionHandler 1435 */ 1436 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1437 if (handler == null) 1438 throw new NullPointerException(); 1439 this.handler = handler; 1440 } 1441 1442 /** 1443 * Returns the current handler for unexecutable tasks. 1444 * 1445 * @return the current handler 1446 * @see #setRejectedExecutionHandler 1447 */ 1448 public RejectedExecutionHandler getRejectedExecutionHandler() { 1449 return handler; 1450 } 1451 1452 /** 1453 * Sets the core number of threads. This overrides any value set 1454 * in the constructor. If the new value is smaller than the 1455 * current value, excess existing threads will be terminated when 1456 * they next become idle. If larger, new threads will, if needed, 1457 * be started to execute any queued tasks. 1458 * 1459 * @param corePoolSize the new core size 1460 * @throws IllegalArgumentException if {@code corePoolSize < 0} 1461 * @see #getCorePoolSize 1462 */ 1463 public void setCorePoolSize(int corePoolSize) { 1464 if (corePoolSize < 0) 1465 throw new IllegalArgumentException(); 1466 int delta = corePoolSize - this.corePoolSize; 1467 this.corePoolSize = corePoolSize; 1468 if (workerCountOf(ctl.get()) > corePoolSize) 1469 interruptIdleWorkers(); 1470 else if (delta > 0) { 1471 // We don't really know how many new threads are "needed". 1472 // As a heuristic, prestart enough new workers (up to new 1473 // core size) to handle the current number of tasks in 1474 // queue, but stop if queue becomes empty while doing so. 1475 int k = Math.min(delta, workQueue.size()); 1476 while (k-- > 0 && addWorker(null, true)) { 1477 if (workQueue.isEmpty()) 1478 break; 1479 } 1480 } 1481 } 1482 1483 /** 1484 * Returns the core number of threads. 1485 * 1486 * @return the core number of threads 1487 * @see #setCorePoolSize 1488 */ 1489 public int getCorePoolSize() { 1490 return corePoolSize; 1491 } 1492 1493 /** 1494 * Starts a core thread, causing it to idly wait for work. This 1495 * overrides the default policy of starting core threads only when 1496 * new tasks are executed. This method will return {@code false} 1497 * if all core threads have already been started. 1498 * 1499 * @return {@code true} if a thread was started 1500 */ 1501 public boolean prestartCoreThread() { 1502 return workerCountOf(ctl.get()) < corePoolSize && 1503 addWorker(null, true); 1504 } 1505 1506 /** 1507 * Starts all core threads, causing them to idly wait for work. This 1508 * overrides the default policy of starting core threads only when 1509 * new tasks are executed. 1510 * 1511 * @return the number of threads started 1512 */ 1513 public int prestartAllCoreThreads() { 1514 int n = 0; 1515 while (addWorker(null, true)) 1516 ++n; 1517 return n; 1518 } 1519 1520 /** 1521 * Sets the maximum allowed number of threads. This overrides any 1522 * value set in the constructor. If the new value is smaller than 1523 * the current value, excess existing threads will be 1524 * terminated when they next become idle. 1525 * 1526 * @param maximumPoolSize the new maximum 1527 * @throws IllegalArgumentException if the new maximum is 1528 * less than or equal to zero, or 1529 * less than the {@linkplain #getCorePoolSize core pool size} 1530 * @see #getMaximumPoolSize 1531 */ 1532 public void setMaximumPoolSize(int maximumPoolSize) { 1533 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1534 throw new IllegalArgumentException(); 1535 this.maximumPoolSize = maximumPoolSize; 1536 if (workerCountOf(ctl.get()) > maximumPoolSize) 1537 interruptIdleWorkers(); 1538 } 1539 1540 /** 1541 * Returns the maximum allowed number of threads. 1542 * 1543 * @return the maximum allowed number of threads 1544 * @see #setMaximumPoolSize 1545 */ 1546 public int getMaximumPoolSize() { 1547 return maximumPoolSize; 1548 } 1549 1550 /** 1551 * Sets the time limit for which threads may remain idle before 1552 * being terminated. If there are more than the core number of 1553 * threads currently in the pool, after waiting this amount of 1554 * time without processing a task, excess threads will be 1555 * terminated. This overrides any value set in the constructor. 1556 * 1557 * @param time the time to wait. A time value of zero will cause 1558 * excess threads to terminate immediately after executing tasks. 1559 * @param unit the time unit of the {@code time} argument 1560 * @throws IllegalArgumentException if {@code time} less than zero or 1561 * if {@code time} is zero and {@code allowsCoreThreadTimeOut} 1562 * @see #getKeepAliveTime 1563 */ 1564 public void setKeepAliveTime(long time, TimeUnit unit) { 1565 if (time < 0) 1566 throw new IllegalArgumentException(); 1567 long keepAliveTime = unit.toNanos(time); 1568 long delta = keepAliveTime - this.keepAliveTime; 1569 this.keepAliveTime = keepAliveTime; 1570 if (delta < 0) 1571 interruptIdleWorkers(); 1572 } 1573 1574 /** 1575 * Returns the thread keep-alive time, which is the amount of time 1576 * that threads in excess of the core pool size may remain 1577 * idle before being terminated. 1578 * 1579 * @param unit the desired time unit of the result 1580 * @return the time limit 1581 * @see #setKeepAliveTime 1582 */ 1583 public long getKeepAliveTime(TimeUnit unit) { 1584 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1585 } 1586 1587 /* User-level queue utilities */ 1588 1589 /** 1590 * Returns the task queue used by this executor. Access to the 1591 * task queue is intended primarily for debugging and monitoring. 1592 * This queue may be in active use. Retrieving the task queue 1593 * does not prevent queued tasks from executing. 1594 * 1595 * @return the task queue 1596 */ 1597 public BlockingQueue<Runnable> getQueue() { 1598 return workQueue; 1599 } 1600 1601 /** 1602 * Removes this task from the executor's internal queue if it is 1603 * present, thus causing it not to be run if it has not already 1604 * started. 1605 * 1606 * <p> This method may be useful as one part of a cancellation 1607 * scheme. It may fail to remove tasks that have been converted 1608 * into other forms before being placed on the internal queue. For 1609 * example, a task entered using {@code submit} might be 1610 * converted into a form that maintains {@code Future} status. 1611 * However, in such cases, method {@link #purge} may be used to 1612 * remove those Futures that have been cancelled. 1613 * 1614 * @param task the task to remove 1615 * @return true if the task was removed 1616 */ 1617 public boolean remove(Runnable task) { 1618 boolean removed = workQueue.remove(task); 1619 tryTerminate(); // In case SHUTDOWN and now empty 1620 return removed; 1621 } 1622 1623 /** 1624 * Tries to remove from the work queue all {@link Future} 1625 * tasks that have been cancelled. This method can be useful as a 1626 * storage reclamation operation, that has no other impact on 1627 * functionality. Cancelled tasks are never executed, but may 1628 * accumulate in work queues until worker threads can actively 1629 * remove them. Invoking this method instead tries to remove them now. 1630 * However, this method may fail to remove tasks in 1631 * the presence of interference by other threads. 1632 */ 1633 public void purge() { 1634 final BlockingQueue<Runnable> q = workQueue; 1635 try { 1636 Iterator<Runnable> it = q.iterator(); 1637 while (it.hasNext()) { 1638 Runnable r = it.next(); 1639 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1640 it.remove(); 1641 } 1642 } catch (ConcurrentModificationException fallThrough) { 1643 // Take slow path if we encounter interference during traversal. 1644 // Make copy for traversal and call remove for cancelled entries. 1645 // The slow path is more likely to be O(N*N). 1646 for (Object r : q.toArray()) 1647 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1648 q.remove(r); 1649 } 1650 1651 tryTerminate(); // In case SHUTDOWN and now empty 1652 } 1653 1654 /* Statistics */ 1655 1656 /** 1657 * Returns the current number of threads in the pool. 1658 * 1659 * @return the number of threads 1660 */ 1661 public int getPoolSize() { 1662 final ReentrantLock mainLock = this.mainLock; 1663 mainLock.lock(); 1664 try { 1665 // Remove rare and surprising possibility of 1666 // isTerminated() && getPoolSize() > 0 1667 return runStateAtLeast(ctl.get(), TIDYING) ? 0 1668 : workers.size(); 1669 } finally { 1670 mainLock.unlock(); 1671 } 1672 } 1673 1674 /** 1675 * Returns the approximate number of threads that are actively 1676 * executing tasks. 1677 * 1678 * @return the number of threads 1679 */ 1680 public int getActiveCount() { 1681 final ReentrantLock mainLock = this.mainLock; 1682 mainLock.lock(); 1683 try { 1684 int n = 0; 1685 for (Worker w : workers) 1686 if (w.isLocked()) 1687 ++n; 1688 return n; 1689 } finally { 1690 mainLock.unlock(); 1691 } 1692 } 1693 1694 /** 1695 * Returns the largest number of threads that have ever 1696 * simultaneously been in the pool. 1697 * 1698 * @return the number of threads 1699 */ 1700 public int getLargestPoolSize() { 1701 final ReentrantLock mainLock = this.mainLock; 1702 mainLock.lock(); 1703 try { 1704 return largestPoolSize; 1705 } finally { 1706 mainLock.unlock(); 1707 } 1708 } 1709 1710 /** 1711 * Returns the approximate total number of tasks that have ever been 1712 * scheduled for execution. Because the states of tasks and 1713 * threads may change dynamically during computation, the returned 1714 * value is only an approximation. 1715 * 1716 * @return the number of tasks 1717 */ 1718 public long getTaskCount() { 1719 final ReentrantLock mainLock = this.mainLock; 1720 mainLock.lock(); 1721 try { 1722 long n = completedTaskCount; 1723 for (Worker w : workers) { 1724 n += w.completedTasks; 1725 if (w.isLocked()) 1726 ++n; 1727 } 1728 return n + workQueue.size(); 1729 } finally { 1730 mainLock.unlock(); 1731 } 1732 } 1733 1734 /** 1735 * Returns the approximate total number of tasks that have 1736 * completed execution. Because the states of tasks and threads 1737 * may change dynamically during computation, the returned value 1738 * is only an approximation, but one that does not ever decrease 1739 * across successive calls. 1740 * 1741 * @return the number of tasks 1742 */ 1743 public long getCompletedTaskCount() { 1744 final ReentrantLock mainLock = this.mainLock; 1745 mainLock.lock(); 1746 try { 1747 long n = completedTaskCount; 1748 for (Worker w : workers) 1749 n += w.completedTasks; 1750 return n; 1751 } finally { 1752 mainLock.unlock(); 1753 } 1754 } 1755 1756 /* Extension hooks */ 1757 1758 /** 1759 * Method invoked prior to executing the given Runnable in the 1760 * given thread. This method is invoked by thread {@code t} that 1761 * will execute task {@code r}, and may be used to re-initialize 1762 * ThreadLocals, or to perform logging. 1763 * 1764 * <p>This implementation does nothing, but may be customized in 1765 * subclasses. Note: To properly nest multiple overridings, subclasses 1766 * should generally invoke {@code super.beforeExecute} at the end of 1767 * this method. 1768 * 1769 * @param t the thread that will run task {@code r} 1770 * @param r the task that will be executed 1771 */ 1772 protected void beforeExecute(Thread t, Runnable r) { } 1773 1774 /** 1775 * Method invoked upon completion of execution of the given Runnable. 1776 * This method is invoked by the thread that executed the task. If 1777 * non-null, the Throwable is the uncaught {@code RuntimeException} 1778 * or {@code Error} that caused execution to terminate abruptly. 1779 * 1780 * <p>This implementation does nothing, but may be customized in 1781 * subclasses. Note: To properly nest multiple overridings, subclasses 1782 * should generally invoke {@code super.afterExecute} at the 1783 * beginning of this method. 1784 * 1785 * <p><b>Note:</b> When actions are enclosed in tasks (such as 1786 * {@link FutureTask}) either explicitly or via methods such as 1787 * {@code submit}, these task objects catch and maintain 1788 * computational exceptions, and so they do not cause abrupt 1789 * termination, and the internal exceptions are <em>not</em> 1790 * passed to this method. If you would like to trap both kinds of 1791 * failures in this method, you can further probe for such cases, 1792 * as in this sample subclass that prints either the direct cause 1793 * or the underlying exception if a task has been aborted: 1794 * 1795 * <pre> {@code 1796 * class ExtendedExecutor extends ThreadPoolExecutor { 1797 * // ... 1798 * protected void afterExecute(Runnable r, Throwable t) { 1799 * super.afterExecute(r, t); 1800 * if (t == null && r instanceof Future<?>) { 1801 * try { 1802 * Object result = ((Future<?>) r).get(); 1803 * } catch (CancellationException ce) { 1804 * t = ce; 1805 * } catch (ExecutionException ee) { 1806 * t = ee.getCause(); 1807 * } catch (InterruptedException ie) { 1808 * Thread.currentThread().interrupt(); // ignore/reset 1809 * } 1810 * } 1811 * if (t != null) 1812 * System.out.println(t); 1813 * } 1814 * }}</pre> 1815 * 1816 * @param r the runnable that has completed 1817 * @param t the exception that caused termination, or null if 1818 * execution completed normally 1819 */ 1820 protected void afterExecute(Runnable r, Throwable t) { } 1821 1822 /** 1823 * Method invoked when the Executor has terminated. Default 1824 * implementation does nothing. Note: To properly nest multiple 1825 * overridings, subclasses should generally invoke 1826 * {@code super.terminated} within this method. 1827 */ 1828 protected void terminated() { } 1829 1830 /* Predefined RejectedExecutionHandlers */ 1831 1832 /** 1833 * A handler for rejected tasks that runs the rejected task 1834 * directly in the calling thread of the {@code execute} method, 1835 * unless the executor has been shut down, in which case the task 1836 * is discarded. 1837 */ 1838 public static class CallerRunsPolicy implements RejectedExecutionHandler { 1839 /** 1840 * Creates a {@code CallerRunsPolicy}. 1841 */ 1842 public CallerRunsPolicy() { } 1843 1844 /** 1845 * Executes task r in the caller's thread, unless the executor 1846 * has been shut down, in which case the task is discarded. 1847 * 1848 * @param r the runnable task requested to be executed 1849 * @param e the executor attempting to execute this task 1850 */ 1851 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1852 if (!e.isShutdown()) { 1853 r.run(); 1854 } 1855 } 1856 } 1857 1858 /** 1859 * A handler for rejected tasks that throws a 1860 * {@code RejectedExecutionException}. 1861 */ 1862 public static class AbortPolicy implements RejectedExecutionHandler { 1863 /** 1864 * Creates an {@code AbortPolicy}. 1865 */ 1866 public AbortPolicy() { } 1867 1868 /** 1869 * Always throws RejectedExecutionException. 1870 * 1871 * @param r the runnable task requested to be executed 1872 * @param e the executor attempting to execute this task 1873 * @throws RejectedExecutionException always. 1874 */ 1875 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1876 throw new RejectedExecutionException(); 1877 } 1878 } 1879 1880 /** 1881 * A handler for rejected tasks that silently discards the 1882 * rejected task. 1883 */ 1884 public static class DiscardPolicy implements RejectedExecutionHandler { 1885 /** 1886 * Creates a {@code DiscardPolicy}. 1887 */ 1888 public DiscardPolicy() { } 1889 1890 /** 1891 * Does nothing, which has the effect of discarding task r. 1892 * 1893 * @param r the runnable task requested to be executed 1894 * @param e the executor attempting to execute this task 1895 */ 1896 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1897 } 1898 } 1899 1900 /** 1901 * A handler for rejected tasks that discards the oldest unhandled 1902 * request and then retries {@code execute}, unless the executor 1903 * is shut down, in which case the task is discarded. 1904 */ 1905 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 1906 /** 1907 * Creates a {@code DiscardOldestPolicy} for the given executor. 1908 */ 1909 public DiscardOldestPolicy() { } 1910 1911 /** 1912 * Obtains and ignores the next task that the executor 1913 * would otherwise execute, if one is immediately available, 1914 * and then retries execution of task r, unless the executor 1915 * is shut down, in which case task r is instead discarded. 1916 * 1917 * @param r the runnable task requested to be executed 1918 * @param e the executor attempting to execute this task 1919 */ 1920 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1921 if (!e.isShutdown()) { 1922 e.getQueue().poll(); 1923 e.execute(r); 1924 } 1925 } 1926 } 1927} 1928