1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8
9import java.util.concurrent.TimeUnit;
10import java.util.concurrent.TimeoutException;
11import java.util.concurrent.atomic.AtomicReference;
12import java.util.concurrent.locks.LockSupport;
13
14/**
15 * A reusable synchronization barrier, similar in functionality to
16 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
17 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
18 * but supporting more flexible usage.
19 *
20 * <p> <b>Registration.</b> Unlike the case for other barriers, the
21 * number of parties <em>registered</em> to synchronize on a phaser
22 * may vary over time.  Tasks may be registered at any time (using
23 * methods {@link #register}, {@link #bulkRegister}, or forms of
24 * constructors establishing initial numbers of parties), and
25 * optionally deregistered upon any arrival (using {@link
26 * #arriveAndDeregister}).  As is the case with most basic
27 * synchronization constructs, registration and deregistration affect
28 * only internal counts; they do not establish any further internal
29 * bookkeeping, so tasks cannot query whether they are registered.
30 * (However, you can introduce such bookkeeping by subclassing this
31 * class.)
32 *
33 * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
34 * Phaser} may be repeatedly awaited.  Method {@link
35 * #arriveAndAwaitAdvance} has effect analogous to {@link
36 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
37 * generation of a phaser has an associated phase number. The phase
38 * number starts at zero, and advances when all parties arrive at the
39 * phaser, wrapping around to zero after reaching {@code
40 * Integer.MAX_VALUE}. The use of phase numbers enables independent
41 * control of actions upon arrival at a phaser and upon awaiting
42 * others, via two kinds of methods that may be invoked by any
43 * registered party:
44 *
45 * <ul>
46 *
47 *   <li> <b>Arrival.</b> Methods {@link #arrive} and
48 *       {@link #arriveAndDeregister} record arrival.  These methods
49 *       do not block, but return an associated <em>arrival phase
50 *       number</em>; that is, the phase number of the phaser to which
51 *       the arrival applied. When the final party for a given phase
52 *       arrives, an optional action is performed and the phase
53 *       advances.  These actions are performed by the party
54 *       triggering a phase advance, and are arranged by overriding
55 *       method {@link #onAdvance(int, int)}, which also controls
56 *       termination. Overriding this method is similar to, but more
57 *       flexible than, providing a barrier action to a {@code
58 *       CyclicBarrier}.
59 *
60 *   <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
61 *       argument indicating an arrival phase number, and returns when
62 *       the phaser advances to (or is already at) a different phase.
63 *       Unlike similar constructions using {@code CyclicBarrier},
64 *       method {@code awaitAdvance} continues to wait even if the
65 *       waiting thread is interrupted. Interruptible and timeout
66 *       versions are also available, but exceptions encountered while
67 *       tasks wait interruptibly or with timeout do not change the
68 *       state of the phaser. If necessary, you can perform any
69 *       associated recovery within handlers of those exceptions,
70 *       often after invoking {@code forceTermination}.  Phasers may
71 *       also be used by tasks executing in a {@link ForkJoinPool},
72 *       which will ensure sufficient parallelism to execute tasks
73 *       when others are blocked waiting for a phase to advance.
74 *
75 * </ul>
76 *
77 * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
78 * state, that may be checked using method {@link #isTerminated}. Upon
79 * termination, all synchronization methods immediately return without
80 * waiting for advance, as indicated by a negative return value.
81 * Similarly, attempts to register upon termination have no effect.
82 * Termination is triggered when an invocation of {@code onAdvance}
83 * returns {@code true}. The default implementation returns {@code
84 * true} if a deregistration has caused the number of registered
85 * parties to become zero.  As illustrated below, when phasers control
86 * actions with a fixed number of iterations, it is often convenient
87 * to override this method to cause termination when the current phase
88 * number reaches a threshold. Method {@link #forceTermination} is
89 * also available to abruptly release waiting threads and allow them
90 * to terminate.
91 *
92 * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
93 * constructed in tree structures) to reduce contention. Phasers with
94 * large numbers of parties that would otherwise experience heavy
95 * synchronization contention costs may instead be set up so that
96 * groups of sub-phasers share a common parent.  This may greatly
97 * increase throughput even though it incurs greater per-operation
98 * overhead.
99 *
100 * <p>In a tree of tiered phasers, registration and deregistration of
101 * child phasers with their parent are managed automatically.
102 * Whenever the number of registered parties of a child phaser becomes
103 * non-zero (as established in the {@link #Phaser(Phaser,int)}
104 * constructor, {@link #register}, or {@link #bulkRegister}), the
105 * child phaser is registered with its parent.  Whenever the number of
106 * registered parties becomes zero as the result of an invocation of
107 * {@link #arriveAndDeregister}, the child phaser is deregistered
108 * from its parent.
109 *
110 * <p><b>Monitoring.</b> While synchronization methods may be invoked
111 * only by registered parties, the current state of a phaser may be
112 * monitored by any caller.  At any given moment there are {@link
113 * #getRegisteredParties} parties in total, of which {@link
114 * #getArrivedParties} have arrived at the current phase ({@link
115 * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
116 * parties arrive, the phase advances.  The values returned by these
117 * methods may reflect transient states and so are not in general
118 * useful for synchronization control.  Method {@link #toString}
119 * returns snapshots of these state queries in a form convenient for
120 * informal monitoring.
121 *
122 * <p><b>Sample usages:</b>
123 *
124 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
125 * to control a one-shot action serving a variable number of parties.
126 * The typical idiom is for the method setting this up to first
127 * register, then start the actions, then deregister, as in:
128 *
129 *  <pre> {@code
130 * void runTasks(List<Runnable> tasks) {
131 *   final Phaser phaser = new Phaser(1); // "1" to register self
132 *   // create and start threads
133 *   for (final Runnable task : tasks) {
134 *     phaser.register();
135 *     new Thread() {
136 *       public void run() {
137 *         phaser.arriveAndAwaitAdvance(); // await all creation
138 *         task.run();
139 *       }
140 *     }.start();
141 *   }
142 *
143 *   // allow threads to start and deregister self
144 *   phaser.arriveAndDeregister();
145 * }}</pre>
146 *
147 * <p>One way to cause a set of threads to repeatedly perform actions
148 * for a given number of iterations is to override {@code onAdvance}:
149 *
150 *  <pre> {@code
151 * void startTasks(List<Runnable> tasks, final int iterations) {
152 *   final Phaser phaser = new Phaser() {
153 *     protected boolean onAdvance(int phase, int registeredParties) {
154 *       return phase >= iterations || registeredParties == 0;
155 *     }
156 *   };
157 *   phaser.register();
158 *   for (final Runnable task : tasks) {
159 *     phaser.register();
160 *     new Thread() {
161 *       public void run() {
162 *         do {
163 *           task.run();
164 *           phaser.arriveAndAwaitAdvance();
165 *         } while (!phaser.isTerminated());
166 *       }
167 *     }.start();
168 *   }
169 *   phaser.arriveAndDeregister(); // deregister self, don't wait
170 * }}</pre>
171 *
172 * If the main task must later await termination, it
173 * may re-register and then execute a similar loop:
174 *  <pre> {@code
175 *   // ...
176 *   phaser.register();
177 *   while (!phaser.isTerminated())
178 *     phaser.arriveAndAwaitAdvance();}</pre>
179 *
180 * <p>Related constructions may be used to await particular phase numbers
181 * in contexts where you are sure that the phase will never wrap around
182 * {@code Integer.MAX_VALUE}. For example:
183 *
184 *  <pre> {@code
185 * void awaitPhase(Phaser phaser, int phase) {
186 *   int p = phaser.register(); // assumes caller not already registered
187 *   while (p < phase) {
188 *     if (phaser.isTerminated())
189 *       // ... deal with unexpected termination
190 *     else
191 *       p = phaser.arriveAndAwaitAdvance();
192 *   }
193 *   phaser.arriveAndDeregister();
194 * }}</pre>
195 *
196 *
197 * <p>To create a set of {@code n} tasks using a tree of phasers, you
198 * could use code of the following form, assuming a Task class with a
199 * constructor accepting a {@code Phaser} that it registers with upon
200 * construction. After invocation of {@code build(new Task[n], 0, n,
201 * new Phaser())}, these tasks could then be started, for example by
202 * submitting to a pool:
203 *
204 *  <pre> {@code
205 * void build(Task[] tasks, int lo, int hi, Phaser ph) {
206 *   if (hi - lo > TASKS_PER_PHASER) {
207 *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
208 *       int j = Math.min(i + TASKS_PER_PHASER, hi);
209 *       build(tasks, i, j, new Phaser(ph));
210 *     }
211 *   } else {
212 *     for (int i = lo; i < hi; ++i)
213 *       tasks[i] = new Task(ph);
214 *       // assumes new Task(ph) performs ph.register()
215 *   }
216 * }}</pre>
217 *
218 * The best value of {@code TASKS_PER_PHASER} depends mainly on
219 * expected synchronization rates. A value as low as four may
220 * be appropriate for extremely small per-phase task bodies (thus
221 * high rates), or up to hundreds for extremely large ones.
222 *
223 * <p><b>Implementation notes</b>: This implementation restricts the
224 * maximum number of parties to 65535. Attempts to register additional
225 * parties result in {@code IllegalStateException}. However, you can and
226 * should create tiered phasers to accommodate arbitrarily large sets
227 * of participants.
228 *
229 * @since 1.7
230 * @hide
231 * @author Doug Lea
232 */
233public class Phaser {
234    /*
235     * This class implements an extension of X10 "clocks".  Thanks to
236     * Vijay Saraswat for the idea, and to Vivek Sarkar for
237     * enhancements to extend functionality.
238     */
239
240    /**
241     * Primary state representation, holding four bit-fields:
242     *
243     * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
244     * parties    -- the number of parties to wait            (bits 16-31)
245     * phase      -- the generation of the barrier            (bits 32-62)
246     * terminated -- set if barrier is terminated             (bit  63 / sign)
247     *
248     * Except that a phaser with no registered parties is
249     * distinguished by the otherwise illegal state of having zero
250     * parties and one unarrived parties (encoded as EMPTY below).
251     *
252     * To efficiently maintain atomicity, these values are packed into
253     * a single (atomic) long. Good performance relies on keeping
254     * state decoding and encoding simple, and keeping race windows
255     * short.
256     *
257     * All state updates are performed via CAS except initial
258     * registration of a sub-phaser (i.e., one with a non-null
259     * parent).  In this (relatively rare) case, we use built-in
260     * synchronization to lock while first registering with its
261     * parent.
262     *
263     * The phase of a subphaser is allowed to lag that of its
264     * ancestors until it is actually accessed -- see method
265     * reconcileState.
266     */
267    private volatile long state;
268
269    private static final int  MAX_PARTIES     = 0xffff;
270    private static final int  MAX_PHASE       = Integer.MAX_VALUE;
271    private static final int  PARTIES_SHIFT   = 16;
272    private static final int  PHASE_SHIFT     = 32;
273    private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
274    private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
275    private static final long COUNTS_MASK     = 0xffffffffL;
276    private static final long TERMINATION_BIT = 1L << 63;
277
278    // some special values
279    private static final int  ONE_ARRIVAL     = 1;
280    private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
281    private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
282    private static final int  EMPTY           = 1;
283
284    // The following unpacking methods are usually manually inlined
285
286    private static int unarrivedOf(long s) {
287        int counts = (int)s;
288        return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
289    }
290
291    private static int partiesOf(long s) {
292        return (int)s >>> PARTIES_SHIFT;
293    }
294
295    private static int phaseOf(long s) {
296        return (int)(s >>> PHASE_SHIFT);
297    }
298
299    private static int arrivedOf(long s) {
300        int counts = (int)s;
301        return (counts == EMPTY) ? 0 :
302            (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
303    }
304
305    /**
306     * The parent of this phaser, or null if none
307     */
308    private final Phaser parent;
309
310    /**
311     * The root of phaser tree. Equals this if not in a tree.
312     */
313    private final Phaser root;
314
315    /**
316     * Heads of Treiber stacks for waiting threads. To eliminate
317     * contention when releasing some threads while adding others, we
318     * use two of them, alternating across even and odd phases.
319     * Subphasers share queues with root to speed up releases.
320     */
321    private final AtomicReference<QNode> evenQ;
322    private final AtomicReference<QNode> oddQ;
323
324    private AtomicReference<QNode> queueFor(int phase) {
325        return ((phase & 1) == 0) ? evenQ : oddQ;
326    }
327
328    /**
329     * Returns message string for bounds exceptions on arrival.
330     */
331    private String badArrive(long s) {
332        return "Attempted arrival of unregistered party for " +
333            stateToString(s);
334    }
335
336    /**
337     * Returns message string for bounds exceptions on registration.
338     */
339    private String badRegister(long s) {
340        return "Attempt to register more than " +
341            MAX_PARTIES + " parties for " + stateToString(s);
342    }
343
344    /**
345     * Main implementation for methods arrive and arriveAndDeregister.
346     * Manually tuned to speed up and minimize race windows for the
347     * common case of just decrementing unarrived field.
348     *
349     * @param adjust value to subtract from state;
350     *               ONE_ARRIVAL for arrive,
351     *               ONE_DEREGISTER for arriveAndDeregister
352     */
353    private int doArrive(int adjust) {
354        final Phaser root = this.root;
355        for (;;) {
356            long s = (root == this) ? state : reconcileState();
357            int phase = (int)(s >>> PHASE_SHIFT);
358            if (phase < 0)
359                return phase;
360            int counts = (int)s;
361            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
362            if (unarrived <= 0)
363                throw new IllegalStateException(badArrive(s));
364            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
365                if (unarrived == 1) {
366                    long n = s & PARTIES_MASK;  // base of next state
367                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
368                    if (root == this) {
369                        if (onAdvance(phase, nextUnarrived))
370                            n |= TERMINATION_BIT;
371                        else if (nextUnarrived == 0)
372                            n |= EMPTY;
373                        else
374                            n |= nextUnarrived;
375                        int nextPhase = (phase + 1) & MAX_PHASE;
376                        n |= (long)nextPhase << PHASE_SHIFT;
377                        UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
378                        releaseWaiters(phase);
379                    }
380                    else if (nextUnarrived == 0) { // propagate deregistration
381                        phase = parent.doArrive(ONE_DEREGISTER);
382                        UNSAFE.compareAndSwapLong(this, stateOffset,
383                                                  s, s | EMPTY);
384                    }
385                    else
386                        phase = parent.doArrive(ONE_ARRIVAL);
387                }
388                return phase;
389            }
390        }
391    }
392
393    /**
394     * Implementation of register, bulkRegister
395     *
396     * @param registrations number to add to both parties and
397     * unarrived fields. Must be greater than zero.
398     */
399    private int doRegister(int registrations) {
400        // adjustment to state
401        long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
402        final Phaser parent = this.parent;
403        int phase;
404        for (;;) {
405            long s = (parent == null) ? state : reconcileState();
406            int counts = (int)s;
407            int parties = counts >>> PARTIES_SHIFT;
408            int unarrived = counts & UNARRIVED_MASK;
409            if (registrations > MAX_PARTIES - parties)
410                throw new IllegalStateException(badRegister(s));
411            phase = (int)(s >>> PHASE_SHIFT);
412            if (phase < 0)
413                break;
414            if (counts != EMPTY) {                  // not 1st registration
415                if (parent == null || reconcileState() == s) {
416                    if (unarrived == 0)             // wait out advance
417                        root.internalAwaitAdvance(phase, null);
418                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,
419                                                       s, s + adjust))
420                        break;
421                }
422            }
423            else if (parent == null) {              // 1st root registration
424                long next = ((long)phase << PHASE_SHIFT) | adjust;
425                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
426                    break;
427            }
428            else {
429                synchronized (this) {               // 1st sub registration
430                    if (state == s) {               // recheck under lock
431                        phase = parent.doRegister(1);
432                        if (phase < 0)
433                            break;
434                        // finish registration whenever parent registration
435                        // succeeded, even when racing with termination,
436                        // since these are part of the same "transaction".
437                        while (!UNSAFE.compareAndSwapLong
438                               (this, stateOffset, s,
439                                ((long)phase << PHASE_SHIFT) | adjust)) {
440                            s = state;
441                            phase = (int)(root.state >>> PHASE_SHIFT);
442                            // assert (int)s == EMPTY;
443                        }
444                        break;
445                    }
446                }
447            }
448        }
449        return phase;
450    }
451
452    /**
453     * Resolves lagged phase propagation from root if necessary.
454     * Reconciliation normally occurs when root has advanced but
455     * subphasers have not yet done so, in which case they must finish
456     * their own advance by setting unarrived to parties (or if
457     * parties is zero, resetting to unregistered EMPTY state).
458     *
459     * @return reconciled state
460     */
461    private long reconcileState() {
462        final Phaser root = this.root;
463        long s = state;
464        if (root != this) {
465            int phase, p;
466            // CAS to root phase with current parties, tripping unarrived
467            while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
468                   (int)(s >>> PHASE_SHIFT) &&
469                   !UNSAFE.compareAndSwapLong
470                   (this, stateOffset, s,
471                    s = (((long)phase << PHASE_SHIFT) |
472                         ((phase < 0) ? (s & COUNTS_MASK) :
473                          (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
474                           ((s & PARTIES_MASK) | p))))))
475                s = state;
476        }
477        return s;
478    }
479
480    /**
481     * Creates a new phaser with no initially registered parties, no
482     * parent, and initial phase number 0. Any thread using this
483     * phaser will need to first register for it.
484     */
485    public Phaser() {
486        this(null, 0);
487    }
488
489    /**
490     * Creates a new phaser with the given number of registered
491     * unarrived parties, no parent, and initial phase number 0.
492     *
493     * @param parties the number of parties required to advance to the
494     * next phase
495     * @throws IllegalArgumentException if parties less than zero
496     * or greater than the maximum number of parties supported
497     */
498    public Phaser(int parties) {
499        this(null, parties);
500    }
501
502    /**
503     * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
504     *
505     * @param parent the parent phaser
506     */
507    public Phaser(Phaser parent) {
508        this(parent, 0);
509    }
510
511    /**
512     * Creates a new phaser with the given parent and number of
513     * registered unarrived parties.  When the given parent is non-null
514     * and the given number of parties is greater than zero, this
515     * child phaser is registered with its parent.
516     *
517     * @param parent the parent phaser
518     * @param parties the number of parties required to advance to the
519     * next phase
520     * @throws IllegalArgumentException if parties less than zero
521     * or greater than the maximum number of parties supported
522     */
523    public Phaser(Phaser parent, int parties) {
524        if (parties >>> PARTIES_SHIFT != 0)
525            throw new IllegalArgumentException("Illegal number of parties");
526        int phase = 0;
527        this.parent = parent;
528        if (parent != null) {
529            final Phaser root = parent.root;
530            this.root = root;
531            this.evenQ = root.evenQ;
532            this.oddQ = root.oddQ;
533            if (parties != 0)
534                phase = parent.doRegister(1);
535        }
536        else {
537            this.root = this;
538            this.evenQ = new AtomicReference<QNode>();
539            this.oddQ = new AtomicReference<QNode>();
540        }
541        this.state = (parties == 0) ? (long)EMPTY :
542            ((long)phase << PHASE_SHIFT) |
543            ((long)parties << PARTIES_SHIFT) |
544            ((long)parties);
545    }
546
547    /**
548     * Adds a new unarrived party to this phaser.  If an ongoing
549     * invocation of {@link #onAdvance} is in progress, this method
550     * may await its completion before returning.  If this phaser has
551     * a parent, and this phaser previously had no registered parties,
552     * this child phaser is also registered with its parent. If
553     * this phaser is terminated, the attempt to register has
554     * no effect, and a negative value is returned.
555     *
556     * @return the arrival phase number to which this registration
557     * applied.  If this value is negative, then this phaser has
558     * terminated, in which case registration has no effect.
559     * @throws IllegalStateException if attempting to register more
560     * than the maximum supported number of parties
561     */
562    public int register() {
563        return doRegister(1);
564    }
565
566    /**
567     * Adds the given number of new unarrived parties to this phaser.
568     * If an ongoing invocation of {@link #onAdvance} is in progress,
569     * this method may await its completion before returning.  If this
570     * phaser has a parent, and the given number of parties is greater
571     * than zero, and this phaser previously had no registered
572     * parties, this child phaser is also registered with its parent.
573     * If this phaser is terminated, the attempt to register has no
574     * effect, and a negative value is returned.
575     *
576     * @param parties the number of additional parties required to
577     * advance to the next phase
578     * @return the arrival phase number to which this registration
579     * applied.  If this value is negative, then this phaser has
580     * terminated, in which case registration has no effect.
581     * @throws IllegalStateException if attempting to register more
582     * than the maximum supported number of parties
583     * @throws IllegalArgumentException if {@code parties < 0}
584     */
585    public int bulkRegister(int parties) {
586        if (parties < 0)
587            throw new IllegalArgumentException();
588        if (parties == 0)
589            return getPhase();
590        return doRegister(parties);
591    }
592
593    /**
594     * Arrives at this phaser, without waiting for others to arrive.
595     *
596     * <p>It is a usage error for an unregistered party to invoke this
597     * method.  However, this error may result in an {@code
598     * IllegalStateException} only upon some subsequent operation on
599     * this phaser, if ever.
600     *
601     * @return the arrival phase number, or a negative value if terminated
602     * @throws IllegalStateException if not terminated and the number
603     * of unarrived parties would become negative
604     */
605    public int arrive() {
606        return doArrive(ONE_ARRIVAL);
607    }
608
609    /**
610     * Arrives at this phaser and deregisters from it without waiting
611     * for others to arrive. Deregistration reduces the number of
612     * parties required to advance in future phases.  If this phaser
613     * has a parent, and deregistration causes this phaser to have
614     * zero parties, this phaser is also deregistered from its parent.
615     *
616     * <p>It is a usage error for an unregistered party to invoke this
617     * method.  However, this error may result in an {@code
618     * IllegalStateException} only upon some subsequent operation on
619     * this phaser, if ever.
620     *
621     * @return the arrival phase number, or a negative value if terminated
622     * @throws IllegalStateException if not terminated and the number
623     * of registered or unarrived parties would become negative
624     */
625    public int arriveAndDeregister() {
626        return doArrive(ONE_DEREGISTER);
627    }
628
629    /**
630     * Arrives at this phaser and awaits others. Equivalent in effect
631     * to {@code awaitAdvance(arrive())}.  If you need to await with
632     * interruption or timeout, you can arrange this with an analogous
633     * construction using one of the other forms of the {@code
634     * awaitAdvance} method.  If instead you need to deregister upon
635     * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
636     *
637     * <p>It is a usage error for an unregistered party to invoke this
638     * method.  However, this error may result in an {@code
639     * IllegalStateException} only upon some subsequent operation on
640     * this phaser, if ever.
641     *
642     * @return the arrival phase number, or the (negative)
643     * {@linkplain #getPhase() current phase} if terminated
644     * @throws IllegalStateException if not terminated and the number
645     * of unarrived parties would become negative
646     */
647    public int arriveAndAwaitAdvance() {
648        // Specialization of doArrive+awaitAdvance eliminating some reads/paths
649        final Phaser root = this.root;
650        for (;;) {
651            long s = (root == this) ? state : reconcileState();
652            int phase = (int)(s >>> PHASE_SHIFT);
653            if (phase < 0)
654                return phase;
655            int counts = (int)s;
656            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
657            if (unarrived <= 0)
658                throw new IllegalStateException(badArrive(s));
659            if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
660                                          s -= ONE_ARRIVAL)) {
661                if (unarrived > 1)
662                    return root.internalAwaitAdvance(phase, null);
663                if (root != this)
664                    return parent.arriveAndAwaitAdvance();
665                long n = s & PARTIES_MASK;  // base of next state
666                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
667                if (onAdvance(phase, nextUnarrived))
668                    n |= TERMINATION_BIT;
669                else if (nextUnarrived == 0)
670                    n |= EMPTY;
671                else
672                    n |= nextUnarrived;
673                int nextPhase = (phase + 1) & MAX_PHASE;
674                n |= (long)nextPhase << PHASE_SHIFT;
675                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
676                    return (int)(state >>> PHASE_SHIFT); // terminated
677                releaseWaiters(phase);
678                return nextPhase;
679            }
680        }
681    }
682
683    /**
684     * Awaits the phase of this phaser to advance from the given phase
685     * value, returning immediately if the current phase is not equal
686     * to the given phase value or this phaser is terminated.
687     *
688     * @param phase an arrival phase number, or negative value if
689     * terminated; this argument is normally the value returned by a
690     * previous call to {@code arrive} or {@code arriveAndDeregister}.
691     * @return the next arrival phase number, or the argument if it is
692     * negative, or the (negative) {@linkplain #getPhase() current phase}
693     * if terminated
694     */
695    public int awaitAdvance(int phase) {
696        final Phaser root = this.root;
697        long s = (root == this) ? state : reconcileState();
698        int p = (int)(s >>> PHASE_SHIFT);
699        if (phase < 0)
700            return phase;
701        if (p == phase)
702            return root.internalAwaitAdvance(phase, null);
703        return p;
704    }
705
706    /**
707     * Awaits the phase of this phaser to advance from the given phase
708     * value, throwing {@code InterruptedException} if interrupted
709     * while waiting, or returning immediately if the current phase is
710     * not equal to the given phase value or this phaser is
711     * terminated.
712     *
713     * @param phase an arrival phase number, or negative value if
714     * terminated; this argument is normally the value returned by a
715     * previous call to {@code arrive} or {@code arriveAndDeregister}.
716     * @return the next arrival phase number, or the argument if it is
717     * negative, or the (negative) {@linkplain #getPhase() current phase}
718     * if terminated
719     * @throws InterruptedException if thread interrupted while waiting
720     */
721    public int awaitAdvanceInterruptibly(int phase)
722        throws InterruptedException {
723        final Phaser root = this.root;
724        long s = (root == this) ? state : reconcileState();
725        int p = (int)(s >>> PHASE_SHIFT);
726        if (phase < 0)
727            return phase;
728        if (p == phase) {
729            QNode node = new QNode(this, phase, true, false, 0L);
730            p = root.internalAwaitAdvance(phase, node);
731            if (node.wasInterrupted)
732                throw new InterruptedException();
733        }
734        return p;
735    }
736
737    /**
738     * Awaits the phase of this phaser to advance from the given phase
739     * value or the given timeout to elapse, throwing {@code
740     * InterruptedException} if interrupted while waiting, or
741     * returning immediately if the current phase is not equal to the
742     * given phase value or this phaser is terminated.
743     *
744     * @param phase an arrival phase number, or negative value if
745     * terminated; this argument is normally the value returned by a
746     * previous call to {@code arrive} or {@code arriveAndDeregister}.
747     * @param timeout how long to wait before giving up, in units of
748     *        {@code unit}
749     * @param unit a {@code TimeUnit} determining how to interpret the
750     *        {@code timeout} parameter
751     * @return the next arrival phase number, or the argument if it is
752     * negative, or the (negative) {@linkplain #getPhase() current phase}
753     * if terminated
754     * @throws InterruptedException if thread interrupted while waiting
755     * @throws TimeoutException if timed out while waiting
756     */
757    public int awaitAdvanceInterruptibly(int phase,
758                                         long timeout, TimeUnit unit)
759        throws InterruptedException, TimeoutException {
760        long nanos = unit.toNanos(timeout);
761        final Phaser root = this.root;
762        long s = (root == this) ? state : reconcileState();
763        int p = (int)(s >>> PHASE_SHIFT);
764        if (phase < 0)
765            return phase;
766        if (p == phase) {
767            QNode node = new QNode(this, phase, true, true, nanos);
768            p = root.internalAwaitAdvance(phase, node);
769            if (node.wasInterrupted)
770                throw new InterruptedException();
771            else if (p == phase)
772                throw new TimeoutException();
773        }
774        return p;
775    }
776
777    /**
778     * Forces this phaser to enter termination state.  Counts of
779     * registered parties are unaffected.  If this phaser is a member
780     * of a tiered set of phasers, then all of the phasers in the set
781     * are terminated.  If this phaser is already terminated, this
782     * method has no effect.  This method may be useful for
783     * coordinating recovery after one or more tasks encounter
784     * unexpected exceptions.
785     */
786    public void forceTermination() {
787        // Only need to change root state
788        final Phaser root = this.root;
789        long s;
790        while ((s = root.state) >= 0) {
791            if (UNSAFE.compareAndSwapLong(root, stateOffset,
792                                          s, s | TERMINATION_BIT)) {
793                // signal all threads
794                releaseWaiters(0); // Waiters on evenQ
795                releaseWaiters(1); // Waiters on oddQ
796                return;
797            }
798        }
799    }
800
801    /**
802     * Returns the current phase number. The maximum phase number is
803     * {@code Integer.MAX_VALUE}, after which it restarts at
804     * zero. Upon termination, the phase number is negative,
805     * in which case the prevailing phase prior to termination
806     * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
807     *
808     * @return the phase number, or a negative value if terminated
809     */
810    public final int getPhase() {
811        return (int)(root.state >>> PHASE_SHIFT);
812    }
813
814    /**
815     * Returns the number of parties registered at this phaser.
816     *
817     * @return the number of parties
818     */
819    public int getRegisteredParties() {
820        return partiesOf(state);
821    }
822
823    /**
824     * Returns the number of registered parties that have arrived at
825     * the current phase of this phaser. If this phaser has terminated,
826     * the returned value is meaningless and arbitrary.
827     *
828     * @return the number of arrived parties
829     */
830    public int getArrivedParties() {
831        return arrivedOf(reconcileState());
832    }
833
834    /**
835     * Returns the number of registered parties that have not yet
836     * arrived at the current phase of this phaser. If this phaser has
837     * terminated, the returned value is meaningless and arbitrary.
838     *
839     * @return the number of unarrived parties
840     */
841    public int getUnarrivedParties() {
842        return unarrivedOf(reconcileState());
843    }
844
845    /**
846     * Returns the parent of this phaser, or {@code null} if none.
847     *
848     * @return the parent of this phaser, or {@code null} if none
849     */
850    public Phaser getParent() {
851        return parent;
852    }
853
854    /**
855     * Returns the root ancestor of this phaser, which is the same as
856     * this phaser if it has no parent.
857     *
858     * @return the root ancestor of this phaser
859     */
860    public Phaser getRoot() {
861        return root;
862    }
863
864    /**
865     * Returns {@code true} if this phaser has been terminated.
866     *
867     * @return {@code true} if this phaser has been terminated
868     */
869    public boolean isTerminated() {
870        return root.state < 0L;
871    }
872
873    /**
874     * Overridable method to perform an action upon impending phase
875     * advance, and to control termination. This method is invoked
876     * upon arrival of the party advancing this phaser (when all other
877     * waiting parties are dormant).  If this method returns {@code
878     * true}, this phaser will be set to a final termination state
879     * upon advance, and subsequent calls to {@link #isTerminated}
880     * will return true. Any (unchecked) Exception or Error thrown by
881     * an invocation of this method is propagated to the party
882     * attempting to advance this phaser, in which case no advance
883     * occurs.
884     *
885     * <p>The arguments to this method provide the state of the phaser
886     * prevailing for the current transition.  The effects of invoking
887     * arrival, registration, and waiting methods on this phaser from
888     * within {@code onAdvance} are unspecified and should not be
889     * relied on.
890     *
891     * <p>If this phaser is a member of a tiered set of phasers, then
892     * {@code onAdvance} is invoked only for its root phaser on each
893     * advance.
894     *
895     * <p>To support the most common use cases, the default
896     * implementation of this method returns {@code true} when the
897     * number of registered parties has become zero as the result of a
898     * party invoking {@code arriveAndDeregister}.  You can disable
899     * this behavior, thus enabling continuation upon future
900     * registrations, by overriding this method to always return
901     * {@code false}:
902     *
903     * <pre> {@code
904     * Phaser phaser = new Phaser() {
905     *   protected boolean onAdvance(int phase, int parties) { return false; }
906     * }}</pre>
907     *
908     * @param phase the current phase number on entry to this method,
909     * before this phaser is advanced
910     * @param registeredParties the current number of registered parties
911     * @return {@code true} if this phaser should terminate
912     */
913    protected boolean onAdvance(int phase, int registeredParties) {
914        return registeredParties == 0;
915    }
916
917    /**
918     * Returns a string identifying this phaser, as well as its
919     * state.  The state, in brackets, includes the String {@code
920     * "phase = "} followed by the phase number, {@code "parties = "}
921     * followed by the number of registered parties, and {@code
922     * "arrived = "} followed by the number of arrived parties.
923     *
924     * @return a string identifying this phaser, as well as its state
925     */
926    public String toString() {
927        return stateToString(reconcileState());
928    }
929
930    /**
931     * Implementation of toString and string-based error messages
932     */
933    private String stateToString(long s) {
934        return super.toString() +
935            "[phase = " + phaseOf(s) +
936            " parties = " + partiesOf(s) +
937            " arrived = " + arrivedOf(s) + "]";
938    }
939
940    // Waiting mechanics
941
942    /**
943     * Removes and signals threads from queue for phase.
944     */
945    private void releaseWaiters(int phase) {
946        QNode q;   // first element of queue
947        Thread t;  // its thread
948        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
949        while ((q = head.get()) != null &&
950               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
951            if (head.compareAndSet(q, q.next) &&
952                (t = q.thread) != null) {
953                q.thread = null;
954                LockSupport.unpark(t);
955            }
956        }
957    }
958
959    /**
960     * Variant of releaseWaiters that additionally tries to remove any
961     * nodes no longer waiting for advance due to timeout or
962     * interrupt. Currently, nodes are removed only if they are at
963     * head of queue, which suffices to reduce memory footprint in
964     * most usages.
965     *
966     * @return current phase on exit
967     */
968    private int abortWait(int phase) {
969        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
970        for (;;) {
971            Thread t;
972            QNode q = head.get();
973            int p = (int)(root.state >>> PHASE_SHIFT);
974            if (q == null || ((t = q.thread) != null && q.phase == p))
975                return p;
976            if (head.compareAndSet(q, q.next) && t != null) {
977                q.thread = null;
978                LockSupport.unpark(t);
979            }
980        }
981    }
982
983    /** The number of CPUs, for spin control */
984    private static final int NCPU = Runtime.getRuntime().availableProcessors();
985
986    /**
987     * The number of times to spin before blocking while waiting for
988     * advance, per arrival while waiting. On multiprocessors, fully
989     * blocking and waking up a large number of threads all at once is
990     * usually a very slow process, so we use rechargeable spins to
991     * avoid it when threads regularly arrive: When a thread in
992     * internalAwaitAdvance notices another arrival before blocking,
993     * and there appear to be enough CPUs available, it spins
994     * SPINS_PER_ARRIVAL more times before blocking. The value trades
995     * off good-citizenship vs big unnecessary slowdowns.
996     */
997    static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
998
999    /**
1000     * Possibly blocks and waits for phase to advance unless aborted.
1001     * Call only on root phaser.
1002     *
1003     * @param phase current phase
1004     * @param node if non-null, the wait node to track interrupt and timeout;
1005     * if null, denotes noninterruptible wait
1006     * @return current phase
1007     */
1008    private int internalAwaitAdvance(int phase, QNode node) {
1009        // assert root == this;
1010        releaseWaiters(phase-1);          // ensure old queue clean
1011        boolean queued = false;           // true when node is enqueued
1012        int lastUnarrived = 0;            // to increase spins upon change
1013        int spins = SPINS_PER_ARRIVAL;
1014        long s;
1015        int p;
1016        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
1017            if (node == null) {           // spinning in noninterruptible mode
1018                int unarrived = (int)s & UNARRIVED_MASK;
1019                if (unarrived != lastUnarrived &&
1020                    (lastUnarrived = unarrived) < NCPU)
1021                    spins += SPINS_PER_ARRIVAL;
1022                boolean interrupted = Thread.interrupted();
1023                if (interrupted || --spins < 0) { // need node to record intr
1024                    node = new QNode(this, phase, false, false, 0L);
1025                    node.wasInterrupted = interrupted;
1026                }
1027            }
1028            else if (node.isReleasable()) // done or aborted
1029                break;
1030            else if (!queued) {           // push onto queue
1031                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
1032                QNode q = node.next = head.get();
1033                if ((q == null || q.phase == phase) &&
1034                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
1035                    queued = head.compareAndSet(q, node);
1036            }
1037            else {
1038                try {
1039                    ForkJoinPool.managedBlock(node);
1040                } catch (InterruptedException ie) {
1041                    node.wasInterrupted = true;
1042                }
1043            }
1044        }
1045
1046        if (node != null) {
1047            if (node.thread != null)
1048                node.thread = null;       // avoid need for unpark()
1049            if (node.wasInterrupted && !node.interruptible)
1050                Thread.currentThread().interrupt();
1051            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
1052                return abortWait(phase); // possibly clean up on abort
1053        }
1054        releaseWaiters(phase);
1055        return p;
1056    }
1057
1058    /**
1059     * Wait nodes for Treiber stack representing wait queue
1060     */
1061    static final class QNode implements ForkJoinPool.ManagedBlocker {
1062        final Phaser phaser;
1063        final int phase;
1064        final boolean interruptible;
1065        final boolean timed;
1066        boolean wasInterrupted;
1067        long nanos;
1068        long lastTime;
1069        volatile Thread thread; // nulled to cancel wait
1070        QNode next;
1071
1072        QNode(Phaser phaser, int phase, boolean interruptible,
1073              boolean timed, long nanos) {
1074            this.phaser = phaser;
1075            this.phase = phase;
1076            this.interruptible = interruptible;
1077            this.nanos = nanos;
1078            this.timed = timed;
1079            this.lastTime = timed ? System.nanoTime() : 0L;
1080            thread = Thread.currentThread();
1081        }
1082
1083        public boolean isReleasable() {
1084            if (thread == null)
1085                return true;
1086            if (phaser.getPhase() != phase) {
1087                thread = null;
1088                return true;
1089            }
1090            if (Thread.interrupted())
1091                wasInterrupted = true;
1092            if (wasInterrupted && interruptible) {
1093                thread = null;
1094                return true;
1095            }
1096            if (timed) {
1097                if (nanos > 0L) {
1098                    long now = System.nanoTime();
1099                    nanos -= now - lastTime;
1100                    lastTime = now;
1101                }
1102                if (nanos <= 0L) {
1103                    thread = null;
1104                    return true;
1105                }
1106            }
1107            return false;
1108        }
1109
1110        public boolean block() {
1111            if (isReleasable())
1112                return true;
1113            else if (!timed)
1114                LockSupport.park(this);
1115            else if (nanos > 0)
1116                LockSupport.parkNanos(this, nanos);
1117            return isReleasable();
1118        }
1119    }
1120
1121    // Unsafe mechanics
1122
1123    private static final sun.misc.Unsafe UNSAFE;
1124    private static final long stateOffset;
1125    static {
1126        try {
1127            UNSAFE = sun.misc.Unsafe.getUnsafe();
1128            Class<?> k = Phaser.class;
1129            stateOffset = UNSAFE.objectFieldOffset
1130                (k.getDeclaredField("state"));
1131        } catch (Exception e) {
1132            throw new Error(e);
1133        }
1134    }
1135}
1136