Exchanger.java revision 6975f84c2ed72e1e26d20190b6f318718c849008
1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea, Bill Scherer, and Michael Scott with
32 * assistance from members of JCP JSR-166 Expert Group and released to
33 * the public domain, as explained at
34 * http://creativecommons.org/publicdomain/zero/1.0/
35 */
36
37package java.util.concurrent;
38
39/**
40 * A synchronization point at which threads can pair and swap elements
41 * within pairs.  Each thread presents some object on entry to the
42 * {@link #exchange exchange} method, matches with a partner thread,
43 * and receives its partner's object on return.  An Exchanger may be
44 * viewed as a bidirectional form of a {@link SynchronousQueue}.
45 * Exchangers may be useful in applications such as genetic algorithms
46 * and pipeline designs.
47 *
48 * <p><b>Sample Usage:</b>
49 * Here are the highlights of a class that uses an {@code Exchanger}
50 * to swap buffers between threads so that the thread filling the
51 * buffer gets a freshly emptied one when it needs it, handing off the
52 * filled one to the thread emptying the buffer.
53 * <pre> {@code
54 * class FillAndEmpty {
55 *   Exchanger<DataBuffer> exchanger = new Exchanger<>();
56 *   DataBuffer initialEmptyBuffer = ... a made-up type
57 *   DataBuffer initialFullBuffer = ...
58 *
59 *   class FillingLoop implements Runnable {
60 *     public void run() {
61 *       DataBuffer currentBuffer = initialEmptyBuffer;
62 *       try {
63 *         while (currentBuffer != null) {
64 *           addToBuffer(currentBuffer);
65 *           if (currentBuffer.isFull())
66 *             currentBuffer = exchanger.exchange(currentBuffer);
67 *         }
68 *       } catch (InterruptedException ex) { ... handle ... }
69 *     }
70 *   }
71 *
72 *   class EmptyingLoop implements Runnable {
73 *     public void run() {
74 *       DataBuffer currentBuffer = initialFullBuffer;
75 *       try {
76 *         while (currentBuffer != null) {
77 *           takeFromBuffer(currentBuffer);
78 *           if (currentBuffer.isEmpty())
79 *             currentBuffer = exchanger.exchange(currentBuffer);
80 *         }
81 *       } catch (InterruptedException ex) { ... handle ...}
82 *     }
83 *   }
84 *
85 *   void start() {
86 *     new Thread(new FillingLoop()).start();
87 *     new Thread(new EmptyingLoop()).start();
88 *   }
89 * }}</pre>
90 *
91 * <p>Memory consistency effects: For each pair of threads that
92 * successfully exchange objects via an {@code Exchanger}, actions
93 * prior to the {@code exchange()} in each thread
94 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
95 * those subsequent to a return from the corresponding {@code exchange()}
96 * in the other thread.
97 *
98 * @since 1.5
99 * @author Doug Lea and Bill Scherer and Michael Scott
100 * @param <V> The type of objects that may be exchanged
101 */
102public class Exchanger<V> {
103
104    /*
105     * Overview: The core algorithm is, for an exchange "slot",
106     * and a participant (caller) with an item:
107     *
108     * for (;;) {
109     *   if (slot is empty) {                       // offer
110     *     place item in a Node;
111     *     if (can CAS slot from empty to node) {
112     *       wait for release;
113     *       return matching item in node;
114     *     }
115     *   }
116     *   else if (can CAS slot from node to empty) { // release
117     *     get the item in node;
118     *     set matching item in node;
119     *     release waiting thread;
120     *   }
121     *   // else retry on CAS failure
122     * }
123     *
124     * This is among the simplest forms of a "dual data structure" --
125     * see Scott and Scherer's DISC 04 paper and
126     * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
127     *
128     * This works great in principle. But in practice, like many
129     * algorithms centered on atomic updates to a single location, it
130     * scales horribly when there are more than a few participants
131     * using the same Exchanger. So the implementation instead uses a
132     * form of elimination arena, that spreads out this contention by
133     * arranging that some threads typically use different slots,
134     * while still ensuring that eventually, any two parties will be
135     * able to exchange items. That is, we cannot completely partition
136     * across threads, but instead give threads arena indices that
137     * will on average grow under contention and shrink under lack of
138     * contention. We approach this by defining the Nodes that we need
139     * anyway as ThreadLocals, and include in them per-thread index
140     * and related bookkeeping state. (We can safely reuse per-thread
141     * nodes rather than creating them fresh each time because slots
142     * alternate between pointing to a node vs null, so cannot
143     * encounter ABA problems. However, we do need some care in
144     * resetting them between uses.)
145     *
146     * Implementing an effective arena requires allocating a bunch of
147     * space, so we only do so upon detecting contention (except on
148     * uniprocessors, where they wouldn't help, so aren't used).
149     * Otherwise, exchanges use the single-slot slotExchange method.
150     * On contention, not only must the slots be in different
151     * locations, but the locations must not encounter memory
152     * contention due to being on the same cache line (or more
153     * generally, the same coherence unit).  Because, as of this
154     * writing, there is no way to determine cacheline size, we define
155     * a value that is enough for common platforms.  Additionally,
156     * extra care elsewhere is taken to avoid other false/unintended
157     * sharing and to enhance locality, including adding padding (via
158     * @Contended) to Nodes, embedding "bound" as an Exchanger field,
159     * and reworking some park/unpark mechanics compared to
160     * LockSupport versions.
161     *
162     * The arena starts out with only one used slot. We expand the
163     * effective arena size by tracking collisions; i.e., failed CASes
164     * while trying to exchange. By nature of the above algorithm, the
165     * only kinds of collision that reliably indicate contention are
166     * when two attempted releases collide -- one of two attempted
167     * offers can legitimately fail to CAS without indicating
168     * contention by more than one other thread. (Note: it is possible
169     * but not worthwhile to more precisely detect contention by
170     * reading slot values after CAS failures.)  When a thread has
171     * collided at each slot within the current arena bound, it tries
172     * to expand the arena size by one. We track collisions within
173     * bounds by using a version (sequence) number on the "bound"
174     * field, and conservatively reset collision counts when a
175     * participant notices that bound has been updated (in either
176     * direction).
177     *
178     * The effective arena size is reduced (when there is more than
179     * one slot) by giving up on waiting after a while and trying to
180     * decrement the arena size on expiration. The value of "a while"
181     * is an empirical matter.  We implement by piggybacking on the
182     * use of spin->yield->block that is essential for reasonable
183     * waiting performance anyway -- in a busy exchanger, offers are
184     * usually almost immediately released, in which case context
185     * switching on multiprocessors is extremely slow/wasteful.  Arena
186     * waits just omit the blocking part, and instead cancel. The spin
187     * count is empirically chosen to be a value that avoids blocking
188     * 99% of the time under maximum sustained exchange rates on a
189     * range of test machines. Spins and yields entail some limited
190     * randomness (using a cheap xorshift) to avoid regular patterns
191     * that can induce unproductive grow/shrink cycles. (Using a
192     * pseudorandom also helps regularize spin cycle duration by
193     * making branches unpredictable.)  Also, during an offer, a
194     * waiter can "know" that it will be released when its slot has
195     * changed, but cannot yet proceed until match is set.  In the
196     * mean time it cannot cancel the offer, so instead spins/yields.
197     * Note: It is possible to avoid this secondary check by changing
198     * the linearization point to be a CAS of the match field (as done
199     * in one case in the Scott & Scherer DISC paper), which also
200     * increases asynchrony a bit, at the expense of poorer collision
201     * detection and inability to always reuse per-thread nodes. So
202     * the current scheme is typically a better tradeoff.
203     *
204     * On collisions, indices traverse the arena cyclically in reverse
205     * order, restarting at the maximum index (which will tend to be
206     * sparsest) when bounds change. (On expirations, indices instead
207     * are halved until reaching 0.) It is possible (and has been
208     * tried) to use randomized, prime-value-stepped, or double-hash
209     * style traversal instead of simple cyclic traversal to reduce
210     * bunching.  But empirically, whatever benefits these may have
211     * don't overcome their added overhead: We are managing operations
212     * that occur very quickly unless there is sustained contention,
213     * so simpler/faster control policies work better than more
214     * accurate but slower ones.
215     *
216     * Because we use expiration for arena size control, we cannot
217     * throw TimeoutExceptions in the timed version of the public
218     * exchange method until the arena size has shrunken to zero (or
219     * the arena isn't enabled). This may delay response to timeout
220     * but is still within spec.
221     *
222     * Essentially all of the implementation is in methods
223     * slotExchange and arenaExchange. These have similar overall
224     * structure, but differ in too many details to combine. The
225     * slotExchange method uses the single Exchanger field "slot"
226     * rather than arena array elements. However, it still needs
227     * minimal collision detection to trigger arena construction.
228     * (The messiest part is making sure interrupt status and
229     * InterruptedExceptions come out right during transitions when
230     * both methods may be called. This is done by using null return
231     * as a sentinel to recheck interrupt status.)
232     *
233     * As is too common in this sort of code, methods are monolithic
234     * because most of the logic relies on reads of fields that are
235     * maintained as local variables so can't be nicely factored --
236     * mainly, here, bulky spin->yield->block/cancel code), and
237     * heavily dependent on intrinsics (Unsafe) to use inlined
238     * embedded CAS and related memory access operations (that tend
239     * not to be as readily inlined by dynamic compilers when they are
240     * hidden behind other methods that would more nicely name and
241     * encapsulate the intended effects). This includes the use of
242     * putOrderedX to clear fields of the per-thread Nodes between
243     * uses. Note that field Node.item is not declared as volatile
244     * even though it is read by releasing threads, because they only
245     * do so after CAS operations that must precede access, and all
246     * uses by the owning thread are otherwise acceptably ordered by
247     * other operations. (Because the actual points of atomicity are
248     * slot CASes, it would also be legal for the write to Node.match
249     * in a release to be weaker than a full volatile write. However,
250     * this is not done because it could allow further postponement of
251     * the write, delaying progress.)
252     */
253
254    /**
255     * The byte distance (as a shift value) between any two used slots
256     * in the arena.  1 << ASHIFT should be at least cacheline size.
257     */
258    private static final int ASHIFT = 7;
259
260    /**
261     * The maximum supported arena index. The maximum allocatable
262     * arena size is MMASK + 1. Must be a power of two minus one, less
263     * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
264     * for the expected scaling limits of the main algorithms.
265     */
266    private static final int MMASK = 0xff;
267
268    /**
269     * Unit for sequence/version bits of bound field. Each successful
270     * change to the bound also adds SEQ.
271     */
272    private static final int SEQ = MMASK + 1;
273
274    /** The number of CPUs, for sizing and spin control */
275    private static final int NCPU = Runtime.getRuntime().availableProcessors();
276
277    /**
278     * The maximum slot index of the arena: The number of slots that
279     * can in principle hold all threads without contention, or at
280     * most the maximum indexable value.
281     */
282    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
283
284    /**
285     * The bound for spins while waiting for a match. The actual
286     * number of iterations will on average be about twice this value
287     * due to randomization. Note: Spinning is disabled when NCPU==1.
288     */
289    private static final int SPINS = 1 << 10;
290
291    /**
292     * Value representing null arguments/returns from public
293     * methods. Needed because the API originally didn't disallow null
294     * arguments, which it should have.
295     */
296    private static final Object NULL_ITEM = new Object();
297
298    /**
299     * Sentinel value returned by internal exchange methods upon
300     * timeout, to avoid need for separate timed versions of these
301     * methods.
302     */
303    private static final Object TIMED_OUT = new Object();
304
305    /**
306     * Nodes hold partially exchanged data, plus other per-thread
307     * bookkeeping. Padded via @Contended to reduce memory contention.
308     */
309    //@jdk.internal.vm.annotation.Contended // Android-removed
310    static final class Node {
311        int index;              // Arena index
312        int bound;              // Last recorded value of Exchanger.bound
313        int collides;           // Number of CAS failures at current bound
314        int hash;               // Pseudo-random for spins
315        Object item;            // This thread's current item
316        volatile Object match;  // Item provided by releasing thread
317        volatile Thread parked; // Set to this thread when parked, else null
318    }
319
320    /** The corresponding thread local class */
321    static final class Participant extends ThreadLocal<Node> {
322        public Node initialValue() { return new Node(); }
323    }
324
325    /**
326     * Per-thread state.
327     */
328    private final Participant participant;
329
330    /**
331     * Elimination array; null until enabled (within slotExchange).
332     * Element accesses use emulation of volatile gets and CAS.
333     */
334    private volatile Node[] arena;
335
336    /**
337     * Slot used until contention detected.
338     */
339    private volatile Node slot;
340
341    /**
342     * The index of the largest valid arena position, OR'ed with SEQ
343     * number in high bits, incremented on each update.  The initial
344     * update from 0 to SEQ is used to ensure that the arena array is
345     * constructed only once.
346     */
347    private volatile int bound;
348
349    /**
350     * Exchange function when arenas enabled. See above for explanation.
351     *
352     * @param item the (non-null) item to exchange
353     * @param timed true if the wait is timed
354     * @param ns if timed, the maximum wait time, else 0L
355     * @return the other thread's item; or null if interrupted; or
356     * TIMED_OUT if timed and timed out
357     */
358    private final Object arenaExchange(Object item, boolean timed, long ns) {
359        Node[] a = arena;
360        Node p = participant.get();
361        for (int i = p.index;;) {                      // access slot at i
362            int b, m, c; long j;                       // j is raw array offset
363            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
364            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
365                Object v = q.item;                     // release
366                q.match = item;
367                Thread w = q.parked;
368                if (w != null)
369                    U.unpark(w);
370                return v;
371            }
372            else if (i <= (m = (b = bound) & MMASK) && q == null) {
373                p.item = item;                         // offer
374                if (U.compareAndSwapObject(a, j, null, p)) {
375                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
376                    Thread t = Thread.currentThread(); // wait
377                    for (int h = p.hash, spins = SPINS;;) {
378                        Object v = p.match;
379                        if (v != null) {
380                            U.putOrderedObject(p, MATCH, null);
381                            p.item = null;             // clear for next use
382                            p.hash = h;
383                            return v;
384                        }
385                        else if (spins > 0) {
386                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
387                            if (h == 0)                // initialize hash
388                                h = SPINS | (int)t.getId();
389                            else if (h < 0 &&          // approx 50% true
390                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
391                                Thread.yield();        // two yields per wait
392                        }
393                        else if (U.getObjectVolatile(a, j) != p)
394                            spins = SPINS;       // releaser hasn't set match yet
395                        else if (!t.isInterrupted() && m == 0 &&
396                                 (!timed ||
397                                  (ns = end - System.nanoTime()) > 0L)) {
398                            U.putObject(t, BLOCKER, this); // emulate LockSupport
399                            p.parked = t;              // minimize window
400                            if (U.getObjectVolatile(a, j) == p)
401                                U.park(false, ns);
402                            p.parked = null;
403                            U.putObject(t, BLOCKER, null);
404                        }
405                        else if (U.getObjectVolatile(a, j) == p &&
406                                 U.compareAndSwapObject(a, j, p, null)) {
407                            if (m != 0)                // try to shrink
408                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
409                            p.item = null;
410                            p.hash = h;
411                            i = p.index >>>= 1;        // descend
412                            if (Thread.interrupted())
413                                return null;
414                            if (timed && m == 0 && ns <= 0L)
415                                return TIMED_OUT;
416                            break;                     // expired; restart
417                        }
418                    }
419                }
420                else
421                    p.item = null;                     // clear offer
422            }
423            else {
424                if (p.bound != b) {                    // stale; reset
425                    p.bound = b;
426                    p.collides = 0;
427                    i = (i != m || m == 0) ? m : m - 1;
428                }
429                else if ((c = p.collides) < m || m == FULL ||
430                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
431                    p.collides = c + 1;
432                    i = (i == 0) ? m : i - 1;          // cyclically traverse
433                }
434                else
435                    i = m + 1;                         // grow
436                p.index = i;
437            }
438        }
439    }
440
441    /**
442     * Exchange function used until arenas enabled. See above for explanation.
443     *
444     * @param item the item to exchange
445     * @param timed true if the wait is timed
446     * @param ns if timed, the maximum wait time, else 0L
447     * @return the other thread's item; or null if either the arena
448     * was enabled or the thread was interrupted before completion; or
449     * TIMED_OUT if timed and timed out
450     */
451    private final Object slotExchange(Object item, boolean timed, long ns) {
452        Node p = participant.get();
453        Thread t = Thread.currentThread();
454        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
455            return null;
456
457        for (Node q;;) {
458            if ((q = slot) != null) {
459                if (U.compareAndSwapObject(this, SLOT, q, null)) {
460                    Object v = q.item;
461                    q.match = item;
462                    Thread w = q.parked;
463                    if (w != null)
464                        U.unpark(w);
465                    return v;
466                }
467                // create arena on contention, but continue until slot null
468                if (NCPU > 1 && bound == 0 &&
469                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
470                    arena = new Node[(FULL + 2) << ASHIFT];
471            }
472            else if (arena != null)
473                return null; // caller must reroute to arenaExchange
474            else {
475                p.item = item;
476                if (U.compareAndSwapObject(this, SLOT, null, p))
477                    break;
478                p.item = null;
479            }
480        }
481
482        // await release
483        int h = p.hash;
484        long end = timed ? System.nanoTime() + ns : 0L;
485        int spins = (NCPU > 1) ? SPINS : 1;
486        Object v;
487        while ((v = p.match) == null) {
488            if (spins > 0) {
489                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
490                if (h == 0)
491                    h = SPINS | (int)t.getId();
492                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
493                    Thread.yield();
494            }
495            else if (slot != p)
496                spins = SPINS;
497            else if (!t.isInterrupted() && arena == null &&
498                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
499                U.putObject(t, BLOCKER, this);
500                p.parked = t;
501                if (slot == p)
502                    U.park(false, ns);
503                p.parked = null;
504                U.putObject(t, BLOCKER, null);
505            }
506            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
507                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
508                break;
509            }
510        }
511        U.putOrderedObject(p, MATCH, null);
512        p.item = null;
513        p.hash = h;
514        return v;
515    }
516
517    /**
518     * Creates a new Exchanger.
519     */
520    public Exchanger() {
521        participant = new Participant();
522    }
523
524    /**
525     * Waits for another thread to arrive at this exchange point (unless
526     * the current thread is {@linkplain Thread#interrupt interrupted}),
527     * and then transfers the given object to it, receiving its object
528     * in return.
529     *
530     * <p>If another thread is already waiting at the exchange point then
531     * it is resumed for thread scheduling purposes and receives the object
532     * passed in by the current thread.  The current thread returns immediately,
533     * receiving the object passed to the exchange by that other thread.
534     *
535     * <p>If no other thread is already waiting at the exchange then the
536     * current thread is disabled for thread scheduling purposes and lies
537     * dormant until one of two things happens:
538     * <ul>
539     * <li>Some other thread enters the exchange; or
540     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
541     * the current thread.
542     * </ul>
543     * <p>If the current thread:
544     * <ul>
545     * <li>has its interrupted status set on entry to this method; or
546     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
547     * for the exchange,
548     * </ul>
549     * then {@link InterruptedException} is thrown and the current thread's
550     * interrupted status is cleared.
551     *
552     * @param x the object to exchange
553     * @return the object provided by the other thread
554     * @throws InterruptedException if the current thread was
555     *         interrupted while waiting
556     */
557    @SuppressWarnings("unchecked")
558    public V exchange(V x) throws InterruptedException {
559        Object v;
560        Object item = (x == null) ? NULL_ITEM : x; // translate null args
561        if ((arena != null ||
562             (v = slotExchange(item, false, 0L)) == null) &&
563            ((Thread.interrupted() || // disambiguates null return
564              (v = arenaExchange(item, false, 0L)) == null)))
565            throw new InterruptedException();
566        return (v == NULL_ITEM) ? null : (V)v;
567    }
568
569    /**
570     * Waits for another thread to arrive at this exchange point (unless
571     * the current thread is {@linkplain Thread#interrupt interrupted} or
572     * the specified waiting time elapses), and then transfers the given
573     * object to it, receiving its object in return.
574     *
575     * <p>If another thread is already waiting at the exchange point then
576     * it is resumed for thread scheduling purposes and receives the object
577     * passed in by the current thread.  The current thread returns immediately,
578     * receiving the object passed to the exchange by that other thread.
579     *
580     * <p>If no other thread is already waiting at the exchange then the
581     * current thread is disabled for thread scheduling purposes and lies
582     * dormant until one of three things happens:
583     * <ul>
584     * <li>Some other thread enters the exchange; or
585     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
586     * the current thread; or
587     * <li>The specified waiting time elapses.
588     * </ul>
589     * <p>If the current thread:
590     * <ul>
591     * <li>has its interrupted status set on entry to this method; or
592     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
593     * for the exchange,
594     * </ul>
595     * then {@link InterruptedException} is thrown and the current thread's
596     * interrupted status is cleared.
597     *
598     * <p>If the specified waiting time elapses then {@link
599     * TimeoutException} is thrown.  If the time is less than or equal
600     * to zero, the method will not wait at all.
601     *
602     * @param x the object to exchange
603     * @param timeout the maximum time to wait
604     * @param unit the time unit of the {@code timeout} argument
605     * @return the object provided by the other thread
606     * @throws InterruptedException if the current thread was
607     *         interrupted while waiting
608     * @throws TimeoutException if the specified waiting time elapses
609     *         before another thread enters the exchange
610     */
611    @SuppressWarnings("unchecked")
612    public V exchange(V x, long timeout, TimeUnit unit)
613        throws InterruptedException, TimeoutException {
614        Object v;
615        Object item = (x == null) ? NULL_ITEM : x;
616        long ns = unit.toNanos(timeout);
617        if ((arena != null ||
618             (v = slotExchange(item, true, ns)) == null) &&
619            ((Thread.interrupted() ||
620              (v = arenaExchange(item, true, ns)) == null)))
621            throw new InterruptedException();
622        if (v == TIMED_OUT)
623            throw new TimeoutException();
624        return (v == NULL_ITEM) ? null : (V)v;
625    }
626
627    // Unsafe mechanics
628    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
629    private static final long BOUND;
630    private static final long SLOT;
631    private static final long MATCH;
632    private static final long BLOCKER;
633    private static final int ABASE;
634    static {
635        try {
636            BOUND = U.objectFieldOffset
637                (Exchanger.class.getDeclaredField("bound"));
638            SLOT = U.objectFieldOffset
639                (Exchanger.class.getDeclaredField("slot"));
640
641            MATCH = U.objectFieldOffset
642                (Node.class.getDeclaredField("match"));
643
644            BLOCKER = U.objectFieldOffset
645                (Thread.class.getDeclaredField("parkBlocker"));
646
647            int scale = U.arrayIndexScale(Node[].class);
648            if ((scale & (scale - 1)) != 0 || scale > (1 << ASHIFT))
649                throw new Error("Unsupported array scale");
650            // ABASE absorbs padding in front of element 0
651            ABASE = U.arrayBaseOffset(Node[].class) + (1 << ASHIFT);
652        } catch (ReflectiveOperationException e) {
653            throw new Error(e);
654        }
655    }
656
657}
658