1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8
9import java.util.AbstractQueue;
10import java.util.Collection;
11import java.util.Iterator;
12import java.util.NoSuchElementException;
13import java.util.Spliterator;
14import java.util.Spliterators;
15import java.util.concurrent.atomic.AtomicInteger;
16import java.util.concurrent.locks.Condition;
17import java.util.concurrent.locks.ReentrantLock;
18import java.util.function.Consumer;
19
20// BEGIN android-note
21// removed link to collections framework docs
22// END android-note
23
24/**
25 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
26 * linked nodes.
27 * This queue orders elements FIFO (first-in-first-out).
28 * The <em>head</em> of the queue is that element that has been on the
29 * queue the longest time.
30 * The <em>tail</em> of the queue is that element that has been on the
31 * queue the shortest time. New elements
32 * are inserted at the tail of the queue, and the queue retrieval
33 * operations obtain elements at the head of the queue.
34 * Linked queues typically have higher throughput than array-based queues but
35 * less predictable performance in most concurrent applications.
36 *
37 * <p>The optional capacity bound constructor argument serves as a
38 * way to prevent excessive queue expansion. The capacity, if unspecified,
39 * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
40 * dynamically created upon each insertion unless this would bring the
41 * queue above capacity.
42 *
43 * <p>This class and its iterator implement all of the
44 * <em>optional</em> methods of the {@link Collection} and {@link
45 * Iterator} interfaces.
46 *
47 * @since 1.5
48 * @author Doug Lea
49 * @param <E> the type of elements held in this queue
50 */
51public class LinkedBlockingQueue<E> extends AbstractQueue<E>
52        implements BlockingQueue<E>, java.io.Serializable {
53    private static final long serialVersionUID = -6903933977591709194L;
54
55    /*
56     * A variant of the "two lock queue" algorithm.  The putLock gates
57     * entry to put (and offer), and has an associated condition for
58     * waiting puts.  Similarly for the takeLock.  The "count" field
59     * that they both rely on is maintained as an atomic to avoid
60     * needing to get both locks in most cases. Also, to minimize need
61     * for puts to get takeLock and vice-versa, cascading notifies are
62     * used. When a put notices that it has enabled at least one take,
63     * it signals taker. That taker in turn signals others if more
64     * items have been entered since the signal. And symmetrically for
65     * takes signalling puts. Operations such as remove(Object) and
66     * iterators acquire both locks.
67     *
68     * Visibility between writers and readers is provided as follows:
69     *
70     * Whenever an element is enqueued, the putLock is acquired and
71     * count updated.  A subsequent reader guarantees visibility to the
72     * enqueued Node by either acquiring the putLock (via fullyLock)
73     * or by acquiring the takeLock, and then reading n = count.get();
74     * this gives visibility to the first n items.
75     *
76     * To implement weakly consistent iterators, it appears we need to
77     * keep all Nodes GC-reachable from a predecessor dequeued Node.
78     * That would cause two problems:
79     * - allow a rogue Iterator to cause unbounded memory retention
80     * - cause cross-generational linking of old Nodes to new Nodes if
81     *   a Node was tenured while live, which generational GCs have a
82     *   hard time dealing with, causing repeated major collections.
83     * However, only non-deleted Nodes need to be reachable from
84     * dequeued Nodes, and reachability does not necessarily have to
85     * be of the kind understood by the GC.  We use the trick of
86     * linking a Node that has just been dequeued to itself.  Such a
87     * self-link implicitly means to advance to head.next.
88     */
89
90    /**
91     * Linked list node class.
92     */
93    static class Node<E> {
94        E item;
95
96        /**
97         * One of:
98         * - the real successor Node
99         * - this Node, meaning the successor is head.next
100         * - null, meaning there is no successor (this is the last node)
101         */
102        Node<E> next;
103
104        Node(E x) { item = x; }
105    }
106
107    /** The capacity bound, or Integer.MAX_VALUE if none */
108    private final int capacity;
109
110    /** Current number of elements */
111    private final AtomicInteger count = new AtomicInteger();
112
113    /**
114     * Head of linked list.
115     * Invariant: head.item == null
116     */
117    transient Node<E> head;
118
119    /**
120     * Tail of linked list.
121     * Invariant: last.next == null
122     */
123    private transient Node<E> last;
124
125    /** Lock held by take, poll, etc */
126    private final ReentrantLock takeLock = new ReentrantLock();
127
128    /** Wait queue for waiting takes */
129    private final Condition notEmpty = takeLock.newCondition();
130
131    /** Lock held by put, offer, etc */
132    private final ReentrantLock putLock = new ReentrantLock();
133
134    /** Wait queue for waiting puts */
135    private final Condition notFull = putLock.newCondition();
136
137    /**
138     * Signals a waiting take. Called only from put/offer (which do not
139     * otherwise ordinarily lock takeLock.)
140     */
141    private void signalNotEmpty() {
142        final ReentrantLock takeLock = this.takeLock;
143        takeLock.lock();
144        try {
145            notEmpty.signal();
146        } finally {
147            takeLock.unlock();
148        }
149    }
150
151    /**
152     * Signals a waiting put. Called only from take/poll.
153     */
154    private void signalNotFull() {
155        final ReentrantLock putLock = this.putLock;
156        putLock.lock();
157        try {
158            notFull.signal();
159        } finally {
160            putLock.unlock();
161        }
162    }
163
164    /**
165     * Links node at end of queue.
166     *
167     * @param node the node
168     */
169    private void enqueue(Node<E> node) {
170        // assert putLock.isHeldByCurrentThread();
171        // assert last.next == null;
172        last = last.next = node;
173    }
174
175    /**
176     * Removes a node from head of queue.
177     *
178     * @return the node
179     */
180    private E dequeue() {
181        // assert takeLock.isHeldByCurrentThread();
182        // assert head.item == null;
183        Node<E> h = head;
184        Node<E> first = h.next;
185        h.next = h; // help GC
186        head = first;
187        E x = first.item;
188        first.item = null;
189        return x;
190    }
191
192    /**
193     * Locks to prevent both puts and takes.
194     */
195    void fullyLock() {
196        putLock.lock();
197        takeLock.lock();
198    }
199
200    /**
201     * Unlocks to allow both puts and takes.
202     */
203    void fullyUnlock() {
204        takeLock.unlock();
205        putLock.unlock();
206    }
207
208//     /**
209//      * Tells whether both locks are held by current thread.
210//      */
211//     boolean isFullyLocked() {
212//         return (putLock.isHeldByCurrentThread() &&
213//                 takeLock.isHeldByCurrentThread());
214//     }
215
216    /**
217     * Creates a {@code LinkedBlockingQueue} with a capacity of
218     * {@link Integer#MAX_VALUE}.
219     */
220    public LinkedBlockingQueue() {
221        this(Integer.MAX_VALUE);
222    }
223
224    /**
225     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
226     *
227     * @param capacity the capacity of this queue
228     * @throws IllegalArgumentException if {@code capacity} is not greater
229     *         than zero
230     */
231    public LinkedBlockingQueue(int capacity) {
232        if (capacity <= 0) throw new IllegalArgumentException();
233        this.capacity = capacity;
234        last = head = new Node<E>(null);
235    }
236
237    /**
238     * Creates a {@code LinkedBlockingQueue} with a capacity of
239     * {@link Integer#MAX_VALUE}, initially containing the elements of the
240     * given collection,
241     * added in traversal order of the collection's iterator.
242     *
243     * @param c the collection of elements to initially contain
244     * @throws NullPointerException if the specified collection or any
245     *         of its elements are null
246     */
247    public LinkedBlockingQueue(Collection<? extends E> c) {
248        this(Integer.MAX_VALUE);
249        final ReentrantLock putLock = this.putLock;
250        putLock.lock(); // Never contended, but necessary for visibility
251        try {
252            int n = 0;
253            for (E e : c) {
254                if (e == null)
255                    throw new NullPointerException();
256                if (n == capacity)
257                    throw new IllegalStateException("Queue full");
258                enqueue(new Node<E>(e));
259                ++n;
260            }
261            count.set(n);
262        } finally {
263            putLock.unlock();
264        }
265    }
266
267    // this doc comment is overridden to remove the reference to collections
268    // greater in size than Integer.MAX_VALUE
269    /**
270     * Returns the number of elements in this queue.
271     *
272     * @return the number of elements in this queue
273     */
274    public int size() {
275        return count.get();
276    }
277
278    // this doc comment is a modified copy of the inherited doc comment,
279    // without the reference to unlimited queues.
280    /**
281     * Returns the number of additional elements that this queue can ideally
282     * (in the absence of memory or resource constraints) accept without
283     * blocking. This is always equal to the initial capacity of this queue
284     * less the current {@code size} of this queue.
285     *
286     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
287     * an element will succeed by inspecting {@code remainingCapacity}
288     * because it may be the case that another thread is about to
289     * insert or remove an element.
290     */
291    public int remainingCapacity() {
292        return capacity - count.get();
293    }
294
295    /**
296     * Inserts the specified element at the tail of this queue, waiting if
297     * necessary for space to become available.
298     *
299     * @throws InterruptedException {@inheritDoc}
300     * @throws NullPointerException {@inheritDoc}
301     */
302    public void put(E e) throws InterruptedException {
303        if (e == null) throw new NullPointerException();
304        // Note: convention in all put/take/etc is to preset local var
305        // holding count negative to indicate failure unless set.
306        int c = -1;
307        Node<E> node = new Node<E>(e);
308        final ReentrantLock putLock = this.putLock;
309        final AtomicInteger count = this.count;
310        putLock.lockInterruptibly();
311        try {
312            /*
313             * Note that count is used in wait guard even though it is
314             * not protected by lock. This works because count can
315             * only decrease at this point (all other puts are shut
316             * out by lock), and we (or some other waiting put) are
317             * signalled if it ever changes from capacity. Similarly
318             * for all other uses of count in other wait guards.
319             */
320            while (count.get() == capacity) {
321                notFull.await();
322            }
323            enqueue(node);
324            c = count.getAndIncrement();
325            if (c + 1 < capacity)
326                notFull.signal();
327        } finally {
328            putLock.unlock();
329        }
330        if (c == 0)
331            signalNotEmpty();
332    }
333
334    /**
335     * Inserts the specified element at the tail of this queue, waiting if
336     * necessary up to the specified wait time for space to become available.
337     *
338     * @return {@code true} if successful, or {@code false} if
339     *         the specified waiting time elapses before space is available
340     * @throws InterruptedException {@inheritDoc}
341     * @throws NullPointerException {@inheritDoc}
342     */
343    public boolean offer(E e, long timeout, TimeUnit unit)
344        throws InterruptedException {
345
346        if (e == null) throw new NullPointerException();
347        long nanos = unit.toNanos(timeout);
348        int c = -1;
349        final ReentrantLock putLock = this.putLock;
350        final AtomicInteger count = this.count;
351        putLock.lockInterruptibly();
352        try {
353            while (count.get() == capacity) {
354                if (nanos <= 0L)
355                    return false;
356                nanos = notFull.awaitNanos(nanos);
357            }
358            enqueue(new Node<E>(e));
359            c = count.getAndIncrement();
360            if (c + 1 < capacity)
361                notFull.signal();
362        } finally {
363            putLock.unlock();
364        }
365        if (c == 0)
366            signalNotEmpty();
367        return true;
368    }
369
370    /**
371     * Inserts the specified element at the tail of this queue if it is
372     * possible to do so immediately without exceeding the queue's capacity,
373     * returning {@code true} upon success and {@code false} if this queue
374     * is full.
375     * When using a capacity-restricted queue, this method is generally
376     * preferable to method {@link BlockingQueue#add add}, which can fail to
377     * insert an element only by throwing an exception.
378     *
379     * @throws NullPointerException if the specified element is null
380     */
381    public boolean offer(E e) {
382        if (e == null) throw new NullPointerException();
383        final AtomicInteger count = this.count;
384        if (count.get() == capacity)
385            return false;
386        int c = -1;
387        Node<E> node = new Node<E>(e);
388        final ReentrantLock putLock = this.putLock;
389        putLock.lock();
390        try {
391            if (count.get() < capacity) {
392                enqueue(node);
393                c = count.getAndIncrement();
394                if (c + 1 < capacity)
395                    notFull.signal();
396            }
397        } finally {
398            putLock.unlock();
399        }
400        if (c == 0)
401            signalNotEmpty();
402        return c >= 0;
403    }
404
405    public E take() throws InterruptedException {
406        E x;
407        int c = -1;
408        final AtomicInteger count = this.count;
409        final ReentrantLock takeLock = this.takeLock;
410        takeLock.lockInterruptibly();
411        try {
412            while (count.get() == 0) {
413                notEmpty.await();
414            }
415            x = dequeue();
416            c = count.getAndDecrement();
417            if (c > 1)
418                notEmpty.signal();
419        } finally {
420            takeLock.unlock();
421        }
422        if (c == capacity)
423            signalNotFull();
424        return x;
425    }
426
427    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
428        E x = null;
429        int c = -1;
430        long nanos = unit.toNanos(timeout);
431        final AtomicInteger count = this.count;
432        final ReentrantLock takeLock = this.takeLock;
433        takeLock.lockInterruptibly();
434        try {
435            while (count.get() == 0) {
436                if (nanos <= 0L)
437                    return null;
438                nanos = notEmpty.awaitNanos(nanos);
439            }
440            x = dequeue();
441            c = count.getAndDecrement();
442            if (c > 1)
443                notEmpty.signal();
444        } finally {
445            takeLock.unlock();
446        }
447        if (c == capacity)
448            signalNotFull();
449        return x;
450    }
451
452    public E poll() {
453        final AtomicInteger count = this.count;
454        if (count.get() == 0)
455            return null;
456        E x = null;
457        int c = -1;
458        final ReentrantLock takeLock = this.takeLock;
459        takeLock.lock();
460        try {
461            if (count.get() > 0) {
462                x = dequeue();
463                c = count.getAndDecrement();
464                if (c > 1)
465                    notEmpty.signal();
466            }
467        } finally {
468            takeLock.unlock();
469        }
470        if (c == capacity)
471            signalNotFull();
472        return x;
473    }
474
475    public E peek() {
476        if (count.get() == 0)
477            return null;
478        final ReentrantLock takeLock = this.takeLock;
479        takeLock.lock();
480        try {
481            return (count.get() > 0) ? head.next.item : null;
482        } finally {
483            takeLock.unlock();
484        }
485    }
486
487    /**
488     * Unlinks interior Node p with predecessor trail.
489     */
490    void unlink(Node<E> p, Node<E> trail) {
491        // assert isFullyLocked();
492        // p.next is not changed, to allow iterators that are
493        // traversing p to maintain their weak-consistency guarantee.
494        p.item = null;
495        trail.next = p.next;
496        if (last == p)
497            last = trail;
498        if (count.getAndDecrement() == capacity)
499            notFull.signal();
500    }
501
502    /**
503     * Removes a single instance of the specified element from this queue,
504     * if it is present.  More formally, removes an element {@code e} such
505     * that {@code o.equals(e)}, if this queue contains one or more such
506     * elements.
507     * Returns {@code true} if this queue contained the specified element
508     * (or equivalently, if this queue changed as a result of the call).
509     *
510     * @param o element to be removed from this queue, if present
511     * @return {@code true} if this queue changed as a result of the call
512     */
513    public boolean remove(Object o) {
514        if (o == null) return false;
515        fullyLock();
516        try {
517            for (Node<E> trail = head, p = trail.next;
518                 p != null;
519                 trail = p, p = p.next) {
520                if (o.equals(p.item)) {
521                    unlink(p, trail);
522                    return true;
523                }
524            }
525            return false;
526        } finally {
527            fullyUnlock();
528        }
529    }
530
531    /**
532     * Returns {@code true} if this queue contains the specified element.
533     * More formally, returns {@code true} if and only if this queue contains
534     * at least one element {@code e} such that {@code o.equals(e)}.
535     *
536     * @param o object to be checked for containment in this queue
537     * @return {@code true} if this queue contains the specified element
538     */
539    public boolean contains(Object o) {
540        if (o == null) return false;
541        fullyLock();
542        try {
543            for (Node<E> p = head.next; p != null; p = p.next)
544                if (o.equals(p.item))
545                    return true;
546            return false;
547        } finally {
548            fullyUnlock();
549        }
550    }
551
552    /**
553     * Returns an array containing all of the elements in this queue, in
554     * proper sequence.
555     *
556     * <p>The returned array will be "safe" in that no references to it are
557     * maintained by this queue.  (In other words, this method must allocate
558     * a new array).  The caller is thus free to modify the returned array.
559     *
560     * <p>This method acts as bridge between array-based and collection-based
561     * APIs.
562     *
563     * @return an array containing all of the elements in this queue
564     */
565    public Object[] toArray() {
566        fullyLock();
567        try {
568            int size = count.get();
569            Object[] a = new Object[size];
570            int k = 0;
571            for (Node<E> p = head.next; p != null; p = p.next)
572                a[k++] = p.item;
573            return a;
574        } finally {
575            fullyUnlock();
576        }
577    }
578
579    /**
580     * Returns an array containing all of the elements in this queue, in
581     * proper sequence; the runtime type of the returned array is that of
582     * the specified array.  If the queue fits in the specified array, it
583     * is returned therein.  Otherwise, a new array is allocated with the
584     * runtime type of the specified array and the size of this queue.
585     *
586     * <p>If this queue fits in the specified array with room to spare
587     * (i.e., the array has more elements than this queue), the element in
588     * the array immediately following the end of the queue is set to
589     * {@code null}.
590     *
591     * <p>Like the {@link #toArray()} method, this method acts as bridge between
592     * array-based and collection-based APIs.  Further, this method allows
593     * precise control over the runtime type of the output array, and may,
594     * under certain circumstances, be used to save allocation costs.
595     *
596     * <p>Suppose {@code x} is a queue known to contain only strings.
597     * The following code can be used to dump the queue into a newly
598     * allocated array of {@code String}:
599     *
600     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
601     *
602     * Note that {@code toArray(new Object[0])} is identical in function to
603     * {@code toArray()}.
604     *
605     * @param a the array into which the elements of the queue are to
606     *          be stored, if it is big enough; otherwise, a new array of the
607     *          same runtime type is allocated for this purpose
608     * @return an array containing all of the elements in this queue
609     * @throws ArrayStoreException if the runtime type of the specified array
610     *         is not a supertype of the runtime type of every element in
611     *         this queue
612     * @throws NullPointerException if the specified array is null
613     */
614    @SuppressWarnings("unchecked")
615    public <T> T[] toArray(T[] a) {
616        fullyLock();
617        try {
618            int size = count.get();
619            if (a.length < size)
620                a = (T[])java.lang.reflect.Array.newInstance
621                    (a.getClass().getComponentType(), size);
622
623            int k = 0;
624            for (Node<E> p = head.next; p != null; p = p.next)
625                a[k++] = (T)p.item;
626            if (a.length > k)
627                a[k] = null;
628            return a;
629        } finally {
630            fullyUnlock();
631        }
632    }
633
634    public String toString() {
635        return Helpers.collectionToString(this);
636    }
637
638    /**
639     * Atomically removes all of the elements from this queue.
640     * The queue will be empty after this call returns.
641     */
642    public void clear() {
643        fullyLock();
644        try {
645            for (Node<E> p, h = head; (p = h.next) != null; h = p) {
646                h.next = h;
647                p.item = null;
648            }
649            head = last;
650            // assert head.item == null && head.next == null;
651            if (count.getAndSet(0) == capacity)
652                notFull.signal();
653        } finally {
654            fullyUnlock();
655        }
656    }
657
658    /**
659     * @throws UnsupportedOperationException {@inheritDoc}
660     * @throws ClassCastException            {@inheritDoc}
661     * @throws NullPointerException          {@inheritDoc}
662     * @throws IllegalArgumentException      {@inheritDoc}
663     */
664    public int drainTo(Collection<? super E> c) {
665        return drainTo(c, Integer.MAX_VALUE);
666    }
667
668    /**
669     * @throws UnsupportedOperationException {@inheritDoc}
670     * @throws ClassCastException            {@inheritDoc}
671     * @throws NullPointerException          {@inheritDoc}
672     * @throws IllegalArgumentException      {@inheritDoc}
673     */
674    public int drainTo(Collection<? super E> c, int maxElements) {
675        if (c == null)
676            throw new NullPointerException();
677        if (c == this)
678            throw new IllegalArgumentException();
679        if (maxElements <= 0)
680            return 0;
681        boolean signalNotFull = false;
682        final ReentrantLock takeLock = this.takeLock;
683        takeLock.lock();
684        try {
685            int n = Math.min(maxElements, count.get());
686            // count.get provides visibility to first n Nodes
687            Node<E> h = head;
688            int i = 0;
689            try {
690                while (i < n) {
691                    Node<E> p = h.next;
692                    c.add(p.item);
693                    p.item = null;
694                    h.next = h;
695                    h = p;
696                    ++i;
697                }
698                return n;
699            } finally {
700                // Restore invariants even if c.add() threw
701                if (i > 0) {
702                    // assert h.item == null;
703                    head = h;
704                    signalNotFull = (count.getAndAdd(-i) == capacity);
705                }
706            }
707        } finally {
708            takeLock.unlock();
709            if (signalNotFull)
710                signalNotFull();
711        }
712    }
713
714    /**
715     * Returns an iterator over the elements in this queue in proper sequence.
716     * The elements will be returned in order from first (head) to last (tail).
717     *
718     * <p>The returned iterator is
719     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
720     *
721     * @return an iterator over the elements in this queue in proper sequence
722     */
723    public Iterator<E> iterator() {
724        return new Itr();
725    }
726
727    private class Itr implements Iterator<E> {
728        /*
729         * Basic weakly-consistent iterator.  At all times hold the next
730         * item to hand out so that if hasNext() reports true, we will
731         * still have it to return even if lost race with a take etc.
732         */
733
734        private Node<E> current;
735        private Node<E> lastRet;
736        private E currentElement;
737
738        Itr() {
739            fullyLock();
740            try {
741                current = head.next;
742                if (current != null)
743                    currentElement = current.item;
744            } finally {
745                fullyUnlock();
746            }
747        }
748
749        public boolean hasNext() {
750            return current != null;
751        }
752
753        public E next() {
754            fullyLock();
755            try {
756                if (current == null)
757                    throw new NoSuchElementException();
758                lastRet = current;
759                E item = null;
760                // Unlike other traversal methods, iterators must handle both:
761                // - dequeued nodes (p.next == p)
762                // - (possibly multiple) interior removed nodes (p.item == null)
763                for (Node<E> p = current, q;; p = q) {
764                    if ((q = p.next) == p)
765                        q = head.next;
766                    if (q == null || (item = q.item) != null) {
767                        current = q;
768                        E x = currentElement;
769                        currentElement = item;
770                        return x;
771                    }
772                }
773            } finally {
774                fullyUnlock();
775            }
776        }
777
778        public void remove() {
779            if (lastRet == null)
780                throw new IllegalStateException();
781            fullyLock();
782            try {
783                Node<E> node = lastRet;
784                lastRet = null;
785                for (Node<E> trail = head, p = trail.next;
786                     p != null;
787                     trail = p, p = p.next) {
788                    if (p == node) {
789                        unlink(p, trail);
790                        break;
791                    }
792                }
793            } finally {
794                fullyUnlock();
795            }
796        }
797    }
798
799    /** A customized variant of Spliterators.IteratorSpliterator */
800    static final class LBQSpliterator<E> implements Spliterator<E> {
801        static final int MAX_BATCH = 1 << 25;  // max batch array size;
802        final LinkedBlockingQueue<E> queue;
803        Node<E> current;    // current node; null until initialized
804        int batch;          // batch size for splits
805        boolean exhausted;  // true when no more nodes
806        long est;           // size estimate
807        LBQSpliterator(LinkedBlockingQueue<E> queue) {
808            this.queue = queue;
809            this.est = queue.size();
810        }
811
812        public long estimateSize() { return est; }
813
814        public Spliterator<E> trySplit() {
815            Node<E> h;
816            final LinkedBlockingQueue<E> q = this.queue;
817            int b = batch;
818            int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
819            if (!exhausted &&
820                ((h = current) != null || (h = q.head.next) != null) &&
821                h.next != null) {
822                Object[] a = new Object[n];
823                int i = 0;
824                Node<E> p = current;
825                q.fullyLock();
826                try {
827                    if (p != null || (p = q.head.next) != null) {
828                        do {
829                            if ((a[i] = p.item) != null)
830                                ++i;
831                        } while ((p = p.next) != null && i < n);
832                    }
833                } finally {
834                    q.fullyUnlock();
835                }
836                if ((current = p) == null) {
837                    est = 0L;
838                    exhausted = true;
839                }
840                else if ((est -= i) < 0L)
841                    est = 0L;
842                if (i > 0) {
843                    batch = i;
844                    return Spliterators.spliterator
845                        (a, 0, i, (Spliterator.ORDERED |
846                                   Spliterator.NONNULL |
847                                   Spliterator.CONCURRENT));
848                }
849            }
850            return null;
851        }
852
853        public void forEachRemaining(Consumer<? super E> action) {
854            if (action == null) throw new NullPointerException();
855            final LinkedBlockingQueue<E> q = this.queue;
856            if (!exhausted) {
857                exhausted = true;
858                Node<E> p = current;
859                do {
860                    E e = null;
861                    q.fullyLock();
862                    try {
863                        if (p == null)
864                            p = q.head.next;
865                        while (p != null) {
866                            e = p.item;
867                            p = p.next;
868                            if (e != null)
869                                break;
870                        }
871                    } finally {
872                        q.fullyUnlock();
873                    }
874                    if (e != null)
875                        action.accept(e);
876                } while (p != null);
877            }
878        }
879
880        public boolean tryAdvance(Consumer<? super E> action) {
881            if (action == null) throw new NullPointerException();
882            final LinkedBlockingQueue<E> q = this.queue;
883            if (!exhausted) {
884                E e = null;
885                q.fullyLock();
886                try {
887                    if (current == null)
888                        current = q.head.next;
889                    while (current != null) {
890                        e = current.item;
891                        current = current.next;
892                        if (e != null)
893                            break;
894                    }
895                } finally {
896                    q.fullyUnlock();
897                }
898                if (current == null)
899                    exhausted = true;
900                if (e != null) {
901                    action.accept(e);
902                    return true;
903                }
904            }
905            return false;
906        }
907
908        public int characteristics() {
909            return Spliterator.ORDERED | Spliterator.NONNULL |
910                Spliterator.CONCURRENT;
911        }
912    }
913
914    /**
915     * Returns a {@link Spliterator} over the elements in this queue.
916     *
917     * <p>The returned spliterator is
918     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
919     *
920     * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
921     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
922     *
923     * @implNote
924     * The {@code Spliterator} implements {@code trySplit} to permit limited
925     * parallelism.
926     *
927     * @return a {@code Spliterator} over the elements in this queue
928     * @since 1.8
929     */
930    public Spliterator<E> spliterator() {
931        return new LBQSpliterator<E>(this);
932    }
933
934    /**
935     * Saves this queue to a stream (that is, serializes it).
936     *
937     * @param s the stream
938     * @throws java.io.IOException if an I/O error occurs
939     * @serialData The capacity is emitted (int), followed by all of
940     * its elements (each an {@code Object}) in the proper order,
941     * followed by a null
942     */
943    private void writeObject(java.io.ObjectOutputStream s)
944        throws java.io.IOException {
945
946        fullyLock();
947        try {
948            // Write out any hidden stuff, plus capacity
949            s.defaultWriteObject();
950
951            // Write out all elements in the proper order.
952            for (Node<E> p = head.next; p != null; p = p.next)
953                s.writeObject(p.item);
954
955            // Use trailing null as sentinel
956            s.writeObject(null);
957        } finally {
958            fullyUnlock();
959        }
960    }
961
962    /**
963     * Reconstitutes this queue from a stream (that is, deserializes it).
964     * @param s the stream
965     * @throws ClassNotFoundException if the class of a serialized object
966     *         could not be found
967     * @throws java.io.IOException if an I/O error occurs
968     */
969    private void readObject(java.io.ObjectInputStream s)
970        throws java.io.IOException, ClassNotFoundException {
971        // Read in capacity, and any hidden stuff
972        s.defaultReadObject();
973
974        count.set(0);
975        last = head = new Node<E>(null);
976
977        // Read in all elements and place in queue
978        for (;;) {
979            @SuppressWarnings("unchecked")
980            E item = (E)s.readObject();
981            if (item == null)
982                break;
983            add(item);
984        }
985    }
986}
987