1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8
9import java.util.concurrent.atomic.AtomicReference;
10import java.util.concurrent.locks.LockSupport;
11
12/**
13 * A reusable synchronization barrier, similar in functionality to
14 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
15 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
16 * but supporting more flexible usage.
17 *
18 * <p><b>Registration.</b> Unlike the case for other barriers, the
19 * number of parties <em>registered</em> to synchronize on a phaser
20 * may vary over time.  Tasks may be registered at any time (using
21 * methods {@link #register}, {@link #bulkRegister}, or forms of
22 * constructors establishing initial numbers of parties), and
23 * optionally deregistered upon any arrival (using {@link
24 * #arriveAndDeregister}).  As is the case with most basic
25 * synchronization constructs, registration and deregistration affect
26 * only internal counts; they do not establish any further internal
27 * bookkeeping, so tasks cannot query whether they are registered.
28 * (However, you can introduce such bookkeeping by subclassing this
29 * class.)
30 *
31 * <p><b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
32 * Phaser} may be repeatedly awaited.  Method {@link
33 * #arriveAndAwaitAdvance} has effect analogous to {@link
34 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
35 * generation of a phaser has an associated phase number. The phase
36 * number starts at zero, and advances when all parties arrive at the
37 * phaser, wrapping around to zero after reaching {@code
38 * Integer.MAX_VALUE}. The use of phase numbers enables independent
39 * control of actions upon arrival at a phaser and upon awaiting
40 * others, via two kinds of methods that may be invoked by any
41 * registered party:
42 *
43 * <ul>
44 *
45 *   <li><b>Arrival.</b> Methods {@link #arrive} and
46 *       {@link #arriveAndDeregister} record arrival.  These methods
47 *       do not block, but return an associated <em>arrival phase
48 *       number</em>; that is, the phase number of the phaser to which
49 *       the arrival applied. When the final party for a given phase
50 *       arrives, an optional action is performed and the phase
51 *       advances.  These actions are performed by the party
52 *       triggering a phase advance, and are arranged by overriding
53 *       method {@link #onAdvance(int, int)}, which also controls
54 *       termination. Overriding this method is similar to, but more
55 *       flexible than, providing a barrier action to a {@code
56 *       CyclicBarrier}.
57 *
58 *   <li><b>Waiting.</b> Method {@link #awaitAdvance} requires an
59 *       argument indicating an arrival phase number, and returns when
60 *       the phaser advances to (or is already at) a different phase.
61 *       Unlike similar constructions using {@code CyclicBarrier},
62 *       method {@code awaitAdvance} continues to wait even if the
63 *       waiting thread is interrupted. Interruptible and timeout
64 *       versions are also available, but exceptions encountered while
65 *       tasks wait interruptibly or with timeout do not change the
66 *       state of the phaser. If necessary, you can perform any
67 *       associated recovery within handlers of those exceptions,
68 *       often after invoking {@code forceTermination}.  Phasers may
69 *       also be used by tasks executing in a {@link ForkJoinPool}.
70 *       Progress is ensured if the pool's parallelismLevel can
71 *       accommodate the maximum number of simultaneously blocked
72 *       parties.
73 *
74 * </ul>
75 *
76 * <p><b>Termination.</b> A phaser may enter a <em>termination</em>
77 * state, that may be checked using method {@link #isTerminated}. Upon
78 * termination, all synchronization methods immediately return without
79 * waiting for advance, as indicated by a negative return value.
80 * Similarly, attempts to register upon termination have no effect.
81 * Termination is triggered when an invocation of {@code onAdvance}
82 * returns {@code true}. The default implementation returns {@code
83 * true} if a deregistration has caused the number of registered
84 * parties to become zero.  As illustrated below, when phasers control
85 * actions with a fixed number of iterations, it is often convenient
86 * to override this method to cause termination when the current phase
87 * number reaches a threshold. Method {@link #forceTermination} is
88 * also available to abruptly release waiting threads and allow them
89 * to terminate.
90 *
91 * <p><b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
92 * constructed in tree structures) to reduce contention. Phasers with
93 * large numbers of parties that would otherwise experience heavy
94 * synchronization contention costs may instead be set up so that
95 * groups of sub-phasers share a common parent.  This may greatly
96 * increase throughput even though it incurs greater per-operation
97 * overhead.
98 *
99 * <p>In a tree of tiered phasers, registration and deregistration of
100 * child phasers with their parent are managed automatically.
101 * Whenever the number of registered parties of a child phaser becomes
102 * non-zero (as established in the {@link #Phaser(Phaser,int)}
103 * constructor, {@link #register}, or {@link #bulkRegister}), the
104 * child phaser is registered with its parent.  Whenever the number of
105 * registered parties becomes zero as the result of an invocation of
106 * {@link #arriveAndDeregister}, the child phaser is deregistered
107 * from its parent.
108 *
109 * <p><b>Monitoring.</b> While synchronization methods may be invoked
110 * only by registered parties, the current state of a phaser may be
111 * monitored by any caller.  At any given moment there are {@link
112 * #getRegisteredParties} parties in total, of which {@link
113 * #getArrivedParties} have arrived at the current phase ({@link
114 * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
115 * parties arrive, the phase advances.  The values returned by these
116 * methods may reflect transient states and so are not in general
117 * useful for synchronization control.  Method {@link #toString}
118 * returns snapshots of these state queries in a form convenient for
119 * informal monitoring.
120 *
121 * <p><b>Sample usages:</b>
122 *
123 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
124 * to control a one-shot action serving a variable number of parties.
125 * The typical idiom is for the method setting this up to first
126 * register, then start the actions, then deregister, as in:
127 *
128 * <pre> {@code
129 * void runTasks(List<Runnable> tasks) {
130 *   final Phaser phaser = new Phaser(1); // "1" to register self
131 *   // create and start threads
132 *   for (final Runnable task : tasks) {
133 *     phaser.register();
134 *     new Thread() {
135 *       public void run() {
136 *         phaser.arriveAndAwaitAdvance(); // await all creation
137 *         task.run();
138 *       }
139 *     }.start();
140 *   }
141 *
142 *   // allow threads to start and deregister self
143 *   phaser.arriveAndDeregister();
144 * }}</pre>
145 *
146 * <p>One way to cause a set of threads to repeatedly perform actions
147 * for a given number of iterations is to override {@code onAdvance}:
148 *
149 * <pre> {@code
150 * void startTasks(List<Runnable> tasks, final int iterations) {
151 *   final Phaser phaser = new Phaser() {
152 *     protected boolean onAdvance(int phase, int registeredParties) {
153 *       return phase >= iterations || registeredParties == 0;
154 *     }
155 *   };
156 *   phaser.register();
157 *   for (final Runnable task : tasks) {
158 *     phaser.register();
159 *     new Thread() {
160 *       public void run() {
161 *         do {
162 *           task.run();
163 *           phaser.arriveAndAwaitAdvance();
164 *         } while (!phaser.isTerminated());
165 *       }
166 *     }.start();
167 *   }
168 *   phaser.arriveAndDeregister(); // deregister self, don't wait
169 * }}</pre>
170 *
171 * If the main task must later await termination, it
172 * may re-register and then execute a similar loop:
173 * <pre> {@code
174 *   // ...
175 *   phaser.register();
176 *   while (!phaser.isTerminated())
177 *     phaser.arriveAndAwaitAdvance();}</pre>
178 *
179 * <p>Related constructions may be used to await particular phase numbers
180 * in contexts where you are sure that the phase will never wrap around
181 * {@code Integer.MAX_VALUE}. For example:
182 *
183 * <pre> {@code
184 * void awaitPhase(Phaser phaser, int phase) {
185 *   int p = phaser.register(); // assumes caller not already registered
186 *   while (p < phase) {
187 *     if (phaser.isTerminated())
188 *       // ... deal with unexpected termination
189 *     else
190 *       p = phaser.arriveAndAwaitAdvance();
191 *   }
192 *   phaser.arriveAndDeregister();
193 * }}</pre>
194 *
195 *
196 * <p>To create a set of {@code n} tasks using a tree of phasers, you
197 * could use code of the following form, assuming a Task class with a
198 * constructor accepting a {@code Phaser} that it registers with upon
199 * construction. After invocation of {@code build(new Task[n], 0, n,
200 * new Phaser())}, these tasks could then be started, for example by
201 * submitting to a pool:
202 *
203 * <pre> {@code
204 * void build(Task[] tasks, int lo, int hi, Phaser ph) {
205 *   if (hi - lo > TASKS_PER_PHASER) {
206 *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
207 *       int j = Math.min(i + TASKS_PER_PHASER, hi);
208 *       build(tasks, i, j, new Phaser(ph));
209 *     }
210 *   } else {
211 *     for (int i = lo; i < hi; ++i)
212 *       tasks[i] = new Task(ph);
213 *       // assumes new Task(ph) performs ph.register()
214 *   }
215 * }}</pre>
216 *
217 * The best value of {@code TASKS_PER_PHASER} depends mainly on
218 * expected synchronization rates. A value as low as four may
219 * be appropriate for extremely small per-phase task bodies (thus
220 * high rates), or up to hundreds for extremely large ones.
221 *
222 * <p><b>Implementation notes</b>: This implementation restricts the
223 * maximum number of parties to 65535. Attempts to register additional
224 * parties result in {@code IllegalStateException}. However, you can and
225 * should create tiered phasers to accommodate arbitrarily large sets
226 * of participants.
227 *
228 * @since 1.7
229 * @author Doug Lea
230 */
231public class Phaser {
232    /*
233     * This class implements an extension of X10 "clocks".  Thanks to
234     * Vijay Saraswat for the idea, and to Vivek Sarkar for
235     * enhancements to extend functionality.
236     */
237
238    /**
239     * Primary state representation, holding four bit-fields:
240     *
241     * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
242     * parties    -- the number of parties to wait            (bits 16-31)
243     * phase      -- the generation of the barrier            (bits 32-62)
244     * terminated -- set if barrier is terminated             (bit  63 / sign)
245     *
246     * Except that a phaser with no registered parties is
247     * distinguished by the otherwise illegal state of having zero
248     * parties and one unarrived parties (encoded as EMPTY below).
249     *
250     * To efficiently maintain atomicity, these values are packed into
251     * a single (atomic) long. Good performance relies on keeping
252     * state decoding and encoding simple, and keeping race windows
253     * short.
254     *
255     * All state updates are performed via CAS except initial
256     * registration of a sub-phaser (i.e., one with a non-null
257     * parent).  In this (relatively rare) case, we use built-in
258     * synchronization to lock while first registering with its
259     * parent.
260     *
261     * The phase of a subphaser is allowed to lag that of its
262     * ancestors until it is actually accessed -- see method
263     * reconcileState.
264     */
265    private volatile long state;
266
267    private static final int  MAX_PARTIES     = 0xffff;
268    private static final int  MAX_PHASE       = Integer.MAX_VALUE;
269    private static final int  PARTIES_SHIFT   = 16;
270    private static final int  PHASE_SHIFT     = 32;
271    private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
272    private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
273    private static final long COUNTS_MASK     = 0xffffffffL;
274    private static final long TERMINATION_BIT = 1L << 63;
275
276    // some special values
277    private static final int  ONE_ARRIVAL     = 1;
278    private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
279    private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
280    private static final int  EMPTY           = 1;
281
282    // The following unpacking methods are usually manually inlined
283
284    private static int unarrivedOf(long s) {
285        int counts = (int)s;
286        return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
287    }
288
289    private static int partiesOf(long s) {
290        return (int)s >>> PARTIES_SHIFT;
291    }
292
293    private static int phaseOf(long s) {
294        return (int)(s >>> PHASE_SHIFT);
295    }
296
297    private static int arrivedOf(long s) {
298        int counts = (int)s;
299        return (counts == EMPTY) ? 0 :
300            (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
301    }
302
303    /**
304     * The parent of this phaser, or null if none.
305     */
306    private final Phaser parent;
307
308    /**
309     * The root of phaser tree. Equals this if not in a tree.
310     */
311    private final Phaser root;
312
313    /**
314     * Heads of Treiber stacks for waiting threads. To eliminate
315     * contention when releasing some threads while adding others, we
316     * use two of them, alternating across even and odd phases.
317     * Subphasers share queues with root to speed up releases.
318     */
319    private final AtomicReference<QNode> evenQ;
320    private final AtomicReference<QNode> oddQ;
321
322    private AtomicReference<QNode> queueFor(int phase) {
323        return ((phase & 1) == 0) ? evenQ : oddQ;
324    }
325
326    /**
327     * Returns message string for bounds exceptions on arrival.
328     */
329    private String badArrive(long s) {
330        return "Attempted arrival of unregistered party for " +
331            stateToString(s);
332    }
333
334    /**
335     * Returns message string for bounds exceptions on registration.
336     */
337    private String badRegister(long s) {
338        return "Attempt to register more than " +
339            MAX_PARTIES + " parties for " + stateToString(s);
340    }
341
342    /**
343     * Main implementation for methods arrive and arriveAndDeregister.
344     * Manually tuned to speed up and minimize race windows for the
345     * common case of just decrementing unarrived field.
346     *
347     * @param adjust value to subtract from state;
348     *               ONE_ARRIVAL for arrive,
349     *               ONE_DEREGISTER for arriveAndDeregister
350     */
351    private int doArrive(int adjust) {
352        final Phaser root = this.root;
353        for (;;) {
354            long s = (root == this) ? state : reconcileState();
355            int phase = (int)(s >>> PHASE_SHIFT);
356            if (phase < 0)
357                return phase;
358            int counts = (int)s;
359            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
360            if (unarrived <= 0)
361                throw new IllegalStateException(badArrive(s));
362            if (U.compareAndSwapLong(this, STATE, s, s-=adjust)) {
363                if (unarrived == 1) {
364                    long n = s & PARTIES_MASK;  // base of next state
365                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
366                    if (root == this) {
367                        if (onAdvance(phase, nextUnarrived))
368                            n |= TERMINATION_BIT;
369                        else if (nextUnarrived == 0)
370                            n |= EMPTY;
371                        else
372                            n |= nextUnarrived;
373                        int nextPhase = (phase + 1) & MAX_PHASE;
374                        n |= (long)nextPhase << PHASE_SHIFT;
375                        U.compareAndSwapLong(this, STATE, s, n);
376                        releaseWaiters(phase);
377                    }
378                    else if (nextUnarrived == 0) { // propagate deregistration
379                        phase = parent.doArrive(ONE_DEREGISTER);
380                        U.compareAndSwapLong(this, STATE, s, s | EMPTY);
381                    }
382                    else
383                        phase = parent.doArrive(ONE_ARRIVAL);
384                }
385                return phase;
386            }
387        }
388    }
389
390    /**
391     * Implementation of register, bulkRegister.
392     *
393     * @param registrations number to add to both parties and
394     * unarrived fields. Must be greater than zero.
395     */
396    private int doRegister(int registrations) {
397        // adjustment to state
398        long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
399        final Phaser parent = this.parent;
400        int phase;
401        for (;;) {
402            long s = (parent == null) ? state : reconcileState();
403            int counts = (int)s;
404            int parties = counts >>> PARTIES_SHIFT;
405            int unarrived = counts & UNARRIVED_MASK;
406            if (registrations > MAX_PARTIES - parties)
407                throw new IllegalStateException(badRegister(s));
408            phase = (int)(s >>> PHASE_SHIFT);
409            if (phase < 0)
410                break;
411            if (counts != EMPTY) {                  // not 1st registration
412                if (parent == null || reconcileState() == s) {
413                    if (unarrived == 0)             // wait out advance
414                        root.internalAwaitAdvance(phase, null);
415                    else if (U.compareAndSwapLong(this, STATE, s, s + adjust))
416                        break;
417                }
418            }
419            else if (parent == null) {              // 1st root registration
420                long next = ((long)phase << PHASE_SHIFT) | adjust;
421                if (U.compareAndSwapLong(this, STATE, s, next))
422                    break;
423            }
424            else {
425                synchronized (this) {               // 1st sub registration
426                    if (state == s) {               // recheck under lock
427                        phase = parent.doRegister(1);
428                        if (phase < 0)
429                            break;
430                        // finish registration whenever parent registration
431                        // succeeded, even when racing with termination,
432                        // since these are part of the same "transaction".
433                        while (!U.compareAndSwapLong
434                               (this, STATE, s,
435                                ((long)phase << PHASE_SHIFT) | adjust)) {
436                            s = state;
437                            phase = (int)(root.state >>> PHASE_SHIFT);
438                            // assert (int)s == EMPTY;
439                        }
440                        break;
441                    }
442                }
443            }
444        }
445        return phase;
446    }
447
448    /**
449     * Resolves lagged phase propagation from root if necessary.
450     * Reconciliation normally occurs when root has advanced but
451     * subphasers have not yet done so, in which case they must finish
452     * their own advance by setting unarrived to parties (or if
453     * parties is zero, resetting to unregistered EMPTY state).
454     *
455     * @return reconciled state
456     */
457    private long reconcileState() {
458        final Phaser root = this.root;
459        long s = state;
460        if (root != this) {
461            int phase, p;
462            // CAS to root phase with current parties, tripping unarrived
463            while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
464                   (int)(s >>> PHASE_SHIFT) &&
465                   !U.compareAndSwapLong
466                   (this, STATE, s,
467                    s = (((long)phase << PHASE_SHIFT) |
468                         ((phase < 0) ? (s & COUNTS_MASK) :
469                          (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
470                           ((s & PARTIES_MASK) | p))))))
471                s = state;
472        }
473        return s;
474    }
475
476    /**
477     * Creates a new phaser with no initially registered parties, no
478     * parent, and initial phase number 0. Any thread using this
479     * phaser will need to first register for it.
480     */
481    public Phaser() {
482        this(null, 0);
483    }
484
485    /**
486     * Creates a new phaser with the given number of registered
487     * unarrived parties, no parent, and initial phase number 0.
488     *
489     * @param parties the number of parties required to advance to the
490     * next phase
491     * @throws IllegalArgumentException if parties less than zero
492     * or greater than the maximum number of parties supported
493     */
494    public Phaser(int parties) {
495        this(null, parties);
496    }
497
498    /**
499     * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
500     *
501     * @param parent the parent phaser
502     */
503    public Phaser(Phaser parent) {
504        this(parent, 0);
505    }
506
507    /**
508     * Creates a new phaser with the given parent and number of
509     * registered unarrived parties.  When the given parent is non-null
510     * and the given number of parties is greater than zero, this
511     * child phaser is registered with its parent.
512     *
513     * @param parent the parent phaser
514     * @param parties the number of parties required to advance to the
515     * next phase
516     * @throws IllegalArgumentException if parties less than zero
517     * or greater than the maximum number of parties supported
518     */
519    public Phaser(Phaser parent, int parties) {
520        if (parties >>> PARTIES_SHIFT != 0)
521            throw new IllegalArgumentException("Illegal number of parties");
522        int phase = 0;
523        this.parent = parent;
524        if (parent != null) {
525            final Phaser root = parent.root;
526            this.root = root;
527            this.evenQ = root.evenQ;
528            this.oddQ = root.oddQ;
529            if (parties != 0)
530                phase = parent.doRegister(1);
531        }
532        else {
533            this.root = this;
534            this.evenQ = new AtomicReference<QNode>();
535            this.oddQ = new AtomicReference<QNode>();
536        }
537        this.state = (parties == 0) ? (long)EMPTY :
538            ((long)phase << PHASE_SHIFT) |
539            ((long)parties << PARTIES_SHIFT) |
540            ((long)parties);
541    }
542
543    /**
544     * Adds a new unarrived party to this phaser.  If an ongoing
545     * invocation of {@link #onAdvance} is in progress, this method
546     * may await its completion before returning.  If this phaser has
547     * a parent, and this phaser previously had no registered parties,
548     * this child phaser is also registered with its parent. If
549     * this phaser is terminated, the attempt to register has
550     * no effect, and a negative value is returned.
551     *
552     * @return the arrival phase number to which this registration
553     * applied.  If this value is negative, then this phaser has
554     * terminated, in which case registration has no effect.
555     * @throws IllegalStateException if attempting to register more
556     * than the maximum supported number of parties
557     */
558    public int register() {
559        return doRegister(1);
560    }
561
562    /**
563     * Adds the given number of new unarrived parties to this phaser.
564     * If an ongoing invocation of {@link #onAdvance} is in progress,
565     * this method may await its completion before returning.  If this
566     * phaser has a parent, and the given number of parties is greater
567     * than zero, and this phaser previously had no registered
568     * parties, this child phaser is also registered with its parent.
569     * If this phaser is terminated, the attempt to register has no
570     * effect, and a negative value is returned.
571     *
572     * @param parties the number of additional parties required to
573     * advance to the next phase
574     * @return the arrival phase number to which this registration
575     * applied.  If this value is negative, then this phaser has
576     * terminated, in which case registration has no effect.
577     * @throws IllegalStateException if attempting to register more
578     * than the maximum supported number of parties
579     * @throws IllegalArgumentException if {@code parties < 0}
580     */
581    public int bulkRegister(int parties) {
582        if (parties < 0)
583            throw new IllegalArgumentException();
584        if (parties == 0)
585            return getPhase();
586        return doRegister(parties);
587    }
588
589    /**
590     * Arrives at this phaser, without waiting for others to arrive.
591     *
592     * <p>It is a usage error for an unregistered party to invoke this
593     * method.  However, this error may result in an {@code
594     * IllegalStateException} only upon some subsequent operation on
595     * this phaser, if ever.
596     *
597     * @return the arrival phase number, or a negative value if terminated
598     * @throws IllegalStateException if not terminated and the number
599     * of unarrived parties would become negative
600     */
601    public int arrive() {
602        return doArrive(ONE_ARRIVAL);
603    }
604
605    /**
606     * Arrives at this phaser and deregisters from it without waiting
607     * for others to arrive. Deregistration reduces the number of
608     * parties required to advance in future phases.  If this phaser
609     * has a parent, and deregistration causes this phaser to have
610     * zero parties, this phaser is also deregistered from its parent.
611     *
612     * <p>It is a usage error for an unregistered party to invoke this
613     * method.  However, this error may result in an {@code
614     * IllegalStateException} only upon some subsequent operation on
615     * this phaser, if ever.
616     *
617     * @return the arrival phase number, or a negative value if terminated
618     * @throws IllegalStateException if not terminated and the number
619     * of registered or unarrived parties would become negative
620     */
621    public int arriveAndDeregister() {
622        return doArrive(ONE_DEREGISTER);
623    }
624
625    /**
626     * Arrives at this phaser and awaits others. Equivalent in effect
627     * to {@code awaitAdvance(arrive())}.  If you need to await with
628     * interruption or timeout, you can arrange this with an analogous
629     * construction using one of the other forms of the {@code
630     * awaitAdvance} method.  If instead you need to deregister upon
631     * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
632     *
633     * <p>It is a usage error for an unregistered party to invoke this
634     * method.  However, this error may result in an {@code
635     * IllegalStateException} only upon some subsequent operation on
636     * this phaser, if ever.
637     *
638     * @return the arrival phase number, or the (negative)
639     * {@linkplain #getPhase() current phase} if terminated
640     * @throws IllegalStateException if not terminated and the number
641     * of unarrived parties would become negative
642     */
643    public int arriveAndAwaitAdvance() {
644        // Specialization of doArrive+awaitAdvance eliminating some reads/paths
645        final Phaser root = this.root;
646        for (;;) {
647            long s = (root == this) ? state : reconcileState();
648            int phase = (int)(s >>> PHASE_SHIFT);
649            if (phase < 0)
650                return phase;
651            int counts = (int)s;
652            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
653            if (unarrived <= 0)
654                throw new IllegalStateException(badArrive(s));
655            if (U.compareAndSwapLong(this, STATE, s, s -= ONE_ARRIVAL)) {
656                if (unarrived > 1)
657                    return root.internalAwaitAdvance(phase, null);
658                if (root != this)
659                    return parent.arriveAndAwaitAdvance();
660                long n = s & PARTIES_MASK;  // base of next state
661                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
662                if (onAdvance(phase, nextUnarrived))
663                    n |= TERMINATION_BIT;
664                else if (nextUnarrived == 0)
665                    n |= EMPTY;
666                else
667                    n |= nextUnarrived;
668                int nextPhase = (phase + 1) & MAX_PHASE;
669                n |= (long)nextPhase << PHASE_SHIFT;
670                if (!U.compareAndSwapLong(this, STATE, s, n))
671                    return (int)(state >>> PHASE_SHIFT); // terminated
672                releaseWaiters(phase);
673                return nextPhase;
674            }
675        }
676    }
677
678    /**
679     * Awaits the phase of this phaser to advance from the given phase
680     * value, returning immediately if the current phase is not equal
681     * to the given phase value or this phaser is terminated.
682     *
683     * @param phase an arrival phase number, or negative value if
684     * terminated; this argument is normally the value returned by a
685     * previous call to {@code arrive} or {@code arriveAndDeregister}.
686     * @return the next arrival phase number, or the argument if it is
687     * negative, or the (negative) {@linkplain #getPhase() current phase}
688     * if terminated
689     */
690    public int awaitAdvance(int phase) {
691        final Phaser root = this.root;
692        long s = (root == this) ? state : reconcileState();
693        int p = (int)(s >>> PHASE_SHIFT);
694        if (phase < 0)
695            return phase;
696        if (p == phase)
697            return root.internalAwaitAdvance(phase, null);
698        return p;
699    }
700
701    /**
702     * Awaits the phase of this phaser to advance from the given phase
703     * value, throwing {@code InterruptedException} if interrupted
704     * while waiting, or returning immediately if the current phase is
705     * not equal to the given phase value or this phaser is
706     * terminated.
707     *
708     * @param phase an arrival phase number, or negative value if
709     * terminated; this argument is normally the value returned by a
710     * previous call to {@code arrive} or {@code arriveAndDeregister}.
711     * @return the next arrival phase number, or the argument if it is
712     * negative, or the (negative) {@linkplain #getPhase() current phase}
713     * if terminated
714     * @throws InterruptedException if thread interrupted while waiting
715     */
716    public int awaitAdvanceInterruptibly(int phase)
717        throws InterruptedException {
718        final Phaser root = this.root;
719        long s = (root == this) ? state : reconcileState();
720        int p = (int)(s >>> PHASE_SHIFT);
721        if (phase < 0)
722            return phase;
723        if (p == phase) {
724            QNode node = new QNode(this, phase, true, false, 0L);
725            p = root.internalAwaitAdvance(phase, node);
726            if (node.wasInterrupted)
727                throw new InterruptedException();
728        }
729        return p;
730    }
731
732    /**
733     * Awaits the phase of this phaser to advance from the given phase
734     * value or the given timeout to elapse, throwing {@code
735     * InterruptedException} if interrupted while waiting, or
736     * returning immediately if the current phase is not equal to the
737     * given phase value or this phaser is terminated.
738     *
739     * @param phase an arrival phase number, or negative value if
740     * terminated; this argument is normally the value returned by a
741     * previous call to {@code arrive} or {@code arriveAndDeregister}.
742     * @param timeout how long to wait before giving up, in units of
743     *        {@code unit}
744     * @param unit a {@code TimeUnit} determining how to interpret the
745     *        {@code timeout} parameter
746     * @return the next arrival phase number, or the argument if it is
747     * negative, or the (negative) {@linkplain #getPhase() current phase}
748     * if terminated
749     * @throws InterruptedException if thread interrupted while waiting
750     * @throws TimeoutException if timed out while waiting
751     */
752    public int awaitAdvanceInterruptibly(int phase,
753                                         long timeout, TimeUnit unit)
754        throws InterruptedException, TimeoutException {
755        long nanos = unit.toNanos(timeout);
756        final Phaser root = this.root;
757        long s = (root == this) ? state : reconcileState();
758        int p = (int)(s >>> PHASE_SHIFT);
759        if (phase < 0)
760            return phase;
761        if (p == phase) {
762            QNode node = new QNode(this, phase, true, true, nanos);
763            p = root.internalAwaitAdvance(phase, node);
764            if (node.wasInterrupted)
765                throw new InterruptedException();
766            else if (p == phase)
767                throw new TimeoutException();
768        }
769        return p;
770    }
771
772    /**
773     * Forces this phaser to enter termination state.  Counts of
774     * registered parties are unaffected.  If this phaser is a member
775     * of a tiered set of phasers, then all of the phasers in the set
776     * are terminated.  If this phaser is already terminated, this
777     * method has no effect.  This method may be useful for
778     * coordinating recovery after one or more tasks encounter
779     * unexpected exceptions.
780     */
781    public void forceTermination() {
782        // Only need to change root state
783        final Phaser root = this.root;
784        long s;
785        while ((s = root.state) >= 0) {
786            if (U.compareAndSwapLong(root, STATE, s, s | TERMINATION_BIT)) {
787                // signal all threads
788                releaseWaiters(0); // Waiters on evenQ
789                releaseWaiters(1); // Waiters on oddQ
790                return;
791            }
792        }
793    }
794
795    /**
796     * Returns the current phase number. The maximum phase number is
797     * {@code Integer.MAX_VALUE}, after which it restarts at
798     * zero. Upon termination, the phase number is negative,
799     * in which case the prevailing phase prior to termination
800     * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
801     *
802     * @return the phase number, or a negative value if terminated
803     */
804    public final int getPhase() {
805        return (int)(root.state >>> PHASE_SHIFT);
806    }
807
808    /**
809     * Returns the number of parties registered at this phaser.
810     *
811     * @return the number of parties
812     */
813    public int getRegisteredParties() {
814        return partiesOf(state);
815    }
816
817    /**
818     * Returns the number of registered parties that have arrived at
819     * the current phase of this phaser. If this phaser has terminated,
820     * the returned value is meaningless and arbitrary.
821     *
822     * @return the number of arrived parties
823     */
824    public int getArrivedParties() {
825        return arrivedOf(reconcileState());
826    }
827
828    /**
829     * Returns the number of registered parties that have not yet
830     * arrived at the current phase of this phaser. If this phaser has
831     * terminated, the returned value is meaningless and arbitrary.
832     *
833     * @return the number of unarrived parties
834     */
835    public int getUnarrivedParties() {
836        return unarrivedOf(reconcileState());
837    }
838
839    /**
840     * Returns the parent of this phaser, or {@code null} if none.
841     *
842     * @return the parent of this phaser, or {@code null} if none
843     */
844    public Phaser getParent() {
845        return parent;
846    }
847
848    /**
849     * Returns the root ancestor of this phaser, which is the same as
850     * this phaser if it has no parent.
851     *
852     * @return the root ancestor of this phaser
853     */
854    public Phaser getRoot() {
855        return root;
856    }
857
858    /**
859     * Returns {@code true} if this phaser has been terminated.
860     *
861     * @return {@code true} if this phaser has been terminated
862     */
863    public boolean isTerminated() {
864        return root.state < 0L;
865    }
866
867    /**
868     * Overridable method to perform an action upon impending phase
869     * advance, and to control termination. This method is invoked
870     * upon arrival of the party advancing this phaser (when all other
871     * waiting parties are dormant).  If this method returns {@code
872     * true}, this phaser will be set to a final termination state
873     * upon advance, and subsequent calls to {@link #isTerminated}
874     * will return true. Any (unchecked) Exception or Error thrown by
875     * an invocation of this method is propagated to the party
876     * attempting to advance this phaser, in which case no advance
877     * occurs.
878     *
879     * <p>The arguments to this method provide the state of the phaser
880     * prevailing for the current transition.  The effects of invoking
881     * arrival, registration, and waiting methods on this phaser from
882     * within {@code onAdvance} are unspecified and should not be
883     * relied on.
884     *
885     * <p>If this phaser is a member of a tiered set of phasers, then
886     * {@code onAdvance} is invoked only for its root phaser on each
887     * advance.
888     *
889     * <p>To support the most common use cases, the default
890     * implementation of this method returns {@code true} when the
891     * number of registered parties has become zero as the result of a
892     * party invoking {@code arriveAndDeregister}.  You can disable
893     * this behavior, thus enabling continuation upon future
894     * registrations, by overriding this method to always return
895     * {@code false}:
896     *
897     * <pre> {@code
898     * Phaser phaser = new Phaser() {
899     *   protected boolean onAdvance(int phase, int parties) { return false; }
900     * }}</pre>
901     *
902     * @param phase the current phase number on entry to this method,
903     * before this phaser is advanced
904     * @param registeredParties the current number of registered parties
905     * @return {@code true} if this phaser should terminate
906     */
907    protected boolean onAdvance(int phase, int registeredParties) {
908        return registeredParties == 0;
909    }
910
911    /**
912     * Returns a string identifying this phaser, as well as its
913     * state.  The state, in brackets, includes the String {@code
914     * "phase = "} followed by the phase number, {@code "parties = "}
915     * followed by the number of registered parties, and {@code
916     * "arrived = "} followed by the number of arrived parties.
917     *
918     * @return a string identifying this phaser, as well as its state
919     */
920    public String toString() {
921        return stateToString(reconcileState());
922    }
923
924    /**
925     * Implementation of toString and string-based error messages.
926     */
927    private String stateToString(long s) {
928        return super.toString() +
929            "[phase = " + phaseOf(s) +
930            " parties = " + partiesOf(s) +
931            " arrived = " + arrivedOf(s) + "]";
932    }
933
934    // Waiting mechanics
935
936    /**
937     * Removes and signals threads from queue for phase.
938     */
939    private void releaseWaiters(int phase) {
940        QNode q;   // first element of queue
941        Thread t;  // its thread
942        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
943        while ((q = head.get()) != null &&
944               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
945            if (head.compareAndSet(q, q.next) &&
946                (t = q.thread) != null) {
947                q.thread = null;
948                LockSupport.unpark(t);
949            }
950        }
951    }
952
953    /**
954     * Variant of releaseWaiters that additionally tries to remove any
955     * nodes no longer waiting for advance due to timeout or
956     * interrupt. Currently, nodes are removed only if they are at
957     * head of queue, which suffices to reduce memory footprint in
958     * most usages.
959     *
960     * @return current phase on exit
961     */
962    private int abortWait(int phase) {
963        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
964        for (;;) {
965            Thread t;
966            QNode q = head.get();
967            int p = (int)(root.state >>> PHASE_SHIFT);
968            if (q == null || ((t = q.thread) != null && q.phase == p))
969                return p;
970            if (head.compareAndSet(q, q.next) && t != null) {
971                q.thread = null;
972                LockSupport.unpark(t);
973            }
974        }
975    }
976
977    /** The number of CPUs, for spin control */
978    private static final int NCPU = Runtime.getRuntime().availableProcessors();
979
980    /**
981     * The number of times to spin before blocking while waiting for
982     * advance, per arrival while waiting. On multiprocessors, fully
983     * blocking and waking up a large number of threads all at once is
984     * usually a very slow process, so we use rechargeable spins to
985     * avoid it when threads regularly arrive: When a thread in
986     * internalAwaitAdvance notices another arrival before blocking,
987     * and there appear to be enough CPUs available, it spins
988     * SPINS_PER_ARRIVAL more times before blocking. The value trades
989     * off good-citizenship vs big unnecessary slowdowns.
990     */
991    static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
992
993    /**
994     * Possibly blocks and waits for phase to advance unless aborted.
995     * Call only on root phaser.
996     *
997     * @param phase current phase
998     * @param node if non-null, the wait node to track interrupt and timeout;
999     * if null, denotes noninterruptible wait
1000     * @return current phase
1001     */
1002    private int internalAwaitAdvance(int phase, QNode node) {
1003        // assert root == this;
1004        releaseWaiters(phase-1);          // ensure old queue clean
1005        boolean queued = false;           // true when node is enqueued
1006        int lastUnarrived = 0;            // to increase spins upon change
1007        int spins = SPINS_PER_ARRIVAL;
1008        long s;
1009        int p;
1010        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
1011            if (node == null) {           // spinning in noninterruptible mode
1012                int unarrived = (int)s & UNARRIVED_MASK;
1013                if (unarrived != lastUnarrived &&
1014                    (lastUnarrived = unarrived) < NCPU)
1015                    spins += SPINS_PER_ARRIVAL;
1016                boolean interrupted = Thread.interrupted();
1017                if (interrupted || --spins < 0) { // need node to record intr
1018                    node = new QNode(this, phase, false, false, 0L);
1019                    node.wasInterrupted = interrupted;
1020                }
1021            }
1022            else if (node.isReleasable()) // done or aborted
1023                break;
1024            else if (!queued) {           // push onto queue
1025                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
1026                QNode q = node.next = head.get();
1027                if ((q == null || q.phase == phase) &&
1028                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
1029                    queued = head.compareAndSet(q, node);
1030            }
1031            else {
1032                try {
1033                    ForkJoinPool.managedBlock(node);
1034                } catch (InterruptedException cantHappen) {
1035                    node.wasInterrupted = true;
1036                }
1037            }
1038        }
1039
1040        if (node != null) {
1041            if (node.thread != null)
1042                node.thread = null;       // avoid need for unpark()
1043            if (node.wasInterrupted && !node.interruptible)
1044                Thread.currentThread().interrupt();
1045            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
1046                return abortWait(phase); // possibly clean up on abort
1047        }
1048        releaseWaiters(phase);
1049        return p;
1050    }
1051
1052    /**
1053     * Wait nodes for Treiber stack representing wait queue.
1054     */
1055    static final class QNode implements ForkJoinPool.ManagedBlocker {
1056        final Phaser phaser;
1057        final int phase;
1058        final boolean interruptible;
1059        final boolean timed;
1060        boolean wasInterrupted;
1061        long nanos;
1062        final long deadline;
1063        volatile Thread thread; // nulled to cancel wait
1064        QNode next;
1065
1066        QNode(Phaser phaser, int phase, boolean interruptible,
1067              boolean timed, long nanos) {
1068            this.phaser = phaser;
1069            this.phase = phase;
1070            this.interruptible = interruptible;
1071            this.nanos = nanos;
1072            this.timed = timed;
1073            this.deadline = timed ? System.nanoTime() + nanos : 0L;
1074            thread = Thread.currentThread();
1075        }
1076
1077        public boolean isReleasable() {
1078            if (thread == null)
1079                return true;
1080            if (phaser.getPhase() != phase) {
1081                thread = null;
1082                return true;
1083            }
1084            if (Thread.interrupted())
1085                wasInterrupted = true;
1086            if (wasInterrupted && interruptible) {
1087                thread = null;
1088                return true;
1089            }
1090            if (timed &&
1091                (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1092                thread = null;
1093                return true;
1094            }
1095            return false;
1096        }
1097
1098        public boolean block() {
1099            while (!isReleasable()) {
1100                if (timed)
1101                    LockSupport.parkNanos(this, nanos);
1102                else
1103                    LockSupport.park(this);
1104            }
1105            return true;
1106        }
1107    }
1108
1109    // Unsafe mechanics
1110
1111    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1112    private static final long STATE;
1113    static {
1114        try {
1115            STATE = U.objectFieldOffset
1116                (Phaser.class.getDeclaredField("state"));
1117        } catch (ReflectiveOperationException e) {
1118            throw new Error(e);
1119        }
1120
1121        // Reduce the risk of rare disastrous classloading in first call to
1122        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1123        Class<?> ensureLoaded = LockSupport.class;
1124    }
1125}
1126