ThreadPoolExecutor.java revision 8eb35c835be1345d3873a82cc9e42f944d698afd
11305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood/* 21305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * Written by Doug Lea with assistance from members of JCP JSR-166 31305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * Expert Group and released to the public domain, as explained at 41305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * http://creativecommons.org/licenses/publicdomain 51305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood */ 61305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood 71305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwoodpackage java.util.concurrent; 81305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwoodimport java.util.concurrent.locks.*; 91305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwoodimport java.util.concurrent.atomic.*; 101305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwoodimport java.util.*; 111305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood 121305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood/** 131305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * An {@link ExecutorService} that executes each submitted task using 141305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * one of possibly several pooled threads, normally configured 151305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * using {@link Executors} factory methods. 161305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * 171305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * <p>Thread pools address two different problems: they usually 181305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * provide improved performance when executing large numbers of 191305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * asynchronous tasks, due to reduced per-task invocation overhead, 201305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * and they provide a means of bounding and managing the resources, 211305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * including threads, consumed when executing a collection of tasks. 221305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * Each {@code ThreadPoolExecutor} also maintains some basic 231305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * statistics, such as the number of completed tasks. 241305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * 251305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * <p>To be useful across a wide range of contexts, this class 261305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * provides many adjustable parameters and extensibility 271305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * hooks. However, programmers are urged to use the more convenient 281305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * {@link Executors} factory methods {@link 291305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * Executors#newCachedThreadPool} (unbounded thread pool, with 301305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * automatic thread reclamation), {@link Executors#newFixedThreadPool} 311305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * (fixed size thread pool) and {@link 321305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * Executors#newSingleThreadExecutor} (single background thread), that 331305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * preconfigure settings for the most common usage 341305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * scenarios. Otherwise, use the following guide when manually 351305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * configuring and tuning this class: 361305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * 371305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * <dl> 381305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * 391305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * <dt>Core and maximum pool sizes</dt> 401305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * 411305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * <dd>A {@code ThreadPoolExecutor} will automatically adjust the 421305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * pool size (see {@link #getPoolSize}) 431305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * according to the bounds set by 441305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * corePoolSize (see {@link #getCorePoolSize}) and 451305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * maximumPoolSize (see {@link #getMaximumPoolSize}). 461305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * 471305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * When a new task is submitted in method {@link #execute}, and fewer 481305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * than corePoolSize threads are running, a new thread is created to 491305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * handle the request, even if other worker threads are idle. If 501305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * there are more than corePoolSize but less than maximumPoolSize 511305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * threads running, a new thread will be created only if the queue is 521305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * full. By setting corePoolSize and maximumPoolSize the same, you 531305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * create a fixed-size thread pool. By setting maximumPoolSize to an 541305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * essentially unbounded value such as {@code Integer.MAX_VALUE}, you 551305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * allow the pool to accommodate an arbitrary number of concurrent 561305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * tasks. Most typically, core and maximum pool sizes are set only 571305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * upon construction, but they may also be changed dynamically using 581305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd> 591305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * 601305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * <dt>On-demand construction</dt> 611305e95ba6ff9fa202d0818caf10405df4b0f648Mike Lockwood * 62 * <dd> By default, even core threads are initially created and 63 * started only when new tasks arrive, but this can be overridden 64 * dynamically using method {@link #prestartCoreThread} or {@link 65 * #prestartAllCoreThreads}. You probably want to prestart threads if 66 * you construct the pool with a non-empty queue. </dd> 67 * 68 * <dt>Creating new threads</dt> 69 * 70 * <dd>New threads are created using a {@link ThreadFactory}. If not 71 * otherwise specified, a {@link Executors#defaultThreadFactory} is 72 * used, that creates threads to all be in the same {@link 73 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and 74 * non-daemon status. By supplying a different ThreadFactory, you can 75 * alter the thread's name, thread group, priority, daemon status, 76 * etc. If a {@code ThreadFactory} fails to create a thread when asked 77 * by returning null from {@code newThread}, the executor will 78 * continue, but might not be able to execute any tasks. Threads 79 * should possess the "modifyThread" {@code RuntimePermission}. If 80 * worker threads or other threads using the pool do not possess this 81 * permission, service may be degraded: configuration changes may not 82 * take effect in a timely manner, and a shutdown pool may remain in a 83 * state in which termination is possible but not completed.</dd> 84 * 85 * <dt>Keep-alive times</dt> 86 * 87 * <dd>If the pool currently has more than corePoolSize threads, 88 * excess threads will be terminated if they have been idle for more 89 * than the keepAliveTime (see {@link #getKeepAliveTime}). This 90 * provides a means of reducing resource consumption when the pool is 91 * not being actively used. If the pool becomes more active later, new 92 * threads will be constructed. This parameter can also be changed 93 * dynamically using method {@link #setKeepAliveTime}. Using a value 94 * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively 95 * disables idle threads from ever terminating prior to shut down. By 96 * default, the keep-alive policy applies only when there are more 97 * than corePoolSizeThreads. But method {@link 98 * #allowCoreThreadTimeOut(boolean)} can be used to apply this 99 * time-out policy to core threads as well, so long as the 100 * keepAliveTime value is non-zero. </dd> 101 * 102 * <dt>Queuing</dt> 103 * 104 * <dd>Any {@link BlockingQueue} may be used to transfer and hold 105 * submitted tasks. The use of this queue interacts with pool sizing: 106 * 107 * <ul> 108 * 109 * <li> If fewer than corePoolSize threads are running, the Executor 110 * always prefers adding a new thread 111 * rather than queuing.</li> 112 * 113 * <li> If corePoolSize or more threads are running, the Executor 114 * always prefers queuing a request rather than adding a new 115 * thread.</li> 116 * 117 * <li> If a request cannot be queued, a new thread is created unless 118 * this would exceed maximumPoolSize, in which case, the task will be 119 * rejected.</li> 120 * 121 * </ul> 122 * 123 * There are three general strategies for queuing: 124 * <ol> 125 * 126 * <li> <em> Direct handoffs.</em> A good default choice for a work 127 * queue is a {@link SynchronousQueue} that hands off tasks to threads 128 * without otherwise holding them. Here, an attempt to queue a task 129 * will fail if no threads are immediately available to run it, so a 130 * new thread will be constructed. This policy avoids lockups when 131 * handling sets of requests that might have internal dependencies. 132 * Direct handoffs generally require unbounded maximumPoolSizes to 133 * avoid rejection of new submitted tasks. This in turn admits the 134 * possibility of unbounded thread growth when commands continue to 135 * arrive on average faster than they can be processed. </li> 136 * 137 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 138 * example a {@link LinkedBlockingQueue} without a predefined 139 * capacity) will cause new tasks to wait in the queue when all 140 * corePoolSize threads are busy. Thus, no more than corePoolSize 141 * threads will ever be created. (And the value of the maximumPoolSize 142 * therefore doesn't have any effect.) This may be appropriate when 143 * each task is completely independent of others, so tasks cannot 144 * affect each others execution; for example, in a web page server. 145 * While this style of queuing can be useful in smoothing out 146 * transient bursts of requests, it admits the possibility of 147 * unbounded work queue growth when commands continue to arrive on 148 * average faster than they can be processed. </li> 149 * 150 * <li><em>Bounded queues.</em> A bounded queue (for example, an 151 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 152 * used with finite maximumPoolSizes, but can be more difficult to 153 * tune and control. Queue sizes and maximum pool sizes may be traded 154 * off for each other: Using large queues and small pools minimizes 155 * CPU usage, OS resources, and context-switching overhead, but can 156 * lead to artificially low throughput. If tasks frequently block (for 157 * example if they are I/O bound), a system may be able to schedule 158 * time for more threads than you otherwise allow. Use of small queues 159 * generally requires larger pool sizes, which keeps CPUs busier but 160 * may encounter unacceptable scheduling overhead, which also 161 * decreases throughput. </li> 162 * 163 * </ol> 164 * 165 * </dd> 166 * 167 * <dt>Rejected tasks</dt> 168 * 169 * <dd> New tasks submitted in method {@link #execute} will be 170 * <em>rejected</em> when the Executor has been shut down, and also 171 * when the Executor uses finite bounds for both maximum threads and 172 * work queue capacity, and is saturated. In either case, the {@code 173 * execute} method invokes the {@link 174 * RejectedExecutionHandler#rejectedExecution} method of its {@link 175 * RejectedExecutionHandler}. Four predefined handler policies are 176 * provided: 177 * 178 * <ol> 179 * 180 * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the 181 * handler throws a runtime {@link RejectedExecutionException} upon 182 * rejection. </li> 183 * 184 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread 185 * that invokes {@code execute} itself runs the task. This provides a 186 * simple feedback control mechanism that will slow down the rate that 187 * new tasks are submitted. </li> 188 * 189 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that 190 * cannot be executed is simply dropped. </li> 191 * 192 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the 193 * executor is not shut down, the task at the head of the work queue 194 * is dropped, and then execution is retried (which can fail again, 195 * causing this to be repeated.) </li> 196 * 197 * </ol> 198 * 199 * It is possible to define and use other kinds of {@link 200 * RejectedExecutionHandler} classes. Doing so requires some care 201 * especially when policies are designed to work only under particular 202 * capacity or queuing policies. </dd> 203 * 204 * <dt>Hook methods</dt> 205 * 206 * <dd>This class provides {@code protected} overridable {@link 207 * #beforeExecute} and {@link #afterExecute} methods that are called 208 * before and after execution of each task. These can be used to 209 * manipulate the execution environment; for example, reinitializing 210 * ThreadLocals, gathering statistics, or adding log 211 * entries. Additionally, method {@link #terminated} can be overridden 212 * to perform any special processing that needs to be done once the 213 * Executor has fully terminated. 214 * 215 * <p>If hook or callback methods throw exceptions, internal worker 216 * threads may in turn fail and abruptly terminate.</dd> 217 * 218 * <dt>Queue maintenance</dt> 219 * 220 * <dd> Method {@link #getQueue} allows access to the work queue for 221 * purposes of monitoring and debugging. Use of this method for any 222 * other purpose is strongly discouraged. Two supplied methods, 223 * {@link #remove} and {@link #purge} are available to assist in 224 * storage reclamation when large numbers of queued tasks become 225 * cancelled.</dd> 226 * 227 * <dt>Finalization</dt> 228 * 229 * <dd> A pool that is no longer referenced in a program <em>AND</em> 230 * has no remaining threads will be {@code shutdown} automatically. If 231 * you would like to ensure that unreferenced pools are reclaimed even 232 * if users forget to call {@link #shutdown}, then you must arrange 233 * that unused threads eventually die, by setting appropriate 234 * keep-alive times, using a lower bound of zero core threads and/or 235 * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> 236 * 237 * </dl> 238 * 239 * <p> <b>Extension example</b>. Most extensions of this class 240 * override one or more of the protected hook methods. For example, 241 * here is a subclass that adds a simple pause/resume feature: 242 * 243 * <pre> {@code 244 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 245 * private boolean isPaused; 246 * private ReentrantLock pauseLock = new ReentrantLock(); 247 * private Condition unpaused = pauseLock.newCondition(); 248 * 249 * public PausableThreadPoolExecutor(...) { super(...); } 250 * 251 * protected void beforeExecute(Thread t, Runnable r) { 252 * super.beforeExecute(t, r); 253 * pauseLock.lock(); 254 * try { 255 * while (isPaused) unpaused.await(); 256 * } catch (InterruptedException ie) { 257 * t.interrupt(); 258 * } finally { 259 * pauseLock.unlock(); 260 * } 261 * } 262 * 263 * public void pause() { 264 * pauseLock.lock(); 265 * try { 266 * isPaused = true; 267 * } finally { 268 * pauseLock.unlock(); 269 * } 270 * } 271 * 272 * public void resume() { 273 * pauseLock.lock(); 274 * try { 275 * isPaused = false; 276 * unpaused.signalAll(); 277 * } finally { 278 * pauseLock.unlock(); 279 * } 280 * } 281 * }}</pre> 282 * 283 * @since 1.5 284 * @author Doug Lea 285 */ 286public class ThreadPoolExecutor extends AbstractExecutorService { 287 /** 288 * The main pool control state, ctl, is an atomic integer packing 289 * two conceptual fields 290 * workerCount, indicating the effective number of threads 291 * runState, indicating whether running, shutting down etc 292 * 293 * In order to pack them into one int, we limit workerCount to 294 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 295 * billion) otherwise representable. If this is ever an issue in 296 * the future, the variable can be changed to be an AtomicLong, 297 * and the shift/mask constants below adjusted. But until the need 298 * arises, this code is a bit faster and simpler using an int. 299 * 300 * The workerCount is the number of workers that have been 301 * permitted to start and not permitted to stop. The value may be 302 * transiently different from the actual number of live threads, 303 * for example when a ThreadFactory fails to create a thread when 304 * asked, and when exiting threads are still performing 305 * bookkeeping before terminating. The user-visible pool size is 306 * reported as the current size of the workers set. 307 * 308 * The runState provides the main lifecyle control, taking on values: 309 * 310 * RUNNING: Accept new tasks and process queued tasks 311 * SHUTDOWN: Don't accept new tasks, but process queued tasks 312 * STOP: Don't accept new tasks, don't process queued tasks, 313 * and interrupt in-progress tasks 314 * TIDYING: All tasks have terminated, workerCount is zero, 315 * the thread transitioning to state TIDYING 316 * will run the terminated() hook method 317 * TERMINATED: terminated() has completed 318 * 319 * The numerical order among these values matters, to allow 320 * ordered comparisons. The runState monotonically increases over 321 * time, but need not hit each state. The transitions are: 322 * 323 * RUNNING -> SHUTDOWN 324 * On invocation of shutdown(), perhaps implicitly in finalize() 325 * (RUNNING or SHUTDOWN) -> STOP 326 * On invocation of shutdownNow() 327 * SHUTDOWN -> TIDYING 328 * When both queue and pool are empty 329 * STOP -> TIDYING 330 * When pool is empty 331 * TIDYING -> TERMINATED 332 * When the terminated() hook method has completed 333 * 334 * Threads waiting in awaitTermination() will return when the 335 * state reaches TERMINATED. 336 * 337 * Detecting the transition from SHUTDOWN to TIDYING is less 338 * straightforward than you'd like because the queue may become 339 * empty after non-empty and vice versa during SHUTDOWN state, but 340 * we can only terminate if, after seeing that it is empty, we see 341 * that workerCount is 0 (which sometimes entails a recheck -- see 342 * below). 343 */ 344 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 345 private static final int COUNT_BITS = Integer.SIZE - 3; 346 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 347 348 // runState is stored in the high-order bits 349 private static final int RUNNING = -1 << COUNT_BITS; 350 private static final int SHUTDOWN = 0 << COUNT_BITS; 351 private static final int STOP = 1 << COUNT_BITS; 352 private static final int TIDYING = 2 << COUNT_BITS; 353 private static final int TERMINATED = 3 << COUNT_BITS; 354 355 // Packing and unpacking ctl 356 private static int runStateOf(int c) { return c & ~CAPACITY; } 357 private static int workerCountOf(int c) { return c & CAPACITY; } 358 private static int ctlOf(int rs, int wc) { return rs | wc; } 359 360 /* 361 * Bit field accessors that don't require unpacking ctl. 362 * These depend on the bit layout and on workerCount being never negative. 363 */ 364 365 private static boolean runStateLessThan(int c, int s) { 366 return c < s; 367 } 368 369 private static boolean runStateAtLeast(int c, int s) { 370 return c >= s; 371 } 372 373 private static boolean isRunning(int c) { 374 return c < SHUTDOWN; 375 } 376 377 /** 378 * Attempt to CAS-increment the workerCount field of ctl. 379 */ 380 private boolean compareAndIncrementWorkerCount(int expect) { 381 return ctl.compareAndSet(expect, expect + 1); 382 } 383 384 /** 385 * Attempt to CAS-decrement the workerCount field of ctl. 386 */ 387 private boolean compareAndDecrementWorkerCount(int expect) { 388 return ctl.compareAndSet(expect, expect - 1); 389 } 390 391 /** 392 * Decrements the workerCount field of ctl. This is called only on 393 * abrupt termination of a thread (see processWorkerExit). Other 394 * decrements are performed within getTask. 395 */ 396 private void decrementWorkerCount() { 397 do {} while (! compareAndDecrementWorkerCount(ctl.get())); 398 } 399 400 /** 401 * The queue used for holding tasks and handing off to worker 402 * threads. We do not require that workQueue.poll() returning 403 * null necessarily means that workQueue.isEmpty(), so rely 404 * solely on isEmpty to see if the queue is empty (which we must 405 * do for example when deciding whether to transition from 406 * SHUTDOWN to TIDYING). This accommodates special-purpose 407 * queues such as DelayQueues for which poll() is allowed to 408 * return null even if it may later return non-null when delays 409 * expire. 410 */ 411 private final BlockingQueue<Runnable> workQueue; 412 413 /** 414 * Lock held on access to workers set and related bookkeeping. 415 * While we could use a concurrent set of some sort, it turns out 416 * to be generally preferable to use a lock. Among the reasons is 417 * that this serializes interruptIdleWorkers, which avoids 418 * unnecessary interrupt storms, especially during shutdown. 419 * Otherwise exiting threads would concurrently interrupt those 420 * that have not yet interrupted. It also simplifies some of the 421 * associated statistics bookkeeping of largestPoolSize etc. We 422 * also hold mainLock on shutdown and shutdownNow, for the sake of 423 * ensuring workers set is stable while separately checking 424 * permission to interrupt and actually interrupting. 425 */ 426 private final ReentrantLock mainLock = new ReentrantLock(); 427 428 /** 429 * Set containing all worker threads in pool. Accessed only when 430 * holding mainLock. 431 */ 432 private final HashSet<Worker> workers = new HashSet<Worker>(); 433 434 /** 435 * Wait condition to support awaitTermination 436 */ 437 private final Condition termination = mainLock.newCondition(); 438 439 /** 440 * Tracks largest attained pool size. Accessed only under 441 * mainLock. 442 */ 443 private int largestPoolSize; 444 445 /** 446 * Counter for completed tasks. Updated only on termination of 447 * worker threads. Accessed only under mainLock. 448 */ 449 private long completedTaskCount; 450 451 /* 452 * All user control parameters are declared as volatiles so that 453 * ongoing actions are based on freshest values, but without need 454 * for locking, since no internal invariants depend on them 455 * changing synchronously with respect to other actions. 456 */ 457 458 /** 459 * Factory for new threads. All threads are created using this 460 * factory (via method addWorker). All callers must be prepared 461 * for addWorker to fail, which may reflect a system or user's 462 * policy limiting the number of threads. Even though it is not 463 * treated as an error, failure to create threads may result in 464 * new tasks being rejected or existing ones remaining stuck in 465 * the queue. On the other hand, no special precautions exist to 466 * handle OutOfMemoryErrors that might be thrown while trying to 467 * create threads, since there is generally no recourse from 468 * within this class. 469 */ 470 private volatile ThreadFactory threadFactory; 471 472 /** 473 * Handler called when saturated or shutdown in execute. 474 */ 475 private volatile RejectedExecutionHandler handler; 476 477 /** 478 * Timeout in nanoseconds for idle threads waiting for work. 479 * Threads use this timeout when there are more than corePoolSize 480 * present or if allowCoreThreadTimeOut. Otherwise they wait 481 * forever for new work. 482 */ 483 private volatile long keepAliveTime; 484 485 /** 486 * If false (default), core threads stay alive even when idle. 487 * If true, core threads use keepAliveTime to time out waiting 488 * for work. 489 */ 490 private volatile boolean allowCoreThreadTimeOut; 491 492 /** 493 * Core pool size is the minimum number of workers to keep alive 494 * (and not allow to time out etc) unless allowCoreThreadTimeOut 495 * is set, in which case the minimum is zero. 496 */ 497 private volatile int corePoolSize; 498 499 /** 500 * Maximum pool size. Note that the actual maximum is internally 501 * bounded by CAPACITY. 502 */ 503 private volatile int maximumPoolSize; 504 505 /** 506 * The default rejected execution handler 507 */ 508 private static final RejectedExecutionHandler defaultHandler = 509 new AbortPolicy(); 510 511 /** 512 * Permission required for callers of shutdown and shutdownNow. 513 * We additionally require (see checkShutdownAccess) that callers 514 * have permission to actually interrupt threads in the worker set 515 * (as governed by Thread.interrupt, which relies on 516 * ThreadGroup.checkAccess, which in turn relies on 517 * SecurityManager.checkAccess). Shutdowns are attempted only if 518 * these checks pass. 519 * 520 * All actual invocations of Thread.interrupt (see 521 * interruptIdleWorkers and interruptWorkers) ignore 522 * SecurityExceptions, meaning that the attempted interrupts 523 * silently fail. In the case of shutdown, they should not fail 524 * unless the SecurityManager has inconsistent policies, sometimes 525 * allowing access to a thread and sometimes not. In such cases, 526 * failure to actually interrupt threads may disable or delay full 527 * termination. Other uses of interruptIdleWorkers are advisory, 528 * and failure to actually interrupt will merely delay response to 529 * configuration changes so is not handled exceptionally. 530 */ 531 private static final RuntimePermission shutdownPerm = 532 new RuntimePermission("modifyThread"); 533 534 /** 535 * Class Worker mainly maintains interrupt control state for 536 * threads running tasks, along with other minor bookkeeping. 537 * This class opportunistically extends AbstractQueuedSynchronizer 538 * to simplify acquiring and releasing a lock surrounding each 539 * task execution. This protects against interrupts that are 540 * intended to wake up a worker thread waiting for a task from 541 * instead interrupting a task being run. We implement a simple 542 * non-reentrant mutual exclusion lock rather than use ReentrantLock 543 * because we do not want worker tasks to be able to reacquire the 544 * lock when they invoke pool control methods like setCorePoolSize. 545 */ 546 private final class Worker 547 extends AbstractQueuedSynchronizer 548 implements Runnable 549 { 550 /** 551 * This class will never be serialized, but we provide a 552 * serialVersionUID to suppress a javac warning. 553 */ 554 private static final long serialVersionUID = 6138294804551838833L; 555 556 /** Thread this worker is running in. Null if factory fails. */ 557 final Thread thread; 558 /** Initial task to run. Possibly null. */ 559 Runnable firstTask; 560 /** Per-thread task counter */ 561 volatile long completedTasks; 562 563 /** 564 * Creates with given first task and thread from ThreadFactory. 565 * @param firstTask the first task (null if none) 566 */ 567 Worker(Runnable firstTask) { 568 this.firstTask = firstTask; 569 this.thread = getThreadFactory().newThread(this); 570 } 571 572 /** Delegates main run loop to outer runWorker */ 573 public void run() { 574 runWorker(this); 575 } 576 577 // Lock methods 578 // 579 // The value 0 represents the unlocked state. 580 // The value 1 represents the locked state. 581 582 protected boolean isHeldExclusively() { 583 return getState() == 1; 584 } 585 586 protected boolean tryAcquire(int unused) { 587 if (compareAndSetState(0, 1)) { 588 setExclusiveOwnerThread(Thread.currentThread()); 589 return true; 590 } 591 return false; 592 } 593 594 protected boolean tryRelease(int unused) { 595 setExclusiveOwnerThread(null); 596 setState(0); 597 return true; 598 } 599 600 public void lock() { acquire(1); } 601 public boolean tryLock() { return tryAcquire(1); } 602 public void unlock() { release(1); } 603 public boolean isLocked() { return isHeldExclusively(); } 604 } 605 606 /* 607 * Methods for setting control state 608 */ 609 610 /** 611 * Transitions runState to given target, or leaves it alone if 612 * already at least the given target. 613 * 614 * @param targetState the desired state, either SHUTDOWN or STOP 615 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 616 */ 617 private void advanceRunState(int targetState) { 618 for (;;) { 619 int c = ctl.get(); 620 if (runStateAtLeast(c, targetState) || 621 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 622 break; 623 } 624 } 625 626 /** 627 * Transitions to TERMINATED state if either (SHUTDOWN and pool 628 * and queue empty) or (STOP and pool empty). If otherwise 629 * eligible to terminate but workerCount is nonzero, interrupts an 630 * idle worker to ensure that shutdown signals propagate. This 631 * method must be called following any action that might make 632 * termination possible -- reducing worker count or removing tasks 633 * from the queue during shutdown. The method is non-private to 634 * allow access from ScheduledThreadPoolExecutor. 635 */ 636 final void tryTerminate() { 637 for (;;) { 638 int c = ctl.get(); 639 if (isRunning(c) || 640 runStateAtLeast(c, TIDYING) || 641 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 642 return; 643 if (workerCountOf(c) != 0) { // Eligible to terminate 644 interruptIdleWorkers(ONLY_ONE); 645 return; 646 } 647 648 final ReentrantLock mainLock = this.mainLock; 649 mainLock.lock(); 650 try { 651 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 652 try { 653 terminated(); 654 } finally { 655 ctl.set(ctlOf(TERMINATED, 0)); 656 termination.signalAll(); 657 } 658 return; 659 } 660 } finally { 661 mainLock.unlock(); 662 } 663 // else retry on failed CAS 664 } 665 } 666 667 /* 668 * Methods for controlling interrupts to worker threads. 669 */ 670 671 /** 672 * If there is a security manager, makes sure caller has 673 * permission to shut down threads in general (see shutdownPerm). 674 * If this passes, additionally makes sure the caller is allowed 675 * to interrupt each worker thread. This might not be true even if 676 * first check passed, if the SecurityManager treats some threads 677 * specially. 678 */ 679 private void checkShutdownAccess() { 680 SecurityManager security = System.getSecurityManager(); 681 if (security != null) { 682 security.checkPermission(shutdownPerm); 683 final ReentrantLock mainLock = this.mainLock; 684 mainLock.lock(); 685 try { 686 for (Worker w : workers) 687 security.checkAccess(w.thread); 688 } finally { 689 mainLock.unlock(); 690 } 691 } 692 } 693 694 /** 695 * Interrupts all threads, even if active. Ignores SecurityExceptions 696 * (in which case some threads may remain uninterrupted). 697 */ 698 private void interruptWorkers() { 699 final ReentrantLock mainLock = this.mainLock; 700 mainLock.lock(); 701 try { 702 for (Worker w : workers) { 703 try { 704 w.thread.interrupt(); 705 } catch (SecurityException ignore) { 706 } 707 } 708 } finally { 709 mainLock.unlock(); 710 } 711 } 712 713 /** 714 * Interrupts threads that might be waiting for tasks (as 715 * indicated by not being locked) so they can check for 716 * termination or configuration changes. Ignores 717 * SecurityExceptions (in which case some threads may remain 718 * uninterrupted). 719 * 720 * @param onlyOne If true, interrupt at most one worker. This is 721 * called only from tryTerminate when termination is otherwise 722 * enabled but there are still other workers. In this case, at 723 * most one waiting worker is interrupted to propagate shutdown 724 * signals in case all threads are currently waiting. 725 * Interrupting any arbitrary thread ensures that newly arriving 726 * workers since shutdown began will also eventually exit. 727 * To guarantee eventual termination, it suffices to always 728 * interrupt only one idle worker, but shutdown() interrupts all 729 * idle workers so that redundant workers exit promptly, not 730 * waiting for a straggler task to finish. 731 */ 732 private void interruptIdleWorkers(boolean onlyOne) { 733 final ReentrantLock mainLock = this.mainLock; 734 mainLock.lock(); 735 try { 736 for (Worker w : workers) { 737 Thread t = w.thread; 738 if (!t.isInterrupted() && w.tryLock()) { 739 try { 740 t.interrupt(); 741 } catch (SecurityException ignore) { 742 } finally { 743 w.unlock(); 744 } 745 } 746 if (onlyOne) 747 break; 748 } 749 } finally { 750 mainLock.unlock(); 751 } 752 } 753 754 /** 755 * Common form of interruptIdleWorkers, to avoid having to 756 * remember what the boolean argument means. 757 */ 758 private void interruptIdleWorkers() { 759 interruptIdleWorkers(false); 760 } 761 762 private static final boolean ONLY_ONE = true; 763 764 /** 765 * Ensures that unless the pool is stopping, the current thread 766 * does not have its interrupt set. This requires a double-check 767 * of state in case the interrupt was cleared concurrently with a 768 * shutdownNow -- if so, the interrupt is re-enabled. 769 */ 770 private void clearInterruptsForTaskRun() { 771 if (runStateLessThan(ctl.get(), STOP) && 772 Thread.interrupted() && 773 runStateAtLeast(ctl.get(), STOP)) 774 Thread.currentThread().interrupt(); 775 } 776 777 /* 778 * Misc utilities, most of which are also exported to 779 * ScheduledThreadPoolExecutor 780 */ 781 782 /** 783 * Invokes the rejected execution handler for the given command. 784 * Package-protected for use by ScheduledThreadPoolExecutor. 785 */ 786 final void reject(Runnable command) { 787 handler.rejectedExecution(command, this); 788 } 789 790 /** 791 * Performs any further cleanup following run state transition on 792 * invocation of shutdown. A no-op here, but used by 793 * ScheduledThreadPoolExecutor to cancel delayed tasks. 794 */ 795 void onShutdown() { 796 } 797 798 /** 799 * State check needed by ScheduledThreadPoolExecutor to 800 * enable running tasks during shutdown. 801 * 802 * @param shutdownOK true if should return true if SHUTDOWN 803 */ 804 final boolean isRunningOrShutdown(boolean shutdownOK) { 805 int rs = runStateOf(ctl.get()); 806 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); 807 } 808 809 /** 810 * Drains the task queue into a new list, normally using 811 * drainTo. But if the queue is a DelayQueue or any other kind of 812 * queue for which poll or drainTo may fail to remove some 813 * elements, it deletes them one by one. 814 */ 815 private List<Runnable> drainQueue() { 816 BlockingQueue<Runnable> q = workQueue; 817 List<Runnable> taskList = new ArrayList<Runnable>(); 818 q.drainTo(taskList); 819 if (!q.isEmpty()) { 820 for (Runnable r : q.toArray(new Runnable[0])) { 821 if (q.remove(r)) 822 taskList.add(r); 823 } 824 } 825 return taskList; 826 } 827 828 /* 829 * Methods for creating, running and cleaning up after workers 830 */ 831 832 /** 833 * Checks if a new worker can be added with respect to current 834 * pool state and the given bound (either core or maximum). If so, 835 * the worker count is adjusted accordingly, and, if possible, a 836 * new worker is created and started running firstTask as its 837 * first task. This method returns false if the pool is stopped or 838 * eligible to shut down. It also returns false if the thread 839 * factory fails to create a thread when asked, which requires a 840 * backout of workerCount, and a recheck for termination, in case 841 * the existence of this worker was holding up termination. 842 * 843 * @param firstTask the task the new thread should run first (or 844 * null if none). Workers are created with an initial first task 845 * (in method execute()) to bypass queuing when there are fewer 846 * than corePoolSize threads (in which case we always start one), 847 * or when the queue is full (in which case we must bypass queue). 848 * Initially idle threads are usually created via 849 * prestartCoreThread or to replace other dying workers. 850 * 851 * @param core if true use corePoolSize as bound, else 852 * maximumPoolSize. (A boolean indicator is used here rather than a 853 * value to ensure reads of fresh values after checking other pool 854 * state). 855 * @return true if successful 856 */ 857 private boolean addWorker(Runnable firstTask, boolean core) { 858 retry: 859 for (;;) { 860 int c = ctl.get(); 861 int rs = runStateOf(c); 862 863 // Check if queue empty only if necessary. 864 if (rs >= SHUTDOWN && 865 ! (rs == SHUTDOWN && 866 firstTask == null && 867 ! workQueue.isEmpty())) 868 return false; 869 870 for (;;) { 871 int wc = workerCountOf(c); 872 if (wc >= CAPACITY || 873 wc >= (core ? corePoolSize : maximumPoolSize)) 874 return false; 875 if (compareAndIncrementWorkerCount(c)) 876 break retry; 877 c = ctl.get(); // Re-read ctl 878 if (runStateOf(c) != rs) 879 continue retry; 880 // else CAS failed due to workerCount change; retry inner loop 881 } 882 } 883 884 Worker w = new Worker(firstTask); 885 Thread t = w.thread; 886 887 final ReentrantLock mainLock = this.mainLock; 888 mainLock.lock(); 889 try { 890 // Recheck while holding lock. 891 // Back out on ThreadFactory failure or if 892 // shut down before lock acquired. 893 int c = ctl.get(); 894 int rs = runStateOf(c); 895 896 if (t == null || 897 (rs >= SHUTDOWN && 898 ! (rs == SHUTDOWN && 899 firstTask == null))) { 900 decrementWorkerCount(); 901 tryTerminate(); 902 return false; 903 } 904 905 workers.add(w); 906 907 int s = workers.size(); 908 if (s > largestPoolSize) 909 largestPoolSize = s; 910 } finally { 911 mainLock.unlock(); 912 } 913 914 t.start(); 915 // It is possible (but unlikely) for a thread to have been 916 // added to workers, but not yet started, during transition to 917 // STOP, which could result in a rare missed interrupt, 918 // because Thread.interrupt is not guaranteed to have any effect 919 // on a non-yet-started Thread (see Thread#interrupt). 920 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) 921 t.interrupt(); 922 923 return true; 924 } 925 926 /** 927 * Performs cleanup and bookkeeping for a dying worker. Called 928 * only from worker threads. Unless completedAbruptly is set, 929 * assumes that workerCount has already been adjusted to account 930 * for exit. This method removes thread from worker set, and 931 * possibly terminates the pool or replaces the worker if either 932 * it exited due to user task exception or if fewer than 933 * corePoolSize workers are running or queue is non-empty but 934 * there are no workers. 935 * 936 * @param w the worker 937 * @param completedAbruptly if the worker died due to user exception 938 */ 939 private void processWorkerExit(Worker w, boolean completedAbruptly) { 940 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 941 decrementWorkerCount(); 942 943 final ReentrantLock mainLock = this.mainLock; 944 mainLock.lock(); 945 try { 946 completedTaskCount += w.completedTasks; 947 workers.remove(w); 948 } finally { 949 mainLock.unlock(); 950 } 951 952 tryTerminate(); 953 954 int c = ctl.get(); 955 if (runStateLessThan(c, STOP)) { 956 if (!completedAbruptly) { 957 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 958 if (min == 0 && ! workQueue.isEmpty()) 959 min = 1; 960 if (workerCountOf(c) >= min) 961 return; // replacement not needed 962 } 963 addWorker(null, false); 964 } 965 } 966 967 /** 968 * Performs blocking or timed wait for a task, depending on 969 * current configuration settings, or returns null if this worker 970 * must exit because of any of: 971 * 1. There are more than maximumPoolSize workers (due to 972 * a call to setMaximumPoolSize). 973 * 2. The pool is stopped. 974 * 3. The pool is shutdown and the queue is empty. 975 * 4. This worker timed out waiting for a task, and timed-out 976 * workers are subject to termination (that is, 977 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 978 * both before and after the timed wait. 979 * 980 * @return task, or null if the worker must exit, in which case 981 * workerCount is decremented 982 */ 983 private Runnable getTask() { 984 boolean timedOut = false; // Did the last poll() time out? 985 986 retry: 987 for (;;) { 988 int c = ctl.get(); 989 int rs = runStateOf(c); 990 991 // Check if queue empty only if necessary. 992 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 993 decrementWorkerCount(); 994 return null; 995 } 996 997 boolean timed; // Are workers subject to culling? 998 999 for (;;) { 1000 int wc = workerCountOf(c); 1001 timed = allowCoreThreadTimeOut || wc > corePoolSize; 1002 1003 if (wc <= maximumPoolSize && ! (timedOut && timed)) 1004 break; 1005 if (compareAndDecrementWorkerCount(c)) 1006 return null; 1007 c = ctl.get(); // Re-read ctl 1008 if (runStateOf(c) != rs) 1009 continue retry; 1010 // else CAS failed due to workerCount change; retry inner loop 1011 } 1012 1013 try { 1014 Runnable r = timed ? 1015 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 1016 workQueue.take(); 1017 if (r != null) 1018 return r; 1019 timedOut = true; 1020 } catch (InterruptedException retry) { 1021 timedOut = false; 1022 } 1023 } 1024 } 1025 1026 /** 1027 * Main worker run loop. Repeatedly gets tasks from queue and 1028 * executes them, while coping with a number of issues: 1029 * 1030 * 1. We may start out with an initial task, in which case we 1031 * don't need to get the first one. Otherwise, as long as pool is 1032 * running, we get tasks from getTask. If it returns null then the 1033 * worker exits due to changed pool state or configuration 1034 * parameters. Other exits result from exception throws in 1035 * external code, in which case completedAbruptly holds, which 1036 * usually leads processWorkerExit to replace this thread. 1037 * 1038 * 2. Before running any task, the lock is acquired to prevent 1039 * other pool interrupts while the task is executing, and 1040 * clearInterruptsForTaskRun called to ensure that unless pool is 1041 * stopping, this thread does not have its interrupt set. 1042 * 1043 * 3. Each task run is preceded by a call to beforeExecute, which 1044 * might throw an exception, in which case we cause thread to die 1045 * (breaking loop with completedAbruptly true) without processing 1046 * the task. 1047 * 1048 * 4. Assuming beforeExecute completes normally, we run the task, 1049 * gathering any of its thrown exceptions to send to 1050 * afterExecute. We separately handle RuntimeException, Error 1051 * (both of which the specs guarantee that we trap) and arbitrary 1052 * Throwables. Because we cannot rethrow Throwables within 1053 * Runnable.run, we wrap them within Errors on the way out (to the 1054 * thread's UncaughtExceptionHandler). Any thrown exception also 1055 * conservatively causes thread to die. 1056 * 1057 * 5. After task.run completes, we call afterExecute, which may 1058 * also throw an exception, which will also cause thread to 1059 * die. According to JLS Sec 14.20, this exception is the one that 1060 * will be in effect even if task.run throws. 1061 * 1062 * The net effect of the exception mechanics is that afterExecute 1063 * and the thread's UncaughtExceptionHandler have as accurate 1064 * information as we can provide about any problems encountered by 1065 * user code. 1066 * 1067 * @param w the worker 1068 */ 1069 final void runWorker(Worker w) { 1070 Runnable task = w.firstTask; 1071 w.firstTask = null; 1072 boolean completedAbruptly = true; 1073 try { 1074 while (task != null || (task = getTask()) != null) { 1075 w.lock(); 1076 clearInterruptsForTaskRun(); 1077 try { 1078 beforeExecute(w.thread, task); 1079 Throwable thrown = null; 1080 try { 1081 task.run(); 1082 } catch (RuntimeException x) { 1083 thrown = x; throw x; 1084 } catch (Error x) { 1085 thrown = x; throw x; 1086 } catch (Throwable x) { 1087 thrown = x; throw new Error(x); 1088 } finally { 1089 afterExecute(task, thrown); 1090 } 1091 } finally { 1092 task = null; 1093 w.completedTasks++; 1094 w.unlock(); 1095 } 1096 } 1097 completedAbruptly = false; 1098 } finally { 1099 processWorkerExit(w, completedAbruptly); 1100 } 1101 } 1102 1103 // Public constructors and methods 1104 1105 /** 1106 * Creates a new {@code ThreadPoolExecutor} with the given initial 1107 * parameters and default thread factory and rejected execution handler. 1108 * It may be more convenient to use one of the {@link Executors} factory 1109 * methods instead of this general purpose constructor. 1110 * 1111 * @param corePoolSize the number of threads to keep in the pool, even 1112 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1113 * @param maximumPoolSize the maximum number of threads to allow in the 1114 * pool 1115 * @param keepAliveTime when the number of threads is greater than 1116 * the core, this is the maximum time that excess idle threads 1117 * will wait for new tasks before terminating. 1118 * @param unit the time unit for the {@code keepAliveTime} argument 1119 * @param workQueue the queue to use for holding tasks before they are 1120 * executed. This queue will hold only the {@code Runnable} 1121 * tasks submitted by the {@code execute} method. 1122 * @throws IllegalArgumentException if one of the following holds:<br> 1123 * {@code corePoolSize < 0}<br> 1124 * {@code keepAliveTime < 0}<br> 1125 * {@code maximumPoolSize <= 0}<br> 1126 * {@code maximumPoolSize < corePoolSize} 1127 * @throws NullPointerException if {@code workQueue} is null 1128 */ 1129 public ThreadPoolExecutor(int corePoolSize, 1130 int maximumPoolSize, 1131 long keepAliveTime, 1132 TimeUnit unit, 1133 BlockingQueue<Runnable> workQueue) { 1134 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1135 Executors.defaultThreadFactory(), defaultHandler); 1136 } 1137 1138 /** 1139 * Creates a new {@code ThreadPoolExecutor} with the given initial 1140 * parameters and default rejected execution handler. 1141 * 1142 * @param corePoolSize the number of threads to keep in the pool, even 1143 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1144 * @param maximumPoolSize the maximum number of threads to allow in the 1145 * pool 1146 * @param keepAliveTime when the number of threads is greater than 1147 * the core, this is the maximum time that excess idle threads 1148 * will wait for new tasks before terminating. 1149 * @param unit the time unit for the {@code keepAliveTime} argument 1150 * @param workQueue the queue to use for holding tasks before they are 1151 * executed. This queue will hold only the {@code Runnable} 1152 * tasks submitted by the {@code execute} method. 1153 * @param threadFactory the factory to use when the executor 1154 * creates a new thread 1155 * @throws IllegalArgumentException if one of the following holds:<br> 1156 * {@code corePoolSize < 0}<br> 1157 * {@code keepAliveTime < 0}<br> 1158 * {@code maximumPoolSize <= 0}<br> 1159 * {@code maximumPoolSize < corePoolSize} 1160 * @throws NullPointerException if {@code workQueue} 1161 * or {@code threadFactory} is null 1162 */ 1163 public ThreadPoolExecutor(int corePoolSize, 1164 int maximumPoolSize, 1165 long keepAliveTime, 1166 TimeUnit unit, 1167 BlockingQueue<Runnable> workQueue, 1168 ThreadFactory threadFactory) { 1169 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1170 threadFactory, defaultHandler); 1171 } 1172 1173 /** 1174 * Creates a new {@code ThreadPoolExecutor} with the given initial 1175 * parameters and default thread factory. 1176 * 1177 * @param corePoolSize the number of threads to keep in the pool, even 1178 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1179 * @param maximumPoolSize the maximum number of threads to allow in the 1180 * pool 1181 * @param keepAliveTime when the number of threads is greater than 1182 * the core, this is the maximum time that excess idle threads 1183 * will wait for new tasks before terminating. 1184 * @param unit the time unit for the {@code keepAliveTime} argument 1185 * @param workQueue the queue to use for holding tasks before they are 1186 * executed. This queue will hold only the {@code Runnable} 1187 * tasks submitted by the {@code execute} method. 1188 * @param handler the handler to use when execution is blocked 1189 * because the thread bounds and queue capacities are reached 1190 * @throws IllegalArgumentException if one of the following holds:<br> 1191 * {@code corePoolSize < 0}<br> 1192 * {@code keepAliveTime < 0}<br> 1193 * {@code maximumPoolSize <= 0}<br> 1194 * {@code maximumPoolSize < corePoolSize} 1195 * @throws NullPointerException if {@code workQueue} 1196 * or {@code handler} is null 1197 */ 1198 public ThreadPoolExecutor(int corePoolSize, 1199 int maximumPoolSize, 1200 long keepAliveTime, 1201 TimeUnit unit, 1202 BlockingQueue<Runnable> workQueue, 1203 RejectedExecutionHandler handler) { 1204 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1205 Executors.defaultThreadFactory(), handler); 1206 } 1207 1208 /** 1209 * Creates a new {@code ThreadPoolExecutor} with the given initial 1210 * parameters. 1211 * 1212 * @param corePoolSize the number of threads to keep in the pool, even 1213 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1214 * @param maximumPoolSize the maximum number of threads to allow in the 1215 * pool 1216 * @param keepAliveTime when the number of threads is greater than 1217 * the core, this is the maximum time that excess idle threads 1218 * will wait for new tasks before terminating. 1219 * @param unit the time unit for the {@code keepAliveTime} argument 1220 * @param workQueue the queue to use for holding tasks before they are 1221 * executed. This queue will hold only the {@code Runnable} 1222 * tasks submitted by the {@code execute} method. 1223 * @param threadFactory the factory to use when the executor 1224 * creates a new thread 1225 * @param handler the handler to use when execution is blocked 1226 * because the thread bounds and queue capacities are reached 1227 * @throws IllegalArgumentException if one of the following holds:<br> 1228 * {@code corePoolSize < 0}<br> 1229 * {@code keepAliveTime < 0}<br> 1230 * {@code maximumPoolSize <= 0}<br> 1231 * {@code maximumPoolSize < corePoolSize} 1232 * @throws NullPointerException if {@code workQueue} 1233 * or {@code threadFactory} or {@code handler} is null 1234 */ 1235 public ThreadPoolExecutor(int corePoolSize, 1236 int maximumPoolSize, 1237 long keepAliveTime, 1238 TimeUnit unit, 1239 BlockingQueue<Runnable> workQueue, 1240 ThreadFactory threadFactory, 1241 RejectedExecutionHandler handler) { 1242 if (corePoolSize < 0 || 1243 maximumPoolSize <= 0 || 1244 maximumPoolSize < corePoolSize || 1245 keepAliveTime < 0) 1246 throw new IllegalArgumentException(); 1247 if (workQueue == null || threadFactory == null || handler == null) 1248 throw new NullPointerException(); 1249 this.corePoolSize = corePoolSize; 1250 this.maximumPoolSize = maximumPoolSize; 1251 this.workQueue = workQueue; 1252 this.keepAliveTime = unit.toNanos(keepAliveTime); 1253 this.threadFactory = threadFactory; 1254 this.handler = handler; 1255 } 1256 1257 /** 1258 * Executes the given task sometime in the future. The task 1259 * may execute in a new thread or in an existing pooled thread. 1260 * 1261 * If the task cannot be submitted for execution, either because this 1262 * executor has been shutdown or because its capacity has been reached, 1263 * the task is handled by the current {@code RejectedExecutionHandler}. 1264 * 1265 * @param command the task to execute 1266 * @throws RejectedExecutionException at discretion of 1267 * {@code RejectedExecutionHandler}, if the task 1268 * cannot be accepted for execution 1269 * @throws NullPointerException if {@code command} is null 1270 */ 1271 public void execute(Runnable command) { 1272 if (command == null) 1273 throw new NullPointerException(); 1274 /* 1275 * Proceed in 3 steps: 1276 * 1277 * 1. If fewer than corePoolSize threads are running, try to 1278 * start a new thread with the given command as its first 1279 * task. The call to addWorker atomically checks runState and 1280 * workerCount, and so prevents false alarms that would add 1281 * threads when it shouldn't, by returning false. 1282 * 1283 * 2. If a task can be successfully queued, then we still need 1284 * to double-check whether we should have added a thread 1285 * (because existing ones died since last checking) or that 1286 * the pool shut down since entry into this method. So we 1287 * recheck state and if necessary roll back the enqueuing if 1288 * stopped, or start a new thread if there are none. 1289 * 1290 * 3. If we cannot queue task, then we try to add a new 1291 * thread. If it fails, we know we are shut down or saturated 1292 * and so reject the task. 1293 */ 1294 int c = ctl.get(); 1295 if (workerCountOf(c) < corePoolSize) { 1296 if (addWorker(command, true)) 1297 return; 1298 c = ctl.get(); 1299 } 1300 if (isRunning(c) && workQueue.offer(command)) { 1301 int recheck = ctl.get(); 1302 if (! isRunning(recheck) && remove(command)) 1303 reject(command); 1304 else if (workerCountOf(recheck) == 0) 1305 addWorker(null, false); 1306 } 1307 else if (!addWorker(command, false)) 1308 reject(command); 1309 } 1310 1311 /** 1312 * Initiates an orderly shutdown in which previously submitted 1313 * tasks are executed, but no new tasks will be accepted. 1314 * Invocation has no additional effect if already shut down. 1315 * 1316 * <p>This method does not wait for previously submitted tasks to 1317 * complete execution. Use {@link #awaitTermination awaitTermination} 1318 * to do that. 1319 * 1320 * @throws SecurityException {@inheritDoc} 1321 */ 1322 public void shutdown() { 1323 final ReentrantLock mainLock = this.mainLock; 1324 mainLock.lock(); 1325 try { 1326 checkShutdownAccess(); 1327 advanceRunState(SHUTDOWN); 1328 interruptIdleWorkers(); 1329 onShutdown(); // hook for ScheduledThreadPoolExecutor 1330 } finally { 1331 mainLock.unlock(); 1332 } 1333 tryTerminate(); 1334 } 1335 1336 /** 1337 * Attempts to stop all actively executing tasks, halts the 1338 * processing of waiting tasks, and returns a list of the tasks 1339 * that were awaiting execution. These tasks are drained (removed) 1340 * from the task queue upon return from this method. 1341 * 1342 * <p>This method does not wait for actively executing tasks to 1343 * terminate. Use {@link #awaitTermination awaitTermination} to 1344 * do that. 1345 * 1346 * <p>There are no guarantees beyond best-effort attempts to stop 1347 * processing actively executing tasks. This implementation 1348 * cancels tasks via {@link Thread#interrupt}, so any task that 1349 * fails to respond to interrupts may never terminate. 1350 * 1351 * @throws SecurityException {@inheritDoc} 1352 */ 1353 public List<Runnable> shutdownNow() { 1354 List<Runnable> tasks; 1355 final ReentrantLock mainLock = this.mainLock; 1356 mainLock.lock(); 1357 try { 1358 checkShutdownAccess(); 1359 advanceRunState(STOP); 1360 interruptWorkers(); 1361 tasks = drainQueue(); 1362 } finally { 1363 mainLock.unlock(); 1364 } 1365 tryTerminate(); 1366 return tasks; 1367 } 1368 1369 public boolean isShutdown() { 1370 return ! isRunning(ctl.get()); 1371 } 1372 1373 /** 1374 * Returns true if this executor is in the process of terminating 1375 * after {@link #shutdown} or {@link #shutdownNow} but has not 1376 * completely terminated. This method may be useful for 1377 * debugging. A return of {@code true} reported a sufficient 1378 * period after shutdown may indicate that submitted tasks have 1379 * ignored or suppressed interruption, causing this executor not 1380 * to properly terminate. 1381 * 1382 * @return true if terminating but not yet terminated 1383 */ 1384 public boolean isTerminating() { 1385 int c = ctl.get(); 1386 return ! isRunning(c) && runStateLessThan(c, TERMINATED); 1387 } 1388 1389 public boolean isTerminated() { 1390 return runStateAtLeast(ctl.get(), TERMINATED); 1391 } 1392 1393 public boolean awaitTermination(long timeout, TimeUnit unit) 1394 throws InterruptedException { 1395 long nanos = unit.toNanos(timeout); 1396 final ReentrantLock mainLock = this.mainLock; 1397 mainLock.lock(); 1398 try { 1399 for (;;) { 1400 if (runStateAtLeast(ctl.get(), TERMINATED)) 1401 return true; 1402 if (nanos <= 0) 1403 return false; 1404 nanos = termination.awaitNanos(nanos); 1405 } 1406 } finally { 1407 mainLock.unlock(); 1408 } 1409 } 1410 1411 /** 1412 * Invokes {@code shutdown} when this executor is no longer 1413 * referenced and it has no threads. 1414 */ 1415 protected void finalize() { 1416 shutdown(); 1417 } 1418 1419 /** 1420 * Sets the thread factory used to create new threads. 1421 * 1422 * @param threadFactory the new thread factory 1423 * @throws NullPointerException if threadFactory is null 1424 * @see #getThreadFactory 1425 */ 1426 public void setThreadFactory(ThreadFactory threadFactory) { 1427 if (threadFactory == null) 1428 throw new NullPointerException(); 1429 this.threadFactory = threadFactory; 1430 } 1431 1432 /** 1433 * Returns the thread factory used to create new threads. 1434 * 1435 * @return the current thread factory 1436 * @see #setThreadFactory 1437 */ 1438 public ThreadFactory getThreadFactory() { 1439 return threadFactory; 1440 } 1441 1442 /** 1443 * Sets a new handler for unexecutable tasks. 1444 * 1445 * @param handler the new handler 1446 * @throws NullPointerException if handler is null 1447 * @see #getRejectedExecutionHandler 1448 */ 1449 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1450 if (handler == null) 1451 throw new NullPointerException(); 1452 this.handler = handler; 1453 } 1454 1455 /** 1456 * Returns the current handler for unexecutable tasks. 1457 * 1458 * @return the current handler 1459 * @see #setRejectedExecutionHandler 1460 */ 1461 public RejectedExecutionHandler getRejectedExecutionHandler() { 1462 return handler; 1463 } 1464 1465 /** 1466 * Sets the core number of threads. This overrides any value set 1467 * in the constructor. If the new value is smaller than the 1468 * current value, excess existing threads will be terminated when 1469 * they next become idle. If larger, new threads will, if needed, 1470 * be started to execute any queued tasks. 1471 * 1472 * @param corePoolSize the new core size 1473 * @throws IllegalArgumentException if {@code corePoolSize < 0} 1474 * @see #getCorePoolSize 1475 */ 1476 public void setCorePoolSize(int corePoolSize) { 1477 if (corePoolSize < 0) 1478 throw new IllegalArgumentException(); 1479 int delta = corePoolSize - this.corePoolSize; 1480 this.corePoolSize = corePoolSize; 1481 if (workerCountOf(ctl.get()) > corePoolSize) 1482 interruptIdleWorkers(); 1483 else if (delta > 0) { 1484 // We don't really know how many new threads are "needed". 1485 // As a heuristic, prestart enough new workers (up to new 1486 // core size) to handle the current number of tasks in 1487 // queue, but stop if queue becomes empty while doing so. 1488 int k = Math.min(delta, workQueue.size()); 1489 while (k-- > 0 && addWorker(null, true)) { 1490 if (workQueue.isEmpty()) 1491 break; 1492 } 1493 } 1494 } 1495 1496 /** 1497 * Returns the core number of threads. 1498 * 1499 * @return the core number of threads 1500 * @see #setCorePoolSize 1501 */ 1502 public int getCorePoolSize() { 1503 return corePoolSize; 1504 } 1505 1506 /** 1507 * Starts a core thread, causing it to idly wait for work. This 1508 * overrides the default policy of starting core threads only when 1509 * new tasks are executed. This method will return {@code false} 1510 * if all core threads have already been started. 1511 * 1512 * @return {@code true} if a thread was started 1513 */ 1514 public boolean prestartCoreThread() { 1515 return workerCountOf(ctl.get()) < corePoolSize && 1516 addWorker(null, true); 1517 } 1518 1519 /** 1520 * Starts all core threads, causing them to idly wait for work. This 1521 * overrides the default policy of starting core threads only when 1522 * new tasks are executed. 1523 * 1524 * @return the number of threads started 1525 */ 1526 public int prestartAllCoreThreads() { 1527 int n = 0; 1528 while (addWorker(null, true)) 1529 ++n; 1530 return n; 1531 } 1532 1533 /** 1534 * Returns true if this pool allows core threads to time out and 1535 * terminate if no tasks arrive within the keepAlive time, being 1536 * replaced if needed when new tasks arrive. When true, the same 1537 * keep-alive policy applying to non-core threads applies also to 1538 * core threads. When false (the default), core threads are never 1539 * terminated due to lack of incoming tasks. 1540 * 1541 * @return {@code true} if core threads are allowed to time out, 1542 * else {@code false} 1543 * 1544 * @since 1.6 1545 */ 1546 public boolean allowsCoreThreadTimeOut() { 1547 return allowCoreThreadTimeOut; 1548 } 1549 1550 /** 1551 * Sets the policy governing whether core threads may time out and 1552 * terminate if no tasks arrive within the keep-alive time, being 1553 * replaced if needed when new tasks arrive. When false, core 1554 * threads are never terminated due to lack of incoming 1555 * tasks. When true, the same keep-alive policy applying to 1556 * non-core threads applies also to core threads. To avoid 1557 * continual thread replacement, the keep-alive time must be 1558 * greater than zero when setting {@code true}. This method 1559 * should in general be called before the pool is actively used. 1560 * 1561 * @param value {@code true} if should time out, else {@code false} 1562 * @throws IllegalArgumentException if value is {@code true} 1563 * and the current keep-alive time is not greater than zero 1564 * 1565 * @since 1.6 1566 */ 1567 public void allowCoreThreadTimeOut(boolean value) { 1568 if (value && keepAliveTime <= 0) 1569 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1570 if (value != allowCoreThreadTimeOut) { 1571 allowCoreThreadTimeOut = value; 1572 if (value) 1573 interruptIdleWorkers(); 1574 } 1575 } 1576 1577 /** 1578 * Sets the maximum allowed number of threads. This overrides any 1579 * value set in the constructor. If the new value is smaller than 1580 * the current value, excess existing threads will be 1581 * terminated when they next become idle. 1582 * 1583 * @param maximumPoolSize the new maximum 1584 * @throws IllegalArgumentException if the new maximum is 1585 * less than or equal to zero, or 1586 * less than the {@linkplain #getCorePoolSize core pool size} 1587 * @see #getMaximumPoolSize 1588 */ 1589 public void setMaximumPoolSize(int maximumPoolSize) { 1590 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1591 throw new IllegalArgumentException(); 1592 this.maximumPoolSize = maximumPoolSize; 1593 if (workerCountOf(ctl.get()) > maximumPoolSize) 1594 interruptIdleWorkers(); 1595 } 1596 1597 /** 1598 * Returns the maximum allowed number of threads. 1599 * 1600 * @return the maximum allowed number of threads 1601 * @see #setMaximumPoolSize 1602 */ 1603 public int getMaximumPoolSize() { 1604 return maximumPoolSize; 1605 } 1606 1607 /** 1608 * Sets the time limit for which threads may remain idle before 1609 * being terminated. If there are more than the core number of 1610 * threads currently in the pool, after waiting this amount of 1611 * time without processing a task, excess threads will be 1612 * terminated. This overrides any value set in the constructor. 1613 * 1614 * @param time the time to wait. A time value of zero will cause 1615 * excess threads to terminate immediately after executing tasks. 1616 * @param unit the time unit of the {@code time} argument 1617 * @throws IllegalArgumentException if {@code time} less than zero or 1618 * if {@code time} is zero and {@code allowsCoreThreadTimeOut} 1619 * @see #getKeepAliveTime 1620 */ 1621 public void setKeepAliveTime(long time, TimeUnit unit) { 1622 if (time < 0) 1623 throw new IllegalArgumentException(); 1624 if (time == 0 && allowsCoreThreadTimeOut()) 1625 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1626 long keepAliveTime = unit.toNanos(time); 1627 long delta = keepAliveTime - this.keepAliveTime; 1628 this.keepAliveTime = keepAliveTime; 1629 if (delta < 0) 1630 interruptIdleWorkers(); 1631 } 1632 1633 /** 1634 * Returns the thread keep-alive time, which is the amount of time 1635 * that threads in excess of the core pool size may remain 1636 * idle before being terminated. 1637 * 1638 * @param unit the desired time unit of the result 1639 * @return the time limit 1640 * @see #setKeepAliveTime 1641 */ 1642 public long getKeepAliveTime(TimeUnit unit) { 1643 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1644 } 1645 1646 /* User-level queue utilities */ 1647 1648 /** 1649 * Returns the task queue used by this executor. Access to the 1650 * task queue is intended primarily for debugging and monitoring. 1651 * This queue may be in active use. Retrieving the task queue 1652 * does not prevent queued tasks from executing. 1653 * 1654 * @return the task queue 1655 */ 1656 public BlockingQueue<Runnable> getQueue() { 1657 return workQueue; 1658 } 1659 1660 /** 1661 * Removes this task from the executor's internal queue if it is 1662 * present, thus causing it not to be run if it has not already 1663 * started. 1664 * 1665 * <p> This method may be useful as one part of a cancellation 1666 * scheme. It may fail to remove tasks that have been converted 1667 * into other forms before being placed on the internal queue. For 1668 * example, a task entered using {@code submit} might be 1669 * converted into a form that maintains {@code Future} status. 1670 * However, in such cases, method {@link #purge} may be used to 1671 * remove those Futures that have been cancelled. 1672 * 1673 * @param task the task to remove 1674 * @return true if the task was removed 1675 */ 1676 public boolean remove(Runnable task) { 1677 boolean removed = workQueue.remove(task); 1678 tryTerminate(); // In case SHUTDOWN and now empty 1679 return removed; 1680 } 1681 1682 /** 1683 * Tries to remove from the work queue all {@link Future} 1684 * tasks that have been cancelled. This method can be useful as a 1685 * storage reclamation operation, that has no other impact on 1686 * functionality. Cancelled tasks are never executed, but may 1687 * accumulate in work queues until worker threads can actively 1688 * remove them. Invoking this method instead tries to remove them now. 1689 * However, this method may fail to remove tasks in 1690 * the presence of interference by other threads. 1691 */ 1692 public void purge() { 1693 final BlockingQueue<Runnable> q = workQueue; 1694 try { 1695 Iterator<Runnable> it = q.iterator(); 1696 while (it.hasNext()) { 1697 Runnable r = it.next(); 1698 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1699 it.remove(); 1700 } 1701 } catch (ConcurrentModificationException fallThrough) { 1702 // Take slow path if we encounter interference during traversal. 1703 // Make copy for traversal and call remove for cancelled entries. 1704 // The slow path is more likely to be O(N*N). 1705 for (Object r : q.toArray()) 1706 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1707 q.remove(r); 1708 } 1709 1710 tryTerminate(); // In case SHUTDOWN and now empty 1711 } 1712 1713 /* Statistics */ 1714 1715 /** 1716 * Returns the current number of threads in the pool. 1717 * 1718 * @return the number of threads 1719 */ 1720 public int getPoolSize() { 1721 final ReentrantLock mainLock = this.mainLock; 1722 mainLock.lock(); 1723 try { 1724 // Remove rare and surprising possibility of 1725 // isTerminated() && getPoolSize() > 0 1726 return runStateAtLeast(ctl.get(), TIDYING) ? 0 1727 : workers.size(); 1728 } finally { 1729 mainLock.unlock(); 1730 } 1731 } 1732 1733 /** 1734 * Returns the approximate number of threads that are actively 1735 * executing tasks. 1736 * 1737 * @return the number of threads 1738 */ 1739 public int getActiveCount() { 1740 final ReentrantLock mainLock = this.mainLock; 1741 mainLock.lock(); 1742 try { 1743 int n = 0; 1744 for (Worker w : workers) 1745 if (w.isLocked()) 1746 ++n; 1747 return n; 1748 } finally { 1749 mainLock.unlock(); 1750 } 1751 } 1752 1753 /** 1754 * Returns the largest number of threads that have ever 1755 * simultaneously been in the pool. 1756 * 1757 * @return the number of threads 1758 */ 1759 public int getLargestPoolSize() { 1760 final ReentrantLock mainLock = this.mainLock; 1761 mainLock.lock(); 1762 try { 1763 return largestPoolSize; 1764 } finally { 1765 mainLock.unlock(); 1766 } 1767 } 1768 1769 /** 1770 * Returns the approximate total number of tasks that have ever been 1771 * scheduled for execution. Because the states of tasks and 1772 * threads may change dynamically during computation, the returned 1773 * value is only an approximation. 1774 * 1775 * @return the number of tasks 1776 */ 1777 public long getTaskCount() { 1778 final ReentrantLock mainLock = this.mainLock; 1779 mainLock.lock(); 1780 try { 1781 long n = completedTaskCount; 1782 for (Worker w : workers) { 1783 n += w.completedTasks; 1784 if (w.isLocked()) 1785 ++n; 1786 } 1787 return n + workQueue.size(); 1788 } finally { 1789 mainLock.unlock(); 1790 } 1791 } 1792 1793 /** 1794 * Returns the approximate total number of tasks that have 1795 * completed execution. Because the states of tasks and threads 1796 * may change dynamically during computation, the returned value 1797 * is only an approximation, but one that does not ever decrease 1798 * across successive calls. 1799 * 1800 * @return the number of tasks 1801 */ 1802 public long getCompletedTaskCount() { 1803 final ReentrantLock mainLock = this.mainLock; 1804 mainLock.lock(); 1805 try { 1806 long n = completedTaskCount; 1807 for (Worker w : workers) 1808 n += w.completedTasks; 1809 return n; 1810 } finally { 1811 mainLock.unlock(); 1812 } 1813 } 1814 1815 /** 1816 * Returns a string identifying this pool, as well as its state, 1817 * including indications of run state and estimated worker and 1818 * task counts. 1819 * 1820 * @return a string identifying this pool, as well as its state 1821 */ 1822 public String toString() { 1823 long ncompleted; 1824 int nworkers, nactive; 1825 final ReentrantLock mainLock = this.mainLock; 1826 mainLock.lock(); 1827 try { 1828 ncompleted = completedTaskCount; 1829 nactive = 0; 1830 nworkers = workers.size(); 1831 for (Worker w : workers) { 1832 ncompleted += w.completedTasks; 1833 if (w.isLocked()) 1834 ++nactive; 1835 } 1836 } finally { 1837 mainLock.unlock(); 1838 } 1839 int c = ctl.get(); 1840 String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : 1841 (runStateAtLeast(c, TERMINATED) ? "Terminated" : 1842 "Shutting down")); 1843 return super.toString() + 1844 "[" + rs + 1845 ", pool size = " + nworkers + 1846 ", active threads = " + nactive + 1847 ", queued tasks = " + workQueue.size() + 1848 ", completed tasks = " + ncompleted + 1849 "]"; 1850 } 1851 1852 /* Extension hooks */ 1853 1854 /** 1855 * Method invoked prior to executing the given Runnable in the 1856 * given thread. This method is invoked by thread {@code t} that 1857 * will execute task {@code r}, and may be used to re-initialize 1858 * ThreadLocals, or to perform logging. 1859 * 1860 * <p>This implementation does nothing, but may be customized in 1861 * subclasses. Note: To properly nest multiple overridings, subclasses 1862 * should generally invoke {@code super.beforeExecute} at the end of 1863 * this method. 1864 * 1865 * @param t the thread that will run task {@code r} 1866 * @param r the task that will be executed 1867 */ 1868 protected void beforeExecute(Thread t, Runnable r) { } 1869 1870 /** 1871 * Method invoked upon completion of execution of the given Runnable. 1872 * This method is invoked by the thread that executed the task. If 1873 * non-null, the Throwable is the uncaught {@code RuntimeException} 1874 * or {@code Error} that caused execution to terminate abruptly. 1875 * 1876 * <p>This implementation does nothing, but may be customized in 1877 * subclasses. Note: To properly nest multiple overridings, subclasses 1878 * should generally invoke {@code super.afterExecute} at the 1879 * beginning of this method. 1880 * 1881 * <p><b>Note:</b> When actions are enclosed in tasks (such as 1882 * {@link FutureTask}) either explicitly or via methods such as 1883 * {@code submit}, these task objects catch and maintain 1884 * computational exceptions, and so they do not cause abrupt 1885 * termination, and the internal exceptions are <em>not</em> 1886 * passed to this method. If you would like to trap both kinds of 1887 * failures in this method, you can further probe for such cases, 1888 * as in this sample subclass that prints either the direct cause 1889 * or the underlying exception if a task has been aborted: 1890 * 1891 * <pre> {@code 1892 * class ExtendedExecutor extends ThreadPoolExecutor { 1893 * // ... 1894 * protected void afterExecute(Runnable r, Throwable t) { 1895 * super.afterExecute(r, t); 1896 * if (t == null && r instanceof Future<?>) { 1897 * try { 1898 * Object result = ((Future<?>) r).get(); 1899 * } catch (CancellationException ce) { 1900 * t = ce; 1901 * } catch (ExecutionException ee) { 1902 * t = ee.getCause(); 1903 * } catch (InterruptedException ie) { 1904 * Thread.currentThread().interrupt(); // ignore/reset 1905 * } 1906 * } 1907 * if (t != null) 1908 * System.out.println(t); 1909 * } 1910 * }}</pre> 1911 * 1912 * @param r the runnable that has completed 1913 * @param t the exception that caused termination, or null if 1914 * execution completed normally 1915 */ 1916 protected void afterExecute(Runnable r, Throwable t) { } 1917 1918 /** 1919 * Method invoked when the Executor has terminated. Default 1920 * implementation does nothing. Note: To properly nest multiple 1921 * overridings, subclasses should generally invoke 1922 * {@code super.terminated} within this method. 1923 */ 1924 protected void terminated() { } 1925 1926 /* Predefined RejectedExecutionHandlers */ 1927 1928 /** 1929 * A handler for rejected tasks that runs the rejected task 1930 * directly in the calling thread of the {@code execute} method, 1931 * unless the executor has been shut down, in which case the task 1932 * is discarded. 1933 */ 1934 public static class CallerRunsPolicy implements RejectedExecutionHandler { 1935 /** 1936 * Creates a {@code CallerRunsPolicy}. 1937 */ 1938 public CallerRunsPolicy() { } 1939 1940 /** 1941 * Executes task r in the caller's thread, unless the executor 1942 * has been shut down, in which case the task is discarded. 1943 * 1944 * @param r the runnable task requested to be executed 1945 * @param e the executor attempting to execute this task 1946 */ 1947 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1948 if (!e.isShutdown()) { 1949 r.run(); 1950 } 1951 } 1952 } 1953 1954 /** 1955 * A handler for rejected tasks that throws a 1956 * {@code RejectedExecutionException}. 1957 */ 1958 public static class AbortPolicy implements RejectedExecutionHandler { 1959 /** 1960 * Creates an {@code AbortPolicy}. 1961 */ 1962 public AbortPolicy() { } 1963 1964 /** 1965 * Always throws RejectedExecutionException. 1966 * 1967 * @param r the runnable task requested to be executed 1968 * @param e the executor attempting to execute this task 1969 * @throws RejectedExecutionException always. 1970 */ 1971 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1972 throw new RejectedExecutionException("Task " + r.toString() + 1973 " rejected from " + 1974 e.toString()); 1975 } 1976 } 1977 1978 /** 1979 * A handler for rejected tasks that silently discards the 1980 * rejected task. 1981 */ 1982 public static class DiscardPolicy implements RejectedExecutionHandler { 1983 /** 1984 * Creates a {@code DiscardPolicy}. 1985 */ 1986 public DiscardPolicy() { } 1987 1988 /** 1989 * Does nothing, which has the effect of discarding task r. 1990 * 1991 * @param r the runnable task requested to be executed 1992 * @param e the executor attempting to execute this task 1993 */ 1994 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1995 } 1996 } 1997 1998 /** 1999 * A handler for rejected tasks that discards the oldest unhandled 2000 * request and then retries {@code execute}, unless the executor 2001 * is shut down, in which case the task is discarded. 2002 */ 2003 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 2004 /** 2005 * Creates a {@code DiscardOldestPolicy} for the given executor. 2006 */ 2007 public DiscardOldestPolicy() { } 2008 2009 /** 2010 * Obtains and ignores the next task that the executor 2011 * would otherwise execute, if one is immediately available, 2012 * and then retries execution of task r, unless the executor 2013 * is shut down, in which case task r is instead discarded. 2014 * 2015 * @param r the runnable task requested to be executed 2016 * @param e the executor attempting to execute this task 2017 */ 2018 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2019 if (!e.isShutdown()) { 2020 e.getQueue().poll(); 2021 e.execute(r); 2022 } 2023 } 2024 } 2025} 2026