ForkJoinPool.java revision 75a06e56a4cc4599946e21422513e4bafa759509
1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent; 8 9import java.lang.Thread.UncaughtExceptionHandler; 10import java.util.ArrayList; 11import java.util.Arrays; 12import java.util.Collection; 13import java.util.Collections; 14import java.util.List; 15import java.util.concurrent.AbstractExecutorService; 16import java.util.concurrent.Callable; 17import java.util.concurrent.ExecutorService; 18import java.util.concurrent.Future; 19import java.util.concurrent.RejectedExecutionException; 20import java.util.concurrent.RunnableFuture; 21import java.util.concurrent.ThreadLocalRandom; 22import java.util.concurrent.TimeUnit; 23 24/** 25 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 26 * A {@code ForkJoinPool} provides the entry point for submissions 27 * from non-{@code ForkJoinTask} clients, as well as management and 28 * monitoring operations. 29 * 30 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 31 * ExecutorService} mainly by virtue of employing 32 * <em>work-stealing</em>: all threads in the pool attempt to find and 33 * execute tasks submitted to the pool and/or created by other active 34 * tasks (eventually blocking waiting for work if none exist). This 35 * enables efficient processing when most tasks spawn other subtasks 36 * (as do most {@code ForkJoinTask}s), as well as when many small 37 * tasks are submitted to the pool from external clients. Especially 38 * when setting <em>asyncMode</em> to true in constructors, {@code 39 * ForkJoinPool}s may also be appropriate for use with event-style 40 * tasks that are never joined. 41 * 42 * <p>A static {@link #commonPool()} is available and appropriate for 43 * most applications. The common pool is used by any ForkJoinTask that 44 * is not explicitly submitted to a specified pool. Using the common 45 * pool normally reduces resource usage (its threads are slowly 46 * reclaimed during periods of non-use, and reinstated upon subsequent 47 * use). 48 * 49 * <p>For applications that require separate or custom pools, a {@code 50 * ForkJoinPool} may be constructed with a given target parallelism 51 * level; by default, equal to the number of available processors. The 52 * pool attempts to maintain enough active (or available) threads by 53 * dynamically adding, suspending, or resuming internal worker 54 * threads, even if some tasks are stalled waiting to join others. 55 * However, no such adjustments are guaranteed in the face of blocked 56 * I/O or other unmanaged synchronization. The nested {@link 57 * ManagedBlocker} interface enables extension of the kinds of 58 * synchronization accommodated. 59 * 60 * <p>In addition to execution and lifecycle control methods, this 61 * class provides status check methods (for example 62 * {@link #getStealCount}) that are intended to aid in developing, 63 * tuning, and monitoring fork/join applications. Also, method 64 * {@link #toString} returns indications of pool state in a 65 * convenient form for informal monitoring. 66 * 67 * <p>As is the case with other ExecutorServices, there are three 68 * main task execution methods summarized in the following table. 69 * These are designed to be used primarily by clients not already 70 * engaged in fork/join computations in the current pool. The main 71 * forms of these methods accept instances of {@code ForkJoinTask}, 72 * but overloaded forms also allow mixed execution of plain {@code 73 * Runnable}- or {@code Callable}- based activities as well. However, 74 * tasks that are already executing in a pool should normally instead 75 * use the within-computation forms listed in the table unless using 76 * async event-style tasks that are not usually joined, in which case 77 * there is little difference among choice of methods. 78 * 79 * <table BORDER CELLPADDING=3 CELLSPACING=1> 80 * <caption>Summary of task execution methods</caption> 81 * <tr> 82 * <td></td> 83 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td> 84 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td> 85 * </tr> 86 * <tr> 87 * <td> <b>Arrange async execution</b></td> 88 * <td> {@link #execute(ForkJoinTask)}</td> 89 * <td> {@link ForkJoinTask#fork}</td> 90 * </tr> 91 * <tr> 92 * <td> <b>Await and obtain result</b></td> 93 * <td> {@link #invoke(ForkJoinTask)}</td> 94 * <td> {@link ForkJoinTask#invoke}</td> 95 * </tr> 96 * <tr> 97 * <td> <b>Arrange exec and obtain Future</b></td> 98 * <td> {@link #submit(ForkJoinTask)}</td> 99 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 100 * </tr> 101 * </table> 102 * 103 * <p>The common pool is by default constructed with default 104 * parameters, but these may be controlled by setting three 105 * {@linkplain System#getProperty system properties}: 106 * <ul> 107 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism} 108 * - the parallelism level, a non-negative integer 109 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory} 110 * - the class name of a {@link ForkJoinWorkerThreadFactory} 111 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler} 112 * - the class name of a {@link UncaughtExceptionHandler} 113 * </ul> 114 * The system class loader is used to load these classes. 115 * Upon any error in establishing these settings, default parameters 116 * are used. It is possible to disable or limit the use of threads in 117 * the common pool by setting the parallelism property to zero, and/or 118 * using a factory that may return {@code null}. 119 * 120 * <p><b>Implementation notes</b>: This implementation restricts the 121 * maximum number of running threads to 32767. Attempts to create 122 * pools with greater than the maximum number result in 123 * {@code IllegalArgumentException}. 124 * 125 * <p>This implementation rejects submitted tasks (that is, by throwing 126 * {@link RejectedExecutionException}) only when the pool is shut down 127 * or internal resources have been exhausted. 128 * 129 * @since 1.7 130 * @hide 131 * @author Doug Lea 132 */ 133public class ForkJoinPool extends AbstractExecutorService { 134 135 /* 136 * Implementation Overview 137 * 138 * This class and its nested classes provide the main 139 * functionality and control for a set of worker threads: 140 * Submissions from non-FJ threads enter into submission queues. 141 * Workers take these tasks and typically split them into subtasks 142 * that may be stolen by other workers. Preference rules give 143 * first priority to processing tasks from their own queues (LIFO 144 * or FIFO, depending on mode), then to randomized FIFO steals of 145 * tasks in other queues. 146 * 147 * WorkQueues 148 * ========== 149 * 150 * Most operations occur within work-stealing queues (in nested 151 * class WorkQueue). These are special forms of Deques that 152 * support only three of the four possible end-operations -- push, 153 * pop, and poll (aka steal), under the further constraints that 154 * push and pop are called only from the owning thread (or, as 155 * extended here, under a lock), while poll may be called from 156 * other threads. (If you are unfamiliar with them, you probably 157 * want to read Herlihy and Shavit's book "The Art of 158 * Multiprocessor programming", chapter 16 describing these in 159 * more detail before proceeding.) The main work-stealing queue 160 * design is roughly similar to those in the papers "Dynamic 161 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 162 * (http://research.sun.com/scalable/pubs/index.html) and 163 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 164 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 165 * See also "Correct and Efficient Work-Stealing for Weak Memory 166 * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 167 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 168 * analysis of memory ordering (atomic, volatile etc) issues. The 169 * main differences ultimately stem from GC requirements that we 170 * null out taken slots as soon as we can, to maintain as small a 171 * footprint as possible even in programs generating huge numbers 172 * of tasks. To accomplish this, we shift the CAS arbitrating pop 173 * vs poll (steal) from being on the indices ("base" and "top") to 174 * the slots themselves. So, both a successful pop and poll 175 * mainly entail a CAS of a slot from non-null to null. Because 176 * we rely on CASes of references, we do not need tag bits on base 177 * or top. They are simple ints as used in any circular 178 * array-based queue (see for example ArrayDeque). Updates to the 179 * indices must still be ordered in a way that guarantees that top 180 * == base means the queue is empty, but otherwise may err on the 181 * side of possibly making the queue appear nonempty when a push, 182 * pop, or poll have not fully committed. Note that this means 183 * that the poll operation, considered individually, is not 184 * wait-free. One thief cannot successfully continue until another 185 * in-progress one (or, if previously empty, a push) completes. 186 * However, in the aggregate, we ensure at least probabilistic 187 * non-blockingness. If an attempted steal fails, a thief always 188 * chooses a different random victim target to try next. So, in 189 * order for one thief to progress, it suffices for any 190 * in-progress poll or new push on any empty queue to 191 * complete. (This is why we normally use method pollAt and its 192 * variants that try once at the apparent base index, else 193 * consider alternative actions, rather than method poll.) 194 * 195 * This approach also enables support of a user mode in which local 196 * task processing is in FIFO, not LIFO order, simply by using 197 * poll rather than pop. This can be useful in message-passing 198 * frameworks in which tasks are never joined. However neither 199 * mode considers affinities, loads, cache localities, etc, so 200 * rarely provide the best possible performance on a given 201 * machine, but portably provide good throughput by averaging over 202 * these factors. (Further, even if we did try to use such 203 * information, we do not usually have a basis for exploiting it. 204 * For example, some sets of tasks profit from cache affinities, 205 * but others are harmed by cache pollution effects.) 206 * 207 * WorkQueues are also used in a similar way for tasks submitted 208 * to the pool. We cannot mix these tasks in the same queues used 209 * for work-stealing (this would contaminate lifo/fifo 210 * processing). Instead, we randomly associate submission queues 211 * with submitting threads, using a form of hashing. The 212 * Submitter probe value serves as a hash code for 213 * choosing existing queues, and may be randomly repositioned upon 214 * contention with other submitters. In essence, submitters act 215 * like workers except that they are restricted to executing local 216 * tasks that they submitted (or in the case of CountedCompleters, 217 * others with the same root task). However, because most 218 * shared/external queue operations are more expensive than 219 * internal, and because, at steady state, external submitters 220 * will compete for CPU with workers, ForkJoinTask.join and 221 * related methods disable them from repeatedly helping to process 222 * tasks if all workers are active. Insertion of tasks in shared 223 * mode requires a lock (mainly to protect in the case of 224 * resizing) but we use only a simple spinlock (using bits in 225 * field qlock), because submitters encountering a busy queue move 226 * on to try or create other queues -- they block only when 227 * creating and registering new queues. 228 * 229 * Management 230 * ========== 231 * 232 * The main throughput advantages of work-stealing stem from 233 * decentralized control -- workers mostly take tasks from 234 * themselves or each other. We cannot negate this in the 235 * implementation of other management responsibilities. The main 236 * tactic for avoiding bottlenecks is packing nearly all 237 * essentially atomic control state into two volatile variables 238 * that are by far most often read (not written) as status and 239 * consistency checks. 240 * 241 * Field "ctl" contains 64 bits holding all the information needed 242 * to atomically decide to add, inactivate, enqueue (on an event 243 * queue), dequeue, and/or re-activate workers. To enable this 244 * packing, we restrict maximum parallelism to (1<<15)-1 (which is 245 * far in excess of normal operating range) to allow ids, counts, 246 * and their negations (used for thresholding) to fit into 16bit 247 * fields. 248 * 249 * Field "plock" is a form of sequence lock with a saturating 250 * shutdown bit (similarly for per-queue "qlocks"), mainly 251 * protecting updates to the workQueues array, as well as to 252 * enable shutdown. When used as a lock, it is normally only very 253 * briefly held, so is nearly always available after at most a 254 * brief spin, but we use a monitor-based backup strategy to 255 * block when needed. 256 * 257 * Recording WorkQueues. WorkQueues are recorded in the 258 * "workQueues" array that is created upon first use and expanded 259 * if necessary. Updates to the array while recording new workers 260 * and unrecording terminated ones are protected from each other 261 * by a lock but the array is otherwise concurrently readable, and 262 * accessed directly. To simplify index-based operations, the 263 * array size is always a power of two, and all readers must 264 * tolerate null slots. Worker queues are at odd indices. Shared 265 * (submission) queues are at even indices, up to a maximum of 64 266 * slots, to limit growth even if array needs to expand to add 267 * more workers. Grouping them together in this way simplifies and 268 * speeds up task scanning. 269 * 270 * All worker thread creation is on-demand, triggered by task 271 * submissions, replacement of terminated workers, and/or 272 * compensation for blocked workers. However, all other support 273 * code is set up to work with other policies. To ensure that we 274 * do not hold on to worker references that would prevent GC, ALL 275 * accesses to workQueues are via indices into the workQueues 276 * array (which is one source of some of the messy code 277 * constructions here). In essence, the workQueues array serves as 278 * a weak reference mechanism. Thus for example the wait queue 279 * field of ctl stores indices, not references. Access to the 280 * workQueues in associated methods (for example signalWork) must 281 * both index-check and null-check the IDs. All such accesses 282 * ignore bad IDs by returning out early from what they are doing, 283 * since this can only be associated with termination, in which 284 * case it is OK to give up. All uses of the workQueues array 285 * also check that it is non-null (even if previously 286 * non-null). This allows nulling during termination, which is 287 * currently not necessary, but remains an option for 288 * resource-revocation-based shutdown schemes. It also helps 289 * reduce JIT issuance of uncommon-trap code, which tends to 290 * unnecessarily complicate control flow in some methods. 291 * 292 * Event Queuing. Unlike HPC work-stealing frameworks, we cannot 293 * let workers spin indefinitely scanning for tasks when none can 294 * be found immediately, and we cannot start/resume workers unless 295 * there appear to be tasks available. On the other hand, we must 296 * quickly prod them into action when new tasks are submitted or 297 * generated. In many usages, ramp-up time to activate workers is 298 * the main limiting factor in overall performance (this is 299 * compounded at program start-up by JIT compilation and 300 * allocation). So we try to streamline this as much as possible. 301 * We park/unpark workers after placing in an event wait queue 302 * when they cannot find work. This "queue" is actually a simple 303 * Treiber stack, headed by the "id" field of ctl, plus a 15bit 304 * counter value (that reflects the number of times a worker has 305 * been inactivated) to avoid ABA effects (we need only as many 306 * version numbers as worker threads). Successors are held in 307 * field WorkQueue.nextWait. Queuing deals with several intrinsic 308 * races, mainly that a task-producing thread can miss seeing (and 309 * signalling) another thread that gave up looking for work but 310 * has not yet entered the wait queue. We solve this by requiring 311 * a full sweep of all workers (via repeated calls to method 312 * scan()) both before and after a newly waiting worker is added 313 * to the wait queue. Because enqueued workers may actually be 314 * rescanning rather than waiting, we set and clear the "parker" 315 * field of WorkQueues to reduce unnecessary calls to unpark. 316 * (This requires a secondary recheck to avoid missed signals.) 317 * Note the unusual conventions about Thread.interrupts 318 * surrounding parking and other blocking: Because interrupts are 319 * used solely to alert threads to check termination, which is 320 * checked anyway upon blocking, we clear status (using 321 * Thread.interrupted) before any call to park, so that park does 322 * not immediately return due to status being set via some other 323 * unrelated call to interrupt in user code. 324 * 325 * Signalling. We create or wake up workers only when there 326 * appears to be at least one task they might be able to find and 327 * execute. When a submission is added or another worker adds a 328 * task to a queue that has fewer than two tasks, they signal 329 * waiting workers (or trigger creation of new ones if fewer than 330 * the given parallelism level -- signalWork). These primary 331 * signals are buttressed by others whenever other threads remove 332 * a task from a queue and notice that there are other tasks there 333 * as well. So in general, pools will be over-signalled. On most 334 * platforms, signalling (unpark) overhead time is noticeably 335 * long, and the time between signalling a thread and it actually 336 * making progress can be very noticeably long, so it is worth 337 * offloading these delays from critical paths as much as 338 * possible. Additionally, workers spin-down gradually, by staying 339 * alive so long as they see the ctl state changing. Similar 340 * stability-sensing techniques are also used before blocking in 341 * awaitJoin and helpComplete. 342 * 343 * Trimming workers. To release resources after periods of lack of 344 * use, a worker starting to wait when the pool is quiescent will 345 * time out and terminate if the pool has remained quiescent for a 346 * given period -- a short period if there are more threads than 347 * parallelism, longer as the number of threads decreases. This 348 * will slowly propagate, eventually terminating all workers after 349 * periods of non-use. 350 * 351 * Shutdown and Termination. A call to shutdownNow atomically sets 352 * a plock bit and then (non-atomically) sets each worker's 353 * qlock status, cancels all unprocessed tasks, and wakes up 354 * all waiting workers. Detecting whether termination should 355 * commence after a non-abrupt shutdown() call requires more work 356 * and bookkeeping. We need consensus about quiescence (i.e., that 357 * there is no more work). The active count provides a primary 358 * indication but non-abrupt shutdown still requires a rechecking 359 * scan for any workers that are inactive but not queued. 360 * 361 * Joining Tasks 362 * ============= 363 * 364 * Any of several actions may be taken when one worker is waiting 365 * to join a task stolen (or always held) by another. Because we 366 * are multiplexing many tasks on to a pool of workers, we can't 367 * just let them block (as in Thread.join). We also cannot just 368 * reassign the joiner's run-time stack with another and replace 369 * it later, which would be a form of "continuation", that even if 370 * possible is not necessarily a good idea since we sometimes need 371 * both an unblocked task and its continuation to progress. 372 * Instead we combine two tactics: 373 * 374 * Helping: Arranging for the joiner to execute some task that it 375 * would be running if the steal had not occurred. 376 * 377 * Compensating: Unless there are already enough live threads, 378 * method tryCompensate() may create or re-activate a spare 379 * thread to compensate for blocked joiners until they unblock. 380 * 381 * A third form (implemented in tryRemoveAndExec) amounts to 382 * helping a hypothetical compensator: If we can readily tell that 383 * a possible action of a compensator is to steal and execute the 384 * task being joined, the joining thread can do so directly, 385 * without the need for a compensation thread (although at the 386 * expense of larger run-time stacks, but the tradeoff is 387 * typically worthwhile). 388 * 389 * The ManagedBlocker extension API can't use helping so relies 390 * only on compensation in method awaitBlocker. 391 * 392 * The algorithm in tryHelpStealer entails a form of "linear" 393 * helping: Each worker records (in field currentSteal) the most 394 * recent task it stole from some other worker. Plus, it records 395 * (in field currentJoin) the task it is currently actively 396 * joining. Method tryHelpStealer uses these markers to try to 397 * find a worker to help (i.e., steal back a task from and execute 398 * it) that could hasten completion of the actively joined task. 399 * In essence, the joiner executes a task that would be on its own 400 * local deque had the to-be-joined task not been stolen. This may 401 * be seen as a conservative variant of the approach in Wagner & 402 * Calder "Leapfrogging: a portable technique for implementing 403 * efficient futures" SIGPLAN Notices, 1993 404 * (http://portal.acm.org/citation.cfm?id=155354). It differs in 405 * that: (1) We only maintain dependency links across workers upon 406 * steals, rather than use per-task bookkeeping. This sometimes 407 * requires a linear scan of workQueues array to locate stealers, 408 * but often doesn't because stealers leave hints (that may become 409 * stale/wrong) of where to locate them. It is only a hint 410 * because a worker might have had multiple steals and the hint 411 * records only one of them (usually the most current). Hinting 412 * isolates cost to when it is needed, rather than adding to 413 * per-task overhead. (2) It is "shallow", ignoring nesting and 414 * potentially cyclic mutual steals. (3) It is intentionally 415 * racy: field currentJoin is updated only while actively joining, 416 * which means that we miss links in the chain during long-lived 417 * tasks, GC stalls etc (which is OK since blocking in such cases 418 * is usually a good idea). (4) We bound the number of attempts 419 * to find work (see MAX_HELP) and fall back to suspending the 420 * worker and if necessary replacing it with another. 421 * 422 * Helping actions for CountedCompleters are much simpler: Method 423 * helpComplete can take and execute any task with the same root 424 * as the task being waited on. However, this still entails some 425 * traversal of completer chains, so is less efficient than using 426 * CountedCompleters without explicit joins. 427 * 428 * It is impossible to keep exactly the target parallelism number 429 * of threads running at any given time. Determining the 430 * existence of conservatively safe helping targets, the 431 * availability of already-created spares, and the apparent need 432 * to create new spares are all racy, so we rely on multiple 433 * retries of each. Compensation in the apparent absence of 434 * helping opportunities is challenging to control on JVMs, where 435 * GC and other activities can stall progress of tasks that in 436 * turn stall out many other dependent tasks, without us being 437 * able to determine whether they will ever require compensation. 438 * Even though work-stealing otherwise encounters little 439 * degradation in the presence of more threads than cores, 440 * aggressively adding new threads in such cases entails risk of 441 * unwanted positive feedback control loops in which more threads 442 * cause more dependent stalls (as well as delayed progress of 443 * unblocked threads to the point that we know they are available) 444 * leading to more situations requiring more threads, and so 445 * on. This aspect of control can be seen as an (analytically 446 * intractable) game with an opponent that may choose the worst 447 * (for us) active thread to stall at any time. We take several 448 * precautions to bound losses (and thus bound gains), mainly in 449 * methods tryCompensate and awaitJoin. 450 * 451 * Common Pool 452 * =========== 453 * 454 * The static common pool always exists after static 455 * initialization. Since it (or any other created pool) need 456 * never be used, we minimize initial construction overhead and 457 * footprint to the setup of about a dozen fields, with no nested 458 * allocation. Most bootstrapping occurs within method 459 * fullExternalPush during the first submission to the pool. 460 * 461 * When external threads submit to the common pool, they can 462 * perform subtask processing (see externalHelpJoin and related 463 * methods). This caller-helps policy makes it sensible to set 464 * common pool parallelism level to one (or more) less than the 465 * total number of available cores, or even zero for pure 466 * caller-runs. We do not need to record whether external 467 * submissions are to the common pool -- if not, externalHelpJoin 468 * returns quickly (at the most helping to signal some common pool 469 * workers). These submitters would otherwise be blocked waiting 470 * for completion, so the extra effort (with liberally sprinkled 471 * task status checks) in inapplicable cases amounts to an odd 472 * form of limited spin-wait before blocking in ForkJoinTask.join. 473 * 474 * Style notes 475 * =========== 476 * 477 * There is a lot of representation-level coupling among classes 478 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 479 * fields of WorkQueue maintain data structures managed by 480 * ForkJoinPool, so are directly accessed. There is little point 481 * trying to reduce this, since any associated future changes in 482 * representations will need to be accompanied by algorithmic 483 * changes anyway. Several methods intrinsically sprawl because 484 * they must accumulate sets of consistent reads of volatiles held 485 * in local variables. Methods signalWork() and scan() are the 486 * main bottlenecks, so are especially heavily 487 * micro-optimized/mangled. There are lots of inline assignments 488 * (of form "while ((local = field) != 0)") which are usually the 489 * simplest way to ensure the required read orderings (which are 490 * sometimes critical). This leads to a "C"-like style of listing 491 * declarations of these locals at the heads of methods or blocks. 492 * There are several occurrences of the unusual "do {} while 493 * (!cas...)" which is the simplest way to force an update of a 494 * CAS'ed variable. There are also other coding oddities (including 495 * several unnecessary-looking hoisted null checks) that help 496 * some methods perform reasonably even when interpreted (not 497 * compiled). 498 * 499 * The order of declarations in this file is: 500 * (1) Static utility functions 501 * (2) Nested (static) classes 502 * (3) Static fields 503 * (4) Fields, along with constants used when unpacking some of them 504 * (5) Internal control methods 505 * (6) Callbacks and other support for ForkJoinTask methods 506 * (7) Exported methods 507 * (8) Static block initializing statics in minimally dependent order 508 */ 509 510 // Static utilities 511 512 /** 513 * If there is a security manager, makes sure caller has 514 * permission to modify threads. 515 */ 516 private static void checkPermission() { 517 SecurityManager security = System.getSecurityManager(); 518 if (security != null) 519 security.checkPermission(modifyThreadPermission); 520 } 521 522 // Nested classes 523 524 /** 525 * Factory for creating new {@link ForkJoinWorkerThread}s. 526 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 527 * for {@code ForkJoinWorkerThread} subclasses that extend base 528 * functionality or initialize threads with different contexts. 529 */ 530 public static interface ForkJoinWorkerThreadFactory { 531 /** 532 * Returns a new worker thread operating in the given pool. 533 * 534 * @param pool the pool this thread works in 535 * @throws NullPointerException if the pool is null 536 * @return the new worker thread 537 */ 538 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 539 } 540 541 /** 542 * Default ForkJoinWorkerThreadFactory implementation; creates a 543 * new ForkJoinWorkerThread. 544 */ 545 static final class DefaultForkJoinWorkerThreadFactory 546 implements ForkJoinWorkerThreadFactory { 547 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 548 return new ForkJoinWorkerThread(pool); 549 } 550 } 551 552 /** 553 * Class for artificial tasks that are used to replace the target 554 * of local joins if they are removed from an interior queue slot 555 * in WorkQueue.tryRemoveAndExec. We don't need the proxy to 556 * actually do anything beyond having a unique identity. 557 */ 558 static final class EmptyTask extends ForkJoinTask<Void> { 559 private static final long serialVersionUID = -7721805057305804111L; 560 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done 561 public final Void getRawResult() { return null; } 562 public final void setRawResult(Void x) {} 563 public final boolean exec() { return true; } 564 } 565 566 /** 567 * Queues supporting work-stealing as well as external task 568 * submission. See above for main rationale and algorithms. 569 * Implementation relies heavily on "Unsafe" intrinsics 570 * and selective use of "volatile": 571 * 572 * Field "base" is the index (mod array.length) of the least valid 573 * queue slot, which is always the next position to steal (poll) 574 * from if nonempty. Reads and writes require volatile orderings 575 * but not CAS, because updates are only performed after slot 576 * CASes. 577 * 578 * Field "top" is the index (mod array.length) of the next queue 579 * slot to push to or pop from. It is written only by owner thread 580 * for push, or under lock for external/shared push, and accessed 581 * by other threads only after reading (volatile) base. Both top 582 * and base are allowed to wrap around on overflow, but (top - 583 * base) (or more commonly -(base - top) to force volatile read of 584 * base before top) still estimates size. The lock ("qlock") is 585 * forced to -1 on termination, causing all further lock attempts 586 * to fail. (Note: we don't need CAS for termination state because 587 * upon pool shutdown, all shared-queues will stop being used 588 * anyway.) Nearly all lock bodies are set up so that exceptions 589 * within lock bodies are "impossible" (modulo JVM errors that 590 * would cause failure anyway.) 591 * 592 * The array slots are read and written using the emulation of 593 * volatiles/atomics provided by Unsafe. Insertions must in 594 * general use putOrderedObject as a form of releasing store to 595 * ensure that all writes to the task object are ordered before 596 * its publication in the queue. All removals entail a CAS to 597 * null. The array is always a power of two. To ensure safety of 598 * Unsafe array operations, all accesses perform explicit null 599 * checks and implicit bounds checks via power-of-two masking. 600 * 601 * In addition to basic queuing support, this class contains 602 * fields described elsewhere to control execution. It turns out 603 * to work better memory-layout-wise to include them in this class 604 * rather than a separate class. 605 * 606 * Performance on most platforms is very sensitive to placement of 607 * instances of both WorkQueues and their arrays -- we absolutely 608 * do not want multiple WorkQueue instances or multiple queue 609 * arrays sharing cache lines. (It would be best for queue objects 610 * and their arrays to share, but there is nothing available to 611 * help arrange that). The @Contended annotation alerts JVMs to 612 * try to keep instances apart. 613 */ 614 static final class WorkQueue { 615 /** 616 * Capacity of work-stealing queue array upon initialization. 617 * Must be a power of two; at least 4, but should be larger to 618 * reduce or eliminate cacheline sharing among queues. 619 * Currently, it is much larger, as a partial workaround for 620 * the fact that JVMs often place arrays in locations that 621 * share GC bookkeeping (especially cardmarks) such that 622 * per-write accesses encounter serious memory contention. 623 */ 624 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 625 626 /** 627 * Maximum size for queue arrays. Must be a power of two less 628 * than or equal to 1 << (31 - width of array entry) to ensure 629 * lack of wraparound of index calculations, but defined to a 630 * value a bit less than this to help users trap runaway 631 * programs before saturating systems. 632 */ 633 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 634 635 // Heuristic padding to ameliorate unfortunate memory placements 636 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06; 637 638 volatile int eventCount; // encoded inactivation count; < 0 if inactive 639 int nextWait; // encoded record of next event waiter 640 int nsteals; // number of steals 641 int hint; // steal index hint 642 short poolIndex; // index of this queue in pool 643 final short mode; // 0: lifo, > 0: fifo, < 0: shared 644 volatile int qlock; // 1: locked, -1: terminate; else 0 645 volatile int base; // index of next slot for poll 646 int top; // index of next slot for push 647 ForkJoinTask<?>[] array; // the elements (initially unallocated) 648 final ForkJoinPool pool; // the containing pool (may be null) 649 final ForkJoinWorkerThread owner; // owning thread or null if shared 650 volatile Thread parker; // == owner during call to park; else null 651 volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 652 ForkJoinTask<?> currentSteal; // current non-local task being executed 653 654 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; 655 volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d; 656 657 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, 658 int seed) { 659 this.pool = pool; 660 this.owner = owner; 661 this.mode = (short)mode; 662 this.hint = seed; // store initial seed for runWorker 663 // Place indices in the center of array (that is not yet allocated) 664 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 665 } 666 667 /** 668 * Returns the approximate number of tasks in the queue. 669 */ 670 final int queueSize() { 671 int n = base - top; // non-owner callers must read base first 672 return (n >= 0) ? 0 : -n; // ignore transient negative 673 } 674 675 /** 676 * Provides a more accurate estimate of whether this queue has 677 * any tasks than does queueSize, by checking whether a 678 * near-empty queue has at least one unclaimed task. 679 */ 680 final boolean isEmpty() { 681 ForkJoinTask<?>[] a; int m, s; 682 int n = base - (s = top); 683 return (n >= 0 || 684 (n == -1 && 685 ((a = array) == null || 686 (m = a.length - 1) < 0 || 687 U.getObject 688 (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); 689 } 690 691 /** 692 * Pushes a task. Call only by owner in unshared queues. (The 693 * shared-queue version is embedded in method externalPush.) 694 * 695 * @param task the task. Caller must ensure non-null. 696 * @throws RejectedExecutionException if array cannot be resized 697 */ 698 final void push(ForkJoinTask<?> task) { 699 ForkJoinTask<?>[] a; ForkJoinPool p; 700 int s = top, n; 701 if ((a = array) != null) { // ignore if queue removed 702 int m = a.length - 1; 703 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); 704 if ((n = (top = s + 1) - base) <= 2) 705 (p = pool).signalWork(p.workQueues, this); 706 else if (n >= m) 707 growArray(); 708 } 709 } 710 711 /** 712 * Initializes or doubles the capacity of array. Call either 713 * by owner or with lock held -- it is OK for base, but not 714 * top, to move while resizings are in progress. 715 */ 716 final ForkJoinTask<?>[] growArray() { 717 ForkJoinTask<?>[] oldA = array; 718 int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; 719 if (size > MAXIMUM_QUEUE_CAPACITY) 720 throw new RejectedExecutionException("Queue capacity exceeded"); 721 int oldMask, t, b; 722 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; 723 if (oldA != null && (oldMask = oldA.length - 1) >= 0 && 724 (t = top) - (b = base) > 0) { 725 int mask = size - 1; 726 do { 727 ForkJoinTask<?> x; 728 int oldj = ((b & oldMask) << ASHIFT) + ABASE; 729 int j = ((b & mask) << ASHIFT) + ABASE; 730 x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); 731 if (x != null && 732 U.compareAndSwapObject(oldA, oldj, x, null)) 733 U.putObjectVolatile(a, j, x); 734 } while (++b != t); 735 } 736 return a; 737 } 738 739 /** 740 * Takes next task, if one exists, in LIFO order. Call only 741 * by owner in unshared queues. 742 */ 743 final ForkJoinTask<?> pop() { 744 ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; 745 if ((a = array) != null && (m = a.length - 1) >= 0) { 746 for (int s; (s = top - 1) - base >= 0;) { 747 long j = ((m & s) << ASHIFT) + ABASE; 748 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) 749 break; 750 if (U.compareAndSwapObject(a, j, t, null)) { 751 top = s; 752 return t; 753 } 754 } 755 } 756 return null; 757 } 758 759 /** 760 * Takes a task in FIFO order if b is base of queue and a task 761 * can be claimed without contention. Specialized versions 762 * appear in ForkJoinPool methods scan and tryHelpStealer. 763 */ 764 final ForkJoinTask<?> pollAt(int b) { 765 ForkJoinTask<?> t; ForkJoinTask<?>[] a; 766 if ((a = array) != null) { 767 int j = (((a.length - 1) & b) << ASHIFT) + ABASE; 768 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && 769 base == b && U.compareAndSwapObject(a, j, t, null)) { 770 U.putOrderedInt(this, QBASE, b + 1); 771 return t; 772 } 773 } 774 return null; 775 } 776 777 /** 778 * Takes next task, if one exists, in FIFO order. 779 */ 780 final ForkJoinTask<?> poll() { 781 ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; 782 while ((b = base) - top < 0 && (a = array) != null) { 783 int j = (((a.length - 1) & b) << ASHIFT) + ABASE; 784 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); 785 if (t != null) { 786 if (U.compareAndSwapObject(a, j, t, null)) { 787 U.putOrderedInt(this, QBASE, b + 1); 788 return t; 789 } 790 } 791 else if (base == b) { 792 if (b + 1 == top) 793 break; 794 Thread.yield(); // wait for lagging update (very rare) 795 } 796 } 797 return null; 798 } 799 800 /** 801 * Takes next task, if one exists, in order specified by mode. 802 */ 803 final ForkJoinTask<?> nextLocalTask() { 804 return mode == 0 ? pop() : poll(); 805 } 806 807 /** 808 * Returns next task, if one exists, in order specified by mode. 809 */ 810 final ForkJoinTask<?> peek() { 811 ForkJoinTask<?>[] a = array; int m; 812 if (a == null || (m = a.length - 1) < 0) 813 return null; 814 int i = mode == 0 ? top - 1 : base; 815 int j = ((i & m) << ASHIFT) + ABASE; 816 return (ForkJoinTask<?>)U.getObjectVolatile(a, j); 817 } 818 819 /** 820 * Pops the given task only if it is at the current top. 821 * (A shared version is available only via FJP.tryExternalUnpush) 822 */ 823 final boolean tryUnpush(ForkJoinTask<?> t) { 824 ForkJoinTask<?>[] a; int s; 825 if ((a = array) != null && (s = top) != base && 826 U.compareAndSwapObject 827 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { 828 top = s; 829 return true; 830 } 831 return false; 832 } 833 834 /** 835 * Removes and cancels all known tasks, ignoring any exceptions. 836 */ 837 final void cancelAll() { 838 ForkJoinTask.cancelIgnoringExceptions(currentJoin); 839 ForkJoinTask.cancelIgnoringExceptions(currentSteal); 840 for (ForkJoinTask<?> t; (t = poll()) != null; ) 841 ForkJoinTask.cancelIgnoringExceptions(t); 842 } 843 844 // Specialized execution methods 845 846 /** 847 * Polls and runs tasks until empty. 848 */ 849 final void pollAndExecAll() { 850 for (ForkJoinTask<?> t; (t = poll()) != null;) 851 t.doExec(); 852 } 853 854 /** 855 * Executes a top-level task and any local tasks remaining 856 * after execution. 857 */ 858 final void runTask(ForkJoinTask<?> task) { 859 if ((currentSteal = task) != null) { 860 task.doExec(); 861 ForkJoinTask<?>[] a = array; 862 int md = mode; 863 ++nsteals; 864 currentSteal = null; 865 if (md != 0) 866 pollAndExecAll(); 867 else if (a != null) { 868 int s, m = a.length - 1; 869 while ((s = top - 1) - base >= 0) { 870 long i = ((m & s) << ASHIFT) + ABASE; 871 ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, i); 872 if (t == null) 873 break; 874 if (U.compareAndSwapObject(a, i, t, null)) { 875 top = s; 876 t.doExec(); 877 } 878 } 879 } 880 } 881 } 882 883 /** 884 * If present, removes from queue and executes the given task, 885 * or any other cancelled task. Returns (true) on any CAS 886 * or consistency check failure so caller can retry. 887 * 888 * @return false if no progress can be made, else true 889 */ 890 final boolean tryRemoveAndExec(ForkJoinTask<?> task) { 891 boolean stat; 892 ForkJoinTask<?>[] a; int m, s, b, n; 893 if (task != null && (a = array) != null && (m = a.length - 1) >= 0 && 894 (n = (s = top) - (b = base)) > 0) { 895 boolean removed = false, empty = true; 896 stat = true; 897 for (ForkJoinTask<?> t;;) { // traverse from s to b 898 long j = ((--s & m) << ASHIFT) + ABASE; 899 t = (ForkJoinTask<?>)U.getObject(a, j); 900 if (t == null) // inconsistent length 901 break; 902 else if (t == task) { 903 if (s + 1 == top) { // pop 904 if (!U.compareAndSwapObject(a, j, task, null)) 905 break; 906 top = s; 907 removed = true; 908 } 909 else if (base == b) // replace with proxy 910 removed = U.compareAndSwapObject(a, j, task, 911 new EmptyTask()); 912 break; 913 } 914 else if (t.status >= 0) 915 empty = false; 916 else if (s + 1 == top) { // pop and throw away 917 if (U.compareAndSwapObject(a, j, t, null)) 918 top = s; 919 break; 920 } 921 if (--n == 0) { 922 if (!empty && base == b) 923 stat = false; 924 break; 925 } 926 } 927 if (removed) 928 task.doExec(); 929 } 930 else 931 stat = false; 932 return stat; 933 } 934 935 /** 936 * Tries to poll for and execute the given task or any other 937 * task in its CountedCompleter computation. 938 */ 939 final boolean pollAndExecCC(CountedCompleter<?> root) { 940 ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r; 941 if ((b = base) - top < 0 && (a = array) != null) { 942 long j = (((a.length - 1) & b) << ASHIFT) + ABASE; 943 if ((o = U.getObjectVolatile(a, j)) == null) 944 return true; // retry 945 if (o instanceof CountedCompleter) { 946 for (t = (CountedCompleter<?>)o, r = t;;) { 947 if (r == root) { 948 if (base == b && 949 U.compareAndSwapObject(a, j, t, null)) { 950 U.putOrderedInt(this, QBASE, b + 1); 951 t.doExec(); 952 } 953 return true; 954 } 955 else if ((r = r.completer) == null) 956 break; // not part of root computation 957 } 958 } 959 } 960 return false; 961 } 962 963 /** 964 * Tries to pop and execute the given task or any other task 965 * in its CountedCompleter computation. 966 */ 967 final boolean externalPopAndExecCC(CountedCompleter<?> root) { 968 ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r; 969 if (base - (s = top) < 0 && (a = array) != null) { 970 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 971 if ((o = U.getObject(a, j)) instanceof CountedCompleter) { 972 for (t = (CountedCompleter<?>)o, r = t;;) { 973 if (r == root) { 974 if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { 975 if (top == s && array == a && 976 U.compareAndSwapObject(a, j, t, null)) { 977 top = s - 1; 978 qlock = 0; 979 t.doExec(); 980 } 981 else 982 qlock = 0; 983 } 984 return true; 985 } 986 else if ((r = r.completer) == null) 987 break; 988 } 989 } 990 } 991 return false; 992 } 993 994 /** 995 * Internal version 996 */ 997 final boolean internalPopAndExecCC(CountedCompleter<?> root) { 998 ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r; 999 if (base - (s = top) < 0 && (a = array) != null) { 1000 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 1001 if ((o = U.getObject(a, j)) instanceof CountedCompleter) { 1002 for (t = (CountedCompleter<?>)o, r = t;;) { 1003 if (r == root) { 1004 if (U.compareAndSwapObject(a, j, t, null)) { 1005 top = s - 1; 1006 t.doExec(); 1007 } 1008 return true; 1009 } 1010 else if ((r = r.completer) == null) 1011 break; 1012 } 1013 } 1014 } 1015 return false; 1016 } 1017 1018 /** 1019 * Returns true if owned and not known to be blocked. 1020 */ 1021 final boolean isApparentlyUnblocked() { 1022 Thread wt; Thread.State s; 1023 return (eventCount >= 0 && 1024 (wt = owner) != null && 1025 (s = wt.getState()) != Thread.State.BLOCKED && 1026 s != Thread.State.WAITING && 1027 s != Thread.State.TIMED_WAITING); 1028 } 1029 1030 // Unsafe mechanics 1031 private static final sun.misc.Unsafe U; 1032 private static final long QBASE; 1033 private static final long QLOCK; 1034 private static final int ABASE; 1035 private static final int ASHIFT; 1036 static { 1037 try { 1038 U = sun.misc.Unsafe.getUnsafe(); 1039 Class<?> k = WorkQueue.class; 1040 Class<?> ak = ForkJoinTask[].class; 1041 QBASE = U.objectFieldOffset 1042 (k.getDeclaredField("base")); 1043 QLOCK = U.objectFieldOffset 1044 (k.getDeclaredField("qlock")); 1045 ABASE = U.arrayBaseOffset(ak); 1046 int scale = U.arrayIndexScale(ak); 1047 if ((scale & (scale - 1)) != 0) 1048 throw new Error("data type scale not a power of two"); 1049 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 1050 } catch (Exception e) { 1051 throw new Error(e); 1052 } 1053 } 1054 } 1055 1056 // static fields (initialized in static initializer below) 1057 1058 /** 1059 * Per-thread submission bookkeeping. Shared across all pools 1060 * to reduce ThreadLocal pollution and because random motion 1061 * to avoid contention in one pool is likely to hold for others. 1062 * Lazily initialized on first submission (but null-checked 1063 * in other contexts to avoid unnecessary initialization). 1064 */ 1065 static final ThreadLocal<Submitter> submitters; 1066 1067 /** 1068 * Creates a new ForkJoinWorkerThread. This factory is used unless 1069 * overridden in ForkJoinPool constructors. 1070 */ 1071 public static final ForkJoinWorkerThreadFactory 1072 defaultForkJoinWorkerThreadFactory; 1073 1074 /** 1075 * Permission required for callers of methods that may start or 1076 * kill threads. 1077 */ 1078 private static final RuntimePermission modifyThreadPermission; 1079 1080 /** 1081 * Common (static) pool. Non-null for public use unless a static 1082 * construction exception, but internal usages null-check on use 1083 * to paranoically avoid potential initialization circularities 1084 * as well as to simplify generated code. 1085 */ 1086 static final ForkJoinPool common; 1087 1088 /** 1089 * Common pool parallelism. To allow simpler use and management 1090 * when common pool threads are disabled, we allow the underlying 1091 * common.parallelism field to be zero, but in that case still report 1092 * parallelism as 1 to reflect resulting caller-runs mechanics. 1093 */ 1094 static final int commonParallelism; 1095 1096 /** 1097 * Sequence number for creating workerNamePrefix. 1098 */ 1099 private static int poolNumberSequence; 1100 1101 /** 1102 * Returns the next sequence number. We don't expect this to 1103 * ever contend, so use simple builtin sync. 1104 */ 1105 private static final synchronized int nextPoolId() { 1106 return ++poolNumberSequence; 1107 } 1108 1109 // static constants 1110 1111 /** 1112 * Initial timeout value (in nanoseconds) for the thread 1113 * triggering quiescence to park waiting for new work. On timeout, 1114 * the thread will instead try to shrink the number of 1115 * workers. The value should be large enough to avoid overly 1116 * aggressive shrinkage during most transient stalls (long GCs 1117 * etc). 1118 */ 1119 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec 1120 1121 /** 1122 * Timeout value when there are more threads than parallelism level 1123 */ 1124 private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; 1125 1126 /** 1127 * Tolerance for idle timeouts, to cope with timer undershoots 1128 */ 1129 private static final long TIMEOUT_SLOP = 2000000L; 1130 1131 /** 1132 * The maximum stolen->joining link depth allowed in method 1133 * tryHelpStealer. Must be a power of two. Depths for legitimate 1134 * chains are unbounded, but we use a fixed constant to avoid 1135 * (otherwise unchecked) cycles and to bound staleness of 1136 * traversal parameters at the expense of sometimes blocking when 1137 * we could be helping. 1138 */ 1139 private static final int MAX_HELP = 64; 1140 1141 /** 1142 * Increment for seed generators. See class ThreadLocal for 1143 * explanation. 1144 */ 1145 private static final int SEED_INCREMENT = 0x61c88647; 1146 1147 /* 1148 * Bits and masks for control variables 1149 * 1150 * Field ctl is a long packed with: 1151 * AC: Number of active running workers minus target parallelism (16 bits) 1152 * TC: Number of total workers minus target parallelism (16 bits) 1153 * ST: true if pool is terminating (1 bit) 1154 * EC: the wait count of top waiting thread (15 bits) 1155 * ID: poolIndex of top of Treiber stack of waiters (16 bits) 1156 * 1157 * When convenient, we can extract the upper 32 bits of counts and 1158 * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = 1159 * (int)ctl. The ec field is never accessed alone, but always 1160 * together with id and st. The offsets of counts by the target 1161 * parallelism and the positionings of fields makes it possible to 1162 * perform the most common checks via sign tests of fields: When 1163 * ac is negative, there are not enough active workers, when tc is 1164 * negative, there are not enough total workers, and when e is 1165 * negative, the pool is terminating. To deal with these possibly 1166 * negative fields, we use casts in and out of "short" and/or 1167 * signed shifts to maintain signedness. 1168 * 1169 * When a thread is queued (inactivated), its eventCount field is 1170 * set negative, which is the only way to tell if a worker is 1171 * prevented from executing tasks, even though it must continue to 1172 * scan for them to avoid queuing races. Note however that 1173 * eventCount updates lag releases so usage requires care. 1174 * 1175 * Field plock is an int packed with: 1176 * SHUTDOWN: true if shutdown is enabled (1 bit) 1177 * SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits) 1178 * SIGNAL: set when threads may be waiting on the lock (1 bit) 1179 * 1180 * The sequence number enables simple consistency checks: 1181 * Staleness of read-only operations on the workQueues array can 1182 * be checked by comparing plock before vs after the reads. 1183 */ 1184 1185 // bit positions/shifts for fields 1186 private static final int AC_SHIFT = 48; 1187 private static final int TC_SHIFT = 32; 1188 private static final int ST_SHIFT = 31; 1189 private static final int EC_SHIFT = 16; 1190 1191 // bounds 1192 private static final int SMASK = 0xffff; // short bits 1193 private static final int MAX_CAP = 0x7fff; // max #workers - 1 1194 private static final int EVENMASK = 0xfffe; // even short bits 1195 private static final int SQMASK = 0x007e; // max 64 (even) slots 1196 private static final int SHORT_SIGN = 1 << 15; 1197 private static final int INT_SIGN = 1 << 31; 1198 1199 // masks 1200 private static final long STOP_BIT = 0x0001L << ST_SHIFT; 1201 private static final long AC_MASK = ((long)SMASK) << AC_SHIFT; 1202 private static final long TC_MASK = ((long)SMASK) << TC_SHIFT; 1203 1204 // units for incrementing and decrementing 1205 private static final long TC_UNIT = 1L << TC_SHIFT; 1206 private static final long AC_UNIT = 1L << AC_SHIFT; 1207 1208 // masks and units for dealing with u = (int)(ctl >>> 32) 1209 private static final int UAC_SHIFT = AC_SHIFT - 32; 1210 private static final int UTC_SHIFT = TC_SHIFT - 32; 1211 private static final int UAC_MASK = SMASK << UAC_SHIFT; 1212 private static final int UTC_MASK = SMASK << UTC_SHIFT; 1213 private static final int UAC_UNIT = 1 << UAC_SHIFT; 1214 private static final int UTC_UNIT = 1 << UTC_SHIFT; 1215 1216 // masks and units for dealing with e = (int)ctl 1217 private static final int E_MASK = 0x7fffffff; // no STOP_BIT 1218 private static final int E_SEQ = 1 << EC_SHIFT; 1219 1220 // plock bits 1221 private static final int SHUTDOWN = 1 << 31; 1222 private static final int PL_LOCK = 2; 1223 private static final int PL_SIGNAL = 1; 1224 private static final int PL_SPINS = 1 << 8; 1225 1226 // access mode for WorkQueue 1227 static final int LIFO_QUEUE = 0; 1228 static final int FIFO_QUEUE = 1; 1229 static final int SHARED_QUEUE = -1; 1230 1231 // Heuristic padding to ameliorate unfortunate memory placements 1232 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06; 1233 1234 // Instance fields 1235 volatile long stealCount; // collects worker counts 1236 volatile long ctl; // main pool control 1237 volatile int plock; // shutdown status and seqLock 1238 volatile int indexSeed; // worker/submitter index seed 1239 final short parallelism; // parallelism level 1240 final short mode; // LIFO/FIFO 1241 WorkQueue[] workQueues; // main registry 1242 final ForkJoinWorkerThreadFactory factory; 1243 final UncaughtExceptionHandler ueh; // per-worker UEH 1244 final String workerNamePrefix; // to create worker name string 1245 1246 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; 1247 volatile Object pad18, pad19, pad1a, pad1b; 1248 1249 /** 1250 * Acquires the plock lock to protect worker array and related 1251 * updates. This method is called only if an initial CAS on plock 1252 * fails. This acts as a spinlock for normal cases, but falls back 1253 * to builtin monitor to block when (rarely) needed. This would be 1254 * a terrible idea for a highly contended lock, but works fine as 1255 * a more conservative alternative to a pure spinlock. 1256 */ 1257 private int acquirePlock() { 1258 int spins = PL_SPINS, ps, nps; 1259 for (;;) { 1260 if (((ps = plock) & PL_LOCK) == 0 && 1261 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) 1262 return nps; 1263 else if (spins >= 0) { 1264 if (ThreadLocalRandom.current().nextInt() >= 0) 1265 --spins; 1266 } 1267 else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { 1268 synchronized (this) { 1269 if ((plock & PL_SIGNAL) != 0) { 1270 try { 1271 wait(); 1272 } catch (InterruptedException ie) { 1273 try { 1274 Thread.currentThread().interrupt(); 1275 } catch (SecurityException ignore) { 1276 } 1277 } 1278 } 1279 else 1280 notifyAll(); 1281 } 1282 } 1283 } 1284 } 1285 1286 /** 1287 * Unlocks and signals any thread waiting for plock. Called only 1288 * when CAS of seq value for unlock fails. 1289 */ 1290 private void releasePlock(int ps) { 1291 plock = ps; 1292 synchronized (this) { notifyAll(); } 1293 } 1294 1295 /** 1296 * Tries to create and start one worker if fewer than target 1297 * parallelism level exist. Adjusts counts etc on failure. 1298 */ 1299 private void tryAddWorker() { 1300 long c; int u, e; 1301 while ((u = (int)((c = ctl) >>> 32)) < 0 && 1302 (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) { 1303 long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) | 1304 ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e; 1305 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1306 ForkJoinWorkerThreadFactory fac; 1307 Throwable ex = null; 1308 ForkJoinWorkerThread wt = null; 1309 try { 1310 if ((fac = factory) != null && 1311 (wt = fac.newThread(this)) != null) { 1312 wt.start(); 1313 break; 1314 } 1315 } catch (Throwable rex) { 1316 ex = rex; 1317 } 1318 deregisterWorker(wt, ex); 1319 break; 1320 } 1321 } 1322 } 1323 1324 // Registering and deregistering workers 1325 1326 /** 1327 * Callback from ForkJoinWorkerThread to establish and record its 1328 * WorkQueue. To avoid scanning bias due to packing entries in 1329 * front of the workQueues array, we treat the array as a simple 1330 * power-of-two hash table using per-thread seed as hash, 1331 * expanding as needed. 1332 * 1333 * @param wt the worker thread 1334 * @return the worker's queue 1335 */ 1336 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 1337 UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps; 1338 wt.setDaemon(true); 1339 if ((handler = ueh) != null) 1340 wt.setUncaughtExceptionHandler(handler); 1341 do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, 1342 s += SEED_INCREMENT) || 1343 s == 0); // skip 0 1344 WorkQueue w = new WorkQueue(this, wt, mode, s); 1345 if (((ps = plock) & PL_LOCK) != 0 || 1346 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1347 ps = acquirePlock(); 1348 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1349 try { 1350 if ((ws = workQueues) != null) { // skip if shutting down 1351 int n = ws.length, m = n - 1; 1352 int r = (s << 1) | 1; // use odd-numbered indices 1353 if (ws[r &= m] != null) { // collision 1354 int probes = 0; // step by approx half size 1355 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; 1356 while (ws[r = (r + step) & m] != null) { 1357 if (++probes >= n) { 1358 workQueues = ws = Arrays.copyOf(ws, n <<= 1); 1359 m = n - 1; 1360 probes = 0; 1361 } 1362 } 1363 } 1364 w.poolIndex = (short)r; 1365 w.eventCount = r; // volatile write orders 1366 ws[r] = w; 1367 } 1368 } finally { 1369 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1370 releasePlock(nps); 1371 } 1372 wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1))); 1373 return w; 1374 } 1375 1376 /** 1377 * Final callback from terminating worker, as well as upon failure 1378 * to construct or start a worker. Removes record of worker from 1379 * array, and adjusts counts. If pool is shutting down, tries to 1380 * complete termination. 1381 * 1382 * @param wt the worker thread, or null if construction failed 1383 * @param ex the exception causing failure, or null if none 1384 */ 1385 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1386 WorkQueue w = null; 1387 if (wt != null && (w = wt.workQueue) != null) { 1388 int ps; long sc; 1389 w.qlock = -1; // ensure set 1390 do {} while (!U.compareAndSwapLong(this, STEALCOUNT, 1391 sc = stealCount, 1392 sc + w.nsteals)); 1393 if (((ps = plock) & PL_LOCK) != 0 || 1394 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1395 ps = acquirePlock(); 1396 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1397 try { 1398 int idx = w.poolIndex; 1399 WorkQueue[] ws = workQueues; 1400 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) 1401 ws[idx] = null; 1402 } finally { 1403 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1404 releasePlock(nps); 1405 } 1406 } 1407 1408 long c; // adjust ctl counts 1409 do {} while (!U.compareAndSwapLong 1410 (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | 1411 ((c - TC_UNIT) & TC_MASK) | 1412 (c & ~(AC_MASK|TC_MASK))))); 1413 1414 if (!tryTerminate(false, false) && w != null && w.array != null) { 1415 w.cancelAll(); // cancel remaining tasks 1416 WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; 1417 while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { 1418 if (e > 0) { // activate or create replacement 1419 if ((ws = workQueues) == null || 1420 (i = e & SMASK) >= ws.length || 1421 (v = ws[i]) == null) 1422 break; 1423 long nc = (((long)(v.nextWait & E_MASK)) | 1424 ((long)(u + UAC_UNIT) << 32)); 1425 if (v.eventCount != (e | INT_SIGN)) 1426 break; 1427 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1428 v.eventCount = (e + E_SEQ) & E_MASK; 1429 if ((p = v.parker) != null) 1430 U.unpark(p); 1431 break; 1432 } 1433 } 1434 else { 1435 if ((short)u < 0) 1436 tryAddWorker(); 1437 break; 1438 } 1439 } 1440 } 1441 if (ex == null) // help clean refs on way out 1442 ForkJoinTask.helpExpungeStaleExceptions(); 1443 else // rethrow 1444 ForkJoinTask.rethrow(ex); 1445 } 1446 1447 // Submissions 1448 1449 /** 1450 * Per-thread records for threads that submit to pools. Currently 1451 * holds only pseudo-random seed / index that is used to choose 1452 * submission queues in method externalPush. In the future, this may 1453 * also incorporate a means to implement different task rejection 1454 * and resubmission policies. 1455 * 1456 * Seeds for submitters and workers/workQueues work in basically 1457 * the same way but are initialized and updated using slightly 1458 * different mechanics. Both are initialized using the same 1459 * approach as in class ThreadLocal, where successive values are 1460 * unlikely to collide with previous values. Seeds are then 1461 * randomly modified upon collisions using xorshifts, which 1462 * requires a non-zero seed. 1463 */ 1464 static final class Submitter { 1465 int seed; 1466 Submitter(int s) { seed = s; } 1467 } 1468 1469 /** 1470 * Unless shutting down, adds the given task to a submission queue 1471 * at submitter's current queue index (modulo submission 1472 * range). Only the most common path is directly handled in this 1473 * method. All others are relayed to fullExternalPush. 1474 * 1475 * @param task the task. Caller must ensure non-null. 1476 */ 1477 final void externalPush(ForkJoinTask<?> task) { 1478 Submitter z = submitters.get(); 1479 WorkQueue q; int r, m, s, n, am; ForkJoinTask<?>[] a; 1480 int ps = plock; 1481 WorkQueue[] ws = workQueues; 1482 if (z != null && ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 && 1483 (q = ws[m & (r = z.seed) & SQMASK]) != null && r != 0 && 1484 U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock 1485 if ((a = q.array) != null && 1486 (am = a.length - 1) > (n = (s = q.top) - q.base)) { 1487 int j = ((am & s) << ASHIFT) + ABASE; 1488 U.putOrderedObject(a, j, task); 1489 q.top = s + 1; // push on to deque 1490 q.qlock = 0; 1491 if (n <= 1) 1492 signalWork(ws, q); 1493 return; 1494 } 1495 q.qlock = 0; 1496 } 1497 fullExternalPush(task); 1498 } 1499 1500 /** 1501 * Full version of externalPush. This method is called, among 1502 * other times, upon the first submission of the first task to the 1503 * pool, so must perform secondary initialization. It also 1504 * detects first submission by an external thread by looking up 1505 * its ThreadLocal, and creates a new shared queue if the one at 1506 * index if empty or contended. The plock lock body must be 1507 * exception-free (so no try/finally) so we optimistically 1508 * allocate new queues outside the lock and throw them away if 1509 * (very rarely) not needed. 1510 * 1511 * Secondary initialization occurs when plock is zero, to create 1512 * workQueue array and set plock to a valid value. This lock body 1513 * must also be exception-free. Because the plock seq value can 1514 * eventually wrap around zero, this method harmlessly fails to 1515 * reinitialize if workQueues exists, while still advancing plock. 1516 */ 1517 private void fullExternalPush(ForkJoinTask<?> task) { 1518 int r = 0; // random index seed 1519 for (Submitter z = submitters.get();;) { 1520 WorkQueue[] ws; WorkQueue q; int ps, m, k; 1521 if (z == null) { 1522 if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed, 1523 r += SEED_INCREMENT) && r != 0) 1524 submitters.set(z = new Submitter(r)); 1525 } 1526 else if (r == 0) { // move to a different index 1527 r = z.seed; 1528 r ^= r << 13; // same xorshift as WorkQueues 1529 r ^= r >>> 17; 1530 z.seed = r ^= (r << 5); 1531 } 1532 if ((ps = plock) < 0) 1533 throw new RejectedExecutionException(); 1534 else if (ps == 0 || (ws = workQueues) == null || 1535 (m = ws.length - 1) < 0) { // initialize workQueues 1536 int p = parallelism; // find power of two table size 1537 int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots 1538 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; 1539 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; 1540 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? 1541 new WorkQueue[n] : null); 1542 if (((ps = plock) & PL_LOCK) != 0 || 1543 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1544 ps = acquirePlock(); 1545 if (((ws = workQueues) == null || ws.length == 0) && nws != null) 1546 workQueues = nws; 1547 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1548 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1549 releasePlock(nps); 1550 } 1551 else if ((q = ws[k = r & m & SQMASK]) != null) { 1552 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { 1553 ForkJoinTask<?>[] a = q.array; 1554 int s = q.top; 1555 boolean submitted = false; 1556 try { // locked version of push 1557 if ((a != null && a.length > s + 1 - q.base) || 1558 (a = q.growArray()) != null) { // must presize 1559 int j = (((a.length - 1) & s) << ASHIFT) + ABASE; 1560 U.putOrderedObject(a, j, task); 1561 q.top = s + 1; 1562 submitted = true; 1563 } 1564 } finally { 1565 q.qlock = 0; // unlock 1566 } 1567 if (submitted) { 1568 signalWork(ws, q); 1569 return; 1570 } 1571 } 1572 r = 0; // move on failure 1573 } 1574 else if (((ps = plock) & PL_LOCK) == 0) { // create new queue 1575 q = new WorkQueue(this, null, SHARED_QUEUE, r); 1576 q.poolIndex = (short)k; 1577 if (((ps = plock) & PL_LOCK) != 0 || 1578 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1579 ps = acquirePlock(); 1580 if ((ws = workQueues) != null && k < ws.length && ws[k] == null) 1581 ws[k] = q; 1582 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1583 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1584 releasePlock(nps); 1585 } 1586 else 1587 r = 0; 1588 } 1589 } 1590 1591 // Maintaining ctl counts 1592 1593 /** 1594 * Increments active count; mainly called upon return from blocking. 1595 */ 1596 final void incrementActiveCount() { 1597 long c; 1598 do {} while (!U.compareAndSwapLong 1599 (this, CTL, c = ctl, ((c & ~AC_MASK) | 1600 ((c & AC_MASK) + AC_UNIT)))); 1601 } 1602 1603 /** 1604 * Tries to create or activate a worker if too few are active. 1605 * 1606 * @param ws the worker array to use to find signallees 1607 * @param q if non-null, the queue holding tasks to be processed 1608 */ 1609 final void signalWork(WorkQueue[] ws, WorkQueue q) { 1610 for (;;) { 1611 long c; int e, u, i; WorkQueue w; Thread p; 1612 if ((u = (int)((c = ctl) >>> 32)) >= 0) 1613 break; 1614 if ((e = (int)c) <= 0) { 1615 if ((short)u < 0) 1616 tryAddWorker(); 1617 break; 1618 } 1619 if (ws == null || ws.length <= (i = e & SMASK) || 1620 (w = ws[i]) == null) 1621 break; 1622 long nc = (((long)(w.nextWait & E_MASK)) | 1623 ((long)(u + UAC_UNIT)) << 32); 1624 int ne = (e + E_SEQ) & E_MASK; 1625 if (w.eventCount == (e | INT_SIGN) && 1626 U.compareAndSwapLong(this, CTL, c, nc)) { 1627 w.eventCount = ne; 1628 if ((p = w.parker) != null) 1629 U.unpark(p); 1630 break; 1631 } 1632 if (q != null && q.base >= q.top) 1633 break; 1634 } 1635 } 1636 1637 // Scanning for tasks 1638 1639 /** 1640 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1641 */ 1642 final void runWorker(WorkQueue w) { 1643 w.growArray(); // allocate queue 1644 for (int r = w.hint; scan(w, r) == 0; ) { 1645 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift 1646 } 1647 } 1648 1649 /** 1650 * Scans for and, if found, runs one task, else possibly 1651 * inactivates the worker. This method operates on single reads of 1652 * volatile state and is designed to be re-invoked continuously, 1653 * in part because it returns upon detecting inconsistencies, 1654 * contention, or state changes that indicate possible success on 1655 * re-invocation. 1656 * 1657 * The scan searches for tasks across queues starting at a random 1658 * index, checking each at least twice. The scan terminates upon 1659 * either finding a non-empty queue, or completing the sweep. If 1660 * the worker is not inactivated, it takes and runs a task from 1661 * this queue. Otherwise, if not activated, it tries to activate 1662 * itself or some other worker by signalling. On failure to find a 1663 * task, returns (for retry) if pool state may have changed during 1664 * an empty scan, or tries to inactivate if active, else possibly 1665 * blocks or terminates via method awaitWork. 1666 * 1667 * @param w the worker (via its WorkQueue) 1668 * @param r a random seed 1669 * @return worker qlock status if would have waited, else 0 1670 */ 1671 private final int scan(WorkQueue w, int r) { 1672 WorkQueue[] ws; int m; 1673 long c = ctl; // for consistency check 1674 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) { 1675 for (int j = m + m + 1, ec = w.eventCount;;) { 1676 WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t; 1677 if ((q = ws[(r - j) & m]) != null && 1678 (b = q.base) - q.top < 0 && (a = q.array) != null) { 1679 long i = (((a.length - 1) & b) << ASHIFT) + ABASE; 1680 if ((t = ((ForkJoinTask<?>) 1681 U.getObjectVolatile(a, i))) != null) { 1682 if (ec < 0) 1683 helpRelease(c, ws, w, q, b); 1684 else if (q.base == b && 1685 U.compareAndSwapObject(a, i, t, null)) { 1686 U.putOrderedInt(q, QBASE, b + 1); 1687 if ((b + 1) - q.top < 0) 1688 signalWork(ws, q); 1689 w.runTask(t); 1690 } 1691 } 1692 break; 1693 } 1694 else if (--j < 0) { 1695 if ((ec | (e = (int)c)) < 0) // inactive or terminating 1696 return awaitWork(w, c, ec); 1697 else if (ctl == c) { // try to inactivate and enqueue 1698 long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); 1699 w.nextWait = e; 1700 w.eventCount = ec | INT_SIGN; 1701 if (!U.compareAndSwapLong(this, CTL, c, nc)) 1702 w.eventCount = ec; // back out 1703 } 1704 break; 1705 } 1706 } 1707 } 1708 return 0; 1709 } 1710 1711 /** 1712 * A continuation of scan(), possibly blocking or terminating 1713 * worker w. Returns without blocking if pool state has apparently 1714 * changed since last invocation. Also, if inactivating w has 1715 * caused the pool to become quiescent, checks for pool 1716 * termination, and, so long as this is not the only worker, waits 1717 * for event for up to a given duration. On timeout, if ctl has 1718 * not changed, terminates the worker, which will in turn wake up 1719 * another worker to possibly repeat this process. 1720 * 1721 * @param w the calling worker 1722 * @param c the ctl value on entry to scan 1723 * @param ec the worker's eventCount on entry to scan 1724 */ 1725 private final int awaitWork(WorkQueue w, long c, int ec) { 1726 int stat, ns; long parkTime, deadline; 1727 if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c && 1728 !Thread.interrupted()) { 1729 int e = (int)c; 1730 int u = (int)(c >>> 32); 1731 int d = (u >> UAC_SHIFT) + parallelism; // active count 1732 1733 if (e < 0 || (d <= 0 && tryTerminate(false, false))) 1734 stat = w.qlock = -1; // pool is terminating 1735 else if ((ns = w.nsteals) != 0) { // collect steals and retry 1736 long sc; 1737 w.nsteals = 0; 1738 do {} while (!U.compareAndSwapLong(this, STEALCOUNT, 1739 sc = stealCount, sc + ns)); 1740 } 1741 else { 1742 long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L : 1743 ((long)(w.nextWait & E_MASK)) | // ctl to restore 1744 ((long)(u + UAC_UNIT)) << 32); 1745 if (pc != 0L) { // timed wait if last waiter 1746 int dc = -(short)(c >>> TC_SHIFT); 1747 parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT: 1748 (dc + 1) * IDLE_TIMEOUT); 1749 deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; 1750 } 1751 else 1752 parkTime = deadline = 0L; 1753 if (w.eventCount == ec && ctl == c) { 1754 Thread wt = Thread.currentThread(); 1755 U.putObject(wt, PARKBLOCKER, this); 1756 w.parker = wt; // emulate LockSupport.park 1757 if (w.eventCount == ec && ctl == c) 1758 U.park(false, parkTime); // must recheck before park 1759 w.parker = null; 1760 U.putObject(wt, PARKBLOCKER, null); 1761 if (parkTime != 0L && ctl == c && 1762 deadline - System.nanoTime() <= 0L && 1763 U.compareAndSwapLong(this, CTL, c, pc)) 1764 stat = w.qlock = -1; // shrink pool 1765 } 1766 } 1767 } 1768 return stat; 1769 } 1770 1771 /** 1772 * Possibly releases (signals) a worker. Called only from scan() 1773 * when a worker with apparently inactive status finds a non-empty 1774 * queue. This requires revalidating all of the associated state 1775 * from caller. 1776 */ 1777 private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w, 1778 WorkQueue q, int b) { 1779 WorkQueue v; int e, i; Thread p; 1780 if (w != null && w.eventCount < 0 && (e = (int)c) > 0 && 1781 ws != null && ws.length > (i = e & SMASK) && 1782 (v = ws[i]) != null && ctl == c) { 1783 long nc = (((long)(v.nextWait & E_MASK)) | 1784 ((long)((int)(c >>> 32) + UAC_UNIT)) << 32); 1785 int ne = (e + E_SEQ) & E_MASK; 1786 if (q != null && q.base == b && w.eventCount < 0 && 1787 v.eventCount == (e | INT_SIGN) && 1788 U.compareAndSwapLong(this, CTL, c, nc)) { 1789 v.eventCount = ne; 1790 if ((p = v.parker) != null) 1791 U.unpark(p); 1792 } 1793 } 1794 } 1795 1796 /** 1797 * Tries to locate and execute tasks for a stealer of the given 1798 * task, or in turn one of its stealers, Traces currentSteal -> 1799 * currentJoin links looking for a thread working on a descendant 1800 * of the given task and with a non-empty queue to steal back and 1801 * execute tasks from. The first call to this method upon a 1802 * waiting join will often entail scanning/search, (which is OK 1803 * because the joiner has nothing better to do), but this method 1804 * leaves hints in workers to speed up subsequent calls. The 1805 * implementation is very branchy to cope with potential 1806 * inconsistencies or loops encountering chains that are stale, 1807 * unknown, or so long that they are likely cyclic. 1808 * 1809 * @param joiner the joining worker 1810 * @param task the task to join 1811 * @return 0 if no progress can be made, negative if task 1812 * known complete, else positive 1813 */ 1814 private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { 1815 int stat = 0, steps = 0; // bound to avoid cycles 1816 if (task != null && joiner != null && 1817 joiner.base - joiner.top >= 0) { // hoist checks 1818 restart: for (;;) { 1819 ForkJoinTask<?> subtask = task; // current target 1820 for (WorkQueue j = joiner, v;;) { // v is stealer of subtask 1821 WorkQueue[] ws; int m, s, h; 1822 if ((s = task.status) < 0) { 1823 stat = s; 1824 break restart; 1825 } 1826 if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) 1827 break restart; // shutting down 1828 if ((v = ws[h = (j.hint | 1) & m]) == null || 1829 v.currentSteal != subtask) { 1830 for (int origin = h;;) { // find stealer 1831 if (((h = (h + 2) & m) & 15) == 1 && 1832 (subtask.status < 0 || j.currentJoin != subtask)) 1833 continue restart; // occasional staleness check 1834 if ((v = ws[h]) != null && 1835 v.currentSteal == subtask) { 1836 j.hint = h; // save hint 1837 break; 1838 } 1839 if (h == origin) 1840 break restart; // cannot find stealer 1841 } 1842 } 1843 for (;;) { // help stealer or descend to its stealer 1844 ForkJoinTask[] a; int b; 1845 if (subtask.status < 0) // surround probes with 1846 continue restart; // consistency checks 1847 if ((b = v.base) - v.top < 0 && (a = v.array) != null) { 1848 int i = (((a.length - 1) & b) << ASHIFT) + ABASE; 1849 ForkJoinTask<?> t = 1850 (ForkJoinTask<?>)U.getObjectVolatile(a, i); 1851 if (subtask.status < 0 || j.currentJoin != subtask || 1852 v.currentSteal != subtask) 1853 continue restart; // stale 1854 stat = 1; // apparent progress 1855 if (v.base == b) { 1856 if (t == null) 1857 break restart; 1858 if (U.compareAndSwapObject(a, i, t, null)) { 1859 U.putOrderedInt(v, QBASE, b + 1); 1860 ForkJoinTask<?> ps = joiner.currentSteal; 1861 int jt = joiner.top; 1862 do { 1863 joiner.currentSteal = t; 1864 t.doExec(); // clear local tasks too 1865 } while (task.status >= 0 && 1866 joiner.top != jt && 1867 (t = joiner.pop()) != null); 1868 joiner.currentSteal = ps; 1869 break restart; 1870 } 1871 } 1872 } 1873 else { // empty -- try to descend 1874 ForkJoinTask<?> next = v.currentJoin; 1875 if (subtask.status < 0 || j.currentJoin != subtask || 1876 v.currentSteal != subtask) 1877 continue restart; // stale 1878 else if (next == null || ++steps == MAX_HELP) 1879 break restart; // dead-end or maybe cyclic 1880 else { 1881 subtask = next; 1882 j = v; 1883 break; 1884 } 1885 } 1886 } 1887 } 1888 } 1889 } 1890 return stat; 1891 } 1892 1893 /** 1894 * Analog of tryHelpStealer for CountedCompleters. Tries to steal 1895 * and run tasks within the target's computation. 1896 * 1897 * @param task the task to join 1898 */ 1899 private int helpComplete(WorkQueue joiner, CountedCompleter<?> task) { 1900 WorkQueue[] ws; int m; 1901 int s = 0; 1902 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && 1903 joiner != null && task != null) { 1904 int j = joiner.poolIndex; 1905 int scans = m + m + 1; 1906 long c = 0L; // for stability check 1907 for (int k = scans; ; j += 2) { 1908 WorkQueue q; 1909 if ((s = task.status) < 0) 1910 break; 1911 else if (joiner.internalPopAndExecCC(task)) 1912 k = scans; 1913 else if ((s = task.status) < 0) 1914 break; 1915 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) 1916 k = scans; 1917 else if (--k < 0) { 1918 if (c == (c = ctl)) 1919 break; 1920 k = scans; 1921 } 1922 } 1923 } 1924 return s; 1925 } 1926 1927 /** 1928 * Tries to decrement active count (sometimes implicitly) and 1929 * possibly release or create a compensating worker in preparation 1930 * for blocking. Fails on contention or termination. Otherwise, 1931 * adds a new thread if no idle workers are available and pool 1932 * may become starved. 1933 * 1934 * @param c the assumed ctl value 1935 */ 1936 final boolean tryCompensate(long c) { 1937 WorkQueue[] ws = workQueues; 1938 int pc = parallelism, e = (int)c, m, tc; 1939 if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) { 1940 WorkQueue w = ws[e & m]; 1941 if (e != 0 && w != null) { 1942 Thread p; 1943 long nc = ((long)(w.nextWait & E_MASK) | 1944 (c & (AC_MASK|TC_MASK))); 1945 int ne = (e + E_SEQ) & E_MASK; 1946 if (w.eventCount == (e | INT_SIGN) && 1947 U.compareAndSwapLong(this, CTL, c, nc)) { 1948 w.eventCount = ne; 1949 if ((p = w.parker) != null) 1950 U.unpark(p); 1951 return true; // replace with idle worker 1952 } 1953 } 1954 else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 && 1955 (int)(c >> AC_SHIFT) + pc > 1) { 1956 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); 1957 if (U.compareAndSwapLong(this, CTL, c, nc)) 1958 return true; // no compensation 1959 } 1960 else if (tc + pc < MAX_CAP) { 1961 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 1962 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1963 ForkJoinWorkerThreadFactory fac; 1964 Throwable ex = null; 1965 ForkJoinWorkerThread wt = null; 1966 try { 1967 if ((fac = factory) != null && 1968 (wt = fac.newThread(this)) != null) { 1969 wt.start(); 1970 return true; 1971 } 1972 } catch (Throwable rex) { 1973 ex = rex; 1974 } 1975 deregisterWorker(wt, ex); // clean up and return false 1976 } 1977 } 1978 } 1979 return false; 1980 } 1981 1982 /** 1983 * Helps and/or blocks until the given task is done. 1984 * 1985 * @param joiner the joining worker 1986 * @param task the task 1987 * @return task status on exit 1988 */ 1989 final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) { 1990 int s = 0; 1991 if (task != null && (s = task.status) >= 0 && joiner != null) { 1992 ForkJoinTask<?> prevJoin = joiner.currentJoin; 1993 joiner.currentJoin = task; 1994 do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 1995 (s = task.status) >= 0); 1996 if (s >= 0 && (task instanceof CountedCompleter)) 1997 s = helpComplete(joiner, (CountedCompleter<?>)task); 1998 long cc = 0; // for stability checks 1999 while (s >= 0 && (s = task.status) >= 0) { 2000 if ((s = tryHelpStealer(joiner, task)) == 0 && 2001 (s = task.status) >= 0) { 2002 if (!tryCompensate(cc)) 2003 cc = ctl; 2004 else { 2005 if (task.trySetSignal() && (s = task.status) >= 0) { 2006 synchronized (task) { 2007 if (task.status >= 0) { 2008 try { // see ForkJoinTask 2009 task.wait(); // for explanation 2010 } catch (InterruptedException ie) { 2011 } 2012 } 2013 else 2014 task.notifyAll(); 2015 } 2016 } 2017 long c; // reactivate 2018 do {} while (!U.compareAndSwapLong 2019 (this, CTL, c = ctl, 2020 ((c & ~AC_MASK) | 2021 ((c & AC_MASK) + AC_UNIT)))); 2022 } 2023 } 2024 } 2025 joiner.currentJoin = prevJoin; 2026 } 2027 return s; 2028 } 2029 2030 /** 2031 * Stripped-down variant of awaitJoin used by timed joins. Tries 2032 * to help join only while there is continuous progress. (Caller 2033 * will then enter a timed wait.) 2034 * 2035 * @param joiner the joining worker 2036 * @param task the task 2037 */ 2038 final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) { 2039 int s; 2040 if (joiner != null && task != null && (s = task.status) >= 0) { 2041 ForkJoinTask<?> prevJoin = joiner.currentJoin; 2042 joiner.currentJoin = task; 2043 do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 2044 (s = task.status) >= 0); 2045 if (s >= 0) { 2046 if (task instanceof CountedCompleter) 2047 helpComplete(joiner, (CountedCompleter<?>)task); 2048 do {} while (task.status >= 0 && 2049 tryHelpStealer(joiner, task) > 0); 2050 } 2051 joiner.currentJoin = prevJoin; 2052 } 2053 } 2054 2055 /** 2056 * Returns a (probably) non-empty steal queue, if one is found 2057 * during a scan, else null. This method must be retried by 2058 * caller if, by the time it tries to use the queue, it is empty. 2059 */ 2060 private WorkQueue findNonEmptyStealQueue() { 2061 int r = ThreadLocalRandom.current().nextInt(); 2062 for (;;) { 2063 int ps = plock, m; WorkQueue[] ws; WorkQueue q; 2064 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { 2065 for (int j = (m + 1) << 2; j >= 0; --j) { 2066 if ((q = ws[(((r - j) << 1) | 1) & m]) != null && 2067 q.base - q.top < 0) 2068 return q; 2069 } 2070 } 2071 if (plock == ps) 2072 return null; 2073 } 2074 } 2075 2076 /** 2077 * Runs tasks until {@code isQuiescent()}. We piggyback on 2078 * active count ctl maintenance, but rather than blocking 2079 * when tasks cannot be found, we rescan until all others cannot 2080 * find tasks either. 2081 */ 2082 final void helpQuiescePool(WorkQueue w) { 2083 ForkJoinTask<?> ps = w.currentSteal; 2084 for (boolean active = true;;) { 2085 long c; WorkQueue q; ForkJoinTask<?> t; int b; 2086 while ((t = w.nextLocalTask()) != null) 2087 t.doExec(); 2088 if ((q = findNonEmptyStealQueue()) != null) { 2089 if (!active) { // re-establish active count 2090 active = true; 2091 do {} while (!U.compareAndSwapLong 2092 (this, CTL, c = ctl, 2093 ((c & ~AC_MASK) | 2094 ((c & AC_MASK) + AC_UNIT)))); 2095 } 2096 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { 2097 (w.currentSteal = t).doExec(); 2098 w.currentSteal = ps; 2099 } 2100 } 2101 else if (active) { // decrement active count without queuing 2102 long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); 2103 if ((int)(nc >> AC_SHIFT) + parallelism == 0) 2104 break; // bypass decrement-then-increment 2105 if (U.compareAndSwapLong(this, CTL, c, nc)) 2106 active = false; 2107 } 2108 else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 && 2109 U.compareAndSwapLong 2110 (this, CTL, c, ((c & ~AC_MASK) | 2111 ((c & AC_MASK) + AC_UNIT)))) 2112 break; 2113 } 2114 } 2115 2116 /** 2117 * Gets and removes a local or stolen task for the given worker. 2118 * 2119 * @return a task, if available 2120 */ 2121 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 2122 for (ForkJoinTask<?> t;;) { 2123 WorkQueue q; int b; 2124 if ((t = w.nextLocalTask()) != null) 2125 return t; 2126 if ((q = findNonEmptyStealQueue()) == null) 2127 return null; 2128 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) 2129 return t; 2130 } 2131 } 2132 2133 /** 2134 * Returns a cheap heuristic guide for task partitioning when 2135 * programmers, frameworks, tools, or languages have little or no 2136 * idea about task granularity. In essence by offering this 2137 * method, we ask users only about tradeoffs in overhead vs 2138 * expected throughput and its variance, rather than how finely to 2139 * partition tasks. 2140 * 2141 * In a steady state strict (tree-structured) computation, each 2142 * thread makes available for stealing enough tasks for other 2143 * threads to remain active. Inductively, if all threads play by 2144 * the same rules, each thread should make available only a 2145 * constant number of tasks. 2146 * 2147 * The minimum useful constant is just 1. But using a value of 1 2148 * would require immediate replenishment upon each steal to 2149 * maintain enough tasks, which is infeasible. Further, 2150 * partitionings/granularities of offered tasks should minimize 2151 * steal rates, which in general means that threads nearer the top 2152 * of computation tree should generate more than those nearer the 2153 * bottom. In perfect steady state, each thread is at 2154 * approximately the same level of computation tree. However, 2155 * producing extra tasks amortizes the uncertainty of progress and 2156 * diffusion assumptions. 2157 * 2158 * So, users will want to use values larger (but not much larger) 2159 * than 1 to both smooth over transient shortages and hedge 2160 * against uneven progress; as traded off against the cost of 2161 * extra task overhead. We leave the user to pick a threshold 2162 * value to compare with the results of this call to guide 2163 * decisions, but recommend values such as 3. 2164 * 2165 * When all threads are active, it is on average OK to estimate 2166 * surplus strictly locally. In steady-state, if one thread is 2167 * maintaining say 2 surplus tasks, then so are others. So we can 2168 * just use estimated queue length. However, this strategy alone 2169 * leads to serious mis-estimates in some non-steady-state 2170 * conditions (ramp-up, ramp-down, other stalls). We can detect 2171 * many of these by further considering the number of "idle" 2172 * threads, that are known to have zero queued tasks, so 2173 * compensate by a factor of (#idle/#active) threads. 2174 * 2175 * Note: The approximation of #busy workers as #active workers is 2176 * not very good under current signalling scheme, and should be 2177 * improved. 2178 */ 2179 static int getSurplusQueuedTaskCount() { 2180 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2181 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { 2182 int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism; 2183 int n = (q = wt.workQueue).top - q.base; 2184 int a = (int)(pool.ctl >> AC_SHIFT) + p; 2185 return n - (a > (p >>>= 1) ? 0 : 2186 a > (p >>>= 1) ? 1 : 2187 a > (p >>>= 1) ? 2 : 2188 a > (p >>>= 1) ? 4 : 2189 8); 2190 } 2191 return 0; 2192 } 2193 2194 // Termination 2195 2196 /** 2197 * Possibly initiates and/or completes termination. The caller 2198 * triggering termination runs three passes through workQueues: 2199 * (0) Setting termination status, followed by wakeups of queued 2200 * workers; (1) cancelling all tasks; (2) interrupting lagging 2201 * threads (likely in external tasks, but possibly also blocked in 2202 * joins). Each pass repeats previous steps because of potential 2203 * lagging thread creation. 2204 * 2205 * @param now if true, unconditionally terminate, else only 2206 * if no work and no active workers 2207 * @param enable if true, enable shutdown when next possible 2208 * @return true if now terminating or terminated 2209 */ 2210 private boolean tryTerminate(boolean now, boolean enable) { 2211 int ps; 2212 if (this == common) // cannot shut down 2213 return false; 2214 if ((ps = plock) >= 0) { // enable by setting plock 2215 if (!enable) 2216 return false; 2217 if ((ps & PL_LOCK) != 0 || 2218 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 2219 ps = acquirePlock(); 2220 int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; 2221 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 2222 releasePlock(nps); 2223 } 2224 for (long c;;) { 2225 if (((c = ctl) & STOP_BIT) != 0) { // already terminating 2226 if ((short)(c >>> TC_SHIFT) + parallelism <= 0) { 2227 synchronized (this) { 2228 notifyAll(); // signal when 0 workers 2229 } 2230 } 2231 return true; 2232 } 2233 if (!now) { // check if idle & no tasks 2234 WorkQueue[] ws; WorkQueue w; 2235 if ((int)(c >> AC_SHIFT) + parallelism > 0) 2236 return false; 2237 if ((ws = workQueues) != null) { 2238 for (int i = 0; i < ws.length; ++i) { 2239 if ((w = ws[i]) != null && 2240 (!w.isEmpty() || 2241 ((i & 1) != 0 && w.eventCount >= 0))) { 2242 signalWork(ws, w); 2243 return false; 2244 } 2245 } 2246 } 2247 } 2248 if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { 2249 for (int pass = 0; pass < 3; ++pass) { 2250 WorkQueue[] ws; WorkQueue w; Thread wt; 2251 if ((ws = workQueues) != null) { 2252 int n = ws.length; 2253 for (int i = 0; i < n; ++i) { 2254 if ((w = ws[i]) != null) { 2255 w.qlock = -1; 2256 if (pass > 0) { 2257 w.cancelAll(); 2258 if (pass > 1 && (wt = w.owner) != null) { 2259 if (!wt.isInterrupted()) { 2260 try { 2261 wt.interrupt(); 2262 } catch (Throwable ignore) { 2263 } 2264 } 2265 U.unpark(wt); 2266 } 2267 } 2268 } 2269 } 2270 // Wake up workers parked on event queue 2271 int i, e; long cc; Thread p; 2272 while ((e = (int)(cc = ctl) & E_MASK) != 0 && 2273 (i = e & SMASK) < n && i >= 0 && 2274 (w = ws[i]) != null) { 2275 long nc = ((long)(w.nextWait & E_MASK) | 2276 ((cc + AC_UNIT) & AC_MASK) | 2277 (cc & (TC_MASK|STOP_BIT))); 2278 if (w.eventCount == (e | INT_SIGN) && 2279 U.compareAndSwapLong(this, CTL, cc, nc)) { 2280 w.eventCount = (e + E_SEQ) & E_MASK; 2281 w.qlock = -1; 2282 if ((p = w.parker) != null) 2283 U.unpark(p); 2284 } 2285 } 2286 } 2287 } 2288 } 2289 } 2290 } 2291 2292 // external operations on common pool 2293 2294 /** 2295 * Returns common pool queue for a thread that has submitted at 2296 * least one task. 2297 */ 2298 static WorkQueue commonSubmitterQueue() { 2299 Submitter z; ForkJoinPool p; WorkQueue[] ws; int m, r; 2300 return ((z = submitters.get()) != null && 2301 (p = common) != null && 2302 (ws = p.workQueues) != null && 2303 (m = ws.length - 1) >= 0) ? 2304 ws[m & z.seed & SQMASK] : null; 2305 } 2306 2307 /** 2308 * Tries to pop the given task from submitter's queue in common pool. 2309 */ 2310 final boolean tryExternalUnpush(ForkJoinTask<?> task) { 2311 WorkQueue joiner; ForkJoinTask<?>[] a; int m, s; 2312 Submitter z = submitters.get(); 2313 WorkQueue[] ws = workQueues; 2314 boolean popped = false; 2315 if (z != null && ws != null && (m = ws.length - 1) >= 0 && 2316 (joiner = ws[z.seed & m & SQMASK]) != null && 2317 joiner.base != (s = joiner.top) && 2318 (a = joiner.array) != null) { 2319 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 2320 if (U.getObject(a, j) == task && 2321 U.compareAndSwapInt(joiner, QLOCK, 0, 1)) { 2322 if (joiner.top == s && joiner.array == a && 2323 U.compareAndSwapObject(a, j, task, null)) { 2324 joiner.top = s - 1; 2325 popped = true; 2326 } 2327 joiner.qlock = 0; 2328 } 2329 } 2330 return popped; 2331 } 2332 2333 final int externalHelpComplete(CountedCompleter<?> task) { 2334 WorkQueue joiner; int m, j; 2335 Submitter z = submitters.get(); 2336 WorkQueue[] ws = workQueues; 2337 int s = 0; 2338 if (z != null && ws != null && (m = ws.length - 1) >= 0 && 2339 (joiner = ws[(j = z.seed) & m & SQMASK]) != null && task != null) { 2340 int scans = m + m + 1; 2341 long c = 0L; // for stability check 2342 j |= 1; // poll odd queues 2343 for (int k = scans; ; j += 2) { 2344 WorkQueue q; 2345 if ((s = task.status) < 0) 2346 break; 2347 else if (joiner.externalPopAndExecCC(task)) 2348 k = scans; 2349 else if ((s = task.status) < 0) 2350 break; 2351 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) 2352 k = scans; 2353 else if (--k < 0) { 2354 if (c == (c = ctl)) 2355 break; 2356 k = scans; 2357 } 2358 } 2359 } 2360 return s; 2361 } 2362 2363 // Exported methods 2364 2365 // Constructors 2366 2367 /** 2368 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2369 * java.lang.Runtime#availableProcessors}, using the {@linkplain 2370 * #defaultForkJoinWorkerThreadFactory default thread factory}, 2371 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 2372 */ 2373 public ForkJoinPool() { 2374 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2375 defaultForkJoinWorkerThreadFactory, null, false); 2376 } 2377 2378 /** 2379 * Creates a {@code ForkJoinPool} with the indicated parallelism 2380 * level, the {@linkplain 2381 * #defaultForkJoinWorkerThreadFactory default thread factory}, 2382 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 2383 * 2384 * @param parallelism the parallelism level 2385 * @throws IllegalArgumentException if parallelism less than or 2386 * equal to zero, or greater than implementation limit 2387 */ 2388 public ForkJoinPool(int parallelism) { 2389 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); 2390 } 2391 2392 /** 2393 * Creates a {@code ForkJoinPool} with the given parameters. 2394 * 2395 * @param parallelism the parallelism level. For default value, 2396 * use {@link java.lang.Runtime#availableProcessors}. 2397 * @param factory the factory for creating new threads. For default value, 2398 * use {@link #defaultForkJoinWorkerThreadFactory}. 2399 * @param handler the handler for internal worker threads that 2400 * terminate due to unrecoverable errors encountered while executing 2401 * tasks. For default value, use {@code null}. 2402 * @param asyncMode if true, 2403 * establishes local first-in-first-out scheduling mode for forked 2404 * tasks that are never joined. This mode may be more appropriate 2405 * than default locally stack-based mode in applications in which 2406 * worker threads only process event-style asynchronous tasks. 2407 * For default value, use {@code false}. 2408 * @throws IllegalArgumentException if parallelism less than or 2409 * equal to zero, or greater than implementation limit 2410 * @throws NullPointerException if the factory is null 2411 */ 2412 public ForkJoinPool(int parallelism, 2413 ForkJoinWorkerThreadFactory factory, 2414 UncaughtExceptionHandler handler, 2415 boolean asyncMode) { 2416 this(checkParallelism(parallelism), 2417 checkFactory(factory), 2418 handler, 2419 (asyncMode ? FIFO_QUEUE : LIFO_QUEUE), 2420 "ForkJoinPool-" + nextPoolId() + "-worker-"); 2421 checkPermission(); 2422 } 2423 2424 private static int checkParallelism(int parallelism) { 2425 if (parallelism <= 0 || parallelism > MAX_CAP) 2426 throw new IllegalArgumentException(); 2427 return parallelism; 2428 } 2429 2430 private static ForkJoinWorkerThreadFactory checkFactory 2431 (ForkJoinWorkerThreadFactory factory) { 2432 if (factory == null) 2433 throw new NullPointerException(); 2434 return factory; 2435 } 2436 2437 /** 2438 * Creates a {@code ForkJoinPool} with the given parameters, without 2439 * any security checks or parameter validation. Invoked directly by 2440 * makeCommonPool. 2441 */ 2442 private ForkJoinPool(int parallelism, 2443 ForkJoinWorkerThreadFactory factory, 2444 UncaughtExceptionHandler handler, 2445 int mode, 2446 String workerNamePrefix) { 2447 this.workerNamePrefix = workerNamePrefix; 2448 this.factory = factory; 2449 this.ueh = handler; 2450 this.mode = (short)mode; 2451 this.parallelism = (short)parallelism; 2452 long np = (long)(-parallelism); // offset ctl counts 2453 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 2454 } 2455 2456 /** 2457 * Returns the common pool instance. This pool is statically 2458 * constructed; its run state is unaffected by attempts to {@link 2459 * #shutdown} or {@link #shutdownNow}. However this pool and any 2460 * ongoing processing are automatically terminated upon program 2461 * {@link System#exit}. Any program that relies on asynchronous 2462 * task processing to complete before program termination should 2463 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2464 * before exit. 2465 * 2466 * @return the common pool instance 2467 * @since 1.8 2468 * @hide 2469 */ 2470 public static ForkJoinPool commonPool() { 2471 // assert common != null : "static init error"; 2472 return common; 2473 } 2474 2475 // Execution methods 2476 2477 /** 2478 * Performs the given task, returning its result upon completion. 2479 * If the computation encounters an unchecked Exception or Error, 2480 * it is rethrown as the outcome of this invocation. Rethrown 2481 * exceptions behave in the same way as regular exceptions, but, 2482 * when possible, contain stack traces (as displayed for example 2483 * using {@code ex.printStackTrace()}) of both the current thread 2484 * as well as the thread actually encountering the exception; 2485 * minimally only the latter. 2486 * 2487 * @param task the task 2488 * @return the task's result 2489 * @throws NullPointerException if the task is null 2490 * @throws RejectedExecutionException if the task cannot be 2491 * scheduled for execution 2492 */ 2493 public <T> T invoke(ForkJoinTask<T> task) { 2494 if (task == null) 2495 throw new NullPointerException(); 2496 externalPush(task); 2497 return task.join(); 2498 } 2499 2500 /** 2501 * Arranges for (asynchronous) execution of the given task. 2502 * 2503 * @param task the task 2504 * @throws NullPointerException if the task is null 2505 * @throws RejectedExecutionException if the task cannot be 2506 * scheduled for execution 2507 */ 2508 public void execute(ForkJoinTask<?> task) { 2509 if (task == null) 2510 throw new NullPointerException(); 2511 externalPush(task); 2512 } 2513 2514 // AbstractExecutorService methods 2515 2516 /** 2517 * @throws NullPointerException if the task is null 2518 * @throws RejectedExecutionException if the task cannot be 2519 * scheduled for execution 2520 */ 2521 public void execute(Runnable task) { 2522 if (task == null) 2523 throw new NullPointerException(); 2524 ForkJoinTask<?> job; 2525 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2526 job = (ForkJoinTask<?>) task; 2527 else 2528 job = new ForkJoinTask.RunnableExecuteAction(task); 2529 externalPush(job); 2530 } 2531 2532 /** 2533 * Submits a ForkJoinTask for execution. 2534 * 2535 * @param task the task to submit 2536 * @return the task 2537 * @throws NullPointerException if the task is null 2538 * @throws RejectedExecutionException if the task cannot be 2539 * scheduled for execution 2540 */ 2541 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 2542 if (task == null) 2543 throw new NullPointerException(); 2544 externalPush(task); 2545 return task; 2546 } 2547 2548 /** 2549 * @throws NullPointerException if the task is null 2550 * @throws RejectedExecutionException if the task cannot be 2551 * scheduled for execution 2552 */ 2553 public <T> ForkJoinTask<T> submit(Callable<T> task) { 2554 ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task); 2555 externalPush(job); 2556 return job; 2557 } 2558 2559 /** 2560 * @throws NullPointerException if the task is null 2561 * @throws RejectedExecutionException if the task cannot be 2562 * scheduled for execution 2563 */ 2564 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 2565 ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); 2566 externalPush(job); 2567 return job; 2568 } 2569 2570 /** 2571 * @throws NullPointerException if the task is null 2572 * @throws RejectedExecutionException if the task cannot be 2573 * scheduled for execution 2574 */ 2575 public ForkJoinTask<?> submit(Runnable task) { 2576 if (task == null) 2577 throw new NullPointerException(); 2578 ForkJoinTask<?> job; 2579 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2580 job = (ForkJoinTask<?>) task; 2581 else 2582 job = new ForkJoinTask.AdaptedRunnableAction(task); 2583 externalPush(job); 2584 return job; 2585 } 2586 2587 /** 2588 * @throws NullPointerException {@inheritDoc} 2589 * @throws RejectedExecutionException {@inheritDoc} 2590 */ 2591 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 2592 // In previous versions of this class, this method constructed 2593 // a task to run ForkJoinTask.invokeAll, but now external 2594 // invocation of multiple tasks is at least as efficient. 2595 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 2596 2597 boolean done = false; 2598 try { 2599 for (Callable<T> t : tasks) { 2600 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); 2601 futures.add(f); 2602 externalPush(f); 2603 } 2604 for (int i = 0, size = futures.size(); i < size; i++) 2605 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 2606 done = true; 2607 return futures; 2608 } finally { 2609 if (!done) 2610 for (int i = 0, size = futures.size(); i < size; i++) 2611 futures.get(i).cancel(false); 2612 } 2613 } 2614 2615 /** 2616 * Returns the factory used for constructing new workers. 2617 * 2618 * @return the factory used for constructing new workers 2619 */ 2620 public ForkJoinWorkerThreadFactory getFactory() { 2621 return factory; 2622 } 2623 2624 /** 2625 * Returns the handler for internal worker threads that terminate 2626 * due to unrecoverable errors encountered while executing tasks. 2627 * 2628 * @return the handler, or {@code null} if none 2629 */ 2630 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 2631 return ueh; 2632 } 2633 2634 /** 2635 * Returns the targeted parallelism level of this pool. 2636 * 2637 * @return the targeted parallelism level of this pool 2638 */ 2639 public int getParallelism() { 2640 int par; 2641 return ((par = parallelism) > 0) ? par : 1; 2642 } 2643 2644 /** 2645 * Returns the targeted parallelism level of the common pool. 2646 * 2647 * @return the targeted parallelism level of the common pool 2648 * @since 1.8 2649 * @hide 2650 */ 2651 public static int getCommonPoolParallelism() { 2652 return commonParallelism; 2653 } 2654 2655 /** 2656 * Returns the number of worker threads that have started but not 2657 * yet terminated. The result returned by this method may differ 2658 * from {@link #getParallelism} when threads are created to 2659 * maintain parallelism when others are cooperatively blocked. 2660 * 2661 * @return the number of worker threads 2662 */ 2663 public int getPoolSize() { 2664 return parallelism + (short)(ctl >>> TC_SHIFT); 2665 } 2666 2667 /** 2668 * Returns {@code true} if this pool uses local first-in-first-out 2669 * scheduling mode for forked tasks that are never joined. 2670 * 2671 * @return {@code true} if this pool uses async mode 2672 */ 2673 public boolean getAsyncMode() { 2674 return mode == FIFO_QUEUE; 2675 } 2676 2677 /** 2678 * Returns an estimate of the number of worker threads that are 2679 * not blocked waiting to join tasks or for other managed 2680 * synchronization. This method may overestimate the 2681 * number of running threads. 2682 * 2683 * @return the number of worker threads 2684 */ 2685 public int getRunningThreadCount() { 2686 int rc = 0; 2687 WorkQueue[] ws; WorkQueue w; 2688 if ((ws = workQueues) != null) { 2689 for (int i = 1; i < ws.length; i += 2) { 2690 if ((w = ws[i]) != null && w.isApparentlyUnblocked()) 2691 ++rc; 2692 } 2693 } 2694 return rc; 2695 } 2696 2697 /** 2698 * Returns an estimate of the number of threads that are currently 2699 * stealing or executing tasks. This method may overestimate the 2700 * number of active threads. 2701 * 2702 * @return the number of active threads 2703 */ 2704 public int getActiveThreadCount() { 2705 int r = parallelism + (int)(ctl >> AC_SHIFT); 2706 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2707 } 2708 2709 /** 2710 * Returns {@code true} if all worker threads are currently idle. 2711 * An idle worker is one that cannot obtain a task to execute 2712 * because none are available to steal from other threads, and 2713 * there are no pending submissions to the pool. This method is 2714 * conservative; it might not return {@code true} immediately upon 2715 * idleness of all threads, but will eventually become true if 2716 * threads remain inactive. 2717 * 2718 * @return {@code true} if all threads are currently idle 2719 */ 2720 public boolean isQuiescent() { 2721 return parallelism + (int)(ctl >> AC_SHIFT) <= 0; 2722 } 2723 2724 /** 2725 * Returns an estimate of the total number of tasks stolen from 2726 * one thread's work queue by another. The reported value 2727 * underestimates the actual total number of steals when the pool 2728 * is not quiescent. This value may be useful for monitoring and 2729 * tuning fork/join programs: in general, steal counts should be 2730 * high enough to keep threads busy, but low enough to avoid 2731 * overhead and contention across threads. 2732 * 2733 * @return the number of steals 2734 */ 2735 public long getStealCount() { 2736 long count = stealCount; 2737 WorkQueue[] ws; WorkQueue w; 2738 if ((ws = workQueues) != null) { 2739 for (int i = 1; i < ws.length; i += 2) { 2740 if ((w = ws[i]) != null) 2741 count += w.nsteals; 2742 } 2743 } 2744 return count; 2745 } 2746 2747 /** 2748 * Returns an estimate of the total number of tasks currently held 2749 * in queues by worker threads (but not including tasks submitted 2750 * to the pool that have not begun executing). This value is only 2751 * an approximation, obtained by iterating across all threads in 2752 * the pool. This method may be useful for tuning task 2753 * granularities. 2754 * 2755 * @return the number of queued tasks 2756 */ 2757 public long getQueuedTaskCount() { 2758 long count = 0; 2759 WorkQueue[] ws; WorkQueue w; 2760 if ((ws = workQueues) != null) { 2761 for (int i = 1; i < ws.length; i += 2) { 2762 if ((w = ws[i]) != null) 2763 count += w.queueSize(); 2764 } 2765 } 2766 return count; 2767 } 2768 2769 /** 2770 * Returns an estimate of the number of tasks submitted to this 2771 * pool that have not yet begun executing. This method may take 2772 * time proportional to the number of submissions. 2773 * 2774 * @return the number of queued submissions 2775 */ 2776 public int getQueuedSubmissionCount() { 2777 int count = 0; 2778 WorkQueue[] ws; WorkQueue w; 2779 if ((ws = workQueues) != null) { 2780 for (int i = 0; i < ws.length; i += 2) { 2781 if ((w = ws[i]) != null) 2782 count += w.queueSize(); 2783 } 2784 } 2785 return count; 2786 } 2787 2788 /** 2789 * Returns {@code true} if there are any tasks submitted to this 2790 * pool that have not yet begun executing. 2791 * 2792 * @return {@code true} if there are any queued submissions 2793 */ 2794 public boolean hasQueuedSubmissions() { 2795 WorkQueue[] ws; WorkQueue w; 2796 if ((ws = workQueues) != null) { 2797 for (int i = 0; i < ws.length; i += 2) { 2798 if ((w = ws[i]) != null && !w.isEmpty()) 2799 return true; 2800 } 2801 } 2802 return false; 2803 } 2804 2805 /** 2806 * Removes and returns the next unexecuted submission if one is 2807 * available. This method may be useful in extensions to this 2808 * class that re-assign work in systems with multiple pools. 2809 * 2810 * @return the next submission, or {@code null} if none 2811 */ 2812 protected ForkJoinTask<?> pollSubmission() { 2813 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2814 if ((ws = workQueues) != null) { 2815 for (int i = 0; i < ws.length; i += 2) { 2816 if ((w = ws[i]) != null && (t = w.poll()) != null) 2817 return t; 2818 } 2819 } 2820 return null; 2821 } 2822 2823 /** 2824 * Removes all available unexecuted submitted and forked tasks 2825 * from scheduling queues and adds them to the given collection, 2826 * without altering their execution status. These may include 2827 * artificially generated or wrapped tasks. This method is 2828 * designed to be invoked only when the pool is known to be 2829 * quiescent. Invocations at other times may not remove all 2830 * tasks. A failure encountered while attempting to add elements 2831 * to collection {@code c} may result in elements being in 2832 * neither, either or both collections when the associated 2833 * exception is thrown. The behavior of this operation is 2834 * undefined if the specified collection is modified while the 2835 * operation is in progress. 2836 * 2837 * @param c the collection to transfer elements into 2838 * @return the number of elements transferred 2839 */ 2840 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 2841 int count = 0; 2842 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2843 if ((ws = workQueues) != null) { 2844 for (int i = 0; i < ws.length; ++i) { 2845 if ((w = ws[i]) != null) { 2846 while ((t = w.poll()) != null) { 2847 c.add(t); 2848 ++count; 2849 } 2850 } 2851 } 2852 } 2853 return count; 2854 } 2855 2856 /** 2857 * Returns a string identifying this pool, as well as its state, 2858 * including indications of run state, parallelism level, and 2859 * worker and task counts. 2860 * 2861 * @return a string identifying this pool, as well as its state 2862 */ 2863 public String toString() { 2864 // Use a single pass through workQueues to collect counts 2865 long qt = 0L, qs = 0L; int rc = 0; 2866 long st = stealCount; 2867 long c = ctl; 2868 WorkQueue[] ws; WorkQueue w; 2869 if ((ws = workQueues) != null) { 2870 for (int i = 0; i < ws.length; ++i) { 2871 if ((w = ws[i]) != null) { 2872 int size = w.queueSize(); 2873 if ((i & 1) == 0) 2874 qs += size; 2875 else { 2876 qt += size; 2877 st += w.nsteals; 2878 if (w.isApparentlyUnblocked()) 2879 ++rc; 2880 } 2881 } 2882 } 2883 } 2884 int pc = parallelism; 2885 int tc = pc + (short)(c >>> TC_SHIFT); 2886 int ac = pc + (int)(c >> AC_SHIFT); 2887 if (ac < 0) // ignore transient negative 2888 ac = 0; 2889 String level; 2890 if ((c & STOP_BIT) != 0) 2891 level = (tc == 0) ? "Terminated" : "Terminating"; 2892 else 2893 level = plock < 0 ? "Shutting down" : "Running"; 2894 return super.toString() + 2895 "[" + level + 2896 ", parallelism = " + pc + 2897 ", size = " + tc + 2898 ", active = " + ac + 2899 ", running = " + rc + 2900 ", steals = " + st + 2901 ", tasks = " + qt + 2902 ", submissions = " + qs + 2903 "]"; 2904 } 2905 2906 /** 2907 * Possibly initiates an orderly shutdown in which previously 2908 * submitted tasks are executed, but no new tasks will be 2909 * accepted. Invocation has no effect on execution state if this 2910 * is the {@link #commonPool()}, and no additional effect if 2911 * already shut down. Tasks that are in the process of being 2912 * submitted concurrently during the course of this method may or 2913 * may not be rejected. 2914 */ 2915 public void shutdown() { 2916 checkPermission(); 2917 tryTerminate(false, true); 2918 } 2919 2920 /** 2921 * Possibly attempts to cancel and/or stop all tasks, and reject 2922 * all subsequently submitted tasks. Invocation has no effect on 2923 * execution state if this is the {@link #commonPool()}, and no 2924 * additional effect if already shut down. Otherwise, tasks that 2925 * are in the process of being submitted or executed concurrently 2926 * during the course of this method may or may not be 2927 * rejected. This method cancels both existing and unexecuted 2928 * tasks, in order to permit termination in the presence of task 2929 * dependencies. So the method always returns an empty list 2930 * (unlike the case for some other Executors). 2931 * 2932 * @return an empty list 2933 */ 2934 public List<Runnable> shutdownNow() { 2935 checkPermission(); 2936 tryTerminate(true, true); 2937 return Collections.emptyList(); 2938 } 2939 2940 /** 2941 * Returns {@code true} if all tasks have completed following shut down. 2942 * 2943 * @return {@code true} if all tasks have completed following shut down 2944 */ 2945 public boolean isTerminated() { 2946 long c = ctl; 2947 return ((c & STOP_BIT) != 0L && 2948 (short)(c >>> TC_SHIFT) + parallelism <= 0); 2949 } 2950 2951 /** 2952 * Returns {@code true} if the process of termination has 2953 * commenced but not yet completed. This method may be useful for 2954 * debugging. A return of {@code true} reported a sufficient 2955 * period after shutdown may indicate that submitted tasks have 2956 * ignored or suppressed interruption, or are waiting for I/O, 2957 * causing this executor not to properly terminate. (See the 2958 * advisory notes for class {@link ForkJoinTask} stating that 2959 * tasks should not normally entail blocking operations. But if 2960 * they do, they must abort them on interrupt.) 2961 * 2962 * @return {@code true} if terminating but not yet terminated 2963 */ 2964 public boolean isTerminating() { 2965 long c = ctl; 2966 return ((c & STOP_BIT) != 0L && 2967 (short)(c >>> TC_SHIFT) + parallelism > 0); 2968 } 2969 2970 /** 2971 * Returns {@code true} if this pool has been shut down. 2972 * 2973 * @return {@code true} if this pool has been shut down 2974 */ 2975 public boolean isShutdown() { 2976 return plock < 0; 2977 } 2978 2979 /** 2980 * Blocks until all tasks have completed execution after a 2981 * shutdown request, or the timeout occurs, or the current thread 2982 * is interrupted, whichever happens first. Because the {@link 2983 * #commonPool()} never terminates until program shutdown, when 2984 * applied to the common pool, this method is equivalent to {@link 2985 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2986 * 2987 * @param timeout the maximum time to wait 2988 * @param unit the time unit of the timeout argument 2989 * @return {@code true} if this executor terminated and 2990 * {@code false} if the timeout elapsed before termination 2991 * @throws InterruptedException if interrupted while waiting 2992 */ 2993 public boolean awaitTermination(long timeout, TimeUnit unit) 2994 throws InterruptedException { 2995 if (Thread.interrupted()) 2996 throw new InterruptedException(); 2997 if (this == common) { 2998 awaitQuiescence(timeout, unit); 2999 return false; 3000 } 3001 long nanos = unit.toNanos(timeout); 3002 if (isTerminated()) 3003 return true; 3004 if (nanos <= 0L) 3005 return false; 3006 long deadline = System.nanoTime() + nanos; 3007 synchronized (this) { 3008 for (;;) { 3009 if (isTerminated()) 3010 return true; 3011 if (nanos <= 0L) 3012 return false; 3013 long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 3014 wait(millis > 0L ? millis : 1L); 3015 nanos = deadline - System.nanoTime(); 3016 } 3017 } 3018 } 3019 3020 /** 3021 * If called by a ForkJoinTask operating in this pool, equivalent 3022 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 3023 * waits and/or attempts to assist performing tasks until this 3024 * pool {@link #isQuiescent} or the indicated timeout elapses. 3025 * 3026 * @param timeout the maximum time to wait 3027 * @param unit the time unit of the timeout argument 3028 * @return {@code true} if quiescent; {@code false} if the 3029 * timeout elapsed. 3030 */ 3031 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 3032 long nanos = unit.toNanos(timeout); 3033 ForkJoinWorkerThread wt; 3034 Thread thread = Thread.currentThread(); 3035 if ((thread instanceof ForkJoinWorkerThread) && 3036 (wt = (ForkJoinWorkerThread)thread).pool == this) { 3037 helpQuiescePool(wt.workQueue); 3038 return true; 3039 } 3040 long startTime = System.nanoTime(); 3041 WorkQueue[] ws; 3042 int r = 0, m; 3043 boolean found = true; 3044 while (!isQuiescent() && (ws = workQueues) != null && 3045 (m = ws.length - 1) >= 0) { 3046 if (!found) { 3047 if ((System.nanoTime() - startTime) > nanos) 3048 return false; 3049 Thread.yield(); // cannot block 3050 } 3051 found = false; 3052 for (int j = (m + 1) << 2; j >= 0; --j) { 3053 ForkJoinTask<?> t; WorkQueue q; int b; 3054 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) { 3055 found = true; 3056 if ((t = q.pollAt(b)) != null) 3057 t.doExec(); 3058 break; 3059 } 3060 } 3061 } 3062 return true; 3063 } 3064 3065 /** 3066 * Waits and/or attempts to assist performing tasks indefinitely 3067 * until the {@link #commonPool()} {@link #isQuiescent}. 3068 */ 3069 static void quiesceCommonPool() { 3070 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 3071 } 3072 3073 /** 3074 * Interface for extending managed parallelism for tasks running 3075 * in {@link ForkJoinPool}s. 3076 * 3077 * <p>A {@code ManagedBlocker} provides two methods. Method 3078 * {@code isReleasable} must return {@code true} if blocking is 3079 * not necessary. Method {@code block} blocks the current thread 3080 * if necessary (perhaps internally invoking {@code isReleasable} 3081 * before actually blocking). These actions are performed by any 3082 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 3083 * The unusual methods in this API accommodate synchronizers that 3084 * may, but don't usually, block for long periods. Similarly, they 3085 * allow more efficient internal handling of cases in which 3086 * additional workers may be, but usually are not, needed to 3087 * ensure sufficient parallelism. Toward this end, 3088 * implementations of method {@code isReleasable} must be amenable 3089 * to repeated invocation. 3090 * 3091 * <p>For example, here is a ManagedBlocker based on a 3092 * ReentrantLock: 3093 * <pre> {@code 3094 * class ManagedLocker implements ManagedBlocker { 3095 * final ReentrantLock lock; 3096 * boolean hasLock = false; 3097 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3098 * public boolean block() { 3099 * if (!hasLock) 3100 * lock.lock(); 3101 * return true; 3102 * } 3103 * public boolean isReleasable() { 3104 * return hasLock || (hasLock = lock.tryLock()); 3105 * } 3106 * }}</pre> 3107 * 3108 * <p>Here is a class that possibly blocks waiting for an 3109 * item on a given queue: 3110 * <pre> {@code 3111 * class QueueTaker<E> implements ManagedBlocker { 3112 * final BlockingQueue<E> queue; 3113 * volatile E item = null; 3114 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3115 * public boolean block() throws InterruptedException { 3116 * if (item == null) 3117 * item = queue.take(); 3118 * return true; 3119 * } 3120 * public boolean isReleasable() { 3121 * return item != null || (item = queue.poll()) != null; 3122 * } 3123 * public E getItem() { // call after pool.managedBlock completes 3124 * return item; 3125 * } 3126 * }}</pre> 3127 */ 3128 public static interface ManagedBlocker { 3129 /** 3130 * Possibly blocks the current thread, for example waiting for 3131 * a lock or condition. 3132 * 3133 * @return {@code true} if no additional blocking is necessary 3134 * (i.e., if isReleasable would return true) 3135 * @throws InterruptedException if interrupted while waiting 3136 * (the method is not required to do so, but is allowed to) 3137 */ 3138 boolean block() throws InterruptedException; 3139 3140 /** 3141 * Returns {@code true} if blocking is unnecessary. 3142 * @return {@code true} if blocking is unnecessary 3143 */ 3144 boolean isReleasable(); 3145 } 3146 3147 /** 3148 * Blocks in accord with the given blocker. If the current thread 3149 * is a {@link ForkJoinWorkerThread}, this method possibly 3150 * arranges for a spare thread to be activated if necessary to 3151 * ensure sufficient parallelism while the current thread is blocked. 3152 * 3153 * <p>If the caller is not a {@link ForkJoinTask}, this method is 3154 * behaviorally equivalent to 3155 * <pre> {@code 3156 * while (!blocker.isReleasable()) 3157 * if (blocker.block()) 3158 * return; 3159 * }</pre> 3160 * 3161 * If the caller is a {@code ForkJoinTask}, then the pool may 3162 * first be expanded to ensure parallelism, and later adjusted. 3163 * 3164 * @param blocker the blocker 3165 * @throws InterruptedException if blocker.block did so 3166 */ 3167 public static void managedBlock(ManagedBlocker blocker) 3168 throws InterruptedException { 3169 Thread t = Thread.currentThread(); 3170 if (t instanceof ForkJoinWorkerThread) { 3171 ForkJoinPool p = ((ForkJoinWorkerThread)t).pool; 3172 while (!blocker.isReleasable()) { 3173 if (p.tryCompensate(p.ctl)) { 3174 try { 3175 do {} while (!blocker.isReleasable() && 3176 !blocker.block()); 3177 } finally { 3178 p.incrementActiveCount(); 3179 } 3180 break; 3181 } 3182 } 3183 } 3184 else { 3185 do {} while (!blocker.isReleasable() && 3186 !blocker.block()); 3187 } 3188 } 3189 3190 // AbstractExecutorService overrides. These rely on undocumented 3191 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 3192 // implement RunnableFuture. 3193 3194 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 3195 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); 3196 } 3197 3198 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 3199 return new ForkJoinTask.AdaptedCallable<T>(callable); 3200 } 3201 3202 // Unsafe mechanics 3203 private static final sun.misc.Unsafe U; 3204 private static final long CTL; 3205 private static final long PARKBLOCKER; 3206 private static final int ABASE; 3207 private static final int ASHIFT; 3208 private static final long STEALCOUNT; 3209 private static final long PLOCK; 3210 private static final long INDEXSEED; 3211 private static final long QBASE; 3212 private static final long QLOCK; 3213 3214 static { 3215 // initialize field offsets for CAS etc 3216 try { 3217 U = sun.misc.Unsafe.getUnsafe(); 3218 Class<?> k = ForkJoinPool.class; 3219 CTL = U.objectFieldOffset 3220 (k.getDeclaredField("ctl")); 3221 STEALCOUNT = U.objectFieldOffset 3222 (k.getDeclaredField("stealCount")); 3223 PLOCK = U.objectFieldOffset 3224 (k.getDeclaredField("plock")); 3225 INDEXSEED = U.objectFieldOffset 3226 (k.getDeclaredField("indexSeed")); 3227 Class<?> tk = Thread.class; 3228 PARKBLOCKER = U.objectFieldOffset 3229 (tk.getDeclaredField("parkBlocker")); 3230 Class<?> wk = WorkQueue.class; 3231 QBASE = U.objectFieldOffset 3232 (wk.getDeclaredField("base")); 3233 QLOCK = U.objectFieldOffset 3234 (wk.getDeclaredField("qlock")); 3235 Class<?> ak = ForkJoinTask[].class; 3236 ABASE = U.arrayBaseOffset(ak); 3237 int scale = U.arrayIndexScale(ak); 3238 if ((scale & (scale - 1)) != 0) 3239 throw new Error("data type scale not a power of two"); 3240 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 3241 } catch (Exception e) { 3242 throw new Error(e); 3243 } 3244 3245 submitters = new ThreadLocal<Submitter>(); 3246 defaultForkJoinWorkerThreadFactory = 3247 new DefaultForkJoinWorkerThreadFactory(); 3248 modifyThreadPermission = new RuntimePermission("modifyThread"); 3249 3250 common = java.security.AccessController.doPrivileged 3251 (new java.security.PrivilegedAction<ForkJoinPool>() { 3252 public ForkJoinPool run() { return makeCommonPool(); }}); 3253 int par = common.parallelism; // report 1 even if threads disabled 3254 commonParallelism = par > 0 ? par : 1; 3255 } 3256 3257 /** 3258 * Creates and returns the common pool, respecting user settings 3259 * specified via system properties. 3260 */ 3261 private static ForkJoinPool makeCommonPool() { 3262 int parallelism = -1; 3263 ForkJoinWorkerThreadFactory factory 3264 = defaultForkJoinWorkerThreadFactory; 3265 UncaughtExceptionHandler handler = null; 3266 try { // ignore exceptions in accessing/parsing properties 3267 String pp = System.getProperty 3268 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 3269 String fp = System.getProperty 3270 ("java.util.concurrent.ForkJoinPool.common.threadFactory"); 3271 String hp = System.getProperty 3272 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 3273 if (pp != null) 3274 parallelism = Integer.parseInt(pp); 3275 if (fp != null) 3276 factory = ((ForkJoinWorkerThreadFactory)ClassLoader. 3277 getSystemClassLoader().loadClass(fp).newInstance()); 3278 if (hp != null) 3279 handler = ((UncaughtExceptionHandler)ClassLoader. 3280 getSystemClassLoader().loadClass(hp).newInstance()); 3281 } catch (Exception ignore) { 3282 } 3283 3284 if (parallelism < 0 && // default 1 less than #cores 3285 (parallelism = Runtime.getRuntime().availableProcessors() - 1) < 0) 3286 parallelism = 0; 3287 if (parallelism > MAX_CAP) 3288 parallelism = MAX_CAP; 3289 return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, 3290 "ForkJoinPool.commonPool-worker-"); 3291 } 3292 3293} 3294