1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.lang.Thread.UncaughtExceptionHandler;
39import java.security.AccessControlContext;
40import java.security.Permissions;
41import java.security.ProtectionDomain;
42import java.util.ArrayList;
43import java.util.Arrays;
44import java.util.Collection;
45import java.util.Collections;
46import java.util.List;
47import java.util.concurrent.locks.ReentrantLock;
48import java.util.concurrent.locks.LockSupport;
49
50/**
51 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
52 * A {@code ForkJoinPool} provides the entry point for submissions
53 * from non-{@code ForkJoinTask} clients, as well as management and
54 * monitoring operations.
55 *
56 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
57 * ExecutorService} mainly by virtue of employing
58 * <em>work-stealing</em>: all threads in the pool attempt to find and
59 * execute tasks submitted to the pool and/or created by other active
60 * tasks (eventually blocking waiting for work if none exist). This
61 * enables efficient processing when most tasks spawn other subtasks
62 * (as do most {@code ForkJoinTask}s), as well as when many small
63 * tasks are submitted to the pool from external clients.  Especially
64 * when setting <em>asyncMode</em> to true in constructors, {@code
65 * ForkJoinPool}s may also be appropriate for use with event-style
66 * tasks that are never joined.
67 *
68 * <p>A static {@link #commonPool()} is available and appropriate for
69 * most applications. The common pool is used by any ForkJoinTask that
70 * is not explicitly submitted to a specified pool. Using the common
71 * pool normally reduces resource usage (its threads are slowly
72 * reclaimed during periods of non-use, and reinstated upon subsequent
73 * use).
74 *
75 * <p>For applications that require separate or custom pools, a {@code
76 * ForkJoinPool} may be constructed with a given target parallelism
77 * level; by default, equal to the number of available processors.
78 * The pool attempts to maintain enough active (or available) threads
79 * by dynamically adding, suspending, or resuming internal worker
80 * threads, even if some tasks are stalled waiting to join others.
81 * However, no such adjustments are guaranteed in the face of blocked
82 * I/O or other unmanaged synchronization. The nested {@link
83 * ManagedBlocker} interface enables extension of the kinds of
84 * synchronization accommodated.
85 *
86 * <p>In addition to execution and lifecycle control methods, this
87 * class provides status check methods (for example
88 * {@link #getStealCount}) that are intended to aid in developing,
89 * tuning, and monitoring fork/join applications. Also, method
90 * {@link #toString} returns indications of pool state in a
91 * convenient form for informal monitoring.
92 *
93 * <p>As is the case with other ExecutorServices, there are three
94 * main task execution methods summarized in the following table.
95 * These are designed to be used primarily by clients not already
96 * engaged in fork/join computations in the current pool.  The main
97 * forms of these methods accept instances of {@code ForkJoinTask},
98 * but overloaded forms also allow mixed execution of plain {@code
99 * Runnable}- or {@code Callable}- based activities as well.  However,
100 * tasks that are already executing in a pool should normally instead
101 * use the within-computation forms listed in the table unless using
102 * async event-style tasks that are not usually joined, in which case
103 * there is little difference among choice of methods.
104 *
105 * <table BORDER CELLPADDING=3 CELLSPACING=1>
106 * <caption>Summary of task execution methods</caption>
107 *  <tr>
108 *    <td></td>
109 *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
110 *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
111 *  </tr>
112 *  <tr>
113 *    <td> <b>Arrange async execution</b></td>
114 *    <td> {@link #execute(ForkJoinTask)}</td>
115 *    <td> {@link ForkJoinTask#fork}</td>
116 *  </tr>
117 *  <tr>
118 *    <td> <b>Await and obtain result</b></td>
119 *    <td> {@link #invoke(ForkJoinTask)}</td>
120 *    <td> {@link ForkJoinTask#invoke}</td>
121 *  </tr>
122 *  <tr>
123 *    <td> <b>Arrange exec and obtain Future</b></td>
124 *    <td> {@link #submit(ForkJoinTask)}</td>
125 *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
126 *  </tr>
127 * </table>
128 *
129 * <p>The common pool is by default constructed with default
130 * parameters, but these may be controlled by setting three
131 * {@linkplain System#getProperty system properties}:
132 * <ul>
133 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
134 * - the parallelism level, a non-negative integer
135 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
136 * - the class name of a {@link ForkJoinWorkerThreadFactory}
137 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
138 * - the class name of a {@link UncaughtExceptionHandler}
139 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
140 * - the maximum number of allowed extra threads to maintain target
141 * parallelism (default 256).
142 * </ul>
143 * If a {@link SecurityManager} is present and no factory is
144 * specified, then the default pool uses a factory supplying
145 * threads that have no {@link Permissions} enabled.
146 * The system class loader is used to load these classes.
147 * Upon any error in establishing these settings, default parameters
148 * are used. It is possible to disable or limit the use of threads in
149 * the common pool by setting the parallelism property to zero, and/or
150 * using a factory that may return {@code null}. However doing so may
151 * cause unjoined tasks to never be executed.
152 *
153 * <p><b>Implementation notes</b>: This implementation restricts the
154 * maximum number of running threads to 32767. Attempts to create
155 * pools with greater than the maximum number result in
156 * {@code IllegalArgumentException}.
157 *
158 * <p>This implementation rejects submitted tasks (that is, by throwing
159 * {@link RejectedExecutionException}) only when the pool is shut down
160 * or internal resources have been exhausted.
161 *
162 * @since 1.7
163 * @author Doug Lea
164 */
165//@jdk.internal.vm.annotation.Contended   // Android-removed
166public class ForkJoinPool extends AbstractExecutorService {
167
168    /*
169     * Implementation Overview
170     *
171     * This class and its nested classes provide the main
172     * functionality and control for a set of worker threads:
173     * Submissions from non-FJ threads enter into submission queues.
174     * Workers take these tasks and typically split them into subtasks
175     * that may be stolen by other workers.  Preference rules give
176     * first priority to processing tasks from their own queues (LIFO
177     * or FIFO, depending on mode), then to randomized FIFO steals of
178     * tasks in other queues.  This framework began as vehicle for
179     * supporting tree-structured parallelism using work-stealing.
180     * Over time, its scalability advantages led to extensions and
181     * changes to better support more diverse usage contexts.  Because
182     * most internal methods and nested classes are interrelated,
183     * their main rationale and descriptions are presented here;
184     * individual methods and nested classes contain only brief
185     * comments about details.
186     *
187     * WorkQueues
188     * ==========
189     *
190     * Most operations occur within work-stealing queues (in nested
191     * class WorkQueue).  These are special forms of Deques that
192     * support only three of the four possible end-operations -- push,
193     * pop, and poll (aka steal), under the further constraints that
194     * push and pop are called only from the owning thread (or, as
195     * extended here, under a lock), while poll may be called from
196     * other threads.  (If you are unfamiliar with them, you probably
197     * want to read Herlihy and Shavit's book "The Art of
198     * Multiprocessor programming", chapter 16 describing these in
199     * more detail before proceeding.)  The main work-stealing queue
200     * design is roughly similar to those in the papers "Dynamic
201     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
202     * (http://research.sun.com/scalable/pubs/index.html) and
203     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
204     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
205     * The main differences ultimately stem from GC requirements that
206     * we null out taken slots as soon as we can, to maintain as small
207     * a footprint as possible even in programs generating huge
208     * numbers of tasks. To accomplish this, we shift the CAS
209     * arbitrating pop vs poll (steal) from being on the indices
210     * ("base" and "top") to the slots themselves.
211     *
212     * Adding tasks then takes the form of a classic array push(task)
213     * in a circular buffer:
214     *    q.array[q.top++ % length] = task;
215     *
216     * (The actual code needs to null-check and size-check the array,
217     * uses masking, not mod, for indexing a power-of-two-sized array,
218     * properly fences accesses, and possibly signals waiting workers
219     * to start scanning -- see below.)  Both a successful pop and
220     * poll mainly entail a CAS of a slot from non-null to null.
221     *
222     * The pop operation (always performed by owner) is:
223     *   if ((the task at top slot is not null) and
224     *        (CAS slot to null))
225     *           decrement top and return task;
226     *
227     * And the poll operation (usually by a stealer) is
228     *    if ((the task at base slot is not null) and
229     *        (CAS slot to null))
230     *           increment base and return task;
231     *
232     * There are several variants of each of these; for example most
233     * versions of poll pre-screen the CAS by rechecking that the base
234     * has not changed since reading the slot, and most methods only
235     * attempt the CAS if base appears not to be equal to top.
236     *
237     * Memory ordering.  See "Correct and Efficient Work-Stealing for
238     * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
239     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
240     * analysis of memory ordering requirements in work-stealing
241     * algorithms similar to (but different than) the one used here.
242     * Extracting tasks in array slots via (fully fenced) CAS provides
243     * primary synchronization. The base and top indices imprecisely
244     * guide where to extract from. We do not always require strict
245     * orderings of array and index updates, so sometimes let them be
246     * subject to compiler and processor reorderings. However, the
247     * volatile "base" index also serves as a basis for memory
248     * ordering: Slot accesses are preceded by a read of base,
249     * ensuring happens-before ordering with respect to stealers (so
250     * the slots themselves can be read via plain array reads.)  The
251     * only other memory orderings relied on are maintained in the
252     * course of signalling and activation (see below).  A check that
253     * base == top indicates (momentary) emptiness, but otherwise may
254     * err on the side of possibly making the queue appear nonempty
255     * when a push, pop, or poll have not fully committed, or making
256     * it appear empty when an update of top has not yet been visibly
257     * written.  (Method isEmpty() checks the case of a partially
258     * completed removal of the last element.)  Because of this, the
259     * poll operation, considered individually, is not wait-free. One
260     * thief cannot successfully continue until another in-progress
261     * one (or, if previously empty, a push) visibly completes.
262     * However, in the aggregate, we ensure at least probabilistic
263     * non-blockingness.  If an attempted steal fails, a scanning
264     * thief chooses a different random victim target to try next. So,
265     * in order for one thief to progress, it suffices for any
266     * in-progress poll or new push on any empty queue to
267     * complete. (This is why we normally use method pollAt and its
268     * variants that try once at the apparent base index, else
269     * consider alternative actions, rather than method poll, which
270     * retries.)
271     *
272     * This approach also enables support of a user mode in which
273     * local task processing is in FIFO, not LIFO order, simply by
274     * using poll rather than pop.  This can be useful in
275     * message-passing frameworks in which tasks are never joined.
276     *
277     * WorkQueues are also used in a similar way for tasks submitted
278     * to the pool. We cannot mix these tasks in the same queues used
279     * by workers. Instead, we randomly associate submission queues
280     * with submitting threads, using a form of hashing.  The
281     * ThreadLocalRandom probe value serves as a hash code for
282     * choosing existing queues, and may be randomly repositioned upon
283     * contention with other submitters.  In essence, submitters act
284     * like workers except that they are restricted to executing local
285     * tasks that they submitted (or in the case of CountedCompleters,
286     * others with the same root task).  Insertion of tasks in shared
287     * mode requires a lock but we use only a simple spinlock (using
288     * field qlock), because submitters encountering a busy queue move
289     * on to try or create other queues -- they block only when
290     * creating and registering new queues. Because it is used only as
291     * a spinlock, unlocking requires only a "releasing" store (using
292     * putOrderedInt).  The qlock is also used during termination
293     * detection, in which case it is forced to a negative
294     * non-lockable value.
295     *
296     * Management
297     * ==========
298     *
299     * The main throughput advantages of work-stealing stem from
300     * decentralized control -- workers mostly take tasks from
301     * themselves or each other, at rates that can exceed a billion
302     * per second.  The pool itself creates, activates (enables
303     * scanning for and running tasks), deactivates, blocks, and
304     * terminates threads, all with minimal central information.
305     * There are only a few properties that we can globally track or
306     * maintain, so we pack them into a small number of variables,
307     * often maintaining atomicity without blocking or locking.
308     * Nearly all essentially atomic control state is held in two
309     * volatile variables that are by far most often read (not
310     * written) as status and consistency checks. (Also, field
311     * "config" holds unchanging configuration state.)
312     *
313     * Field "ctl" contains 64 bits holding information needed to
314     * atomically decide to add, inactivate, enqueue (on an event
315     * queue), dequeue, and/or re-activate workers.  To enable this
316     * packing, we restrict maximum parallelism to (1<<15)-1 (which is
317     * far in excess of normal operating range) to allow ids, counts,
318     * and their negations (used for thresholding) to fit into 16bit
319     * subfields.
320     *
321     * Field "runState" holds lifetime status, atomically and
322     * monotonically setting STARTED, SHUTDOWN, STOP, and finally
323     * TERMINATED bits.
324     *
325     * Field "auxState" is a ReentrantLock subclass that also
326     * opportunistically holds some other bookkeeping fields accessed
327     * only when locked.  It is mainly used to lock (infrequent)
328     * updates to workQueues.  The auxState instance is itself lazily
329     * constructed (see tryInitialize), requiring a double-check-style
330     * bootstrapping use of field runState, and locking a private
331     * static.
332     *
333     * Field "workQueues" holds references to WorkQueues.  It is
334     * updated (only during worker creation and termination) under the
335     * lock, but is otherwise concurrently readable, and accessed
336     * directly. We also ensure that reads of the array reference
337     * itself never become too stale (for example, re-reading before
338     * each scan). To simplify index-based operations, the array size
339     * is always a power of two, and all readers must tolerate null
340     * slots. Worker queues are at odd indices. Shared (submission)
341     * queues are at even indices, up to a maximum of 64 slots, to
342     * limit growth even if array needs to expand to add more
343     * workers. Grouping them together in this way simplifies and
344     * speeds up task scanning.
345     *
346     * All worker thread creation is on-demand, triggered by task
347     * submissions, replacement of terminated workers, and/or
348     * compensation for blocked workers. However, all other support
349     * code is set up to work with other policies.  To ensure that we
350     * do not hold on to worker references that would prevent GC, all
351     * accesses to workQueues are via indices into the workQueues
352     * array (which is one source of some of the messy code
353     * constructions here). In essence, the workQueues array serves as
354     * a weak reference mechanism. Thus for example the stack top
355     * subfield of ctl stores indices, not references.
356     *
357     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
358     * cannot let workers spin indefinitely scanning for tasks when
359     * none can be found immediately, and we cannot start/resume
360     * workers unless there appear to be tasks available.  On the
361     * other hand, we must quickly prod them into action when new
362     * tasks are submitted or generated. In many usages, ramp-up time
363     * to activate workers is the main limiting factor in overall
364     * performance, which is compounded at program start-up by JIT
365     * compilation and allocation. So we streamline this as much as
366     * possible.
367     *
368     * The "ctl" field atomically maintains active and total worker
369     * counts as well as a queue to place waiting threads so they can
370     * be located for signalling. Active counts also play the role of
371     * quiescence indicators, so are decremented when workers believe
372     * that there are no more tasks to execute. The "queue" is
373     * actually a form of Treiber stack.  A stack is ideal for
374     * activating threads in most-recently used order. This improves
375     * performance and locality, outweighing the disadvantages of
376     * being prone to contention and inability to release a worker
377     * unless it is topmost on stack.  We block/unblock workers after
378     * pushing on the idle worker stack (represented by the lower
379     * 32bit subfield of ctl) when they cannot find work.  The top
380     * stack state holds the value of the "scanState" field of the
381     * worker: its index and status, plus a version counter that, in
382     * addition to the count subfields (also serving as version
383     * stamps) provide protection against Treiber stack ABA effects.
384     *
385     * Creating workers. To create a worker, we pre-increment total
386     * count (serving as a reservation), and attempt to construct a
387     * ForkJoinWorkerThread via its factory. Upon construction, the
388     * new thread invokes registerWorker, where it constructs a
389     * WorkQueue and is assigned an index in the workQueues array
390     * (expanding the array if necessary). The thread is then started.
391     * Upon any exception across these steps, or null return from
392     * factory, deregisterWorker adjusts counts and records
393     * accordingly.  If a null return, the pool continues running with
394     * fewer than the target number workers. If exceptional, the
395     * exception is propagated, generally to some external caller.
396     * Worker index assignment avoids the bias in scanning that would
397     * occur if entries were sequentially packed starting at the front
398     * of the workQueues array. We treat the array as a simple
399     * power-of-two hash table, expanding as needed. The seedIndex
400     * increment ensures no collisions until a resize is needed or a
401     * worker is deregistered and replaced, and thereafter keeps
402     * probability of collision low. We cannot use
403     * ThreadLocalRandom.getProbe() for similar purposes here because
404     * the thread has not started yet, but do so for creating
405     * submission queues for existing external threads (see
406     * externalPush).
407     *
408     * WorkQueue field scanState is used by both workers and the pool
409     * to manage and track whether a worker is UNSIGNALLED (possibly
410     * blocked waiting for a signal).  When a worker is inactivated,
411     * its scanState field is set, and is prevented from executing
412     * tasks, even though it must scan once for them to avoid queuing
413     * races. Note that scanState updates lag queue CAS releases so
414     * usage requires care. When queued, the lower 16 bits of
415     * scanState must hold its pool index. So we place the index there
416     * upon initialization (see registerWorker) and otherwise keep it
417     * there or restore it when necessary.
418     *
419     * The ctl field also serves as the basis for memory
420     * synchronization surrounding activation. This uses a more
421     * efficient version of a Dekker-like rule that task producers and
422     * consumers sync with each other by both writing/CASing ctl (even
423     * if to its current value).  This would be extremely costly. So
424     * we relax it in several ways: (1) Producers only signal when
425     * their queue is empty. Other workers propagate this signal (in
426     * method scan) when they find tasks. (2) Workers only enqueue
427     * after scanning (see below) and not finding any tasks.  (3)
428     * Rather than CASing ctl to its current value in the common case
429     * where no action is required, we reduce write contention by
430     * equivalently prefacing signalWork when called by an external
431     * task producer using a memory access with full-volatile
432     * semantics or a "fullFence". (4) For internal task producers we
433     * rely on the fact that even if no other workers awaken, the
434     * producer itself will eventually see the task and execute it.
435     *
436     * Almost always, too many signals are issued. A task producer
437     * cannot in general tell if some existing worker is in the midst
438     * of finishing one task (or already scanning) and ready to take
439     * another without being signalled. So the producer might instead
440     * activate a different worker that does not find any work, and
441     * then inactivates. This scarcely matters in steady-state
442     * computations involving all workers, but can create contention
443     * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
444     * computations involving only a few workers.
445     *
446     * Scanning. Method scan() performs top-level scanning for tasks.
447     * Each scan traverses (and tries to poll from) each queue in
448     * pseudorandom permutation order by randomly selecting an origin
449     * index and a step value.  (The pseudorandom generator need not
450     * have high-quality statistical properties in the long term, but
451     * just within computations; We use 64bit and 32bit Marsaglia
452     * XorShifts, which are cheap and suffice here.)  Scanning also
453     * employs contention reduction: When scanning workers fail a CAS
454     * polling for work, they soon restart with a different
455     * pseudorandom scan order (thus likely retrying at different
456     * intervals). This improves throughput when many threads are
457     * trying to take tasks from few queues.  Scans do not otherwise
458     * explicitly take into account core affinities, loads, cache
459     * localities, etc, However, they do exploit temporal locality
460     * (which usually approximates these) by preferring to re-poll (up
461     * to POLL_LIMIT times) from the same queue after a successful
462     * poll before trying others.  Restricted forms of scanning occur
463     * in methods helpComplete and findNonEmptyStealQueue, and take
464     * similar but simpler forms.
465     *
466     * Deactivation and waiting. Queuing encounters several intrinsic
467     * races; most notably that an inactivating scanning worker can
468     * miss seeing a task produced during a scan.  So when a worker
469     * cannot find a task to steal, it inactivates and enqueues, and
470     * then rescans to ensure that it didn't miss one, reactivating
471     * upon seeing one with probability approximately proportional to
472     * probability of a miss.  (In most cases, the worker will be
473     * signalled before self-signalling, avoiding cascades of multiple
474     * signals for the same task).
475     *
476     * Workers block (in method awaitWork) using park/unpark;
477     * advertising the need for signallers to unpark by setting their
478     * "parker" fields.
479     *
480     * Trimming workers. To release resources after periods of lack of
481     * use, a worker starting to wait when the pool is quiescent will
482     * time out and terminate (see awaitWork) if the pool has remained
483     * quiescent for period given by IDLE_TIMEOUT_MS, increasing the
484     * period as the number of threads decreases, eventually removing
485     * all workers.
486     *
487     * Shutdown and Termination. A call to shutdownNow invokes
488     * tryTerminate to atomically set a runState bit. The calling
489     * thread, as well as every other worker thereafter terminating,
490     * helps terminate others by setting their (qlock) status,
491     * cancelling their unprocessed tasks, and waking them up, doing
492     * so repeatedly until stable. Calls to non-abrupt shutdown()
493     * preface this by checking whether termination should commence.
494     * This relies primarily on the active count bits of "ctl"
495     * maintaining consensus -- tryTerminate is called from awaitWork
496     * whenever quiescent. However, external submitters do not take
497     * part in this consensus.  So, tryTerminate sweeps through queues
498     * (until stable) to ensure lack of in-flight submissions and
499     * workers about to process them before triggering the "STOP"
500     * phase of termination. (Note: there is an intrinsic conflict if
501     * helpQuiescePool is called when shutdown is enabled. Both wait
502     * for quiescence, but tryTerminate is biased to not trigger until
503     * helpQuiescePool completes.)
504     *
505     * Joining Tasks
506     * =============
507     *
508     * Any of several actions may be taken when one worker is waiting
509     * to join a task stolen (or always held) by another.  Because we
510     * are multiplexing many tasks on to a pool of workers, we can't
511     * just let them block (as in Thread.join).  We also cannot just
512     * reassign the joiner's run-time stack with another and replace
513     * it later, which would be a form of "continuation", that even if
514     * possible is not necessarily a good idea since we may need both
515     * an unblocked task and its continuation to progress.  Instead we
516     * combine two tactics:
517     *
518     *   Helping: Arranging for the joiner to execute some task that it
519     *      would be running if the steal had not occurred.
520     *
521     *   Compensating: Unless there are already enough live threads,
522     *      method tryCompensate() may create or re-activate a spare
523     *      thread to compensate for blocked joiners until they unblock.
524     *
525     * A third form (implemented in tryRemoveAndExec) amounts to
526     * helping a hypothetical compensator: If we can readily tell that
527     * a possible action of a compensator is to steal and execute the
528     * task being joined, the joining thread can do so directly,
529     * without the need for a compensation thread (although at the
530     * expense of larger run-time stacks, but the tradeoff is
531     * typically worthwhile).
532     *
533     * The ManagedBlocker extension API can't use helping so relies
534     * only on compensation in method awaitBlocker.
535     *
536     * The algorithm in helpStealer entails a form of "linear
537     * helping".  Each worker records (in field currentSteal) the most
538     * recent task it stole from some other worker (or a submission).
539     * It also records (in field currentJoin) the task it is currently
540     * actively joining. Method helpStealer uses these markers to try
541     * to find a worker to help (i.e., steal back a task from and
542     * execute it) that could hasten completion of the actively joined
543     * task.  Thus, the joiner executes a task that would be on its
544     * own local deque had the to-be-joined task not been stolen. This
545     * is a conservative variant of the approach described in Wagner &
546     * Calder "Leapfrogging: a portable technique for implementing
547     * efficient futures" SIGPLAN Notices, 1993
548     * (http://portal.acm.org/citation.cfm?id=155354). It differs in
549     * that: (1) We only maintain dependency links across workers upon
550     * steals, rather than use per-task bookkeeping.  This sometimes
551     * requires a linear scan of workQueues array to locate stealers,
552     * but often doesn't because stealers leave hints (that may become
553     * stale/wrong) of where to locate them.  It is only a hint
554     * because a worker might have had multiple steals and the hint
555     * records only one of them (usually the most current).  Hinting
556     * isolates cost to when it is needed, rather than adding to
557     * per-task overhead.  (2) It is "shallow", ignoring nesting and
558     * potentially cyclic mutual steals.  (3) It is intentionally
559     * racy: field currentJoin is updated only while actively joining,
560     * which means that we miss links in the chain during long-lived
561     * tasks, GC stalls etc (which is OK since blocking in such cases
562     * is usually a good idea).  (4) We bound the number of attempts
563     * to find work using checksums and fall back to suspending the
564     * worker and if necessary replacing it with another.
565     *
566     * Helping actions for CountedCompleters do not require tracking
567     * currentJoins: Method helpComplete takes and executes any task
568     * with the same root as the task being waited on (preferring
569     * local pops to non-local polls). However, this still entails
570     * some traversal of completer chains, so is less efficient than
571     * using CountedCompleters without explicit joins.
572     *
573     * Compensation does not aim to keep exactly the target
574     * parallelism number of unblocked threads running at any given
575     * time. Some previous versions of this class employed immediate
576     * compensations for any blocked join. However, in practice, the
577     * vast majority of blockages are transient byproducts of GC and
578     * other JVM or OS activities that are made worse by replacement.
579     * Currently, compensation is attempted only after validating that
580     * all purportedly active threads are processing tasks by checking
581     * field WorkQueue.scanState, which eliminates most false
582     * positives.  Also, compensation is bypassed (tolerating fewer
583     * threads) in the most common case in which it is rarely
584     * beneficial: when a worker with an empty queue (thus no
585     * continuation tasks) blocks on a join and there still remain
586     * enough threads to ensure liveness.
587     *
588     * Spare threads are removed as soon as they notice that the
589     * target parallelism level has been exceeded, in method
590     * tryDropSpare. (Method scan arranges returns for rechecks upon
591     * each probe via the "bound" parameter.)
592     *
593     * The compensation mechanism may be bounded.  Bounds for the
594     * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
595     * with programming errors and abuse before running out of
596     * resources to do so. In other cases, users may supply factories
597     * that limit thread construction. The effects of bounding in this
598     * pool (like all others) is imprecise.  Total worker counts are
599     * decremented when threads deregister, not when they exit and
600     * resources are reclaimed by the JVM and OS. So the number of
601     * simultaneously live threads may transiently exceed bounds.
602     *
603     * Common Pool
604     * ===========
605     *
606     * The static common pool always exists after static
607     * initialization.  Since it (or any other created pool) need
608     * never be used, we minimize initial construction overhead and
609     * footprint to the setup of about a dozen fields, with no nested
610     * allocation. Most bootstrapping occurs within method
611     * externalSubmit during the first submission to the pool.
612     *
613     * When external threads submit to the common pool, they can
614     * perform subtask processing (see externalHelpComplete and
615     * related methods) upon joins.  This caller-helps policy makes it
616     * sensible to set common pool parallelism level to one (or more)
617     * less than the total number of available cores, or even zero for
618     * pure caller-runs.  We do not need to record whether external
619     * submissions are to the common pool -- if not, external help
620     * methods return quickly. These submitters would otherwise be
621     * blocked waiting for completion, so the extra effort (with
622     * liberally sprinkled task status checks) in inapplicable cases
623     * amounts to an odd form of limited spin-wait before blocking in
624     * ForkJoinTask.join.
625     *
626     * As a more appropriate default in managed environments, unless
627     * overridden by system properties, we use workers of subclass
628     * InnocuousForkJoinWorkerThread when there is a SecurityManager
629     * present. These workers have no permissions set, do not belong
630     * to any user-defined ThreadGroup, and erase all ThreadLocals
631     * after executing any top-level task (see WorkQueue.runTask).
632     * The associated mechanics (mainly in ForkJoinWorkerThread) may
633     * be JVM-dependent and must access particular Thread class fields
634     * to achieve this effect.
635     *
636     * Style notes
637     * ===========
638     *
639     * Memory ordering relies mainly on Unsafe intrinsics that carry
640     * the further responsibility of explicitly performing null- and
641     * bounds- checks otherwise carried out implicitly by JVMs.  This
642     * can be awkward and ugly, but also reflects the need to control
643     * outcomes across the unusual cases that arise in very racy code
644     * with very few invariants. So these explicit checks would exist
645     * in some form anyway.  All fields are read into locals before
646     * use, and null-checked if they are references.  This is usually
647     * done in a "C"-like style of listing declarations at the heads
648     * of methods or blocks, and using inline assignments on first
649     * encounter.  Array bounds-checks are usually performed by
650     * masking with array.length-1, which relies on the invariant that
651     * these arrays are created with positive lengths, which is itself
652     * paranoically checked. Nearly all explicit checks lead to
653     * bypass/return, not exception throws, because they may
654     * legitimately arise due to cancellation/revocation during
655     * shutdown.
656     *
657     * There is a lot of representation-level coupling among classes
658     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
659     * fields of WorkQueue maintain data structures managed by
660     * ForkJoinPool, so are directly accessed.  There is little point
661     * trying to reduce this, since any associated future changes in
662     * representations will need to be accompanied by algorithmic
663     * changes anyway. Several methods intrinsically sprawl because
664     * they must accumulate sets of consistent reads of fields held in
665     * local variables.  There are also other coding oddities
666     * (including several unnecessary-looking hoisted null checks)
667     * that help some methods perform reasonably even when interpreted
668     * (not compiled).
669     *
670     * The order of declarations in this file is (with a few exceptions):
671     * (1) Static utility functions
672     * (2) Nested (static) classes
673     * (3) Static fields
674     * (4) Fields, along with constants used when unpacking some of them
675     * (5) Internal control methods
676     * (6) Callbacks and other support for ForkJoinTask methods
677     * (7) Exported methods
678     * (8) Static block initializing statics in minimally dependent order
679     */
680
681    // Static utilities
682
683    /**
684     * If there is a security manager, makes sure caller has
685     * permission to modify threads.
686     */
687    private static void checkPermission() {
688        SecurityManager security = System.getSecurityManager();
689        if (security != null)
690            security.checkPermission(modifyThreadPermission);
691    }
692
693    // Nested classes
694
695    /**
696     * Factory for creating new {@link ForkJoinWorkerThread}s.
697     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
698     * for {@code ForkJoinWorkerThread} subclasses that extend base
699     * functionality or initialize threads with different contexts.
700     */
701    public static interface ForkJoinWorkerThreadFactory {
702        /**
703         * Returns a new worker thread operating in the given pool.
704         *
705         * @param pool the pool this thread works in
706         * @return the new worker thread, or {@code null} if the request
707         *         to create a thread is rejected
708         * @throws NullPointerException if the pool is null
709         */
710        public ForkJoinWorkerThread newThread(ForkJoinPool pool);
711    }
712
713    /**
714     * Default ForkJoinWorkerThreadFactory implementation; creates a
715     * new ForkJoinWorkerThread.
716     */
717    private static final class DefaultForkJoinWorkerThreadFactory
718        implements ForkJoinWorkerThreadFactory {
719        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
720            return new ForkJoinWorkerThread(pool);
721        }
722    }
723
724    /**
725     * Class for artificial tasks that are used to replace the target
726     * of local joins if they are removed from an interior queue slot
727     * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
728     * actually do anything beyond having a unique identity.
729     */
730    private static final class EmptyTask extends ForkJoinTask<Void> {
731        private static final long serialVersionUID = -7721805057305804111L;
732        EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
733        public final Void getRawResult() { return null; }
734        public final void setRawResult(Void x) {}
735        public final boolean exec() { return true; }
736    }
737
738    /**
739     * Additional fields and lock created upon initialization.
740     */
741    private static final class AuxState extends ReentrantLock {
742        private static final long serialVersionUID = -6001602636862214147L;
743        volatile long stealCount;     // cumulative steal count
744        long indexSeed;               // index bits for registerWorker
745        AuxState() {}
746    }
747
748    // Constants shared across ForkJoinPool and WorkQueue
749
750    // Bounds
751    static final int SMASK        = 0xffff;        // short bits == max index
752    static final int MAX_CAP      = 0x7fff;        // max #workers - 1
753    static final int EVENMASK     = 0xfffe;        // even short bits
754    static final int SQMASK       = 0x007e;        // max 64 (even) slots
755
756    // Masks and units for WorkQueue.scanState and ctl sp subfield
757    static final int UNSIGNALLED  = 1 << 31;       // must be negative
758    static final int SS_SEQ       = 1 << 16;       // version count
759
760    // Mode bits for ForkJoinPool.config and WorkQueue.config
761    static final int MODE_MASK    = 0xffff << 16;  // top half of int
762    static final int SPARE_WORKER = 1 << 17;       // set if tc > 0 on creation
763    static final int UNREGISTERED = 1 << 18;       // to skip some of deregister
764    static final int FIFO_QUEUE   = 1 << 31;       // must be negative
765    static final int LIFO_QUEUE   = 0;             // for clarity
766    static final int IS_OWNED     = 1;             // low bit 0 if shared
767
768    /**
769     * The maximum number of task executions from the same queue
770     * before checking other queues, bounding unfairness and impact of
771     * infinite user task recursion.  Must be a power of two minus 1.
772     */
773    static final int POLL_LIMIT = (1 << 10) - 1;
774
775    /**
776     * Queues supporting work-stealing as well as external task
777     * submission. See above for descriptions and algorithms.
778     * Performance on most platforms is very sensitive to placement of
779     * instances of both WorkQueues and their arrays -- we absolutely
780     * do not want multiple WorkQueue instances or multiple queue
781     * arrays sharing cache lines. The @Contended annotation alerts
782     * JVMs to try to keep instances apart.
783     */
784    //@jdk.internal.vm.annotation.Contended   // Android-removed
785    static final class WorkQueue {
786
787        /**
788         * Capacity of work-stealing queue array upon initialization.
789         * Must be a power of two; at least 4, but should be larger to
790         * reduce or eliminate cacheline sharing among queues.
791         * Currently, it is much larger, as a partial workaround for
792         * the fact that JVMs often place arrays in locations that
793         * share GC bookkeeping (especially cardmarks) such that
794         * per-write accesses encounter serious memory contention.
795         */
796        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
797
798        /**
799         * Maximum size for queue arrays. Must be a power of two less
800         * than or equal to 1 << (31 - width of array entry) to ensure
801         * lack of wraparound of index calculations, but defined to a
802         * value a bit less than this to help users trap runaway
803         * programs before saturating systems.
804         */
805        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
806
807        // Instance fields
808
809        volatile int scanState;    // versioned, negative if inactive
810        int stackPred;             // pool stack (ctl) predecessor
811        int nsteals;               // number of steals
812        int hint;                  // randomization and stealer index hint
813        int config;                // pool index and mode
814        volatile int qlock;        // 1: locked, < 0: terminate; else 0
815        volatile int base;         // index of next slot for poll
816        int top;                   // index of next slot for push
817        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
818        final ForkJoinPool pool;   // the containing pool (may be null)
819        final ForkJoinWorkerThread owner; // owning thread or null if shared
820        volatile Thread parker;    // == owner during call to park; else null
821        volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
822
823      // @jdk.internal.vm.annotation.Contended("group2") // segregate // Android-removed
824        volatile ForkJoinTask<?> currentSteal; // nonnull when running some task
825
826        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
827            this.pool = pool;
828            this.owner = owner;
829            // Place indices in the center of array (that is not yet allocated)
830            base = top = INITIAL_QUEUE_CAPACITY >>> 1;
831        }
832
833        /**
834         * Returns an exportable index (used by ForkJoinWorkerThread).
835         */
836        final int getPoolIndex() {
837            return (config & 0xffff) >>> 1; // ignore odd/even tag bit
838        }
839
840        /**
841         * Returns the approximate number of tasks in the queue.
842         */
843        final int queueSize() {
844            int n = base - top;       // read base first
845            return (n >= 0) ? 0 : -n; // ignore transient negative
846        }
847
848        /**
849         * Provides a more accurate estimate of whether this queue has
850         * any tasks than does queueSize, by checking whether a
851         * near-empty queue has at least one unclaimed task.
852         */
853        final boolean isEmpty() {
854            ForkJoinTask<?>[] a; int n, al, s;
855            return ((n = base - (s = top)) >= 0 || // possibly one task
856                    (n == -1 && ((a = array) == null ||
857                                 (al = a.length) == 0 ||
858                                 a[(al - 1) & (s - 1)] == null)));
859        }
860
861        /**
862         * Pushes a task. Call only by owner in unshared queues.
863         *
864         * @param task the task. Caller must ensure non-null.
865         * @throws RejectedExecutionException if array cannot be resized
866         */
867        final void push(ForkJoinTask<?> task) {
868            U.storeFence();              // ensure safe publication
869            int s = top, al, d; ForkJoinTask<?>[] a;
870            if ((a = array) != null && (al = a.length) > 0) {
871                a[(al - 1) & s] = task;  // relaxed writes OK
872                top = s + 1;
873                ForkJoinPool p = pool;
874                if ((d = base - s) == 0 && p != null) {
875                    U.fullFence();
876                    p.signalWork();
877                }
878                else if (al + d == 1)
879                    growArray();
880            }
881        }
882
883        /**
884         * Initializes or doubles the capacity of array. Call either
885         * by owner or with lock held -- it is OK for base, but not
886         * top, to move while resizings are in progress.
887         */
888        final ForkJoinTask<?>[] growArray() {
889            ForkJoinTask<?>[] oldA = array;
890            int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
891            if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
892                throw new RejectedExecutionException("Queue capacity exceeded");
893            int oldMask, t, b;
894            ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
895            if (oldA != null && (oldMask = oldA.length - 1) > 0 &&
896                (t = top) - (b = base) > 0) {
897                int mask = size - 1;
898                do { // emulate poll from old array, push to new array
899                    int index = b & oldMask;
900                    long offset = ((long)index << ASHIFT) + ABASE;
901                    ForkJoinTask<?> x = (ForkJoinTask<?>)
902                        U.getObjectVolatile(oldA, offset);
903                    if (x != null &&
904                        U.compareAndSwapObject(oldA, offset, x, null))
905                        a[b & mask] = x;
906                } while (++b != t);
907                U.storeFence();
908            }
909            return a;
910        }
911
912        /**
913         * Takes next task, if one exists, in LIFO order.  Call only
914         * by owner in unshared queues.
915         */
916        final ForkJoinTask<?> pop() {
917            int b = base, s = top, al, i; ForkJoinTask<?>[] a;
918            if ((a = array) != null && b != s && (al = a.length) > 0) {
919                int index = (al - 1) & --s;
920                long offset = ((long)index << ASHIFT) + ABASE;
921                ForkJoinTask<?> t = (ForkJoinTask<?>)
922                    U.getObject(a, offset);
923                if (t != null &&
924                    U.compareAndSwapObject(a, offset, t, null)) {
925                    top = s;
926                    return t;
927                }
928            }
929            return null;
930        }
931
932        /**
933         * Takes a task in FIFO order if b is base of queue and a task
934         * can be claimed without contention. Specialized versions
935         * appear in ForkJoinPool methods scan and helpStealer.
936         */
937        final ForkJoinTask<?> pollAt(int b) {
938            ForkJoinTask<?>[] a; int al;
939            if ((a = array) != null && (al = a.length) > 0) {
940                int index = (al - 1) & b;
941                long offset = ((long)index << ASHIFT) + ABASE;
942                ForkJoinTask<?> t = (ForkJoinTask<?>)
943                    U.getObjectVolatile(a, offset);
944                if (t != null && b++ == base &&
945                    U.compareAndSwapObject(a, offset, t, null)) {
946                    base = b;
947                    return t;
948                }
949            }
950            return null;
951        }
952
953        /**
954         * Takes next task, if one exists, in FIFO order.
955         */
956        final ForkJoinTask<?> poll() {
957            for (;;) {
958                int b = base, s = top, d, al; ForkJoinTask<?>[] a;
959                if ((a = array) != null && (d = b - s) < 0 &&
960                    (al = a.length) > 0) {
961                    int index = (al - 1) & b;
962                    long offset = ((long)index << ASHIFT) + ABASE;
963                    ForkJoinTask<?> t = (ForkJoinTask<?>)
964                        U.getObjectVolatile(a, offset);
965                    if (b++ == base) {
966                        if (t != null) {
967                            if (U.compareAndSwapObject(a, offset, t, null)) {
968                                base = b;
969                                return t;
970                            }
971                        }
972                        else if (d == -1)
973                            break; // now empty
974                    }
975                }
976                else
977                    break;
978            }
979            return null;
980        }
981
982        /**
983         * Takes next task, if one exists, in order specified by mode.
984         */
985        final ForkJoinTask<?> nextLocalTask() {
986            return (config < 0) ? poll() : pop();
987        }
988
989        /**
990         * Returns next task, if one exists, in order specified by mode.
991         */
992        final ForkJoinTask<?> peek() {
993            int al; ForkJoinTask<?>[] a;
994            return ((a = array) != null && (al = a.length) > 0) ?
995                a[(al - 1) & (config < 0 ? base : top - 1)] : null;
996        }
997
998        /**
999         * Pops the given task only if it is at the current top.
1000         */
1001        final boolean tryUnpush(ForkJoinTask<?> task) {
1002            int b = base, s = top, al; ForkJoinTask<?>[] a;
1003            if ((a = array) != null && b != s && (al = a.length) > 0) {
1004                int index = (al - 1) & --s;
1005                long offset = ((long)index << ASHIFT) + ABASE;
1006                if (U.compareAndSwapObject(a, offset, task, null)) {
1007                    top = s;
1008                    return true;
1009                }
1010            }
1011            return false;
1012        }
1013
1014        /**
1015         * Shared version of push. Fails if already locked.
1016         *
1017         * @return status: > 0 locked, 0 possibly was empty, < 0 was nonempty
1018         */
1019        final int sharedPush(ForkJoinTask<?> task) {
1020            int stat;
1021            if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1022                int b = base, s = top, al, d; ForkJoinTask<?>[] a;
1023                if ((a = array) != null && (al = a.length) > 0 &&
1024                    al - 1 + (d = b - s) > 0) {
1025                    a[(al - 1) & s] = task;
1026                    top = s + 1;                 // relaxed writes OK here
1027                    qlock = 0;
1028                    stat = (d < 0 && b == base) ? d : 0;
1029                }
1030                else {
1031                    growAndSharedPush(task);
1032                    stat = 0;
1033                }
1034            }
1035            else
1036                stat = 1;
1037            return stat;
1038        }
1039
1040        /**
1041         * Helper for sharedPush; called only when locked and resize
1042         * needed.
1043         */
1044        private void growAndSharedPush(ForkJoinTask<?> task) {
1045            try {
1046                growArray();
1047                int s = top, al; ForkJoinTask<?>[] a;
1048                if ((a = array) != null && (al = a.length) > 0) {
1049                    a[(al - 1) & s] = task;
1050                    top = s + 1;
1051                }
1052            } finally {
1053                qlock = 0;
1054            }
1055        }
1056
1057        /**
1058         * Shared version of tryUnpush.
1059         */
1060        final boolean trySharedUnpush(ForkJoinTask<?> task) {
1061            boolean popped = false;
1062            int s = top - 1, al; ForkJoinTask<?>[] a;
1063            if ((a = array) != null && (al = a.length) > 0) {
1064                int index = (al - 1) & s;
1065                long offset = ((long)index << ASHIFT) + ABASE;
1066                ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1067                if (t == task &&
1068                    U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1069                    if (top == s + 1 && array == a &&
1070                        U.compareAndSwapObject(a, offset, task, null)) {
1071                        popped = true;
1072                        top = s;
1073                    }
1074                    U.putOrderedInt(this, QLOCK, 0);
1075                }
1076            }
1077            return popped;
1078        }
1079
1080        /**
1081         * Removes and cancels all known tasks, ignoring any exceptions.
1082         */
1083        final void cancelAll() {
1084            ForkJoinTask<?> t;
1085            if ((t = currentJoin) != null) {
1086                currentJoin = null;
1087                ForkJoinTask.cancelIgnoringExceptions(t);
1088            }
1089            if ((t = currentSteal) != null) {
1090                currentSteal = null;
1091                ForkJoinTask.cancelIgnoringExceptions(t);
1092            }
1093            while ((t = poll()) != null)
1094                ForkJoinTask.cancelIgnoringExceptions(t);
1095        }
1096
1097        // Specialized execution methods
1098
1099        /**
1100         * Pops and executes up to POLL_LIMIT tasks or until empty.
1101         */
1102        final void localPopAndExec() {
1103            for (int nexec = 0;;) {
1104                int b = base, s = top, al; ForkJoinTask<?>[] a;
1105                if ((a = array) != null && b != s && (al = a.length) > 0) {
1106                    int index = (al - 1) & --s;
1107                    long offset = ((long)index << ASHIFT) + ABASE;
1108                    ForkJoinTask<?> t = (ForkJoinTask<?>)
1109                        U.getAndSetObject(a, offset, null);
1110                    if (t != null) {
1111                        top = s;
1112                        (currentSteal = t).doExec();
1113                        if (++nexec > POLL_LIMIT)
1114                            break;
1115                    }
1116                    else
1117                        break;
1118                }
1119                else
1120                    break;
1121            }
1122        }
1123
1124        /**
1125         * Polls and executes up to POLL_LIMIT tasks or until empty.
1126         */
1127        final void localPollAndExec() {
1128            for (int nexec = 0;;) {
1129                int b = base, s = top, al; ForkJoinTask<?>[] a;
1130                if ((a = array) != null && b != s && (al = a.length) > 0) {
1131                    int index = (al - 1) & b++;
1132                    long offset = ((long)index << ASHIFT) + ABASE;
1133                    ForkJoinTask<?> t = (ForkJoinTask<?>)
1134                        U.getAndSetObject(a, offset, null);
1135                    if (t != null) {
1136                        base = b;
1137                        t.doExec();
1138                        if (++nexec > POLL_LIMIT)
1139                            break;
1140                    }
1141                }
1142                else
1143                    break;
1144            }
1145        }
1146
1147        /**
1148         * Executes the given task and (some) remaining local tasks.
1149         */
1150        final void runTask(ForkJoinTask<?> task) {
1151            if (task != null) {
1152                task.doExec();
1153                if (config < 0)
1154                    localPollAndExec();
1155                else
1156                    localPopAndExec();
1157                int ns = ++nsteals;
1158                ForkJoinWorkerThread thread = owner;
1159                currentSteal = null;
1160                if (ns < 0)           // collect on overflow
1161                    transferStealCount(pool);
1162                if (thread != null)
1163                    thread.afterTopLevelExec();
1164            }
1165        }
1166
1167        /**
1168         * Adds steal count to pool steal count if it exists, and resets.
1169         */
1170        final void transferStealCount(ForkJoinPool p) {
1171            AuxState aux;
1172            if (p != null && (aux = p.auxState) != null) {
1173                long s = nsteals;
1174                nsteals = 0;            // if negative, correct for overflow
1175                if (s < 0) s = Integer.MAX_VALUE;
1176                aux.lock();
1177                try {
1178                    aux.stealCount += s;
1179                } finally {
1180                    aux.unlock();
1181                }
1182            }
1183        }
1184
1185        /**
1186         * If present, removes from queue and executes the given task,
1187         * or any other cancelled task. Used only by awaitJoin.
1188         *
1189         * @return true if queue empty and task not known to be done
1190         */
1191        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
1192            if (task != null && task.status >= 0) {
1193                int b, s, d, al; ForkJoinTask<?>[] a;
1194                while ((d = (b = base) - (s = top)) < 0 &&
1195                       (a = array) != null && (al = a.length) > 0) {
1196                    for (;;) {      // traverse from s to b
1197                        int index = --s & (al - 1);
1198                        long offset = (index << ASHIFT) + ABASE;
1199                        ForkJoinTask<?> t = (ForkJoinTask<?>)
1200                            U.getObjectVolatile(a, offset);
1201                        if (t == null)
1202                            break;                   // restart
1203                        else if (t == task) {
1204                            boolean removed = false;
1205                            if (s + 1 == top) {      // pop
1206                                if (U.compareAndSwapObject(a, offset, t, null)) {
1207                                    top = s;
1208                                    removed = true;
1209                                }
1210                            }
1211                            else if (base == b)      // replace with proxy
1212                                removed = U.compareAndSwapObject(a, offset, t,
1213                                                                 new EmptyTask());
1214                            if (removed) {
1215                                ForkJoinTask<?> ps = currentSteal;
1216                                (currentSteal = task).doExec();
1217                                currentSteal = ps;
1218                            }
1219                            break;
1220                        }
1221                        else if (t.status < 0 && s + 1 == top) {
1222                            if (U.compareAndSwapObject(a, offset, t, null)) {
1223                                top = s;
1224                            }
1225                            break;                  // was cancelled
1226                        }
1227                        else if (++d == 0) {
1228                            if (base != b)          // rescan
1229                                break;
1230                            return false;
1231                        }
1232                    }
1233                    if (task.status < 0)
1234                        return false;
1235                }
1236            }
1237            return true;
1238        }
1239
1240        /**
1241         * Pops task if in the same CC computation as the given task,
1242         * in either shared or owned mode. Used only by helpComplete.
1243         */
1244        final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
1245            int b = base, s = top, al; ForkJoinTask<?>[] a;
1246            if ((a = array) != null && b != s && (al = a.length) > 0) {
1247                int index = (al - 1) & (s - 1);
1248                long offset = ((long)index << ASHIFT) + ABASE;
1249                ForkJoinTask<?> o = (ForkJoinTask<?>)
1250                    U.getObjectVolatile(a, offset);
1251                if (o instanceof CountedCompleter) {
1252                    CountedCompleter<?> t = (CountedCompleter<?>)o;
1253                    for (CountedCompleter<?> r = t;;) {
1254                        if (r == task) {
1255                            if ((mode & IS_OWNED) == 0) {
1256                                boolean popped = false;
1257                                if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1258                                    if (top == s && array == a &&
1259                                        U.compareAndSwapObject(a, offset,
1260                                                               t, null)) {
1261                                        popped = true;
1262                                        top = s - 1;
1263                                    }
1264                                    U.putOrderedInt(this, QLOCK, 0);
1265                                    if (popped)
1266                                        return t;
1267                                }
1268                            }
1269                            else if (U.compareAndSwapObject(a, offset,
1270                                                            t, null)) {
1271                                top = s - 1;
1272                                return t;
1273                            }
1274                            break;
1275                        }
1276                        else if ((r = r.completer) == null) // try parent
1277                            break;
1278                    }
1279                }
1280            }
1281            return null;
1282        }
1283
1284        /**
1285         * Steals and runs a task in the same CC computation as the
1286         * given task if one exists and can be taken without
1287         * contention. Otherwise returns a checksum/control value for
1288         * use by method helpComplete.
1289         *
1290         * @return 1 if successful, 2 if retryable (lost to another
1291         * stealer), -1 if non-empty but no matching task found, else
1292         * the base index, forced negative.
1293         */
1294        final int pollAndExecCC(CountedCompleter<?> task) {
1295            ForkJoinTask<?>[] a;
1296            int b = base, s = top, al, h;
1297            if ((a = array) != null && b != s && (al = a.length) > 0) {
1298                int index = (al - 1) & b;
1299                long offset = ((long)index << ASHIFT) + ABASE;
1300                ForkJoinTask<?> o = (ForkJoinTask<?>)
1301                    U.getObjectVolatile(a, offset);
1302                if (o == null)
1303                    h = 2;                      // retryable
1304                else if (!(o instanceof CountedCompleter))
1305                    h = -1;                     // unmatchable
1306                else {
1307                    CountedCompleter<?> t = (CountedCompleter<?>)o;
1308                    for (CountedCompleter<?> r = t;;) {
1309                        if (r == task) {
1310                            if (b++ == base &&
1311                                U.compareAndSwapObject(a, offset, t, null)) {
1312                                base = b;
1313                                t.doExec();
1314                                h = 1;          // success
1315                            }
1316                            else
1317                                h = 2;          // lost CAS
1318                            break;
1319                        }
1320                        else if ((r = r.completer) == null) {
1321                            h = -1;             // unmatched
1322                            break;
1323                        }
1324                    }
1325                }
1326            }
1327            else
1328                h = b | Integer.MIN_VALUE;      // to sense movement on re-poll
1329            return h;
1330        }
1331
1332        /**
1333         * Returns true if owned and not known to be blocked.
1334         */
1335        final boolean isApparentlyUnblocked() {
1336            Thread wt; Thread.State s;
1337            return (scanState >= 0 &&
1338                    (wt = owner) != null &&
1339                    (s = wt.getState()) != Thread.State.BLOCKED &&
1340                    s != Thread.State.WAITING &&
1341                    s != Thread.State.TIMED_WAITING);
1342        }
1343
1344        // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1345        private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1346        private static final long QLOCK;
1347        private static final int ABASE;
1348        private static final int ASHIFT;
1349        static {
1350            try {
1351                QLOCK = U.objectFieldOffset
1352                    (WorkQueue.class.getDeclaredField("qlock"));
1353                ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1354                int scale = U.arrayIndexScale(ForkJoinTask[].class);
1355                if ((scale & (scale - 1)) != 0)
1356                    throw new Error("array index scale not a power of two");
1357                ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1358            } catch (ReflectiveOperationException e) {
1359                throw new Error(e);
1360            }
1361        }
1362    }
1363
1364    // static fields (initialized in static initializer below)
1365
1366    /**
1367     * Creates a new ForkJoinWorkerThread. This factory is used unless
1368     * overridden in ForkJoinPool constructors.
1369     */
1370    public static final ForkJoinWorkerThreadFactory
1371        defaultForkJoinWorkerThreadFactory;
1372
1373    /**
1374     * Permission required for callers of methods that may start or
1375     * kill threads.  Also used as a static lock in tryInitialize.
1376     */
1377    static final RuntimePermission modifyThreadPermission;
1378
1379    /**
1380     * Common (static) pool. Non-null for public use unless a static
1381     * construction exception, but internal usages null-check on use
1382     * to paranoically avoid potential initialization circularities
1383     * as well as to simplify generated code.
1384     */
1385    static final ForkJoinPool common;
1386
1387    /**
1388     * Common pool parallelism. To allow simpler use and management
1389     * when common pool threads are disabled, we allow the underlying
1390     * common.parallelism field to be zero, but in that case still report
1391     * parallelism as 1 to reflect resulting caller-runs mechanics.
1392     */
1393    static final int COMMON_PARALLELISM;
1394
1395    /**
1396     * Limit on spare thread construction in tryCompensate.
1397     */
1398    private static final int COMMON_MAX_SPARES;
1399
1400    /**
1401     * Sequence number for creating workerNamePrefix.
1402     */
1403    private static int poolNumberSequence;
1404
1405    /**
1406     * Returns the next sequence number. We don't expect this to
1407     * ever contend, so use simple builtin sync.
1408     */
1409    private static final synchronized int nextPoolId() {
1410        return ++poolNumberSequence;
1411    }
1412
1413    // static configuration constants
1414
1415    /**
1416     * Initial timeout value (in milliseconds) for the thread
1417     * triggering quiescence to park waiting for new work. On timeout,
1418     * the thread will instead try to shrink the number of workers.
1419     * The value should be large enough to avoid overly aggressive
1420     * shrinkage during most transient stalls (long GCs etc).
1421     */
1422    private static final long IDLE_TIMEOUT_MS = 2000L; // 2sec
1423
1424    /**
1425     * Tolerance for idle timeouts, to cope with timer undershoots.
1426     */
1427    private static final long TIMEOUT_SLOP_MS =   20L; // 20ms
1428
1429    /**
1430     * The default value for COMMON_MAX_SPARES.  Overridable using the
1431     * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1432     * property.  The default value is far in excess of normal
1433     * requirements, but also far short of MAX_CAP and typical OS
1434     * thread limits, so allows JVMs to catch misuse/abuse before
1435     * running out of resources needed to do so.
1436     */
1437    private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1438
1439    /**
1440     * Increment for seed generators. See class ThreadLocal for
1441     * explanation.
1442     */
1443    private static final int SEED_INCREMENT = 0x9e3779b9;
1444
1445    /*
1446     * Bits and masks for field ctl, packed with 4 16 bit subfields:
1447     * AC: Number of active running workers minus target parallelism
1448     * TC: Number of total workers minus target parallelism
1449     * SS: version count and status of top waiting thread
1450     * ID: poolIndex of top of Treiber stack of waiters
1451     *
1452     * When convenient, we can extract the lower 32 stack top bits
1453     * (including version bits) as sp=(int)ctl.  The offsets of counts
1454     * by the target parallelism and the positionings of fields makes
1455     * it possible to perform the most common checks via sign tests of
1456     * fields: When ac is negative, there are not enough active
1457     * workers, when tc is negative, there are not enough total
1458     * workers.  When sp is non-zero, there are waiting workers.  To
1459     * deal with possibly negative fields, we use casts in and out of
1460     * "short" and/or signed shifts to maintain signedness.
1461     *
1462     * Because it occupies uppermost bits, we can add one active count
1463     * using getAndAddLong of AC_UNIT, rather than CAS, when returning
1464     * from a blocked join.  Other updates entail multiple subfields
1465     * and masking, requiring CAS.
1466     */
1467
1468    // Lower and upper word masks
1469    private static final long SP_MASK    = 0xffffffffL;
1470    private static final long UC_MASK    = ~SP_MASK;
1471
1472    // Active counts
1473    private static final int  AC_SHIFT   = 48;
1474    private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
1475    private static final long AC_MASK    = 0xffffL << AC_SHIFT;
1476
1477    // Total counts
1478    private static final int  TC_SHIFT   = 32;
1479    private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
1480    private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1481    private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1482
1483    // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
1484    private static final int  STARTED    = 1;
1485    private static final int  STOP       = 1 << 1;
1486    private static final int  TERMINATED = 1 << 2;
1487    private static final int  SHUTDOWN   = 1 << 31;
1488
1489    // Instance fields
1490    volatile long ctl;                   // main pool control
1491    volatile int runState;
1492    final int config;                    // parallelism, mode
1493    AuxState auxState;                   // lock, steal counts
1494    volatile WorkQueue[] workQueues;     // main registry
1495    final String workerNamePrefix;       // to create worker name string
1496    final ForkJoinWorkerThreadFactory factory;
1497    final UncaughtExceptionHandler ueh;  // per-worker UEH
1498
1499    /**
1500     * Instantiates fields upon first submission, or upon shutdown if
1501     * no submissions. If checkTermination true, also responds to
1502     * termination by external calls submitting tasks.
1503     */
1504    private void tryInitialize(boolean checkTermination) {
1505        if (runState == 0) { // bootstrap by locking static field
1506            int p = config & SMASK;
1507            int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
1508            n |= n >>> 1;    // create workQueues array with size a power of two
1509            n |= n >>> 2;
1510            n |= n >>> 4;
1511            n |= n >>> 8;
1512            n |= n >>> 16;
1513            n = ((n + 1) << 1) & SMASK;
1514            AuxState aux = new AuxState();
1515            WorkQueue[] ws = new WorkQueue[n];
1516            synchronized (modifyThreadPermission) { // double-check
1517                if (runState == 0) {
1518                    workQueues = ws;
1519                    auxState = aux;
1520                    runState = STARTED;
1521                }
1522            }
1523        }
1524        if (checkTermination && runState < 0) {
1525            tryTerminate(false, false); // help terminate
1526            throw new RejectedExecutionException();
1527        }
1528    }
1529
1530    // Creating, registering and deregistering workers
1531
1532    /**
1533     * Tries to construct and start one worker. Assumes that total
1534     * count has already been incremented as a reservation.  Invokes
1535     * deregisterWorker on any failure.
1536     *
1537     * @param isSpare true if this is a spare thread
1538     * @return true if successful
1539     */
1540    private boolean createWorker(boolean isSpare) {
1541        ForkJoinWorkerThreadFactory fac = factory;
1542        Throwable ex = null;
1543        ForkJoinWorkerThread wt = null;
1544        WorkQueue q;
1545        try {
1546            if (fac != null && (wt = fac.newThread(this)) != null) {
1547                if (isSpare && (q = wt.workQueue) != null)
1548                    q.config |= SPARE_WORKER;
1549                wt.start();
1550                return true;
1551            }
1552        } catch (Throwable rex) {
1553            ex = rex;
1554        }
1555        deregisterWorker(wt, ex);
1556        return false;
1557    }
1558
1559    /**
1560     * Tries to add one worker, incrementing ctl counts before doing
1561     * so, relying on createWorker to back out on failure.
1562     *
1563     * @param c incoming ctl value, with total count negative and no
1564     * idle workers.  On CAS failure, c is refreshed and retried if
1565     * this holds (otherwise, a new worker is not needed).
1566     */
1567    private void tryAddWorker(long c) {
1568        do {
1569            long nc = ((AC_MASK & (c + AC_UNIT)) |
1570                       (TC_MASK & (c + TC_UNIT)));
1571            if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1572                createWorker(false);
1573                break;
1574            }
1575        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1576    }
1577
1578    /**
1579     * Callback from ForkJoinWorkerThread constructor to establish and
1580     * record its WorkQueue.
1581     *
1582     * @param wt the worker thread
1583     * @return the worker's queue
1584     */
1585    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1586        UncaughtExceptionHandler handler;
1587        AuxState aux;
1588        wt.setDaemon(true);                           // configure thread
1589        if ((handler = ueh) != null)
1590            wt.setUncaughtExceptionHandler(handler);
1591        WorkQueue w = new WorkQueue(this, wt);
1592        int i = 0;                                    // assign a pool index
1593        int mode = config & MODE_MASK;
1594        if ((aux = auxState) != null) {
1595            aux.lock();
1596            try {
1597                int s = (int)(aux.indexSeed += SEED_INCREMENT), n, m;
1598                WorkQueue[] ws = workQueues;
1599                if (ws != null && (n = ws.length) > 0) {
1600                    i = (m = n - 1) & ((s << 1) | 1); // odd-numbered indices
1601                    if (ws[i] != null) {              // collision
1602                        int probes = 0;               // step by approx half n
1603                        int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1604                        while (ws[i = (i + step) & m] != null) {
1605                            if (++probes >= n) {
1606                                workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1607                                m = n - 1;
1608                                probes = 0;
1609                            }
1610                        }
1611                    }
1612                    w.hint = s;                       // use as random seed
1613                    w.config = i | mode;
1614                    w.scanState = i | (s & 0x7fff0000); // random seq bits
1615                    ws[i] = w;
1616                }
1617            } finally {
1618                aux.unlock();
1619            }
1620        }
1621        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
1622        return w;
1623    }
1624
1625    /**
1626     * Final callback from terminating worker, as well as upon failure
1627     * to construct or start a worker.  Removes record of worker from
1628     * array, and adjusts counts. If pool is shutting down, tries to
1629     * complete termination.
1630     *
1631     * @param wt the worker thread, or null if construction failed
1632     * @param ex the exception causing failure, or null if none
1633     */
1634    final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1635        WorkQueue w = null;
1636        if (wt != null && (w = wt.workQueue) != null) {
1637            AuxState aux; WorkQueue[] ws;          // remove index from array
1638            int idx = w.config & SMASK;
1639            int ns = w.nsteals;
1640            if ((aux = auxState) != null) {
1641                aux.lock();
1642                try {
1643                    if ((ws = workQueues) != null && ws.length > idx &&
1644                        ws[idx] == w)
1645                        ws[idx] = null;
1646                    aux.stealCount += ns;
1647                } finally {
1648                    aux.unlock();
1649                }
1650            }
1651        }
1652        if (w == null || (w.config & UNREGISTERED) == 0) { // else pre-adjusted
1653            long c;                                   // decrement counts
1654            do {} while (!U.compareAndSwapLong
1655                         (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
1656                                               (TC_MASK & (c - TC_UNIT)) |
1657                                               (SP_MASK & c))));
1658        }
1659        if (w != null) {
1660            w.currentSteal = null;
1661            w.qlock = -1;                             // ensure set
1662            w.cancelAll();                            // cancel remaining tasks
1663        }
1664        while (tryTerminate(false, false) >= 0) {     // possibly replace
1665            WorkQueue[] ws; int wl, sp; long c;
1666            if (w == null || w.array == null ||
1667                (ws = workQueues) == null || (wl = ws.length) <= 0)
1668                break;
1669            else if ((sp = (int)(c = ctl)) != 0) {    // wake up replacement
1670                if (tryRelease(c, ws[(wl - 1) & sp], AC_UNIT))
1671                    break;
1672            }
1673            else if (ex != null && (c & ADD_WORKER) != 0L) {
1674                tryAddWorker(c);                      // create replacement
1675                break;
1676            }
1677            else                                      // don't need replacement
1678                break;
1679        }
1680        if (ex == null)                               // help clean on way out
1681            ForkJoinTask.helpExpungeStaleExceptions();
1682        else                                          // rethrow
1683            ForkJoinTask.rethrow(ex);
1684    }
1685
1686    // Signalling
1687
1688    /**
1689     * Tries to create or activate a worker if too few are active.
1690     */
1691    final void signalWork() {
1692        for (;;) {
1693            long c; int sp, i; WorkQueue v; WorkQueue[] ws;
1694            if ((c = ctl) >= 0L)                      // enough workers
1695                break;
1696            else if ((sp = (int)c) == 0) {            // no idle workers
1697                if ((c & ADD_WORKER) != 0L)           // too few workers
1698                    tryAddWorker(c);
1699                break;
1700            }
1701            else if ((ws = workQueues) == null)
1702                break;                                // unstarted/terminated
1703            else if (ws.length <= (i = sp & SMASK))
1704                break;                                // terminated
1705            else if ((v = ws[i]) == null)
1706                break;                                // terminating
1707            else {
1708                int ns = sp & ~UNSIGNALLED;
1709                int vs = v.scanState;
1710                long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT));
1711                if (sp == vs && U.compareAndSwapLong(this, CTL, c, nc)) {
1712                    v.scanState = ns;
1713                    LockSupport.unpark(v.parker);
1714                    break;
1715                }
1716            }
1717        }
1718    }
1719
1720    /**
1721     * Signals and releases worker v if it is top of idle worker
1722     * stack.  This performs a one-shot version of signalWork only if
1723     * there is (apparently) at least one idle worker.
1724     *
1725     * @param c incoming ctl value
1726     * @param v if non-null, a worker
1727     * @param inc the increment to active count (zero when compensating)
1728     * @return true if successful
1729     */
1730    private boolean tryRelease(long c, WorkQueue v, long inc) {
1731        int sp = (int)c, ns = sp & ~UNSIGNALLED;
1732        if (v != null) {
1733            int vs = v.scanState;
1734            long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + inc));
1735            if (sp == vs && U.compareAndSwapLong(this, CTL, c, nc)) {
1736                v.scanState = ns;
1737                LockSupport.unpark(v.parker);
1738                return true;
1739            }
1740        }
1741        return false;
1742    }
1743
1744    /**
1745     * With approx probability of a missed signal, tries (once) to
1746     * reactivate worker w (or some other worker), failing if stale or
1747     * known to be already active.
1748     *
1749     * @param w the worker
1750     * @param ws the workQueue array to use
1751     * @param r random seed
1752     */
1753    private void tryReactivate(WorkQueue w, WorkQueue[] ws, int r) {
1754        long c; int sp, wl; WorkQueue v;
1755        if ((sp = (int)(c = ctl)) != 0 && w != null &&
1756            ws != null && (wl = ws.length) > 0 &&
1757            ((sp ^ r) & SS_SEQ) == 0 &&
1758            (v = ws[(wl - 1) & sp]) != null) {
1759            long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT));
1760            int ns = sp & ~UNSIGNALLED;
1761            if (w.scanState < 0 &&
1762                v.scanState == sp &&
1763                U.compareAndSwapLong(this, CTL, c, nc)) {
1764                v.scanState = ns;
1765                LockSupport.unpark(v.parker);
1766            }
1767        }
1768    }
1769
1770    /**
1771     * If worker w exists and is active, enqueues and sets status to inactive.
1772     *
1773     * @param w the worker
1774     * @param ss current (non-negative) scanState
1775     */
1776    private void inactivate(WorkQueue w, int ss) {
1777        int ns = (ss + SS_SEQ) | UNSIGNALLED;
1778        long lc = ns & SP_MASK, nc, c;
1779        if (w != null) {
1780            w.scanState = ns;
1781            do {
1782                nc = lc | (UC_MASK & ((c = ctl) - AC_UNIT));
1783                w.stackPred = (int)c;
1784            } while (!U.compareAndSwapLong(this, CTL, c, nc));
1785        }
1786    }
1787
1788    /**
1789     * Possibly blocks worker w waiting for signal, or returns
1790     * negative status if the worker should terminate. May return
1791     * without status change if multiple stale unparks and/or
1792     * interrupts occur.
1793     *
1794     * @param w the calling worker
1795     * @return negative if w should terminate
1796     */
1797    private int awaitWork(WorkQueue w) {
1798        int stat = 0;
1799        if (w != null && w.scanState < 0) {
1800            long c = ctl;
1801            if ((int)(c >> AC_SHIFT) + (config & SMASK) <= 0)
1802                stat = timedAwaitWork(w, c);     // possibly quiescent
1803            else if ((runState & STOP) != 0)
1804                stat = w.qlock = -1;             // pool terminating
1805            else if (w.scanState < 0) {
1806                w.parker = Thread.currentThread();
1807                if (w.scanState < 0)             // recheck after write
1808                    LockSupport.park(this);
1809                w.parker = null;
1810                if ((runState & STOP) != 0)
1811                    stat = w.qlock = -1;         // recheck
1812                else if (w.scanState < 0)
1813                    Thread.interrupted();        // clear status
1814            }
1815        }
1816        return stat;
1817    }
1818
1819    /**
1820     * Possibly triggers shutdown and tries (once) to block worker
1821     * when pool is (or may be) quiescent. Waits up to a duration
1822     * determined by number of workers.  On timeout, if ctl has not
1823     * changed, terminates the worker, which will in turn wake up
1824     * another worker to possibly repeat this process.
1825     *
1826     * @param w the calling worker
1827     * @return negative if w should terminate
1828     */
1829    private int timedAwaitWork(WorkQueue w, long c) {
1830        int stat = 0;
1831        int scale = 1 - (short)(c >>> TC_SHIFT);
1832        long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS +
1833                         System.currentTimeMillis());
1834        if ((runState >= 0 || (stat = tryTerminate(false, false)) > 0) &&
1835            w != null && w.scanState < 0) {
1836            int ss; AuxState aux;
1837            w.parker = Thread.currentThread();
1838            if (w.scanState < 0)
1839                LockSupport.parkUntil(this, deadline);
1840            w.parker = null;
1841            if ((runState & STOP) != 0)
1842                stat = w.qlock = -1;         // pool terminating
1843            else if ((ss = w.scanState) < 0 && !Thread.interrupted() &&
1844                     (int)c == ss && (aux = auxState) != null && ctl == c &&
1845                     deadline - System.currentTimeMillis() <= TIMEOUT_SLOP_MS) {
1846                aux.lock();
1847                try {                        // pre-deregister
1848                    WorkQueue[] ws;
1849                    int cfg = w.config, idx = cfg & SMASK;
1850                    long nc = ((UC_MASK & (c - TC_UNIT)) |
1851                               (SP_MASK & w.stackPred));
1852                    if ((runState & STOP) == 0 &&
1853                        (ws = workQueues) != null &&
1854                        idx < ws.length && idx >= 0 && ws[idx] == w &&
1855                        U.compareAndSwapLong(this, CTL, c, nc)) {
1856                        ws[idx] = null;
1857                        w.config = cfg | UNREGISTERED;
1858                        stat = w.qlock = -1;
1859                    }
1860                } finally {
1861                    aux.unlock();
1862                }
1863            }
1864        }
1865        return stat;
1866    }
1867
1868    /**
1869     * If the given worker is a spare with no queued tasks, and there
1870     * are enough existing workers, drops it from ctl counts and sets
1871     * its state to terminated.
1872     *
1873     * @param w the calling worker -- must be a spare
1874     * @return true if dropped (in which case it must not process more tasks)
1875     */
1876    private boolean tryDropSpare(WorkQueue w) {
1877        if (w != null && w.isEmpty()) {           // no local tasks
1878            long c; int sp, wl; WorkQueue[] ws; WorkQueue v;
1879            while ((short)((c = ctl) >> TC_SHIFT) > 0 &&
1880                   ((sp = (int)c) != 0 || (int)(c >> AC_SHIFT) > 0) &&
1881                   (ws = workQueues) != null && (wl = ws.length) > 0) {
1882                boolean dropped, canDrop;
1883                if (sp == 0) {                    // no queued workers
1884                    long nc = ((AC_MASK & (c - AC_UNIT)) |
1885                               (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c));
1886                    dropped = U.compareAndSwapLong(this, CTL, c, nc);
1887                }
1888                else if (
1889                    (v = ws[(wl - 1) & sp]) == null || v.scanState != sp)
1890                    dropped = false;              // stale; retry
1891                else {
1892                    long nc = v.stackPred & SP_MASK;
1893                    if (w == v || w.scanState >= 0) {
1894                        canDrop = true;           // w unqueued or topmost
1895                        nc |= ((AC_MASK & c) |    // ensure replacement
1896                               (TC_MASK & (c - TC_UNIT)));
1897                    }
1898                    else {                        // w may be queued
1899                        canDrop = false;          // help uncover
1900                        nc |= ((AC_MASK & (c + AC_UNIT)) |
1901                               (TC_MASK & c));
1902                    }
1903                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1904                        v.scanState = sp & ~UNSIGNALLED;
1905                        LockSupport.unpark(v.parker);
1906                        dropped = canDrop;
1907                    }
1908                    else
1909                        dropped = false;
1910                }
1911                if (dropped) {                    // pre-deregister
1912                    int cfg = w.config, idx = cfg & SMASK;
1913                    if (idx >= 0 && idx < ws.length && ws[idx] == w)
1914                        ws[idx] = null;
1915                    w.config = cfg | UNREGISTERED;
1916                    w.qlock = -1;
1917                    return true;
1918                }
1919            }
1920        }
1921        return false;
1922    }
1923
1924    /**
1925     * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1926     */
1927    final void runWorker(WorkQueue w) {
1928        w.growArray();                                  // allocate queue
1929        int bound = (w.config & SPARE_WORKER) != 0 ? 0 : POLL_LIMIT;
1930        long seed = w.hint * 0xdaba0b6eb09322e3L;       // initial random seed
1931        if ((runState & STOP) == 0) {
1932            for (long r = (seed == 0L) ? 1L : seed;;) { // ensure nonzero
1933                if (bound == 0 && tryDropSpare(w))
1934                    break;
1935                // high bits of prev seed for step; current low bits for idx
1936                int step = (int)(r >>> 48) | 1;
1937                r ^= r >>> 12; r ^= r << 25; r ^= r >>> 27; // xorshift
1938                if (scan(w, bound, step, (int)r) < 0 && awaitWork(w) < 0)
1939                    break;
1940            }
1941        }
1942    }
1943
1944    // Scanning for tasks
1945
1946    /**
1947     * Repeatedly scans for and tries to steal and execute (via
1948     * workQueue.runTask) a queued task. Each scan traverses queues in
1949     * pseudorandom permutation. Upon finding a non-empty queue, makes
1950     * at most the given bound attempts to re-poll (fewer if
1951     * contended) on the same queue before returning (impossible
1952     * scanState value) 0 to restart scan. Else returns after at least
1953     * 1 and at most 32 full scans.
1954     *
1955     * @param w the worker (via its WorkQueue)
1956     * @param bound repoll bound as bitmask (0 if spare)
1957     * @param step (circular) index increment per iteration (must be odd)
1958     * @param r a random seed for origin index
1959     * @return negative if should await signal
1960     */
1961    private int scan(WorkQueue w, int bound, int step, int r) {
1962        int stat = 0, wl; WorkQueue[] ws;
1963        if ((ws = workQueues) != null && w != null && (wl = ws.length) > 0) {
1964            for (int m = wl - 1,
1965                     origin = m & r, idx = origin,
1966                     npolls = 0,
1967                     ss = w.scanState;;) {         // negative if inactive
1968                WorkQueue q; ForkJoinTask<?>[] a; int b, al;
1969                if ((q = ws[idx]) != null && (b = q.base) - q.top < 0 &&
1970                    (a = q.array) != null && (al = a.length) > 0) {
1971                    int index = (al - 1) & b;
1972                    long offset = ((long)index << ASHIFT) + ABASE;
1973                    ForkJoinTask<?> t = (ForkJoinTask<?>)
1974                        U.getObjectVolatile(a, offset);
1975                    if (t == null)
1976                        break;                     // empty or busy
1977                    else if (b++ != q.base)
1978                        break;                     // busy
1979                    else if (ss < 0) {
1980                        tryReactivate(w, ws, r);
1981                        break;                     // retry upon rescan
1982                    }
1983                    else if (!U.compareAndSwapObject(a, offset, t, null))
1984                        break;                     // contended
1985                    else {
1986                        q.base = b;
1987                        w.currentSteal = t;
1988                        if (b != q.top)            // propagate signal
1989                            signalWork();
1990                        w.runTask(t);
1991                        if (++npolls > bound)
1992                            break;
1993                    }
1994                }
1995                else if (npolls != 0)              // rescan
1996                    break;
1997                else if ((idx = (idx + step) & m) == origin) {
1998                    if (ss < 0) {                  // await signal
1999                        stat = ss;
2000                        break;
2001                    }
2002                    else if (r >= 0) {
2003                        inactivate(w, ss);
2004                        break;
2005                    }
2006                    else
2007                        r <<= 1;                   // at most 31 rescans
2008                }
2009            }
2010        }
2011        return stat;
2012    }
2013
2014    // Joining tasks
2015
2016    /**
2017     * Tries to steal and run tasks within the target's computation.
2018     * Uses a variant of the top-level algorithm, restricted to tasks
2019     * with the given task as ancestor: It prefers taking and running
2020     * eligible tasks popped from the worker's own queue (via
2021     * popCC). Otherwise it scans others, randomly moving on
2022     * contention or execution, deciding to give up based on a
2023     * checksum (via return codes from pollAndExecCC). The maxTasks
2024     * argument supports external usages; internal calls use zero,
2025     * allowing unbounded steps (external calls trap non-positive
2026     * values).
2027     *
2028     * @param w caller
2029     * @param maxTasks if non-zero, the maximum number of other tasks to run
2030     * @return task status on exit
2031     */
2032    final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2033                           int maxTasks) {
2034        WorkQueue[] ws; int s = 0, wl;
2035        if ((ws = workQueues) != null && (wl = ws.length) > 1 &&
2036            task != null && w != null) {
2037            for (int m = wl - 1,
2038                     mode = w.config,
2039                     r = ~mode,                  // scanning seed
2040                     origin = r & m, k = origin, // first queue to scan
2041                     step = 3,                   // first scan step
2042                     h = 1,                      // 1:ran, >1:contended, <0:hash
2043                     oldSum = 0, checkSum = 0;;) {
2044                CountedCompleter<?> p; WorkQueue q; int i;
2045                if ((s = task.status) < 0)
2046                    break;
2047                if (h == 1 && (p = w.popCC(task, mode)) != null) {
2048                    p.doExec();                  // run local task
2049                    if (maxTasks != 0 && --maxTasks == 0)
2050                        break;
2051                    origin = k;                  // reset
2052                    oldSum = checkSum = 0;
2053                }
2054                else {                           // poll other worker queues
2055                    if ((i = k | 1) < 0 || i > m || (q = ws[i]) == null)
2056                        h = 0;
2057                    else if ((h = q.pollAndExecCC(task)) < 0)
2058                        checkSum += h;
2059                    if (h > 0) {
2060                        if (h == 1 && maxTasks != 0 && --maxTasks == 0)
2061                            break;
2062                        step = (r >>> 16) | 3;
2063                        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
2064                        k = origin = r & m;      // move and restart
2065                        oldSum = checkSum = 0;
2066                    }
2067                    else if ((k = (k + step) & m) == origin) {
2068                        if (oldSum == (oldSum = checkSum))
2069                            break;
2070                        checkSum = 0;
2071                    }
2072                }
2073            }
2074        }
2075        return s;
2076    }
2077
2078    /**
2079     * Tries to locate and execute tasks for a stealer of the given
2080     * task, or in turn one of its stealers. Traces currentSteal ->
2081     * currentJoin links looking for a thread working on a descendant
2082     * of the given task and with a non-empty queue to steal back and
2083     * execute tasks from. The first call to this method upon a
2084     * waiting join will often entail scanning/search, (which is OK
2085     * because the joiner has nothing better to do), but this method
2086     * leaves hints in workers to speed up subsequent calls.
2087     *
2088     * @param w caller
2089     * @param task the task to join
2090     */
2091    private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
2092        if (task != null && w != null) {
2093            ForkJoinTask<?> ps = w.currentSteal;
2094            WorkQueue[] ws; int wl, oldSum = 0;
2095            outer: while (w.tryRemoveAndExec(task) && task.status >= 0 &&
2096                          (ws = workQueues) != null && (wl = ws.length) > 0) {
2097                ForkJoinTask<?> subtask;
2098                int m = wl - 1, checkSum = 0;          // for stability check
2099                WorkQueue j = w, v;                    // v is subtask stealer
2100                descent: for (subtask = task; subtask.status >= 0; ) {
2101                    for (int h = j.hint | 1, k = 0, i;;) {
2102                        if ((v = ws[i = (h + (k << 1)) & m]) != null) {
2103                            if (v.currentSteal == subtask) {
2104                                j.hint = i;
2105                                break;
2106                            }
2107                            checkSum += v.base;
2108                        }
2109                        if (++k > m)                   // can't find stealer
2110                            break outer;
2111                    }
2112
2113                    for (;;) {                         // help v or descend
2114                        ForkJoinTask<?>[] a; int b, al;
2115                        if (subtask.status < 0)        // too late to help
2116                            break descent;
2117                        checkSum += (b = v.base);
2118                        ForkJoinTask<?> next = v.currentJoin;
2119                        ForkJoinTask<?> t = null;
2120                        if ((a = v.array) != null && (al = a.length) > 0) {
2121                            int index = (al - 1) & b;
2122                            long offset = ((long)index << ASHIFT) + ABASE;
2123                            t = (ForkJoinTask<?>)
2124                                U.getObjectVolatile(a, offset);
2125                            if (t != null && b++ == v.base) {
2126                                if (j.currentJoin != subtask ||
2127                                    v.currentSteal != subtask ||
2128                                    subtask.status < 0)
2129                                    break descent;     // stale
2130                                if (U.compareAndSwapObject(a, offset, t, null)) {
2131                                    v.base = b;
2132                                    w.currentSteal = t;
2133                                    for (int top = w.top;;) {
2134                                        t.doExec();    // help
2135                                        w.currentSteal = ps;
2136                                        if (task.status < 0)
2137                                            break outer;
2138                                        if (w.top == top)
2139                                            break;     // run local tasks
2140                                        if ((t = w.pop()) == null)
2141                                            break descent;
2142                                        w.currentSteal = t;
2143                                    }
2144                                }
2145                            }
2146                        }
2147                        if (t == null && b == v.base && b - v.top >= 0) {
2148                            if ((subtask = next) == null) {  // try to descend
2149                                if (next == v.currentJoin &&
2150                                    oldSum == (oldSum = checkSum))
2151                                    break outer;
2152                                break descent;
2153                            }
2154                            j = v;
2155                            break;
2156                        }
2157                    }
2158                }
2159            }
2160        }
2161    }
2162
2163    /**
2164     * Tries to decrement active count (sometimes implicitly) and
2165     * possibly release or create a compensating worker in preparation
2166     * for blocking. Returns false (retryable by caller), on
2167     * contention, detected staleness, instability, or termination.
2168     *
2169     * @param w caller
2170     */
2171    private boolean tryCompensate(WorkQueue w) {
2172        boolean canBlock; int wl;
2173        long c = ctl;
2174        WorkQueue[] ws = workQueues;
2175        int pc = config & SMASK;
2176        int ac = pc + (int)(c >> AC_SHIFT);
2177        int tc = pc + (short)(c >> TC_SHIFT);
2178        if (w == null || w.qlock < 0 || pc == 0 ||  // terminating or disabled
2179            ws == null || (wl = ws.length) <= 0)
2180            canBlock = false;
2181        else {
2182            int m = wl - 1, sp;
2183            boolean busy = true;                    // validate ac
2184            for (int i = 0; i <= m; ++i) {
2185                int k; WorkQueue v;
2186                if ((k = (i << 1) | 1) <= m && k >= 0 && (v = ws[k]) != null &&
2187                    v.scanState >= 0 && v.currentSteal == null) {
2188                    busy = false;
2189                    break;
2190                }
2191            }
2192            if (!busy || ctl != c)
2193                canBlock = false;                   // unstable or stale
2194            else if ((sp = (int)c) != 0)            // release idle worker
2195                canBlock = tryRelease(c, ws[m & sp], 0L);
2196            else if (tc >= pc && ac > 1 && w.isEmpty()) {
2197                long nc = ((AC_MASK & (c - AC_UNIT)) |
2198                           (~AC_MASK & c));         // uncompensated
2199                canBlock = U.compareAndSwapLong(this, CTL, c, nc);
2200            }
2201            else if (tc >= MAX_CAP ||
2202                     (this == common && tc >= pc + COMMON_MAX_SPARES))
2203                throw new RejectedExecutionException(
2204                    "Thread limit exceeded replacing blocked worker");
2205            else {                                  // similar to tryAddWorker
2206                boolean isSpare = (tc >= pc);
2207                long nc = (AC_MASK & c) | (TC_MASK & (c + TC_UNIT));
2208                canBlock = (U.compareAndSwapLong(this, CTL, c, nc) &&
2209                            createWorker(isSpare)); // throws on exception
2210            }
2211        }
2212        return canBlock;
2213    }
2214
2215    /**
2216     * Helps and/or blocks until the given task is done or timeout.
2217     *
2218     * @param w caller
2219     * @param task the task
2220     * @param deadline for timed waits, if nonzero
2221     * @return task status on exit
2222     */
2223    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
2224        int s = 0;
2225        if (w != null) {
2226            ForkJoinTask<?> prevJoin = w.currentJoin;
2227            if (task != null && (s = task.status) >= 0) {
2228                w.currentJoin = task;
2229                CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
2230                    (CountedCompleter<?>)task : null;
2231                for (;;) {
2232                    if (cc != null)
2233                        helpComplete(w, cc, 0);
2234                    else
2235                        helpStealer(w, task);
2236                    if ((s = task.status) < 0)
2237                        break;
2238                    long ms, ns;
2239                    if (deadline == 0L)
2240                        ms = 0L;
2241                    else if ((ns = deadline - System.nanoTime()) <= 0L)
2242                        break;
2243                    else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
2244                        ms = 1L;
2245                    if (tryCompensate(w)) {
2246                        task.internalWait(ms);
2247                        U.getAndAddLong(this, CTL, AC_UNIT);
2248                    }
2249                    if ((s = task.status) < 0)
2250                        break;
2251                }
2252                w.currentJoin = prevJoin;
2253            }
2254        }
2255        return s;
2256    }
2257
2258    // Specialized scanning
2259
2260    /**
2261     * Returns a (probably) non-empty steal queue, if one is found
2262     * during a scan, else null.  This method must be retried by
2263     * caller if, by the time it tries to use the queue, it is empty.
2264     */
2265    private WorkQueue findNonEmptyStealQueue() {
2266        WorkQueue[] ws; int wl;  // one-shot version of scan loop
2267        int r = ThreadLocalRandom.nextSecondarySeed();
2268        if ((ws = workQueues) != null && (wl = ws.length) > 0) {
2269            int m = wl - 1, origin = r & m;
2270            for (int k = origin, oldSum = 0, checkSum = 0;;) {
2271                WorkQueue q; int b;
2272                if ((q = ws[k]) != null) {
2273                    if ((b = q.base) - q.top < 0)
2274                        return q;
2275                    checkSum += b;
2276                }
2277                if ((k = (k + 1) & m) == origin) {
2278                    if (oldSum == (oldSum = checkSum))
2279                        break;
2280                    checkSum = 0;
2281                }
2282            }
2283        }
2284        return null;
2285    }
2286
2287    /**
2288     * Runs tasks until {@code isQuiescent()}. We piggyback on
2289     * active count ctl maintenance, but rather than blocking
2290     * when tasks cannot be found, we rescan until all others cannot
2291     * find tasks either.
2292     */
2293    final void helpQuiescePool(WorkQueue w) {
2294        ForkJoinTask<?> ps = w.currentSteal; // save context
2295        int wc = w.config;
2296        for (boolean active = true;;) {
2297            long c; WorkQueue q; ForkJoinTask<?> t;
2298            if (wc >= 0 && (t = w.pop()) != null) { // run locals if LIFO
2299                (w.currentSteal = t).doExec();
2300                w.currentSteal = ps;
2301            }
2302            else if ((q = findNonEmptyStealQueue()) != null) {
2303                if (!active) {      // re-establish active count
2304                    active = true;
2305                    U.getAndAddLong(this, CTL, AC_UNIT);
2306                }
2307                if ((t = q.pollAt(q.base)) != null) {
2308                    (w.currentSteal = t).doExec();
2309                    w.currentSteal = ps;
2310                    if (++w.nsteals < 0)
2311                        w.transferStealCount(this);
2312                }
2313            }
2314            else if (active) {      // decrement active count without queuing
2315                long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
2316                if (U.compareAndSwapLong(this, CTL, c, nc))
2317                    active = false;
2318            }
2319            else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
2320                     U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2321                break;
2322        }
2323    }
2324
2325    /**
2326     * Gets and removes a local or stolen task for the given worker.
2327     *
2328     * @return a task, if available
2329     */
2330    final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2331        for (ForkJoinTask<?> t;;) {
2332            WorkQueue q;
2333            if ((t = w.nextLocalTask()) != null)
2334                return t;
2335            if ((q = findNonEmptyStealQueue()) == null)
2336                return null;
2337            if ((t = q.pollAt(q.base)) != null)
2338                return t;
2339        }
2340    }
2341
2342    /**
2343     * Returns a cheap heuristic guide for task partitioning when
2344     * programmers, frameworks, tools, or languages have little or no
2345     * idea about task granularity.  In essence, by offering this
2346     * method, we ask users only about tradeoffs in overhead vs
2347     * expected throughput and its variance, rather than how finely to
2348     * partition tasks.
2349     *
2350     * In a steady state strict (tree-structured) computation, each
2351     * thread makes available for stealing enough tasks for other
2352     * threads to remain active. Inductively, if all threads play by
2353     * the same rules, each thread should make available only a
2354     * constant number of tasks.
2355     *
2356     * The minimum useful constant is just 1. But using a value of 1
2357     * would require immediate replenishment upon each steal to
2358     * maintain enough tasks, which is infeasible.  Further,
2359     * partitionings/granularities of offered tasks should minimize
2360     * steal rates, which in general means that threads nearer the top
2361     * of computation tree should generate more than those nearer the
2362     * bottom. In perfect steady state, each thread is at
2363     * approximately the same level of computation tree. However,
2364     * producing extra tasks amortizes the uncertainty of progress and
2365     * diffusion assumptions.
2366     *
2367     * So, users will want to use values larger (but not much larger)
2368     * than 1 to both smooth over transient shortages and hedge
2369     * against uneven progress; as traded off against the cost of
2370     * extra task overhead. We leave the user to pick a threshold
2371     * value to compare with the results of this call to guide
2372     * decisions, but recommend values such as 3.
2373     *
2374     * When all threads are active, it is on average OK to estimate
2375     * surplus strictly locally. In steady-state, if one thread is
2376     * maintaining say 2 surplus tasks, then so are others. So we can
2377     * just use estimated queue length.  However, this strategy alone
2378     * leads to serious mis-estimates in some non-steady-state
2379     * conditions (ramp-up, ramp-down, other stalls). We can detect
2380     * many of these by further considering the number of "idle"
2381     * threads, that are known to have zero queued tasks, so
2382     * compensate by a factor of (#idle/#active) threads.
2383     */
2384    static int getSurplusQueuedTaskCount() {
2385        Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2386        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2387            int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
2388            int n = (q = wt.workQueue).top - q.base;
2389            int a = (int)(pool.ctl >> AC_SHIFT) + p;
2390            return n - (a > (p >>>= 1) ? 0 :
2391                        a > (p >>>= 1) ? 1 :
2392                        a > (p >>>= 1) ? 2 :
2393                        a > (p >>>= 1) ? 4 :
2394                        8);
2395        }
2396        return 0;
2397    }
2398
2399    //  Termination
2400
2401    /**
2402     * Possibly initiates and/or completes termination.
2403     *
2404     * @param now if true, unconditionally terminate, else only
2405     * if no work and no active workers
2406     * @param enable if true, terminate when next possible
2407     * @return -1: terminating/terminated, 0: retry if internal caller, else 1
2408     */
2409    private int tryTerminate(boolean now, boolean enable) {
2410        int rs; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2411
2412        while ((rs = runState) >= 0) {
2413            if (!enable || this == common)        // cannot shutdown
2414                return 1;
2415            else if (rs == 0)
2416                tryInitialize(false);             // ensure initialized
2417            else
2418                U.compareAndSwapInt(this, RUNSTATE, rs, rs | SHUTDOWN);
2419        }
2420
2421        if ((rs & STOP) == 0) {                   // try to initiate termination
2422            if (!now) {                           // check quiescence
2423                for (long oldSum = 0L;;) {        // repeat until stable
2424                    WorkQueue[] ws; WorkQueue w; int b;
2425                    long checkSum = ctl;
2426                    if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
2427                        return 0;                 // still active workers
2428                    if ((ws = workQueues) != null) {
2429                        for (int i = 0; i < ws.length; ++i) {
2430                            if ((w = ws[i]) != null) {
2431                                checkSum += (b = w.base);
2432                                if (w.currentSteal != null || b != w.top)
2433                                    return 0;     // retry if internal caller
2434                            }
2435                        }
2436                    }
2437                    if (oldSum == (oldSum = checkSum))
2438                        break;
2439                }
2440            }
2441            do {} while (!U.compareAndSwapInt(this, RUNSTATE,
2442                                              rs = runState, rs | STOP));
2443        }
2444
2445        for (long oldSum = 0L;;) {                // repeat until stable
2446            WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt;
2447            long checkSum = ctl;
2448            if ((ws = workQueues) != null) {      // help terminate others
2449                for (int i = 0; i < ws.length; ++i) {
2450                    if ((w = ws[i]) != null) {
2451                        w.cancelAll();            // clear queues
2452                        checkSum += w.base;
2453                        if (w.qlock >= 0) {
2454                            w.qlock = -1;         // racy set OK
2455                            if ((wt = w.owner) != null) {
2456                                try {             // unblock join or park
2457                                    wt.interrupt();
2458                                } catch (Throwable ignore) {
2459                                }
2460                            }
2461                        }
2462                    }
2463                }
2464            }
2465            if (oldSum == (oldSum = checkSum))
2466                break;
2467        }
2468
2469        if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) {
2470            runState = (STARTED | SHUTDOWN | STOP | TERMINATED); // final write
2471            synchronized (this) {
2472                notifyAll();                      // for awaitTermination
2473            }
2474        }
2475
2476        return -1;
2477    }
2478
2479    // External operations
2480
2481    /**
2482     * Constructs and tries to install a new external queue,
2483     * failing if the workQueues array already has a queue at
2484     * the given index.
2485     *
2486     * @param index the index of the new queue
2487     */
2488    private void tryCreateExternalQueue(int index) {
2489        AuxState aux;
2490        if ((aux = auxState) != null && index >= 0) {
2491            WorkQueue q = new WorkQueue(this, null);
2492            q.config = index;
2493            q.scanState = ~UNSIGNALLED;
2494            q.qlock = 1;                   // lock queue
2495            boolean installed = false;
2496            aux.lock();
2497            try {                          // lock pool to install
2498                WorkQueue[] ws;
2499                if ((ws = workQueues) != null && index < ws.length &&
2500                    ws[index] == null) {
2501                    ws[index] = q;         // else throw away
2502                    installed = true;
2503                }
2504            } finally {
2505                aux.unlock();
2506            }
2507            if (installed) {
2508                try {
2509                    q.growArray();
2510                } finally {
2511                    q.qlock = 0;
2512                }
2513            }
2514        }
2515    }
2516
2517    /**
2518     * Adds the given task to a submission queue at submitter's
2519     * current queue. Also performs secondary initialization upon the
2520     * first submission of the first task to the pool, and detects
2521     * first submission by an external thread and creates a new shared
2522     * queue if the one at index if empty or contended.
2523     *
2524     * @param task the task. Caller must ensure non-null.
2525     */
2526    final void externalPush(ForkJoinTask<?> task) {
2527        int r;                            // initialize caller's probe
2528        if ((r = ThreadLocalRandom.getProbe()) == 0) {
2529            ThreadLocalRandom.localInit();
2530            r = ThreadLocalRandom.getProbe();
2531        }
2532        for (;;) {
2533            WorkQueue q; int wl, k, stat;
2534            int rs = runState;
2535            WorkQueue[] ws = workQueues;
2536            if (rs <= 0 || ws == null || (wl = ws.length) <= 0)
2537                tryInitialize(true);
2538            else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null)
2539                tryCreateExternalQueue(k);
2540            else if ((stat = q.sharedPush(task)) < 0)
2541                break;
2542            else if (stat == 0) {
2543                signalWork();
2544                break;
2545            }
2546            else                          // move if busy
2547                r = ThreadLocalRandom.advanceProbe(r);
2548        }
2549    }
2550
2551    /**
2552     * Pushes a possibly-external submission.
2553     */
2554    private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2555        Thread t; ForkJoinWorkerThread w; WorkQueue q;
2556        if (task == null)
2557            throw new NullPointerException();
2558        if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2559            (w = (ForkJoinWorkerThread)t).pool == this &&
2560            (q = w.workQueue) != null)
2561            q.push(task);
2562        else
2563            externalPush(task);
2564        return task;
2565    }
2566
2567    /**
2568     * Returns common pool queue for an external thread.
2569     */
2570    static WorkQueue commonSubmitterQueue() {
2571        ForkJoinPool p = common;
2572        int r = ThreadLocalRandom.getProbe();
2573        WorkQueue[] ws; int wl;
2574        return (p != null && (ws = p.workQueues) != null &&
2575                (wl = ws.length) > 0) ?
2576            ws[(wl - 1) & r & SQMASK] : null;
2577    }
2578
2579    /**
2580     * Performs tryUnpush for an external submitter.
2581     */
2582    final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2583        int r = ThreadLocalRandom.getProbe();
2584        WorkQueue[] ws; WorkQueue w; int wl;
2585        return ((ws = workQueues) != null &&
2586                (wl = ws.length) > 0 &&
2587                (w = ws[(wl - 1) & r & SQMASK]) != null &&
2588                w.trySharedUnpush(task));
2589    }
2590
2591    /**
2592     * Performs helpComplete for an external submitter.
2593     */
2594    final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2595        WorkQueue[] ws; int wl;
2596        int r = ThreadLocalRandom.getProbe();
2597        return ((ws = workQueues) != null && (wl = ws.length) > 0) ?
2598            helpComplete(ws[(wl - 1) & r & SQMASK], task, maxTasks) : 0;
2599    }
2600
2601    // Exported methods
2602
2603    // Constructors
2604
2605    /**
2606     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2607     * java.lang.Runtime#availableProcessors}, using the {@linkplain
2608     * #defaultForkJoinWorkerThreadFactory default thread factory},
2609     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2610     *
2611     * @throws SecurityException if a security manager exists and
2612     *         the caller is not permitted to modify threads
2613     *         because it does not hold {@link
2614     *         java.lang.RuntimePermission}{@code ("modifyThread")}
2615     */
2616    public ForkJoinPool() {
2617        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2618             defaultForkJoinWorkerThreadFactory, null, false);
2619    }
2620
2621    /**
2622     * Creates a {@code ForkJoinPool} with the indicated parallelism
2623     * level, the {@linkplain
2624     * #defaultForkJoinWorkerThreadFactory default thread factory},
2625     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2626     *
2627     * @param parallelism the parallelism level
2628     * @throws IllegalArgumentException if parallelism less than or
2629     *         equal to zero, or greater than implementation limit
2630     * @throws SecurityException if a security manager exists and
2631     *         the caller is not permitted to modify threads
2632     *         because it does not hold {@link
2633     *         java.lang.RuntimePermission}{@code ("modifyThread")}
2634     */
2635    public ForkJoinPool(int parallelism) {
2636        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2637    }
2638
2639    /**
2640     * Creates a {@code ForkJoinPool} with the given parameters.
2641     *
2642     * @param parallelism the parallelism level. For default value,
2643     * use {@link java.lang.Runtime#availableProcessors}.
2644     * @param factory the factory for creating new threads. For default value,
2645     * use {@link #defaultForkJoinWorkerThreadFactory}.
2646     * @param handler the handler for internal worker threads that
2647     * terminate due to unrecoverable errors encountered while executing
2648     * tasks. For default value, use {@code null}.
2649     * @param asyncMode if true,
2650     * establishes local first-in-first-out scheduling mode for forked
2651     * tasks that are never joined. This mode may be more appropriate
2652     * than default locally stack-based mode in applications in which
2653     * worker threads only process event-style asynchronous tasks.
2654     * For default value, use {@code false}.
2655     * @throws IllegalArgumentException if parallelism less than or
2656     *         equal to zero, or greater than implementation limit
2657     * @throws NullPointerException if the factory is null
2658     * @throws SecurityException if a security manager exists and
2659     *         the caller is not permitted to modify threads
2660     *         because it does not hold {@link
2661     *         java.lang.RuntimePermission}{@code ("modifyThread")}
2662     */
2663    public ForkJoinPool(int parallelism,
2664                        ForkJoinWorkerThreadFactory factory,
2665                        UncaughtExceptionHandler handler,
2666                        boolean asyncMode) {
2667        this(checkParallelism(parallelism),
2668             checkFactory(factory),
2669             handler,
2670             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
2671             "ForkJoinPool-" + nextPoolId() + "-worker-");
2672        checkPermission();
2673    }
2674
2675    private static int checkParallelism(int parallelism) {
2676        if (parallelism <= 0 || parallelism > MAX_CAP)
2677            throw new IllegalArgumentException();
2678        return parallelism;
2679    }
2680
2681    private static ForkJoinWorkerThreadFactory checkFactory
2682        (ForkJoinWorkerThreadFactory factory) {
2683        if (factory == null)
2684            throw new NullPointerException();
2685        return factory;
2686    }
2687
2688    /**
2689     * Creates a {@code ForkJoinPool} with the given parameters, without
2690     * any security checks or parameter validation.  Invoked directly by
2691     * makeCommonPool.
2692     */
2693    private ForkJoinPool(int parallelism,
2694                         ForkJoinWorkerThreadFactory factory,
2695                         UncaughtExceptionHandler handler,
2696                         int mode,
2697                         String workerNamePrefix) {
2698        this.workerNamePrefix = workerNamePrefix;
2699        this.factory = factory;
2700        this.ueh = handler;
2701        this.config = (parallelism & SMASK) | mode;
2702        long np = (long)(-parallelism); // offset ctl counts
2703        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2704    }
2705
2706    /**
2707     * Returns the common pool instance. This pool is statically
2708     * constructed; its run state is unaffected by attempts to {@link
2709     * #shutdown} or {@link #shutdownNow}. However this pool and any
2710     * ongoing processing are automatically terminated upon program
2711     * {@link System#exit}.  Any program that relies on asynchronous
2712     * task processing to complete before program termination should
2713     * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2714     * before exit.
2715     *
2716     * @return the common pool instance
2717     * @since 1.8
2718     */
2719    public static ForkJoinPool commonPool() {
2720        // assert common != null : "static init error";
2721        return common;
2722    }
2723
2724    // Execution methods
2725
2726    /**
2727     * Performs the given task, returning its result upon completion.
2728     * If the computation encounters an unchecked Exception or Error,
2729     * it is rethrown as the outcome of this invocation.  Rethrown
2730     * exceptions behave in the same way as regular exceptions, but,
2731     * when possible, contain stack traces (as displayed for example
2732     * using {@code ex.printStackTrace()}) of both the current thread
2733     * as well as the thread actually encountering the exception;
2734     * minimally only the latter.
2735     *
2736     * @param task the task
2737     * @param <T> the type of the task's result
2738     * @return the task's result
2739     * @throws NullPointerException if the task is null
2740     * @throws RejectedExecutionException if the task cannot be
2741     *         scheduled for execution
2742     */
2743    public <T> T invoke(ForkJoinTask<T> task) {
2744        if (task == null)
2745            throw new NullPointerException();
2746        externalSubmit(task);
2747        return task.join();
2748    }
2749
2750    /**
2751     * Arranges for (asynchronous) execution of the given task.
2752     *
2753     * @param task the task
2754     * @throws NullPointerException if the task is null
2755     * @throws RejectedExecutionException if the task cannot be
2756     *         scheduled for execution
2757     */
2758    public void execute(ForkJoinTask<?> task) {
2759        externalSubmit(task);
2760    }
2761
2762    // AbstractExecutorService methods
2763
2764    /**
2765     * @throws NullPointerException if the task is null
2766     * @throws RejectedExecutionException if the task cannot be
2767     *         scheduled for execution
2768     */
2769    public void execute(Runnable task) {
2770        if (task == null)
2771            throw new NullPointerException();
2772        ForkJoinTask<?> job;
2773        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2774            job = (ForkJoinTask<?>) task;
2775        else
2776            job = new ForkJoinTask.RunnableExecuteAction(task);
2777        externalSubmit(job);
2778    }
2779
2780    /**
2781     * Submits a ForkJoinTask for execution.
2782     *
2783     * @param task the task to submit
2784     * @param <T> the type of the task's result
2785     * @return the task
2786     * @throws NullPointerException if the task is null
2787     * @throws RejectedExecutionException if the task cannot be
2788     *         scheduled for execution
2789     */
2790    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2791        return externalSubmit(task);
2792    }
2793
2794    /**
2795     * @throws NullPointerException if the task is null
2796     * @throws RejectedExecutionException if the task cannot be
2797     *         scheduled for execution
2798     */
2799    public <T> ForkJoinTask<T> submit(Callable<T> task) {
2800        return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2801    }
2802
2803    /**
2804     * @throws NullPointerException if the task is null
2805     * @throws RejectedExecutionException if the task cannot be
2806     *         scheduled for execution
2807     */
2808    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2809        return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2810    }
2811
2812    /**
2813     * @throws NullPointerException if the task is null
2814     * @throws RejectedExecutionException if the task cannot be
2815     *         scheduled for execution
2816     */
2817    public ForkJoinTask<?> submit(Runnable task) {
2818        if (task == null)
2819            throw new NullPointerException();
2820        ForkJoinTask<?> job;
2821        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2822            job = (ForkJoinTask<?>) task;
2823        else
2824            job = new ForkJoinTask.AdaptedRunnableAction(task);
2825        return externalSubmit(job);
2826    }
2827
2828    /**
2829     * @throws NullPointerException       {@inheritDoc}
2830     * @throws RejectedExecutionException {@inheritDoc}
2831     */
2832    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2833        // In previous versions of this class, this method constructed
2834        // a task to run ForkJoinTask.invokeAll, but now external
2835        // invocation of multiple tasks is at least as efficient.
2836        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2837
2838        try {
2839            for (Callable<T> t : tasks) {
2840                ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2841                futures.add(f);
2842                externalSubmit(f);
2843            }
2844            for (int i = 0, size = futures.size(); i < size; i++)
2845                ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2846            return futures;
2847        } catch (Throwable t) {
2848            for (int i = 0, size = futures.size(); i < size; i++)
2849                futures.get(i).cancel(false);
2850            throw t;
2851        }
2852    }
2853
2854    /**
2855     * Returns the factory used for constructing new workers.
2856     *
2857     * @return the factory used for constructing new workers
2858     */
2859    public ForkJoinWorkerThreadFactory getFactory() {
2860        return factory;
2861    }
2862
2863    /**
2864     * Returns the handler for internal worker threads that terminate
2865     * due to unrecoverable errors encountered while executing tasks.
2866     *
2867     * @return the handler, or {@code null} if none
2868     */
2869    public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2870        return ueh;
2871    }
2872
2873    /**
2874     * Returns the targeted parallelism level of this pool.
2875     *
2876     * @return the targeted parallelism level of this pool
2877     */
2878    public int getParallelism() {
2879        int par;
2880        return ((par = config & SMASK) > 0) ? par : 1;
2881    }
2882
2883    /**
2884     * Returns the targeted parallelism level of the common pool.
2885     *
2886     * @return the targeted parallelism level of the common pool
2887     * @since 1.8
2888     */
2889    public static int getCommonPoolParallelism() {
2890        return COMMON_PARALLELISM;
2891    }
2892
2893    /**
2894     * Returns the number of worker threads that have started but not
2895     * yet terminated.  The result returned by this method may differ
2896     * from {@link #getParallelism} when threads are created to
2897     * maintain parallelism when others are cooperatively blocked.
2898     *
2899     * @return the number of worker threads
2900     */
2901    public int getPoolSize() {
2902        return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2903    }
2904
2905    /**
2906     * Returns {@code true} if this pool uses local first-in-first-out
2907     * scheduling mode for forked tasks that are never joined.
2908     *
2909     * @return {@code true} if this pool uses async mode
2910     */
2911    public boolean getAsyncMode() {
2912        return (config & FIFO_QUEUE) != 0;
2913    }
2914
2915    /**
2916     * Returns an estimate of the number of worker threads that are
2917     * not blocked waiting to join tasks or for other managed
2918     * synchronization. This method may overestimate the
2919     * number of running threads.
2920     *
2921     * @return the number of worker threads
2922     */
2923    public int getRunningThreadCount() {
2924        int rc = 0;
2925        WorkQueue[] ws; WorkQueue w;
2926        if ((ws = workQueues) != null) {
2927            for (int i = 1; i < ws.length; i += 2) {
2928                if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2929                    ++rc;
2930            }
2931        }
2932        return rc;
2933    }
2934
2935    /**
2936     * Returns an estimate of the number of threads that are currently
2937     * stealing or executing tasks. This method may overestimate the
2938     * number of active threads.
2939     *
2940     * @return the number of active threads
2941     */
2942    public int getActiveThreadCount() {
2943        int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2944        return (r <= 0) ? 0 : r; // suppress momentarily negative values
2945    }
2946
2947    /**
2948     * Returns {@code true} if all worker threads are currently idle.
2949     * An idle worker is one that cannot obtain a task to execute
2950     * because none are available to steal from other threads, and
2951     * there are no pending submissions to the pool. This method is
2952     * conservative; it might not return {@code true} immediately upon
2953     * idleness of all threads, but will eventually become true if
2954     * threads remain inactive.
2955     *
2956     * @return {@code true} if all threads are currently idle
2957     */
2958    public boolean isQuiescent() {
2959        return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
2960    }
2961
2962    /**
2963     * Returns an estimate of the total number of tasks stolen from
2964     * one thread's work queue by another. The reported value
2965     * underestimates the actual total number of steals when the pool
2966     * is not quiescent. This value may be useful for monitoring and
2967     * tuning fork/join programs: in general, steal counts should be
2968     * high enough to keep threads busy, but low enough to avoid
2969     * overhead and contention across threads.
2970     *
2971     * @return the number of steals
2972     */
2973    public long getStealCount() {
2974        AuxState sc = auxState;
2975        long count = (sc == null) ? 0L : sc.stealCount;
2976        WorkQueue[] ws; WorkQueue w;
2977        if ((ws = workQueues) != null) {
2978            for (int i = 1; i < ws.length; i += 2) {
2979                if ((w = ws[i]) != null)
2980                    count += w.nsteals;
2981            }
2982        }
2983        return count;
2984    }
2985
2986    /**
2987     * Returns an estimate of the total number of tasks currently held
2988     * in queues by worker threads (but not including tasks submitted
2989     * to the pool that have not begun executing). This value is only
2990     * an approximation, obtained by iterating across all threads in
2991     * the pool. This method may be useful for tuning task
2992     * granularities.
2993     *
2994     * @return the number of queued tasks
2995     */
2996    public long getQueuedTaskCount() {
2997        long count = 0;
2998        WorkQueue[] ws; WorkQueue w;
2999        if ((ws = workQueues) != null) {
3000            for (int i = 1; i < ws.length; i += 2) {
3001                if ((w = ws[i]) != null)
3002                    count += w.queueSize();
3003            }
3004        }
3005        return count;
3006    }
3007
3008    /**
3009     * Returns an estimate of the number of tasks submitted to this
3010     * pool that have not yet begun executing.  This method may take
3011     * time proportional to the number of submissions.
3012     *
3013     * @return the number of queued submissions
3014     */
3015    public int getQueuedSubmissionCount() {
3016        int count = 0;
3017        WorkQueue[] ws; WorkQueue w;
3018        if ((ws = workQueues) != null) {
3019            for (int i = 0; i < ws.length; i += 2) {
3020                if ((w = ws[i]) != null)
3021                    count += w.queueSize();
3022            }
3023        }
3024        return count;
3025    }
3026
3027    /**
3028     * Returns {@code true} if there are any tasks submitted to this
3029     * pool that have not yet begun executing.
3030     *
3031     * @return {@code true} if there are any queued submissions
3032     */
3033    public boolean hasQueuedSubmissions() {
3034        WorkQueue[] ws; WorkQueue w;
3035        if ((ws = workQueues) != null) {
3036            for (int i = 0; i < ws.length; i += 2) {
3037                if ((w = ws[i]) != null && !w.isEmpty())
3038                    return true;
3039            }
3040        }
3041        return false;
3042    }
3043
3044    /**
3045     * Removes and returns the next unexecuted submission if one is
3046     * available.  This method may be useful in extensions to this
3047     * class that re-assign work in systems with multiple pools.
3048     *
3049     * @return the next submission, or {@code null} if none
3050     */
3051    protected ForkJoinTask<?> pollSubmission() {
3052        WorkQueue[] ws; int wl; WorkQueue w; ForkJoinTask<?> t;
3053        int r = ThreadLocalRandom.nextSecondarySeed();
3054        if ((ws = workQueues) != null && (wl = ws.length) > 0) {
3055            for (int m = wl - 1, i = 0; i < wl; ++i) {
3056                if ((w = ws[(i << 1) & m]) != null && (t = w.poll()) != null)
3057                    return t;
3058            }
3059        }
3060        return null;
3061    }
3062
3063    /**
3064     * Removes all available unexecuted submitted and forked tasks
3065     * from scheduling queues and adds them to the given collection,
3066     * without altering their execution status. These may include
3067     * artificially generated or wrapped tasks. This method is
3068     * designed to be invoked only when the pool is known to be
3069     * quiescent. Invocations at other times may not remove all
3070     * tasks. A failure encountered while attempting to add elements
3071     * to collection {@code c} may result in elements being in
3072     * neither, either or both collections when the associated
3073     * exception is thrown.  The behavior of this operation is
3074     * undefined if the specified collection is modified while the
3075     * operation is in progress.
3076     *
3077     * @param c the collection to transfer elements into
3078     * @return the number of elements transferred
3079     */
3080    protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
3081        int count = 0;
3082        WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
3083        if ((ws = workQueues) != null) {
3084            for (int i = 0; i < ws.length; ++i) {
3085                if ((w = ws[i]) != null) {
3086                    while ((t = w.poll()) != null) {
3087                        c.add(t);
3088                        ++count;
3089                    }
3090                }
3091            }
3092        }
3093        return count;
3094    }
3095
3096    /**
3097     * Returns a string identifying this pool, as well as its state,
3098     * including indications of run state, parallelism level, and
3099     * worker and task counts.
3100     *
3101     * @return a string identifying this pool, as well as its state
3102     */
3103    public String toString() {
3104        // Use a single pass through workQueues to collect counts
3105        long qt = 0L, qs = 0L; int rc = 0;
3106        AuxState sc = auxState;
3107        long st = (sc == null) ? 0L : sc.stealCount;
3108        long c = ctl;
3109        WorkQueue[] ws; WorkQueue w;
3110        if ((ws = workQueues) != null) {
3111            for (int i = 0; i < ws.length; ++i) {
3112                if ((w = ws[i]) != null) {
3113                    int size = w.queueSize();
3114                    if ((i & 1) == 0)
3115                        qs += size;
3116                    else {
3117                        qt += size;
3118                        st += w.nsteals;
3119                        if (w.isApparentlyUnblocked())
3120                            ++rc;
3121                    }
3122                }
3123            }
3124        }
3125        int pc = (config & SMASK);
3126        int tc = pc + (short)(c >>> TC_SHIFT);
3127        int ac = pc + (int)(c >> AC_SHIFT);
3128        if (ac < 0) // ignore transient negative
3129            ac = 0;
3130        int rs = runState;
3131        String level = ((rs & TERMINATED) != 0 ? "Terminated" :
3132                        (rs & STOP)       != 0 ? "Terminating" :
3133                        (rs & SHUTDOWN)   != 0 ? "Shutting down" :
3134                        "Running");
3135        return super.toString() +
3136            "[" + level +
3137            ", parallelism = " + pc +
3138            ", size = " + tc +
3139            ", active = " + ac +
3140            ", running = " + rc +
3141            ", steals = " + st +
3142            ", tasks = " + qt +
3143            ", submissions = " + qs +
3144            "]";
3145    }
3146
3147    /**
3148     * Possibly initiates an orderly shutdown in which previously
3149     * submitted tasks are executed, but no new tasks will be
3150     * accepted. Invocation has no effect on execution state if this
3151     * is the {@link #commonPool()}, and no additional effect if
3152     * already shut down.  Tasks that are in the process of being
3153     * submitted concurrently during the course of this method may or
3154     * may not be rejected.
3155     *
3156     * @throws SecurityException if a security manager exists and
3157     *         the caller is not permitted to modify threads
3158     *         because it does not hold {@link
3159     *         java.lang.RuntimePermission}{@code ("modifyThread")}
3160     */
3161    public void shutdown() {
3162        checkPermission();
3163        tryTerminate(false, true);
3164    }
3165
3166    /**
3167     * Possibly attempts to cancel and/or stop all tasks, and reject
3168     * all subsequently submitted tasks.  Invocation has no effect on
3169     * execution state if this is the {@link #commonPool()}, and no
3170     * additional effect if already shut down. Otherwise, tasks that
3171     * are in the process of being submitted or executed concurrently
3172     * during the course of this method may or may not be
3173     * rejected. This method cancels both existing and unexecuted
3174     * tasks, in order to permit termination in the presence of task
3175     * dependencies. So the method always returns an empty list
3176     * (unlike the case for some other Executors).
3177     *
3178     * @return an empty list
3179     * @throws SecurityException if a security manager exists and
3180     *         the caller is not permitted to modify threads
3181     *         because it does not hold {@link
3182     *         java.lang.RuntimePermission}{@code ("modifyThread")}
3183     */
3184    public List<Runnable> shutdownNow() {
3185        checkPermission();
3186        tryTerminate(true, true);
3187        return Collections.emptyList();
3188    }
3189
3190    /**
3191     * Returns {@code true} if all tasks have completed following shut down.
3192     *
3193     * @return {@code true} if all tasks have completed following shut down
3194     */
3195    public boolean isTerminated() {
3196        return (runState & TERMINATED) != 0;
3197    }
3198
3199    /**
3200     * Returns {@code true} if the process of termination has
3201     * commenced but not yet completed.  This method may be useful for
3202     * debugging. A return of {@code true} reported a sufficient
3203     * period after shutdown may indicate that submitted tasks have
3204     * ignored or suppressed interruption, or are waiting for I/O,
3205     * causing this executor not to properly terminate. (See the
3206     * advisory notes for class {@link ForkJoinTask} stating that
3207     * tasks should not normally entail blocking operations.  But if
3208     * they do, they must abort them on interrupt.)
3209     *
3210     * @return {@code true} if terminating but not yet terminated
3211     */
3212    public boolean isTerminating() {
3213        int rs = runState;
3214        return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
3215    }
3216
3217    /**
3218     * Returns {@code true} if this pool has been shut down.
3219     *
3220     * @return {@code true} if this pool has been shut down
3221     */
3222    public boolean isShutdown() {
3223        return (runState & SHUTDOWN) != 0;
3224    }
3225
3226    /**
3227     * Blocks until all tasks have completed execution after a
3228     * shutdown request, or the timeout occurs, or the current thread
3229     * is interrupted, whichever happens first. Because the {@link
3230     * #commonPool()} never terminates until program shutdown, when
3231     * applied to the common pool, this method is equivalent to {@link
3232     * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3233     *
3234     * @param timeout the maximum time to wait
3235     * @param unit the time unit of the timeout argument
3236     * @return {@code true} if this executor terminated and
3237     *         {@code false} if the timeout elapsed before termination
3238     * @throws InterruptedException if interrupted while waiting
3239     */
3240    public boolean awaitTermination(long timeout, TimeUnit unit)
3241        throws InterruptedException {
3242        if (Thread.interrupted())
3243            throw new InterruptedException();
3244        if (this == common) {
3245            awaitQuiescence(timeout, unit);
3246            return false;
3247        }
3248        long nanos = unit.toNanos(timeout);
3249        if (isTerminated())
3250            return true;
3251        if (nanos <= 0L)
3252            return false;
3253        long deadline = System.nanoTime() + nanos;
3254        synchronized (this) {
3255            for (;;) {
3256                if (isTerminated())
3257                    return true;
3258                if (nanos <= 0L)
3259                    return false;
3260                long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
3261                wait(millis > 0L ? millis : 1L);
3262                nanos = deadline - System.nanoTime();
3263            }
3264        }
3265    }
3266
3267    /**
3268     * If called by a ForkJoinTask operating in this pool, equivalent
3269     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3270     * waits and/or attempts to assist performing tasks until this
3271     * pool {@link #isQuiescent} or the indicated timeout elapses.
3272     *
3273     * @param timeout the maximum time to wait
3274     * @param unit the time unit of the timeout argument
3275     * @return {@code true} if quiescent; {@code false} if the
3276     * timeout elapsed.
3277     */
3278    public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3279        long nanos = unit.toNanos(timeout);
3280        ForkJoinWorkerThread wt;
3281        Thread thread = Thread.currentThread();
3282        if ((thread instanceof ForkJoinWorkerThread) &&
3283            (wt = (ForkJoinWorkerThread)thread).pool == this) {
3284            helpQuiescePool(wt.workQueue);
3285            return true;
3286        }
3287        long startTime = System.nanoTime();
3288        WorkQueue[] ws;
3289        int r = 0, wl;
3290        boolean found = true;
3291        while (!isQuiescent() && (ws = workQueues) != null &&
3292               (wl = ws.length) > 0) {
3293            if (!found) {
3294                if ((System.nanoTime() - startTime) > nanos)
3295                    return false;
3296                Thread.yield(); // cannot block
3297            }
3298            found = false;
3299            for (int m = wl - 1, j = (m + 1) << 2; j >= 0; --j) {
3300                ForkJoinTask<?> t; WorkQueue q; int b, k;
3301                if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
3302                    (b = q.base) - q.top < 0) {
3303                    found = true;
3304                    if ((t = q.pollAt(b)) != null)
3305                        t.doExec();
3306                    break;
3307                }
3308            }
3309        }
3310        return true;
3311    }
3312
3313    /**
3314     * Waits and/or attempts to assist performing tasks indefinitely
3315     * until the {@link #commonPool()} {@link #isQuiescent}.
3316     */
3317    static void quiesceCommonPool() {
3318        common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3319    }
3320
3321    /**
3322     * Interface for extending managed parallelism for tasks running
3323     * in {@link ForkJoinPool}s.
3324     *
3325     * <p>A {@code ManagedBlocker} provides two methods.  Method
3326     * {@link #isReleasable} must return {@code true} if blocking is
3327     * not necessary. Method {@link #block} blocks the current thread
3328     * if necessary (perhaps internally invoking {@code isReleasable}
3329     * before actually blocking). These actions are performed by any
3330     * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3331     * The unusual methods in this API accommodate synchronizers that
3332     * may, but don't usually, block for long periods. Similarly, they
3333     * allow more efficient internal handling of cases in which
3334     * additional workers may be, but usually are not, needed to
3335     * ensure sufficient parallelism.  Toward this end,
3336     * implementations of method {@code isReleasable} must be amenable
3337     * to repeated invocation.
3338     *
3339     * <p>For example, here is a ManagedBlocker based on a
3340     * ReentrantLock:
3341     * <pre> {@code
3342     * class ManagedLocker implements ManagedBlocker {
3343     *   final ReentrantLock lock;
3344     *   boolean hasLock = false;
3345     *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3346     *   public boolean block() {
3347     *     if (!hasLock)
3348     *       lock.lock();
3349     *     return true;
3350     *   }
3351     *   public boolean isReleasable() {
3352     *     return hasLock || (hasLock = lock.tryLock());
3353     *   }
3354     * }}</pre>
3355     *
3356     * <p>Here is a class that possibly blocks waiting for an
3357     * item on a given queue:
3358     * <pre> {@code
3359     * class QueueTaker<E> implements ManagedBlocker {
3360     *   final BlockingQueue<E> queue;
3361     *   volatile E item = null;
3362     *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3363     *   public boolean block() throws InterruptedException {
3364     *     if (item == null)
3365     *       item = queue.take();
3366     *     return true;
3367     *   }
3368     *   public boolean isReleasable() {
3369     *     return item != null || (item = queue.poll()) != null;
3370     *   }
3371     *   public E getItem() { // call after pool.managedBlock completes
3372     *     return item;
3373     *   }
3374     * }}</pre>
3375     */
3376    public static interface ManagedBlocker {
3377        /**
3378         * Possibly blocks the current thread, for example waiting for
3379         * a lock or condition.
3380         *
3381         * @return {@code true} if no additional blocking is necessary
3382         * (i.e., if isReleasable would return true)
3383         * @throws InterruptedException if interrupted while waiting
3384         * (the method is not required to do so, but is allowed to)
3385         */
3386        boolean block() throws InterruptedException;
3387
3388        /**
3389         * Returns {@code true} if blocking is unnecessary.
3390         * @return {@code true} if blocking is unnecessary
3391         */
3392        boolean isReleasable();
3393    }
3394
3395    /**
3396     * Runs the given possibly blocking task.  When {@linkplain
3397     * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3398     * method possibly arranges for a spare thread to be activated if
3399     * necessary to ensure sufficient parallelism while the current
3400     * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3401     *
3402     * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3403     * {@code blocker.block()} until either method returns {@code true}.
3404     * Every call to {@code blocker.block()} is preceded by a call to
3405     * {@code blocker.isReleasable()} that returned {@code false}.
3406     *
3407     * <p>If not running in a ForkJoinPool, this method is
3408     * behaviorally equivalent to
3409     * <pre> {@code
3410     * while (!blocker.isReleasable())
3411     *   if (blocker.block())
3412     *     break;}</pre>
3413     *
3414     * If running in a ForkJoinPool, the pool may first be expanded to
3415     * ensure sufficient parallelism available during the call to
3416     * {@code blocker.block()}.
3417     *
3418     * @param blocker the blocker task
3419     * @throws InterruptedException if {@code blocker.block()} did so
3420     */
3421    public static void managedBlock(ManagedBlocker blocker)
3422        throws InterruptedException {
3423        ForkJoinPool p;
3424        ForkJoinWorkerThread wt;
3425        Thread t = Thread.currentThread();
3426        if ((t instanceof ForkJoinWorkerThread) &&
3427            (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
3428            WorkQueue w = wt.workQueue;
3429            while (!blocker.isReleasable()) {
3430                if (p.tryCompensate(w)) {
3431                    try {
3432                        do {} while (!blocker.isReleasable() &&
3433                                     !blocker.block());
3434                    } finally {
3435                        U.getAndAddLong(p, CTL, AC_UNIT);
3436                    }
3437                    break;
3438                }
3439            }
3440        }
3441        else {
3442            do {} while (!blocker.isReleasable() &&
3443                         !blocker.block());
3444        }
3445    }
3446
3447    // AbstractExecutorService overrides.  These rely on undocumented
3448    // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3449    // implement RunnableFuture.
3450
3451    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3452        return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3453    }
3454
3455    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3456        return new ForkJoinTask.AdaptedCallable<T>(callable);
3457    }
3458
3459    // Unsafe mechanics
3460    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3461    private static final long CTL;
3462    private static final long RUNSTATE;
3463    private static final int ABASE;
3464    private static final int ASHIFT;
3465
3466    static {
3467        try {
3468            CTL = U.objectFieldOffset
3469                (ForkJoinPool.class.getDeclaredField("ctl"));
3470            RUNSTATE = U.objectFieldOffset
3471                (ForkJoinPool.class.getDeclaredField("runState"));
3472            ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3473            int scale = U.arrayIndexScale(ForkJoinTask[].class);
3474            if ((scale & (scale - 1)) != 0)
3475                throw new Error("array index scale not a power of two");
3476            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3477        } catch (ReflectiveOperationException e) {
3478            throw new Error(e);
3479        }
3480
3481        // Reduce the risk of rare disastrous classloading in first call to
3482        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3483        Class<?> ensureLoaded = LockSupport.class;
3484
3485        int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3486        try {
3487            String p = System.getProperty
3488                ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3489            if (p != null)
3490                commonMaxSpares = Integer.parseInt(p);
3491        } catch (Exception ignore) {}
3492        COMMON_MAX_SPARES = commonMaxSpares;
3493
3494        defaultForkJoinWorkerThreadFactory =
3495            new DefaultForkJoinWorkerThreadFactory();
3496        modifyThreadPermission = new RuntimePermission("modifyThread");
3497
3498        common = java.security.AccessController.doPrivileged
3499            (new java.security.PrivilegedAction<ForkJoinPool>() {
3500                public ForkJoinPool run() { return makeCommonPool(); }});
3501
3502        // report 1 even if threads disabled
3503        COMMON_PARALLELISM = Math.max(common.config & SMASK, 1);
3504    }
3505
3506    /**
3507     * Creates and returns the common pool, respecting user settings
3508     * specified via system properties.
3509     */
3510    static ForkJoinPool makeCommonPool() {
3511        int parallelism = -1;
3512        ForkJoinWorkerThreadFactory factory = null;
3513        UncaughtExceptionHandler handler = null;
3514        try {  // ignore exceptions in accessing/parsing properties
3515            String pp = System.getProperty
3516                ("java.util.concurrent.ForkJoinPool.common.parallelism");
3517            String fp = System.getProperty
3518                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3519            String hp = System.getProperty
3520                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3521            if (pp != null)
3522                parallelism = Integer.parseInt(pp);
3523            if (fp != null)
3524                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3525                           getSystemClassLoader().loadClass(fp).newInstance());
3526            if (hp != null)
3527                handler = ((UncaughtExceptionHandler)ClassLoader.
3528                           getSystemClassLoader().loadClass(hp).newInstance());
3529        } catch (Exception ignore) {
3530        }
3531        if (factory == null) {
3532            if (System.getSecurityManager() == null)
3533                factory = defaultForkJoinWorkerThreadFactory;
3534            else // use security-managed default
3535                factory = new InnocuousForkJoinWorkerThreadFactory();
3536        }
3537        if (parallelism < 0 && // default 1 less than #cores
3538            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
3539            parallelism = 1;
3540        if (parallelism > MAX_CAP)
3541            parallelism = MAX_CAP;
3542        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
3543                                "ForkJoinPool.commonPool-worker-");
3544    }
3545
3546    /**
3547     * Factory for innocuous worker threads.
3548     */
3549    private static final class InnocuousForkJoinWorkerThreadFactory
3550        implements ForkJoinWorkerThreadFactory {
3551
3552        /**
3553         * An ACC to restrict permissions for the factory itself.
3554         * The constructed workers have no permissions set.
3555         */
3556        private static final AccessControlContext innocuousAcc;
3557        static {
3558            Permissions innocuousPerms = new Permissions();
3559            innocuousPerms.add(modifyThreadPermission);
3560            innocuousPerms.add(new RuntimePermission(
3561                                   "enableContextClassLoaderOverride"));
3562            innocuousPerms.add(new RuntimePermission(
3563                                   "modifyThreadGroup"));
3564            innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3565                    new ProtectionDomain(null, innocuousPerms)
3566                });
3567        }
3568
3569        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3570            return java.security.AccessController.doPrivileged(
3571                new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3572                    public ForkJoinWorkerThread run() {
3573                        return new ForkJoinWorkerThread.
3574                            InnocuousForkJoinWorkerThread(pool);
3575                    }}, innocuousAcc);
3576        }
3577    }
3578
3579}
3580