1/*
2 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 * assistance from members of JCP JSR-166 Expert Group and released to
4 * the public domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8package java.util.concurrent;
9
10/**
11 * A synchronization point at which threads can pair and swap elements
12 * within pairs.  Each thread presents some object on entry to the
13 * {@link #exchange exchange} method, matches with a partner thread,
14 * and receives its partner's object on return.  An Exchanger may be
15 * viewed as a bidirectional form of a {@link SynchronousQueue}.
16 * Exchangers may be useful in applications such as genetic algorithms
17 * and pipeline designs.
18 *
19 * <p><b>Sample Usage:</b>
20 * Here are the highlights of a class that uses an {@code Exchanger}
21 * to swap buffers between threads so that the thread filling the
22 * buffer gets a freshly emptied one when it needs it, handing off the
23 * filled one to the thread emptying the buffer.
24 * <pre> {@code
25 * class FillAndEmpty {
26 *   Exchanger<DataBuffer> exchanger = new Exchanger<>();
27 *   DataBuffer initialEmptyBuffer = ... a made-up type
28 *   DataBuffer initialFullBuffer = ...
29 *
30 *   class FillingLoop implements Runnable {
31 *     public void run() {
32 *       DataBuffer currentBuffer = initialEmptyBuffer;
33 *       try {
34 *         while (currentBuffer != null) {
35 *           addToBuffer(currentBuffer);
36 *           if (currentBuffer.isFull())
37 *             currentBuffer = exchanger.exchange(currentBuffer);
38 *         }
39 *       } catch (InterruptedException ex) { ... handle ... }
40 *     }
41 *   }
42 *
43 *   class EmptyingLoop implements Runnable {
44 *     public void run() {
45 *       DataBuffer currentBuffer = initialFullBuffer;
46 *       try {
47 *         while (currentBuffer != null) {
48 *           takeFromBuffer(currentBuffer);
49 *           if (currentBuffer.isEmpty())
50 *             currentBuffer = exchanger.exchange(currentBuffer);
51 *         }
52 *       } catch (InterruptedException ex) { ... handle ...}
53 *     }
54 *   }
55 *
56 *   void start() {
57 *     new Thread(new FillingLoop()).start();
58 *     new Thread(new EmptyingLoop()).start();
59 *   }
60 * }}</pre>
61 *
62 * <p>Memory consistency effects: For each pair of threads that
63 * successfully exchange objects via an {@code Exchanger}, actions
64 * prior to the {@code exchange()} in each thread
65 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
66 * those subsequent to a return from the corresponding {@code exchange()}
67 * in the other thread.
68 *
69 * @since 1.5
70 * @author Doug Lea and Bill Scherer and Michael Scott
71 * @param <V> The type of objects that may be exchanged
72 */
73public class Exchanger<V> {
74
75    /*
76     * Overview: The core algorithm is, for an exchange "slot",
77     * and a participant (caller) with an item:
78     *
79     * for (;;) {
80     *   if (slot is empty) {                       // offer
81     *     place item in a Node;
82     *     if (can CAS slot from empty to node) {
83     *       wait for release;
84     *       return matching item in node;
85     *     }
86     *   }
87     *   else if (can CAS slot from node to empty) { // release
88     *     get the item in node;
89     *     set matching item in node;
90     *     release waiting thread;
91     *   }
92     *   // else retry on CAS failure
93     * }
94     *
95     * This is among the simplest forms of a "dual data structure" --
96     * see Scott and Scherer's DISC 04 paper and
97     * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
98     *
99     * This works great in principle. But in practice, like many
100     * algorithms centered on atomic updates to a single location, it
101     * scales horribly when there are more than a few participants
102     * using the same Exchanger. So the implementation instead uses a
103     * form of elimination arena, that spreads out this contention by
104     * arranging that some threads typically use different slots,
105     * while still ensuring that eventually, any two parties will be
106     * able to exchange items. That is, we cannot completely partition
107     * across threads, but instead give threads arena indices that
108     * will on average grow under contention and shrink under lack of
109     * contention. We approach this by defining the Nodes that we need
110     * anyway as ThreadLocals, and include in them per-thread index
111     * and related bookkeeping state. (We can safely reuse per-thread
112     * nodes rather than creating them fresh each time because slots
113     * alternate between pointing to a node vs null, so cannot
114     * encounter ABA problems. However, we do need some care in
115     * resetting them between uses.)
116     *
117     * Implementing an effective arena requires allocating a bunch of
118     * space, so we only do so upon detecting contention (except on
119     * uniprocessors, where they wouldn't help, so aren't used).
120     * Otherwise, exchanges use the single-slot slotExchange method.
121     * On contention, not only must the slots be in different
122     * locations, but the locations must not encounter memory
123     * contention due to being on the same cache line (or more
124     * generally, the same coherence unit).  Because, as of this
125     * writing, there is no way to determine cacheline size, we define
126     * a value that is enough for common platforms.  Additionally,
127     * extra care elsewhere is taken to avoid other false/unintended
128     * sharing and to enhance locality, including adding padding (via
129     * @Contended) to Nodes, embedding "bound" as an Exchanger field,
130     * and reworking some park/unpark mechanics compared to
131     * LockSupport versions.
132     *
133     * The arena starts out with only one used slot. We expand the
134     * effective arena size by tracking collisions; i.e., failed CASes
135     * while trying to exchange. By nature of the above algorithm, the
136     * only kinds of collision that reliably indicate contention are
137     * when two attempted releases collide -- one of two attempted
138     * offers can legitimately fail to CAS without indicating
139     * contention by more than one other thread. (Note: it is possible
140     * but not worthwhile to more precisely detect contention by
141     * reading slot values after CAS failures.)  When a thread has
142     * collided at each slot within the current arena bound, it tries
143     * to expand the arena size by one. We track collisions within
144     * bounds by using a version (sequence) number on the "bound"
145     * field, and conservatively reset collision counts when a
146     * participant notices that bound has been updated (in either
147     * direction).
148     *
149     * The effective arena size is reduced (when there is more than
150     * one slot) by giving up on waiting after a while and trying to
151     * decrement the arena size on expiration. The value of "a while"
152     * is an empirical matter.  We implement by piggybacking on the
153     * use of spin->yield->block that is essential for reasonable
154     * waiting performance anyway -- in a busy exchanger, offers are
155     * usually almost immediately released, in which case context
156     * switching on multiprocessors is extremely slow/wasteful.  Arena
157     * waits just omit the blocking part, and instead cancel. The spin
158     * count is empirically chosen to be a value that avoids blocking
159     * 99% of the time under maximum sustained exchange rates on a
160     * range of test machines. Spins and yields entail some limited
161     * randomness (using a cheap xorshift) to avoid regular patterns
162     * that can induce unproductive grow/shrink cycles. (Using a
163     * pseudorandom also helps regularize spin cycle duration by
164     * making branches unpredictable.)  Also, during an offer, a
165     * waiter can "know" that it will be released when its slot has
166     * changed, but cannot yet proceed until match is set.  In the
167     * mean time it cannot cancel the offer, so instead spins/yields.
168     * Note: It is possible to avoid this secondary check by changing
169     * the linearization point to be a CAS of the match field (as done
170     * in one case in the Scott & Scherer DISC paper), which also
171     * increases asynchrony a bit, at the expense of poorer collision
172     * detection and inability to always reuse per-thread nodes. So
173     * the current scheme is typically a better tradeoff.
174     *
175     * On collisions, indices traverse the arena cyclically in reverse
176     * order, restarting at the maximum index (which will tend to be
177     * sparsest) when bounds change. (On expirations, indices instead
178     * are halved until reaching 0.) It is possible (and has been
179     * tried) to use randomized, prime-value-stepped, or double-hash
180     * style traversal instead of simple cyclic traversal to reduce
181     * bunching.  But empirically, whatever benefits these may have
182     * don't overcome their added overhead: We are managing operations
183     * that occur very quickly unless there is sustained contention,
184     * so simpler/faster control policies work better than more
185     * accurate but slower ones.
186     *
187     * Because we use expiration for arena size control, we cannot
188     * throw TimeoutExceptions in the timed version of the public
189     * exchange method until the arena size has shrunken to zero (or
190     * the arena isn't enabled). This may delay response to timeout
191     * but is still within spec.
192     *
193     * Essentially all of the implementation is in methods
194     * slotExchange and arenaExchange. These have similar overall
195     * structure, but differ in too many details to combine. The
196     * slotExchange method uses the single Exchanger field "slot"
197     * rather than arena array elements. However, it still needs
198     * minimal collision detection to trigger arena construction.
199     * (The messiest part is making sure interrupt status and
200     * InterruptedExceptions come out right during transitions when
201     * both methods may be called. This is done by using null return
202     * as a sentinel to recheck interrupt status.)
203     *
204     * As is too common in this sort of code, methods are monolithic
205     * because most of the logic relies on reads of fields that are
206     * maintained as local variables so can't be nicely factored --
207     * mainly, here, bulky spin->yield->block/cancel code), and
208     * heavily dependent on intrinsics (Unsafe) to use inlined
209     * embedded CAS and related memory access operations (that tend
210     * not to be as readily inlined by dynamic compilers when they are
211     * hidden behind other methods that would more nicely name and
212     * encapsulate the intended effects). This includes the use of
213     * putOrderedX to clear fields of the per-thread Nodes between
214     * uses. Note that field Node.item is not declared as volatile
215     * even though it is read by releasing threads, because they only
216     * do so after CAS operations that must precede access, and all
217     * uses by the owning thread are otherwise acceptably ordered by
218     * other operations. (Because the actual points of atomicity are
219     * slot CASes, it would also be legal for the write to Node.match
220     * in a release to be weaker than a full volatile write. However,
221     * this is not done because it could allow further postponement of
222     * the write, delaying progress.)
223     */
224
225    /**
226     * The byte distance (as a shift value) between any two used slots
227     * in the arena.  1 << ASHIFT should be at least cacheline size.
228     */
229    private static final int ASHIFT = 7;
230
231    /**
232     * The maximum supported arena index. The maximum allocatable
233     * arena size is MMASK + 1. Must be a power of two minus one, less
234     * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
235     * for the expected scaling limits of the main algorithms.
236     */
237    private static final int MMASK = 0xff;
238
239    /**
240     * Unit for sequence/version bits of bound field. Each successful
241     * change to the bound also adds SEQ.
242     */
243    private static final int SEQ = MMASK + 1;
244
245    /** The number of CPUs, for sizing and spin control */
246    private static final int NCPU = Runtime.getRuntime().availableProcessors();
247
248    /**
249     * The maximum slot index of the arena: The number of slots that
250     * can in principle hold all threads without contention, or at
251     * most the maximum indexable value.
252     */
253    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
254
255    /**
256     * The bound for spins while waiting for a match. The actual
257     * number of iterations will on average be about twice this value
258     * due to randomization. Note: Spinning is disabled when NCPU==1.
259     */
260    private static final int SPINS = 1 << 10;
261
262    /**
263     * Value representing null arguments/returns from public
264     * methods. Needed because the API originally didn't disallow null
265     * arguments, which it should have.
266     */
267    private static final Object NULL_ITEM = new Object();
268
269    /**
270     * Sentinel value returned by internal exchange methods upon
271     * timeout, to avoid need for separate timed versions of these
272     * methods.
273     */
274    private static final Object TIMED_OUT = new Object();
275
276    /**
277     * Nodes hold partially exchanged data, plus other per-thread
278     * bookkeeping. Padded via @Contended to reduce memory contention.
279     */
280    //@jdk.internal.vm.annotation.Contended // android-removed
281    static final class Node {
282        int index;              // Arena index
283        int bound;              // Last recorded value of Exchanger.bound
284        int collides;           // Number of CAS failures at current bound
285        int hash;               // Pseudo-random for spins
286        Object item;            // This thread's current item
287        volatile Object match;  // Item provided by releasing thread
288        volatile Thread parked; // Set to this thread when parked, else null
289    }
290
291    /** The corresponding thread local class */
292    static final class Participant extends ThreadLocal<Node> {
293        public Node initialValue() { return new Node(); }
294    }
295
296    /**
297     * Per-thread state.
298     */
299    private final Participant participant;
300
301    /**
302     * Elimination array; null until enabled (within slotExchange).
303     * Element accesses use emulation of volatile gets and CAS.
304     */
305    private volatile Node[] arena;
306
307    /**
308     * Slot used until contention detected.
309     */
310    private volatile Node slot;
311
312    /**
313     * The index of the largest valid arena position, OR'ed with SEQ
314     * number in high bits, incremented on each update.  The initial
315     * update from 0 to SEQ is used to ensure that the arena array is
316     * constructed only once.
317     */
318    private volatile int bound;
319
320    /**
321     * Exchange function when arenas enabled. See above for explanation.
322     *
323     * @param item the (non-null) item to exchange
324     * @param timed true if the wait is timed
325     * @param ns if timed, the maximum wait time, else 0L
326     * @return the other thread's item; or null if interrupted; or
327     * TIMED_OUT if timed and timed out
328     */
329    private final Object arenaExchange(Object item, boolean timed, long ns) {
330        Node[] a = arena;
331        Node p = participant.get();
332        for (int i = p.index;;) {                      // access slot at i
333            int b, m, c; long j;                       // j is raw array offset
334            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
335            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
336                Object v = q.item;                     // release
337                q.match = item;
338                Thread w = q.parked;
339                if (w != null)
340                    U.unpark(w);
341                return v;
342            }
343            else if (i <= (m = (b = bound) & MMASK) && q == null) {
344                p.item = item;                         // offer
345                if (U.compareAndSwapObject(a, j, null, p)) {
346                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
347                    Thread t = Thread.currentThread(); // wait
348                    for (int h = p.hash, spins = SPINS;;) {
349                        Object v = p.match;
350                        if (v != null) {
351                            U.putOrderedObject(p, MATCH, null);
352                            p.item = null;             // clear for next use
353                            p.hash = h;
354                            return v;
355                        }
356                        else if (spins > 0) {
357                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
358                            if (h == 0)                // initialize hash
359                                h = SPINS | (int)t.getId();
360                            else if (h < 0 &&          // approx 50% true
361                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
362                                Thread.yield();        // two yields per wait
363                        }
364                        else if (U.getObjectVolatile(a, j) != p)
365                            spins = SPINS;       // releaser hasn't set match yet
366                        else if (!t.isInterrupted() && m == 0 &&
367                                 (!timed ||
368                                  (ns = end - System.nanoTime()) > 0L)) {
369                            U.putObject(t, BLOCKER, this); // emulate LockSupport
370                            p.parked = t;              // minimize window
371                            if (U.getObjectVolatile(a, j) == p)
372                                U.park(false, ns);
373                            p.parked = null;
374                            U.putObject(t, BLOCKER, null);
375                        }
376                        else if (U.getObjectVolatile(a, j) == p &&
377                                 U.compareAndSwapObject(a, j, p, null)) {
378                            if (m != 0)                // try to shrink
379                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
380                            p.item = null;
381                            p.hash = h;
382                            i = p.index >>>= 1;        // descend
383                            if (Thread.interrupted())
384                                return null;
385                            if (timed && m == 0 && ns <= 0L)
386                                return TIMED_OUT;
387                            break;                     // expired; restart
388                        }
389                    }
390                }
391                else
392                    p.item = null;                     // clear offer
393            }
394            else {
395                if (p.bound != b) {                    // stale; reset
396                    p.bound = b;
397                    p.collides = 0;
398                    i = (i != m || m == 0) ? m : m - 1;
399                }
400                else if ((c = p.collides) < m || m == FULL ||
401                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
402                    p.collides = c + 1;
403                    i = (i == 0) ? m : i - 1;          // cyclically traverse
404                }
405                else
406                    i = m + 1;                         // grow
407                p.index = i;
408            }
409        }
410    }
411
412    /**
413     * Exchange function used until arenas enabled. See above for explanation.
414     *
415     * @param item the item to exchange
416     * @param timed true if the wait is timed
417     * @param ns if timed, the maximum wait time, else 0L
418     * @return the other thread's item; or null if either the arena
419     * was enabled or the thread was interrupted before completion; or
420     * TIMED_OUT if timed and timed out
421     */
422    private final Object slotExchange(Object item, boolean timed, long ns) {
423        Node p = participant.get();
424        Thread t = Thread.currentThread();
425        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
426            return null;
427
428        for (Node q;;) {
429            if ((q = slot) != null) {
430                if (U.compareAndSwapObject(this, SLOT, q, null)) {
431                    Object v = q.item;
432                    q.match = item;
433                    Thread w = q.parked;
434                    if (w != null)
435                        U.unpark(w);
436                    return v;
437                }
438                // create arena on contention, but continue until slot null
439                if (NCPU > 1 && bound == 0 &&
440                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
441                    arena = new Node[(FULL + 2) << ASHIFT];
442            }
443            else if (arena != null)
444                return null; // caller must reroute to arenaExchange
445            else {
446                p.item = item;
447                if (U.compareAndSwapObject(this, SLOT, null, p))
448                    break;
449                p.item = null;
450            }
451        }
452
453        // await release
454        int h = p.hash;
455        long end = timed ? System.nanoTime() + ns : 0L;
456        int spins = (NCPU > 1) ? SPINS : 1;
457        Object v;
458        while ((v = p.match) == null) {
459            if (spins > 0) {
460                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
461                if (h == 0)
462                    h = SPINS | (int)t.getId();
463                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
464                    Thread.yield();
465            }
466            else if (slot != p)
467                spins = SPINS;
468            else if (!t.isInterrupted() && arena == null &&
469                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
470                U.putObject(t, BLOCKER, this);
471                p.parked = t;
472                if (slot == p)
473                    U.park(false, ns);
474                p.parked = null;
475                U.putObject(t, BLOCKER, null);
476            }
477            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
478                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
479                break;
480            }
481        }
482        U.putOrderedObject(p, MATCH, null);
483        p.item = null;
484        p.hash = h;
485        return v;
486    }
487
488    /**
489     * Creates a new Exchanger.
490     */
491    public Exchanger() {
492        participant = new Participant();
493    }
494
495    /**
496     * Waits for another thread to arrive at this exchange point (unless
497     * the current thread is {@linkplain Thread#interrupt interrupted}),
498     * and then transfers the given object to it, receiving its object
499     * in return.
500     *
501     * <p>If another thread is already waiting at the exchange point then
502     * it is resumed for thread scheduling purposes and receives the object
503     * passed in by the current thread.  The current thread returns immediately,
504     * receiving the object passed to the exchange by that other thread.
505     *
506     * <p>If no other thread is already waiting at the exchange then the
507     * current thread is disabled for thread scheduling purposes and lies
508     * dormant until one of two things happens:
509     * <ul>
510     * <li>Some other thread enters the exchange; or
511     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
512     * the current thread.
513     * </ul>
514     * <p>If the current thread:
515     * <ul>
516     * <li>has its interrupted status set on entry to this method; or
517     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
518     * for the exchange,
519     * </ul>
520     * then {@link InterruptedException} is thrown and the current thread's
521     * interrupted status is cleared.
522     *
523     * @param x the object to exchange
524     * @return the object provided by the other thread
525     * @throws InterruptedException if the current thread was
526     *         interrupted while waiting
527     */
528    @SuppressWarnings("unchecked")
529    public V exchange(V x) throws InterruptedException {
530        Object v;
531        Object item = (x == null) ? NULL_ITEM : x; // translate null args
532        if ((arena != null ||
533             (v = slotExchange(item, false, 0L)) == null) &&
534            ((Thread.interrupted() || // disambiguates null return
535              (v = arenaExchange(item, false, 0L)) == null)))
536            throw new InterruptedException();
537        return (v == NULL_ITEM) ? null : (V)v;
538    }
539
540    /**
541     * Waits for another thread to arrive at this exchange point (unless
542     * the current thread is {@linkplain Thread#interrupt interrupted} or
543     * the specified waiting time elapses), and then transfers the given
544     * object to it, receiving its object in return.
545     *
546     * <p>If another thread is already waiting at the exchange point then
547     * it is resumed for thread scheduling purposes and receives the object
548     * passed in by the current thread.  The current thread returns immediately,
549     * receiving the object passed to the exchange by that other thread.
550     *
551     * <p>If no other thread is already waiting at the exchange then the
552     * current thread is disabled for thread scheduling purposes and lies
553     * dormant until one of three things happens:
554     * <ul>
555     * <li>Some other thread enters the exchange; or
556     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
557     * the current thread; or
558     * <li>The specified waiting time elapses.
559     * </ul>
560     * <p>If the current thread:
561     * <ul>
562     * <li>has its interrupted status set on entry to this method; or
563     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
564     * for the exchange,
565     * </ul>
566     * then {@link InterruptedException} is thrown and the current thread's
567     * interrupted status is cleared.
568     *
569     * <p>If the specified waiting time elapses then {@link
570     * TimeoutException} is thrown.  If the time is less than or equal
571     * to zero, the method will not wait at all.
572     *
573     * @param x the object to exchange
574     * @param timeout the maximum time to wait
575     * @param unit the time unit of the {@code timeout} argument
576     * @return the object provided by the other thread
577     * @throws InterruptedException if the current thread was
578     *         interrupted while waiting
579     * @throws TimeoutException if the specified waiting time elapses
580     *         before another thread enters the exchange
581     */
582    @SuppressWarnings("unchecked")
583    public V exchange(V x, long timeout, TimeUnit unit)
584        throws InterruptedException, TimeoutException {
585        Object v;
586        Object item = (x == null) ? NULL_ITEM : x;
587        long ns = unit.toNanos(timeout);
588        if ((arena != null ||
589             (v = slotExchange(item, true, ns)) == null) &&
590            ((Thread.interrupted() ||
591              (v = arenaExchange(item, true, ns)) == null)))
592            throw new InterruptedException();
593        if (v == TIMED_OUT)
594            throw new TimeoutException();
595        return (v == NULL_ITEM) ? null : (V)v;
596    }
597
598    // Unsafe mechanics
599    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
600    private static final long BOUND;
601    private static final long SLOT;
602    private static final long MATCH;
603    private static final long BLOCKER;
604    private static final int ABASE;
605    static {
606        try {
607            BOUND = U.objectFieldOffset
608                (Exchanger.class.getDeclaredField("bound"));
609            SLOT = U.objectFieldOffset
610                (Exchanger.class.getDeclaredField("slot"));
611
612            MATCH = U.objectFieldOffset
613                (Node.class.getDeclaredField("match"));
614
615            BLOCKER = U.objectFieldOffset
616                (Thread.class.getDeclaredField("parkBlocker"));
617
618            int scale = U.arrayIndexScale(Node[].class);
619            if ((scale & (scale - 1)) != 0 || scale > (1 << ASHIFT))
620                throw new Error("Unsupported array scale");
621            // ABASE absorbs padding in front of element 0
622            ABASE = U.arrayBaseOffset(Node[].class) + (1 << ASHIFT);
623        } catch (ReflectiveOperationException e) {
624            throw new Error(e);
625        }
626    }
627
628}
629