LinkedBlockingQueue.java revision 51b1b6997fd3f980076b8081f7f1165ccc2a4008
1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.util.concurrent.atomic.AtomicInteger;
39import java.util.concurrent.locks.Condition;
40import java.util.concurrent.locks.ReentrantLock;
41import java.util.AbstractQueue;
42import java.util.Collection;
43import java.util.Iterator;
44import java.util.NoSuchElementException;
45
46/**
47 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
48 * linked nodes.
49 * This queue orders elements FIFO (first-in-first-out).
50 * The <em>head</em> of the queue is that element that has been on the
51 * queue the longest time.
52 * The <em>tail</em> of the queue is that element that has been on the
53 * queue the shortest time. New elements
54 * are inserted at the tail of the queue, and the queue retrieval
55 * operations obtain elements at the head of the queue.
56 * Linked queues typically have higher throughput than array-based queues but
57 * less predictable performance in most concurrent applications.
58 *
59 * <p> The optional capacity bound constructor argument serves as a
60 * way to prevent excessive queue expansion. The capacity, if unspecified,
61 * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
62 * dynamically created upon each insertion unless this would bring the
63 * queue above capacity.
64 *
65 * <p>This class and its iterator implement all of the
66 * <em>optional</em> methods of the {@link Collection} and {@link
67 * Iterator} interfaces.
68 *
69 * <p>This class is a member of the
70 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
71 * Java Collections Framework</a>.
72 *
73 * @since 1.5
74 * @author Doug Lea
75 * @param <E> the type of elements held in this collection
76 *
77 */
78public class LinkedBlockingQueue<E> extends AbstractQueue<E>
79        implements BlockingQueue<E>, java.io.Serializable {
80    private static final long serialVersionUID = -6903933977591709194L;
81
82    /*
83     * A variant of the "two lock queue" algorithm.  The putLock gates
84     * entry to put (and offer), and has an associated condition for
85     * waiting puts.  Similarly for the takeLock.  The "count" field
86     * that they both rely on is maintained as an atomic to avoid
87     * needing to get both locks in most cases. Also, to minimize need
88     * for puts to get takeLock and vice-versa, cascading notifies are
89     * used. When a put notices that it has enabled at least one take,
90     * it signals taker. That taker in turn signals others if more
91     * items have been entered since the signal. And symmetrically for
92     * takes signalling puts. Operations such as remove(Object) and
93     * iterators acquire both locks.
94     *
95     * Visibility between writers and readers is provided as follows:
96     *
97     * Whenever an element is enqueued, the putLock is acquired and
98     * count updated.  A subsequent reader guarantees visibility to the
99     * enqueued Node by either acquiring the putLock (via fullyLock)
100     * or by acquiring the takeLock, and then reading n = count.get();
101     * this gives visibility to the first n items.
102     *
103     * To implement weakly consistent iterators, it appears we need to
104     * keep all Nodes GC-reachable from a predecessor dequeued Node.
105     * That would cause two problems:
106     * - allow a rogue Iterator to cause unbounded memory retention
107     * - cause cross-generational linking of old Nodes to new Nodes if
108     *   a Node was tenured while live, which generational GCs have a
109     *   hard time dealing with, causing repeated major collections.
110     * However, only non-deleted Nodes need to be reachable from
111     * dequeued Nodes, and reachability does not necessarily have to
112     * be of the kind understood by the GC.  We use the trick of
113     * linking a Node that has just been dequeued to itself.  Such a
114     * self-link implicitly means to advance to head.next.
115     */
116
117    /**
118     * Linked list node class
119     */
120    static class Node<E> {
121        E item;
122
123        /**
124         * One of:
125         * - the real successor Node
126         * - this Node, meaning the successor is head.next
127         * - null, meaning there is no successor (this is the last node)
128         */
129        Node<E> next;
130
131        Node(E x) { item = x; }
132    }
133
134    /** The capacity bound, or Integer.MAX_VALUE if none */
135    private final int capacity;
136
137    /** Current number of elements */
138    private final AtomicInteger count = new AtomicInteger(0);
139
140    /**
141     * Head of linked list.
142     * Invariant: head.item == null
143     */
144    private transient Node<E> head;
145
146    /**
147     * Tail of linked list.
148     * Invariant: last.next == null
149     */
150    private transient Node<E> last;
151
152    /** Lock held by take, poll, etc */
153    private final ReentrantLock takeLock = new ReentrantLock();
154
155    /** Wait queue for waiting takes */
156    private final Condition notEmpty = takeLock.newCondition();
157
158    /** Lock held by put, offer, etc */
159    private final ReentrantLock putLock = new ReentrantLock();
160
161    /** Wait queue for waiting puts */
162    private final Condition notFull = putLock.newCondition();
163
164    /**
165     * Signals a waiting take. Called only from put/offer (which do not
166     * otherwise ordinarily lock takeLock.)
167     */
168    private void signalNotEmpty() {
169        final ReentrantLock takeLock = this.takeLock;
170        takeLock.lock();
171        try {
172            notEmpty.signal();
173        } finally {
174            takeLock.unlock();
175        }
176    }
177
178    /**
179     * Signals a waiting put. Called only from take/poll.
180     */
181    private void signalNotFull() {
182        final ReentrantLock putLock = this.putLock;
183        putLock.lock();
184        try {
185            notFull.signal();
186        } finally {
187            putLock.unlock();
188        }
189    }
190
191    /**
192     * Links node at end of queue.
193     *
194     * @param node the node
195     */
196    private void enqueue(Node<E> node) {
197        // assert putLock.isHeldByCurrentThread();
198        // assert last.next == null;
199        last = last.next = node;
200    }
201
202    /**
203     * Removes a node from head of queue.
204     *
205     * @return the node
206     */
207    private E dequeue() {
208        // assert takeLock.isHeldByCurrentThread();
209        // assert head.item == null;
210        Node<E> h = head;
211        Node<E> first = h.next;
212        h.next = h; // help GC
213        head = first;
214        E x = first.item;
215        first.item = null;
216        return x;
217    }
218
219    /**
220     * Lock to prevent both puts and takes.
221     */
222    void fullyLock() {
223        putLock.lock();
224        takeLock.lock();
225    }
226
227    /**
228     * Unlock to allow both puts and takes.
229     */
230    void fullyUnlock() {
231        takeLock.unlock();
232        putLock.unlock();
233    }
234
235//     /**
236//      * Tells whether both locks are held by current thread.
237//      */
238//     boolean isFullyLocked() {
239//         return (putLock.isHeldByCurrentThread() &&
240//                 takeLock.isHeldByCurrentThread());
241//     }
242
243    /**
244     * Creates a {@code LinkedBlockingQueue} with a capacity of
245     * {@link Integer#MAX_VALUE}.
246     */
247    public LinkedBlockingQueue() {
248        this(Integer.MAX_VALUE);
249    }
250
251    /**
252     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
253     *
254     * @param capacity the capacity of this queue
255     * @throws IllegalArgumentException if {@code capacity} is not greater
256     *         than zero
257     */
258    public LinkedBlockingQueue(int capacity) {
259        if (capacity <= 0) throw new IllegalArgumentException();
260        this.capacity = capacity;
261        last = head = new Node<E>(null);
262    }
263
264    /**
265     * Creates a {@code LinkedBlockingQueue} with a capacity of
266     * {@link Integer#MAX_VALUE}, initially containing the elements of the
267     * given collection,
268     * added in traversal order of the collection's iterator.
269     *
270     * @param c the collection of elements to initially contain
271     * @throws NullPointerException if the specified collection or any
272     *         of its elements are null
273     */
274    public LinkedBlockingQueue(Collection<? extends E> c) {
275        this(Integer.MAX_VALUE);
276        final ReentrantLock putLock = this.putLock;
277        putLock.lock(); // Never contended, but necessary for visibility
278        try {
279            int n = 0;
280            for (E e : c) {
281                if (e == null)
282                    throw new NullPointerException();
283                if (n == capacity)
284                    throw new IllegalStateException("Queue full");
285                enqueue(new Node<E>(e));
286                ++n;
287            }
288            count.set(n);
289        } finally {
290            putLock.unlock();
291        }
292    }
293
294
295    // this doc comment is overridden to remove the reference to collections
296    // greater in size than Integer.MAX_VALUE
297    /**
298     * Returns the number of elements in this queue.
299     *
300     * @return the number of elements in this queue
301     */
302    public int size() {
303        return count.get();
304    }
305
306    // this doc comment is a modified copy of the inherited doc comment,
307    // without the reference to unlimited queues.
308    /**
309     * Returns the number of additional elements that this queue can ideally
310     * (in the absence of memory or resource constraints) accept without
311     * blocking. This is always equal to the initial capacity of this queue
312     * less the current {@code size} of this queue.
313     *
314     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
315     * an element will succeed by inspecting {@code remainingCapacity}
316     * because it may be the case that another thread is about to
317     * insert or remove an element.
318     */
319    public int remainingCapacity() {
320        return capacity - count.get();
321    }
322
323    /**
324     * Inserts the specified element at the tail of this queue, waiting if
325     * necessary for space to become available.
326     *
327     * @throws InterruptedException {@inheritDoc}
328     * @throws NullPointerException {@inheritDoc}
329     */
330    public void put(E e) throws InterruptedException {
331        if (e == null) throw new NullPointerException();
332        // Note: convention in all put/take/etc is to preset local var
333        // holding count negative to indicate failure unless set.
334        int c = -1;
335        Node<E> node = new Node(e);
336        final ReentrantLock putLock = this.putLock;
337        final AtomicInteger count = this.count;
338        putLock.lockInterruptibly();
339        try {
340            /*
341             * Note that count is used in wait guard even though it is
342             * not protected by lock. This works because count can
343             * only decrease at this point (all other puts are shut
344             * out by lock), and we (or some other waiting put) are
345             * signalled if it ever changes from capacity. Similarly
346             * for all other uses of count in other wait guards.
347             */
348            while (count.get() == capacity) {
349                notFull.await();
350            }
351            enqueue(node);
352            c = count.getAndIncrement();
353            if (c + 1 < capacity)
354                notFull.signal();
355        } finally {
356            putLock.unlock();
357        }
358        if (c == 0)
359            signalNotEmpty();
360    }
361
362    /**
363     * Inserts the specified element at the tail of this queue, waiting if
364     * necessary up to the specified wait time for space to become available.
365     *
366     * @return {@code true} if successful, or {@code false} if
367     *         the specified waiting time elapses before space is available.
368     * @throws InterruptedException {@inheritDoc}
369     * @throws NullPointerException {@inheritDoc}
370     */
371    public boolean offer(E e, long timeout, TimeUnit unit)
372        throws InterruptedException {
373
374        if (e == null) throw new NullPointerException();
375        long nanos = unit.toNanos(timeout);
376        int c = -1;
377        final ReentrantLock putLock = this.putLock;
378        final AtomicInteger count = this.count;
379        putLock.lockInterruptibly();
380        try {
381            while (count.get() == capacity) {
382                if (nanos <= 0)
383                    return false;
384                nanos = notFull.awaitNanos(nanos);
385            }
386            enqueue(new Node<E>(e));
387            c = count.getAndIncrement();
388            if (c + 1 < capacity)
389                notFull.signal();
390        } finally {
391            putLock.unlock();
392        }
393        if (c == 0)
394            signalNotEmpty();
395        return true;
396    }
397
398    /**
399     * Inserts the specified element at the tail of this queue if it is
400     * possible to do so immediately without exceeding the queue's capacity,
401     * returning {@code true} upon success and {@code false} if this queue
402     * is full.
403     * When using a capacity-restricted queue, this method is generally
404     * preferable to method {@link BlockingQueue#add add}, which can fail to
405     * insert an element only by throwing an exception.
406     *
407     * @throws NullPointerException if the specified element is null
408     */
409    public boolean offer(E e) {
410        if (e == null) throw new NullPointerException();
411        final AtomicInteger count = this.count;
412        if (count.get() == capacity)
413            return false;
414        int c = -1;
415        Node<E> node = new Node(e);
416        final ReentrantLock putLock = this.putLock;
417        putLock.lock();
418        try {
419            if (count.get() < capacity) {
420                enqueue(node);
421                c = count.getAndIncrement();
422                if (c + 1 < capacity)
423                    notFull.signal();
424            }
425        } finally {
426            putLock.unlock();
427        }
428        if (c == 0)
429            signalNotEmpty();
430        return c >= 0;
431    }
432
433
434    public E take() throws InterruptedException {
435        E x;
436        int c = -1;
437        final AtomicInteger count = this.count;
438        final ReentrantLock takeLock = this.takeLock;
439        takeLock.lockInterruptibly();
440        try {
441            while (count.get() == 0) {
442                notEmpty.await();
443            }
444            x = dequeue();
445            c = count.getAndDecrement();
446            if (c > 1)
447                notEmpty.signal();
448        } finally {
449            takeLock.unlock();
450        }
451        if (c == capacity)
452            signalNotFull();
453        return x;
454    }
455
456    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
457        E x = null;
458        int c = -1;
459        long nanos = unit.toNanos(timeout);
460        final AtomicInteger count = this.count;
461        final ReentrantLock takeLock = this.takeLock;
462        takeLock.lockInterruptibly();
463        try {
464            while (count.get() == 0) {
465                if (nanos <= 0)
466                    return null;
467                nanos = notEmpty.awaitNanos(nanos);
468            }
469            x = dequeue();
470            c = count.getAndDecrement();
471            if (c > 1)
472                notEmpty.signal();
473        } finally {
474            takeLock.unlock();
475        }
476        if (c == capacity)
477            signalNotFull();
478        return x;
479    }
480
481    public E poll() {
482        final AtomicInteger count = this.count;
483        if (count.get() == 0)
484            return null;
485        E x = null;
486        int c = -1;
487        final ReentrantLock takeLock = this.takeLock;
488        takeLock.lock();
489        try {
490            if (count.get() > 0) {
491                x = dequeue();
492                c = count.getAndDecrement();
493                if (c > 1)
494                    notEmpty.signal();
495            }
496        } finally {
497            takeLock.unlock();
498        }
499        if (c == capacity)
500            signalNotFull();
501        return x;
502    }
503
504    public E peek() {
505        if (count.get() == 0)
506            return null;
507        final ReentrantLock takeLock = this.takeLock;
508        takeLock.lock();
509        try {
510            Node<E> first = head.next;
511            if (first == null)
512                return null;
513            else
514                return first.item;
515        } finally {
516            takeLock.unlock();
517        }
518    }
519
520    /**
521     * Unlinks interior Node p with predecessor trail.
522     */
523    void unlink(Node<E> p, Node<E> trail) {
524        // assert isFullyLocked();
525        // p.next is not changed, to allow iterators that are
526        // traversing p to maintain their weak-consistency guarantee.
527        p.item = null;
528        trail.next = p.next;
529        if (last == p)
530            last = trail;
531        if (count.getAndDecrement() == capacity)
532            notFull.signal();
533    }
534
535    /**
536     * Removes a single instance of the specified element from this queue,
537     * if it is present.  More formally, removes an element {@code e} such
538     * that {@code o.equals(e)}, if this queue contains one or more such
539     * elements.
540     * Returns {@code true} if this queue contained the specified element
541     * (or equivalently, if this queue changed as a result of the call).
542     *
543     * @param o element to be removed from this queue, if present
544     * @return {@code true} if this queue changed as a result of the call
545     */
546    public boolean remove(Object o) {
547        if (o == null) return false;
548        fullyLock();
549        try {
550            for (Node<E> trail = head, p = trail.next;
551                 p != null;
552                 trail = p, p = p.next) {
553                if (o.equals(p.item)) {
554                    unlink(p, trail);
555                    return true;
556                }
557            }
558            return false;
559        } finally {
560            fullyUnlock();
561        }
562    }
563
564    /**
565     * Returns {@code true} if this queue contains the specified element.
566     * More formally, returns {@code true} if and only if this queue contains
567     * at least one element {@code e} such that {@code o.equals(e)}.
568     *
569     * @param o object to be checked for containment in this queue
570     * @return {@code true} if this queue contains the specified element
571     */
572    public boolean contains(Object o) {
573        if (o == null) return false;
574        fullyLock();
575        try {
576            for (Node<E> p = head.next; p != null; p = p.next)
577                if (o.equals(p.item))
578                    return true;
579            return false;
580        } finally {
581            fullyUnlock();
582        }
583    }
584
585    /**
586     * Returns an array containing all of the elements in this queue, in
587     * proper sequence.
588     *
589     * <p>The returned array will be "safe" in that no references to it are
590     * maintained by this queue.  (In other words, this method must allocate
591     * a new array).  The caller is thus free to modify the returned array.
592     *
593     * <p>This method acts as bridge between array-based and collection-based
594     * APIs.
595     *
596     * @return an array containing all of the elements in this queue
597     */
598    public Object[] toArray() {
599        fullyLock();
600        try {
601            int size = count.get();
602            Object[] a = new Object[size];
603            int k = 0;
604            for (Node<E> p = head.next; p != null; p = p.next)
605                a[k++] = p.item;
606            return a;
607        } finally {
608            fullyUnlock();
609        }
610    }
611
612    /**
613     * Returns an array containing all of the elements in this queue, in
614     * proper sequence; the runtime type of the returned array is that of
615     * the specified array.  If the queue fits in the specified array, it
616     * is returned therein.  Otherwise, a new array is allocated with the
617     * runtime type of the specified array and the size of this queue.
618     *
619     * <p>If this queue fits in the specified array with room to spare
620     * (i.e., the array has more elements than this queue), the element in
621     * the array immediately following the end of the queue is set to
622     * {@code null}.
623     *
624     * <p>Like the {@link #toArray()} method, this method acts as bridge between
625     * array-based and collection-based APIs.  Further, this method allows
626     * precise control over the runtime type of the output array, and may,
627     * under certain circumstances, be used to save allocation costs.
628     *
629     * <p>Suppose {@code x} is a queue known to contain only strings.
630     * The following code can be used to dump the queue into a newly
631     * allocated array of {@code String}:
632     *
633     * <pre>
634     *     String[] y = x.toArray(new String[0]);</pre>
635     *
636     * Note that {@code toArray(new Object[0])} is identical in function to
637     * {@code toArray()}.
638     *
639     * @param a the array into which the elements of the queue are to
640     *          be stored, if it is big enough; otherwise, a new array of the
641     *          same runtime type is allocated for this purpose
642     * @return an array containing all of the elements in this queue
643     * @throws ArrayStoreException if the runtime type of the specified array
644     *         is not a supertype of the runtime type of every element in
645     *         this queue
646     * @throws NullPointerException if the specified array is null
647     */
648    @SuppressWarnings("unchecked")
649    public <T> T[] toArray(T[] a) {
650        fullyLock();
651        try {
652            int size = count.get();
653            if (a.length < size)
654                a = (T[])java.lang.reflect.Array.newInstance
655                    (a.getClass().getComponentType(), size);
656
657            int k = 0;
658            for (Node<E> p = head.next; p != null; p = p.next)
659                a[k++] = (T)p.item;
660            if (a.length > k)
661                a[k] = null;
662            return a;
663        } finally {
664            fullyUnlock();
665        }
666    }
667
668    public String toString() {
669        fullyLock();
670        try {
671            Node<E> p = head.next;
672            if (p == null)
673                return "[]";
674
675            StringBuilder sb = new StringBuilder();
676            sb.append('[');
677            for (;;) {
678                E e = p.item;
679                sb.append(e == this ? "(this Collection)" : e);
680                p = p.next;
681                if (p == null)
682                    return sb.append(']').toString();
683                sb.append(',').append(' ');
684            }
685        } finally {
686            fullyUnlock();
687        }
688    }
689
690    /**
691     * Atomically removes all of the elements from this queue.
692     * The queue will be empty after this call returns.
693     */
694    public void clear() {
695        fullyLock();
696        try {
697            for (Node<E> p, h = head; (p = h.next) != null; h = p) {
698                h.next = h;
699                p.item = null;
700            }
701            head = last;
702            // assert head.item == null && head.next == null;
703            if (count.getAndSet(0) == capacity)
704                notFull.signal();
705        } finally {
706            fullyUnlock();
707        }
708    }
709
710    /**
711     * @throws UnsupportedOperationException {@inheritDoc}
712     * @throws ClassCastException            {@inheritDoc}
713     * @throws NullPointerException          {@inheritDoc}
714     * @throws IllegalArgumentException      {@inheritDoc}
715     */
716    public int drainTo(Collection<? super E> c) {
717        return drainTo(c, Integer.MAX_VALUE);
718    }
719
720    /**
721     * @throws UnsupportedOperationException {@inheritDoc}
722     * @throws ClassCastException            {@inheritDoc}
723     * @throws NullPointerException          {@inheritDoc}
724     * @throws IllegalArgumentException      {@inheritDoc}
725     */
726    public int drainTo(Collection<? super E> c, int maxElements) {
727        if (c == null)
728            throw new NullPointerException();
729        if (c == this)
730            throw new IllegalArgumentException();
731        boolean signalNotFull = false;
732        final ReentrantLock takeLock = this.takeLock;
733        takeLock.lock();
734        try {
735            int n = Math.min(maxElements, count.get());
736            // count.get provides visibility to first n Nodes
737            Node<E> h = head;
738            int i = 0;
739            try {
740                while (i < n) {
741                    Node<E> p = h.next;
742                    c.add(p.item);
743                    p.item = null;
744                    h.next = h;
745                    h = p;
746                    ++i;
747                }
748                return n;
749            } finally {
750                // Restore invariants even if c.add() threw
751                if (i > 0) {
752                    // assert h.item == null;
753                    head = h;
754                    signalNotFull = (count.getAndAdd(-i) == capacity);
755                }
756            }
757        } finally {
758            takeLock.unlock();
759            if (signalNotFull)
760                signalNotFull();
761        }
762    }
763
764    /**
765     * Returns an iterator over the elements in this queue in proper sequence.
766     * The elements will be returned in order from first (head) to last (tail).
767     *
768     * <p>The returned iterator is a "weakly consistent" iterator that
769     * will never throw {@link java.util.ConcurrentModificationException
770     * ConcurrentModificationException}, and guarantees to traverse
771     * elements as they existed upon construction of the iterator, and
772     * may (but is not guaranteed to) reflect any modifications
773     * subsequent to construction.
774     *
775     * @return an iterator over the elements in this queue in proper sequence
776     */
777    public Iterator<E> iterator() {
778      return new Itr();
779    }
780
781    private class Itr implements Iterator<E> {
782        /*
783         * Basic weakly-consistent iterator.  At all times hold the next
784         * item to hand out so that if hasNext() reports true, we will
785         * still have it to return even if lost race with a take etc.
786         */
787        private Node<E> current;
788        private Node<E> lastRet;
789        private E currentElement;
790
791        Itr() {
792            fullyLock();
793            try {
794                current = head.next;
795                if (current != null)
796                    currentElement = current.item;
797            } finally {
798                fullyUnlock();
799            }
800        }
801
802        public boolean hasNext() {
803            return current != null;
804        }
805
806        /**
807         * Returns the next live successor of p, or null if no such.
808         *
809         * Unlike other traversal methods, iterators need to handle both:
810         * - dequeued nodes (p.next == p)
811         * - (possibly multiple) interior removed nodes (p.item == null)
812         */
813        private Node<E> nextNode(Node<E> p) {
814            for (;;) {
815                Node<E> s = p.next;
816                if (s == p)
817                    return head.next;
818                if (s == null || s.item != null)
819                    return s;
820                p = s;
821            }
822        }
823
824        public E next() {
825            fullyLock();
826            try {
827                if (current == null)
828                    throw new NoSuchElementException();
829                E x = currentElement;
830                lastRet = current;
831                current = nextNode(current);
832                currentElement = (current == null) ? null : current.item;
833                return x;
834            } finally {
835                fullyUnlock();
836            }
837        }
838
839        public void remove() {
840            if (lastRet == null)
841                throw new IllegalStateException();
842            fullyLock();
843            try {
844                Node<E> node = lastRet;
845                lastRet = null;
846                for (Node<E> trail = head, p = trail.next;
847                     p != null;
848                     trail = p, p = p.next) {
849                    if (p == node) {
850                        unlink(p, trail);
851                        break;
852                    }
853                }
854            } finally {
855                fullyUnlock();
856            }
857        }
858    }
859
860    /**
861     * Save the state to a stream (that is, serialize it).
862     *
863     * @serialData The capacity is emitted (int), followed by all of
864     * its elements (each an {@code Object}) in the proper order,
865     * followed by a null
866     * @param s the stream
867     */
868    private void writeObject(java.io.ObjectOutputStream s)
869        throws java.io.IOException {
870
871        fullyLock();
872        try {
873            // Write out any hidden stuff, plus capacity
874            s.defaultWriteObject();
875
876            // Write out all elements in the proper order.
877            for (Node<E> p = head.next; p != null; p = p.next)
878                s.writeObject(p.item);
879
880            // Use trailing null as sentinel
881            s.writeObject(null);
882        } finally {
883            fullyUnlock();
884        }
885    }
886
887    /**
888     * Reconstitute this queue instance from a stream (that is,
889     * deserialize it).
890     *
891     * @param s the stream
892     */
893    private void readObject(java.io.ObjectInputStream s)
894        throws java.io.IOException, ClassNotFoundException {
895        // Read in capacity, and any hidden stuff
896        s.defaultReadObject();
897
898        count.set(0);
899        last = head = new Node<E>(null);
900
901        // Read in all elements and place in queue
902        for (;;) {
903            @SuppressWarnings("unchecked")
904            E item = (E)s.readObject();
905            if (item == null)
906                break;
907            add(item);
908        }
909    }
910}
911