SynchronousQueue.java revision 89c1feb0a69a7707b271086e749975b3f7acacf7
1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7package java.util.concurrent;
8import java.util.concurrent.locks.*;
9import java.util.*;
10
11// BEGIN android-note
12// removed link to collections framework docs
13// END android-note
14
15/**
16 * A {@linkplain BlockingQueue blocking queue} in which each
17 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa.  A
18 * synchronous queue does not have any internal capacity, not even a
19 * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
20 * because an element is only present when you try to take it; you
21 * cannot add an element (using any method) unless another thread is
22 * trying to remove it; you cannot iterate as there is nothing to
23 * iterate.  The <em>head</em> of the queue is the element that the
24 * first queued thread is trying to add to the queue; if there are no
25 * queued threads then no element is being added and the head is
26 * <tt>null</tt>.  For purposes of other <tt>Collection</tt> methods
27 * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
28 * as an empty collection.  This queue does not permit <tt>null</tt>
29 * elements.
30 *
31 * <p>Synchronous queues are similar to rendezvous channels used in
32 * CSP and Ada. They are well suited for handoff designs, in which an
33 * object running in one thread must sync up with an object running
34 * in another thread in order to hand it some information, event, or
35 * task.
36 *
37 * <p> This class supports an optional fairness policy for ordering
38 * waiting producer and consumer threads.  By default, this ordering
39 * is not guaranteed. However, a queue constructed with fairness set
40 * to <tt>true</tt> grants threads access in FIFO order. Fairness
41 * generally decreases throughput but reduces variability and avoids
42 * starvation.
43 *
44 * <p>This class implements all of the <em>optional</em> methods
45 * of the {@link Collection} and {@link Iterator} interfaces.
46 *
47 * @since 1.5
48 * @author Doug Lea
49 * @param <E> the type of elements held in this collection
50 */
51public class SynchronousQueue<E> extends AbstractQueue<E>
52        implements BlockingQueue<E>, java.io.Serializable {
53    private static final long serialVersionUID = -3223113410248163686L;
54
55    /*
56      This implementation divides actions into two cases for puts:
57
58      * An arriving producer that does not already have a waiting consumer
59      creates a node holding item, and then waits for a consumer to take it.
60      * An arriving producer that does already have a waiting consumer fills
61      the slot node created by the consumer, and notifies it to continue.
62
63      And symmetrically, two for takes:
64
65      * An arriving consumer that does not already have a waiting producer
66      creates an empty slot node, and then waits for a producer to fill it.
67      * An arriving consumer that does already have a waiting producer takes
68      item from the node created by the producer, and notifies it to continue.
69
70      When a put or take waiting for the actions of its counterpart
71      aborts due to interruption or timeout, it marks the node
72      it created as "CANCELLED", which causes its counterpart to retry
73      the entire put or take sequence.
74
75      This requires keeping two simple queues, waitingProducers and
76      waitingConsumers. Each of these can be FIFO (preserves fairness)
77      or LIFO (improves throughput).
78    */
79
80    /** Lock protecting both wait queues */
81    private final ReentrantLock qlock;
82    /** Queue holding waiting puts */
83    private final WaitQueue waitingProducers;
84    /** Queue holding waiting takes */
85    private final WaitQueue waitingConsumers;
86
87    /**
88     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
89     */
90    public SynchronousQueue() {
91        this(false);
92    }
93
94    /**
95     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
96     * @param fair if true, threads contend in FIFO order for access;
97     * otherwise the order is unspecified.
98     */
99    public SynchronousQueue(boolean fair) {
100        if (fair) {
101            qlock = new ReentrantLock(true);
102            waitingProducers = new FifoWaitQueue();
103            waitingConsumers = new FifoWaitQueue();
104        }
105        else {
106            qlock = new ReentrantLock();
107            waitingProducers = new LifoWaitQueue();
108            waitingConsumers = new LifoWaitQueue();
109        }
110    }
111
112    /**
113     * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
114     * These queues have all transient fields, but are serializable
115     * in order to recover fairness settings when deserialized.
116     */
117    static abstract class WaitQueue implements java.io.Serializable {
118        /** Create, add, and return node for x */
119        abstract Node enq(Object x);
120        /** Remove and return node, or null if empty */
121        abstract Node deq();
122    }
123
124    /**
125     * FIFO queue to hold waiting puts/takes.
126     */
127    static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
128        private static final long serialVersionUID = -3623113410248163686L;
129        private transient Node head;
130        private transient Node last;
131
132        Node enq(Object x) {
133            Node p = new Node(x);
134            if (last == null)
135                last = head = p;
136            else
137                last = last.next = p;
138            return p;
139        }
140
141        Node deq() {
142            Node p = head;
143            if (p != null) {
144                if ((head = p.next) == null)
145                    last = null;
146                p.next = null;
147            }
148            return p;
149        }
150    }
151
152    /**
153     * LIFO queue to hold waiting puts/takes.
154     */
155    static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
156        private static final long serialVersionUID = -3633113410248163686L;
157        private transient Node head;
158
159        Node enq(Object x) {
160            return head = new Node(x, head);
161        }
162
163        Node deq() {
164            Node p = head;
165            if (p != null) {
166                head = p.next;
167                p.next = null;
168            }
169            return p;
170        }
171    }
172
173    /**
174     * Nodes each maintain an item and handle waits and signals for
175     * getting and setting it. The class extends
176     * AbstractQueuedSynchronizer to manage blocking, using AQS state
177     *  0 for waiting, 1 for ack, -1 for cancelled.
178     */
179    static final class Node extends AbstractQueuedSynchronizer {
180        /** Synchronization state value representing that node acked */
181        private static final int ACK    =  1;
182        /** Synchronization state value representing that node cancelled */
183        private static final int CANCEL = -1;
184
185        /** The item being transferred */
186        Object item;
187        /** Next node in wait queue */
188        Node next;
189
190        /** Creates a node with initial item */
191        Node(Object x) { item = x; }
192
193        /** Creates a node with initial item and next */
194        Node(Object x, Node n) { item = x; next = n; }
195
196        /**
197         * Implements AQS base acquire to succeed if not in WAITING state
198         */
199        protected boolean tryAcquire(int ignore) {
200            return getState() != 0;
201        }
202
203        /**
204         * Implements AQS base release to signal if state changed
205         */
206        protected boolean tryRelease(int newState) {
207            return compareAndSetState(0, newState);
208        }
209
210        /**
211         * Takes item and nulls out field (for sake of GC)
212         */
213        private Object extract() {
214            Object x = item;
215            item = null;
216            return x;
217        }
218
219        /**
220         * Tries to cancel on interrupt; if so rethrowing,
221         * else setting interrupt state
222         */
223        private void checkCancellationOnInterrupt(InterruptedException ie)
224            throws InterruptedException {
225            if (release(CANCEL))
226                throw ie;
227            Thread.currentThread().interrupt();
228        }
229
230        /**
231         * Fills in the slot created by the consumer and signal consumer to
232         * continue.
233         */
234        boolean setItem(Object x) {
235            item = x; // can place in slot even if cancelled
236            return release(ACK);
237        }
238
239        /**
240         * Removes item from slot created by producer and signal producer
241         * to continue.
242         */
243        Object getItem() {
244            return (release(ACK))? extract() : null;
245        }
246
247        /**
248         * Waits for a consumer to take item placed by producer.
249         */
250        void waitForTake() throws InterruptedException {
251            try {
252                acquireInterruptibly(0);
253            } catch (InterruptedException ie) {
254                checkCancellationOnInterrupt(ie);
255            }
256        }
257
258        /**
259         * Waits for a producer to put item placed by consumer.
260         */
261        Object waitForPut() throws InterruptedException {
262            try {
263                acquireInterruptibly(0);
264            } catch (InterruptedException ie) {
265                checkCancellationOnInterrupt(ie);
266            }
267            return extract();
268        }
269
270        /**
271         * Waits for a consumer to take item placed by producer or time out.
272         */
273        boolean waitForTake(long nanos) throws InterruptedException {
274            try {
275                if (!tryAcquireNanos(0, nanos) &&
276                    release(CANCEL))
277                    return false;
278            } catch (InterruptedException ie) {
279                checkCancellationOnInterrupt(ie);
280            }
281            return true;
282        }
283
284        /**
285         * Waits for a producer to put item placed by consumer, or time out.
286         */
287        Object waitForPut(long nanos) throws InterruptedException {
288            try {
289                if (!tryAcquireNanos(0, nanos) &&
290                    release(CANCEL))
291                    return null;
292            } catch (InterruptedException ie) {
293                checkCancellationOnInterrupt(ie);
294            }
295            return extract();
296        }
297    }
298
299    /**
300     * Adds the specified element to this queue, waiting if necessary for
301     * another thread to receive it.
302     * @param o the element to add
303     * @throws InterruptedException if interrupted while waiting.
304     * @throws NullPointerException if the specified element is <tt>null</tt>.
305     */
306    public void put(E o) throws InterruptedException {
307        if (o == null) throw new NullPointerException();
308        final ReentrantLock qlock = this.qlock;
309
310        for (;;) {
311            Node node;
312            boolean mustWait;
313            if (Thread.interrupted()) throw new InterruptedException();
314            qlock.lock();
315            try {
316                node = waitingConsumers.deq();
317                if ( (mustWait = (node == null)) )
318                    node = waitingProducers.enq(o);
319            } finally {
320                qlock.unlock();
321            }
322
323            if (mustWait) {
324                node.waitForTake();
325                return;
326            }
327
328            else if (node.setItem(o))
329                return;
330
331            // else consumer cancelled, so retry
332        }
333    }
334
335    /**
336     * Inserts the specified element into this queue, waiting if necessary
337     * up to the specified wait time for another thread to receive it.
338     * @param o the element to add
339     * @param timeout how long to wait before giving up, in units of
340     * <tt>unit</tt>
341     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
342     * <tt>timeout</tt> parameter
343     * @return <tt>true</tt> if successful, or <tt>false</tt> if
344     * the specified waiting time elapses before a consumer appears.
345     * @throws InterruptedException if interrupted while waiting.
346     * @throws NullPointerException if the specified element is <tt>null</tt>.
347     */
348    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
349        if (o == null) throw new NullPointerException();
350        long nanos = unit.toNanos(timeout);
351        final ReentrantLock qlock = this.qlock;
352        for (;;) {
353            Node node;
354            boolean mustWait;
355            if (Thread.interrupted()) throw new InterruptedException();
356            qlock.lock();
357            try {
358                node = waitingConsumers.deq();
359                if ( (mustWait = (node == null)) )
360                    node = waitingProducers.enq(o);
361            } finally {
362                qlock.unlock();
363            }
364
365            if (mustWait)
366                return node.waitForTake(nanos);
367
368            else if (node.setItem(o))
369                return true;
370
371            // else consumer cancelled, so retry
372        }
373    }
374
375    /**
376     * Retrieves and removes the head of this queue, waiting if necessary
377     * for another thread to insert it.
378     * @throws InterruptedException if interrupted while waiting.
379     * @return the head of this queue
380     */
381    public E take() throws InterruptedException {
382        final ReentrantLock qlock = this.qlock;
383        for (;;) {
384            Node node;
385            boolean mustWait;
386
387            if (Thread.interrupted()) throw new InterruptedException();
388            qlock.lock();
389            try {
390                node = waitingProducers.deq();
391                if ( (mustWait = (node == null)) )
392                    node = waitingConsumers.enq(null);
393            } finally {
394                qlock.unlock();
395            }
396
397            if (mustWait) {
398                Object x = node.waitForPut();
399                return (E)x;
400            }
401            else {
402                Object x = node.getItem();
403                if (x != null)
404                    return (E)x;
405                // else cancelled, so retry
406            }
407        }
408    }
409
410    /**
411     * Retrieves and removes the head of this queue, waiting
412     * if necessary up to the specified wait time, for another thread
413     * to insert it.
414     * @param timeout how long to wait before giving up, in units of
415     * <tt>unit</tt>
416     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
417     * <tt>timeout</tt> parameter
418     * @return the head of this queue, or <tt>null</tt> if the
419     * specified waiting time elapses before an element is present.
420     * @throws InterruptedException if interrupted while waiting.
421     */
422    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
423        long nanos = unit.toNanos(timeout);
424        final ReentrantLock qlock = this.qlock;
425
426        for (;;) {
427            Node node;
428            boolean mustWait;
429
430            if (Thread.interrupted()) throw new InterruptedException();
431            qlock.lock();
432            try {
433                node = waitingProducers.deq();
434                if ( (mustWait = (node == null)) )
435                    node = waitingConsumers.enq(null);
436            } finally {
437                qlock.unlock();
438            }
439
440            if (mustWait) {
441                Object x = node.waitForPut(nanos);
442                return (E)x;
443            }
444            else {
445                Object x = node.getItem();
446                if (x != null)
447                    return (E)x;
448                // else cancelled, so retry
449            }
450        }
451    }
452
453    // Untimed nonblocking versions
454
455   /**
456    * Inserts the specified element into this queue, if another thread is
457    * waiting to receive it.
458    *
459    * @param o the element to add.
460    * @return <tt>true</tt> if it was possible to add the element to
461    *         this queue, else <tt>false</tt>
462    * @throws NullPointerException if the specified element is <tt>null</tt>
463    */
464    public boolean offer(E o) {
465        if (o == null) throw new NullPointerException();
466        final ReentrantLock qlock = this.qlock;
467
468        for (;;) {
469            Node node;
470            qlock.lock();
471            try {
472                node = waitingConsumers.deq();
473            } finally {
474                qlock.unlock();
475            }
476            if (node == null)
477                return false;
478
479            else if (node.setItem(o))
480                return true;
481            // else retry
482        }
483    }
484
485    /**
486     * Retrieves and removes the head of this queue, if another thread
487     * is currently making an element available.
488     *
489     * @return the head of this queue, or <tt>null</tt> if no
490     *         element is available.
491     */
492    public E poll() {
493        final ReentrantLock qlock = this.qlock;
494        for (;;) {
495            Node node;
496            qlock.lock();
497            try {
498                node = waitingProducers.deq();
499            } finally {
500                qlock.unlock();
501            }
502            if (node == null)
503                return null;
504
505            else {
506                Object x = node.getItem();
507                if (x != null)
508                    return (E)x;
509                // else retry
510            }
511        }
512    }
513
514    /**
515     * Always returns <tt>true</tt>.
516     * A <tt>SynchronousQueue</tt> has no internal capacity.
517     * @return <tt>true</tt>
518     */
519    public boolean isEmpty() {
520        return true;
521    }
522
523    /**
524     * Always returns zero.
525     * A <tt>SynchronousQueue</tt> has no internal capacity.
526     * @return zero.
527     */
528    public int size() {
529        return 0;
530    }
531
532    /**
533     * Always returns zero.
534     * A <tt>SynchronousQueue</tt> has no internal capacity.
535     * @return zero.
536     */
537    public int remainingCapacity() {
538        return 0;
539    }
540
541    /**
542     * Does nothing.
543     * A <tt>SynchronousQueue</tt> has no internal capacity.
544     */
545    public void clear() {}
546
547    /**
548     * Always returns <tt>false</tt>.
549     * A <tt>SynchronousQueue</tt> has no internal capacity.
550     * @param o the element
551     * @return <tt>false</tt>
552     */
553    public boolean contains(Object o) {
554        return false;
555    }
556
557    /**
558     * Always returns <tt>false</tt>.
559     * A <tt>SynchronousQueue</tt> has no internal capacity.
560     *
561     * @param o the element to remove
562     * @return <tt>false</tt>
563     */
564    public boolean remove(Object o) {
565        return false;
566    }
567
568    /**
569     * Returns <tt>false</tt> unless given collection is empty.
570     * A <tt>SynchronousQueue</tt> has no internal capacity.
571     * @param c the collection
572     * @return <tt>false</tt> unless given collection is empty
573     */
574    public boolean containsAll(Collection<?> c) {
575        return c.isEmpty();
576    }
577
578    /**
579     * Always returns <tt>false</tt>.
580     * A <tt>SynchronousQueue</tt> has no internal capacity.
581     * @param c the collection
582     * @return <tt>false</tt>
583     */
584    public boolean removeAll(Collection<?> c) {
585        return false;
586    }
587
588    /**
589     * Always returns <tt>false</tt>.
590     * A <tt>SynchronousQueue</tt> has no internal capacity.
591     * @param c the collection
592     * @return <tt>false</tt>
593     */
594    public boolean retainAll(Collection<?> c) {
595        return false;
596    }
597
598    /**
599     * Always returns <tt>null</tt>.
600     * A <tt>SynchronousQueue</tt> does not return elements
601     * unless actively waited on.
602     * @return <tt>null</tt>
603     */
604    public E peek() {
605        return null;
606    }
607
608
609    static class EmptyIterator<E> implements Iterator<E> {
610        public boolean hasNext() {
611            return false;
612        }
613        public E next() {
614            throw new NoSuchElementException();
615        }
616        public void remove() {
617            throw new IllegalStateException();
618        }
619    }
620
621    /**
622     * Returns an empty iterator in which <tt>hasNext</tt> always returns
623     * <tt>false</tt>.
624     *
625     * @return an empty iterator
626     */
627    public Iterator<E> iterator() {
628        return new EmptyIterator<E>();
629    }
630
631
632    /**
633     * Returns a zero-length array.
634     * @return a zero-length array
635     */
636    public Object[] toArray() {
637        return new Object[0];
638    }
639
640    /**
641     * Sets the zeroeth element of the specified array to <tt>null</tt>
642     * (if the array has non-zero length) and returns it.
643     * @param a the array
644     * @return the specified array
645     */
646    public <T> T[] toArray(T[] a) {
647        if (a.length > 0)
648            a[0] = null;
649        return a;
650    }
651
652
653    public int drainTo(Collection<? super E> c) {
654        if (c == null)
655            throw new NullPointerException();
656        if (c == this)
657            throw new IllegalArgumentException();
658        int n = 0;
659        E e;
660        while ( (e = poll()) != null) {
661            c.add(e);
662            ++n;
663        }
664        return n;
665    }
666
667    public int drainTo(Collection<? super E> c, int maxElements) {
668        if (c == null)
669            throw new NullPointerException();
670        if (c == this)
671            throw new IllegalArgumentException();
672        int n = 0;
673        E e;
674        while (n < maxElements && (e = poll()) != null) {
675            c.add(e);
676            ++n;
677        }
678        return n;
679    }
680}
681
682
683
684
685
686