ArrayBlockingQueue.java revision 29957558cf0db700bfaae360a80c42dc3871d0e5
1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.lang.ref.WeakReference;
39import java.util.AbstractQueue;
40import java.util.Arrays;
41import java.util.Collection;
42import java.util.Iterator;
43import java.util.NoSuchElementException;
44import java.util.Objects;
45import java.util.Spliterator;
46import java.util.Spliterators;
47import java.util.concurrent.locks.Condition;
48import java.util.concurrent.locks.ReentrantLock;
49
50// BEGIN android-note
51// removed link to collections framework docs
52// END android-note
53
54/**
55 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
56 * array.  This queue orders elements FIFO (first-in-first-out).  The
57 * <em>head</em> of the queue is that element that has been on the
58 * queue the longest time.  The <em>tail</em> of the queue is that
59 * element that has been on the queue the shortest time. New elements
60 * are inserted at the tail of the queue, and the queue retrieval
61 * operations obtain elements at the head of the queue.
62 *
63 * <p>This is a classic &quot;bounded buffer&quot;, in which a
64 * fixed-sized array holds elements inserted by producers and
65 * extracted by consumers.  Once created, the capacity cannot be
66 * changed.  Attempts to {@code put} an element into a full queue
67 * will result in the operation blocking; attempts to {@code take} an
68 * element from an empty queue will similarly block.
69 *
70 * <p>This class supports an optional fairness policy for ordering
71 * waiting producer and consumer threads.  By default, this ordering
72 * is not guaranteed. However, a queue constructed with fairness set
73 * to {@code true} grants threads access in FIFO order. Fairness
74 * generally decreases throughput but reduces variability and avoids
75 * starvation.
76 *
77 * <p>This class and its iterator implement all of the
78 * <em>optional</em> methods of the {@link Collection} and {@link
79 * Iterator} interfaces.
80 *
81 * @since 1.5
82 * @author Doug Lea
83 * @param <E> the type of elements held in this queue
84 */
85public class ArrayBlockingQueue<E> extends AbstractQueue<E>
86        implements BlockingQueue<E>, java.io.Serializable {
87
88    /**
89     * Serialization ID. This class relies on default serialization
90     * even for the items array, which is default-serialized, even if
91     * it is empty. Otherwise it could not be declared final, which is
92     * necessary here.
93     */
94    private static final long serialVersionUID = -817911632652898426L;
95
96    /** The queued items */
97    final Object[] items;
98
99    /** items index for next take, poll, peek or remove */
100    int takeIndex;
101
102    /** items index for next put, offer, or add */
103    int putIndex;
104
105    /** Number of elements in the queue */
106    int count;
107
108    /*
109     * Concurrency control uses the classic two-condition algorithm
110     * found in any textbook.
111     */
112
113    /** Main lock guarding all access */
114    final ReentrantLock lock;
115
116    /** Condition for waiting takes */
117    private final Condition notEmpty;
118
119    /** Condition for waiting puts */
120    private final Condition notFull;
121
122    /**
123     * Shared state for currently active iterators, or null if there
124     * are known not to be any.  Allows queue operations to update
125     * iterator state.
126     */
127    transient Itrs itrs;
128
129    // Internal helper methods
130
131    /**
132     * Circularly decrements array index i.
133     */
134    final int dec(int i) {
135        return ((i == 0) ? items.length : i) - 1;
136    }
137
138    /**
139     * Returns item at index i.
140     */
141    @SuppressWarnings("unchecked")
142    final E itemAt(int i) {
143        return (E) items[i];
144    }
145
146    /**
147     * Inserts element at current put position, advances, and signals.
148     * Call only when holding lock.
149     */
150    private void enqueue(E x) {
151        // assert lock.getHoldCount() == 1;
152        // assert items[putIndex] == null;
153        final Object[] items = this.items;
154        items[putIndex] = x;
155        if (++putIndex == items.length) putIndex = 0;
156        count++;
157        notEmpty.signal();
158    }
159
160    /**
161     * Extracts element at current take position, advances, and signals.
162     * Call only when holding lock.
163     */
164    private E dequeue() {
165        // assert lock.getHoldCount() == 1;
166        // assert items[takeIndex] != null;
167        final Object[] items = this.items;
168        @SuppressWarnings("unchecked")
169        E x = (E) items[takeIndex];
170        items[takeIndex] = null;
171        if (++takeIndex == items.length) takeIndex = 0;
172        count--;
173        if (itrs != null)
174            itrs.elementDequeued();
175        notFull.signal();
176        return x;
177    }
178
179    /**
180     * Deletes item at array index removeIndex.
181     * Utility for remove(Object) and iterator.remove.
182     * Call only when holding lock.
183     */
184    void removeAt(final int removeIndex) {
185        // assert lock.getHoldCount() == 1;
186        // assert items[removeIndex] != null;
187        // assert removeIndex >= 0 && removeIndex < items.length;
188        final Object[] items = this.items;
189        if (removeIndex == takeIndex) {
190            // removing front item; just advance
191            items[takeIndex] = null;
192            if (++takeIndex == items.length) takeIndex = 0;
193            count--;
194            if (itrs != null)
195                itrs.elementDequeued();
196        } else {
197            // an "interior" remove
198
199            // slide over all others up through putIndex.
200            for (int i = removeIndex, putIndex = this.putIndex;;) {
201                int pred = i;
202                if (++i == items.length) i = 0;
203                if (i == putIndex) {
204                    items[pred] = null;
205                    this.putIndex = pred;
206                    break;
207                }
208                items[pred] = items[i];
209            }
210            count--;
211            if (itrs != null)
212                itrs.removedAt(removeIndex);
213        }
214        notFull.signal();
215    }
216
217    /**
218     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
219     * capacity and default access policy.
220     *
221     * @param capacity the capacity of this queue
222     * @throws IllegalArgumentException if {@code capacity < 1}
223     */
224    public ArrayBlockingQueue(int capacity) {
225        this(capacity, false);
226    }
227
228    /**
229     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
230     * capacity and the specified access policy.
231     *
232     * @param capacity the capacity of this queue
233     * @param fair if {@code true} then queue accesses for threads blocked
234     *        on insertion or removal, are processed in FIFO order;
235     *        if {@code false} the access order is unspecified.
236     * @throws IllegalArgumentException if {@code capacity < 1}
237     */
238    public ArrayBlockingQueue(int capacity, boolean fair) {
239        if (capacity <= 0)
240            throw new IllegalArgumentException();
241        this.items = new Object[capacity];
242        lock = new ReentrantLock(fair);
243        notEmpty = lock.newCondition();
244        notFull =  lock.newCondition();
245    }
246
247    /**
248     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
249     * capacity, the specified access policy and initially containing the
250     * elements of the given collection,
251     * added in traversal order of the collection's iterator.
252     *
253     * @param capacity the capacity of this queue
254     * @param fair if {@code true} then queue accesses for threads blocked
255     *        on insertion or removal, are processed in FIFO order;
256     *        if {@code false} the access order is unspecified.
257     * @param c the collection of elements to initially contain
258     * @throws IllegalArgumentException if {@code capacity} is less than
259     *         {@code c.size()}, or less than 1.
260     * @throws NullPointerException if the specified collection or any
261     *         of its elements are null
262     */
263    public ArrayBlockingQueue(int capacity, boolean fair,
264                              Collection<? extends E> c) {
265        this(capacity, fair);
266
267        final ReentrantLock lock = this.lock;
268        lock.lock(); // Lock only for visibility, not mutual exclusion
269        try {
270            int i = 0;
271            try {
272                for (E e : c)
273                    items[i++] = Objects.requireNonNull(e);
274            } catch (ArrayIndexOutOfBoundsException ex) {
275                throw new IllegalArgumentException();
276            }
277            count = i;
278            putIndex = (i == capacity) ? 0 : i;
279        } finally {
280            lock.unlock();
281        }
282    }
283
284    /**
285     * Inserts the specified element at the tail of this queue if it is
286     * possible to do so immediately without exceeding the queue's capacity,
287     * returning {@code true} upon success and throwing an
288     * {@code IllegalStateException} if this queue is full.
289     *
290     * @param e the element to add
291     * @return {@code true} (as specified by {@link Collection#add})
292     * @throws IllegalStateException if this queue is full
293     * @throws NullPointerException if the specified element is null
294     */
295    public boolean add(E e) {
296        return super.add(e);
297    }
298
299    /**
300     * Inserts the specified element at the tail of this queue if it is
301     * possible to do so immediately without exceeding the queue's capacity,
302     * returning {@code true} upon success and {@code false} if this queue
303     * is full.  This method is generally preferable to method {@link #add},
304     * which can fail to insert an element only by throwing an exception.
305     *
306     * @throws NullPointerException if the specified element is null
307     */
308    public boolean offer(E e) {
309        Objects.requireNonNull(e);
310        final ReentrantLock lock = this.lock;
311        lock.lock();
312        try {
313            if (count == items.length)
314                return false;
315            else {
316                enqueue(e);
317                return true;
318            }
319        } finally {
320            lock.unlock();
321        }
322    }
323
324    /**
325     * Inserts the specified element at the tail of this queue, waiting
326     * for space to become available if the queue is full.
327     *
328     * @throws InterruptedException {@inheritDoc}
329     * @throws NullPointerException {@inheritDoc}
330     */
331    public void put(E e) throws InterruptedException {
332        Objects.requireNonNull(e);
333        final ReentrantLock lock = this.lock;
334        lock.lockInterruptibly();
335        try {
336            while (count == items.length)
337                notFull.await();
338            enqueue(e);
339        } finally {
340            lock.unlock();
341        }
342    }
343
344    /**
345     * Inserts the specified element at the tail of this queue, waiting
346     * up to the specified wait time for space to become available if
347     * the queue is full.
348     *
349     * @throws InterruptedException {@inheritDoc}
350     * @throws NullPointerException {@inheritDoc}
351     */
352    public boolean offer(E e, long timeout, TimeUnit unit)
353        throws InterruptedException {
354
355        Objects.requireNonNull(e);
356        long nanos = unit.toNanos(timeout);
357        final ReentrantLock lock = this.lock;
358        lock.lockInterruptibly();
359        try {
360            while (count == items.length) {
361                if (nanos <= 0L)
362                    return false;
363                nanos = notFull.awaitNanos(nanos);
364            }
365            enqueue(e);
366            return true;
367        } finally {
368            lock.unlock();
369        }
370    }
371
372    public E poll() {
373        final ReentrantLock lock = this.lock;
374        lock.lock();
375        try {
376            return (count == 0) ? null : dequeue();
377        } finally {
378            lock.unlock();
379        }
380    }
381
382    public E take() throws InterruptedException {
383        final ReentrantLock lock = this.lock;
384        lock.lockInterruptibly();
385        try {
386            while (count == 0)
387                notEmpty.await();
388            return dequeue();
389        } finally {
390            lock.unlock();
391        }
392    }
393
394    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
395        long nanos = unit.toNanos(timeout);
396        final ReentrantLock lock = this.lock;
397        lock.lockInterruptibly();
398        try {
399            while (count == 0) {
400                if (nanos <= 0L)
401                    return null;
402                nanos = notEmpty.awaitNanos(nanos);
403            }
404            return dequeue();
405        } finally {
406            lock.unlock();
407        }
408    }
409
410    public E peek() {
411        final ReentrantLock lock = this.lock;
412        lock.lock();
413        try {
414            return itemAt(takeIndex); // null when queue is empty
415        } finally {
416            lock.unlock();
417        }
418    }
419
420    // this doc comment is overridden to remove the reference to collections
421    // greater in size than Integer.MAX_VALUE
422    /**
423     * Returns the number of elements in this queue.
424     *
425     * @return the number of elements in this queue
426     */
427    public int size() {
428        final ReentrantLock lock = this.lock;
429        lock.lock();
430        try {
431            return count;
432        } finally {
433            lock.unlock();
434        }
435    }
436
437    // this doc comment is a modified copy of the inherited doc comment,
438    // without the reference to unlimited queues.
439    /**
440     * Returns the number of additional elements that this queue can ideally
441     * (in the absence of memory or resource constraints) accept without
442     * blocking. This is always equal to the initial capacity of this queue
443     * less the current {@code size} of this queue.
444     *
445     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
446     * an element will succeed by inspecting {@code remainingCapacity}
447     * because it may be the case that another thread is about to
448     * insert or remove an element.
449     */
450    public int remainingCapacity() {
451        final ReentrantLock lock = this.lock;
452        lock.lock();
453        try {
454            return items.length - count;
455        } finally {
456            lock.unlock();
457        }
458    }
459
460    /**
461     * Removes a single instance of the specified element from this queue,
462     * if it is present.  More formally, removes an element {@code e} such
463     * that {@code o.equals(e)}, if this queue contains one or more such
464     * elements.
465     * Returns {@code true} if this queue contained the specified element
466     * (or equivalently, if this queue changed as a result of the call).
467     *
468     * <p>Removal of interior elements in circular array based queues
469     * is an intrinsically slow and disruptive operation, so should
470     * be undertaken only in exceptional circumstances, ideally
471     * only when the queue is known not to be accessible by other
472     * threads.
473     *
474     * @param o element to be removed from this queue, if present
475     * @return {@code true} if this queue changed as a result of the call
476     */
477    public boolean remove(Object o) {
478        if (o == null) return false;
479        final ReentrantLock lock = this.lock;
480        lock.lock();
481        try {
482            if (count > 0) {
483                final Object[] items = this.items;
484                final int putIndex = this.putIndex;
485                int i = takeIndex;
486                do {
487                    if (o.equals(items[i])) {
488                        removeAt(i);
489                        return true;
490                    }
491                    if (++i == items.length) i = 0;
492                } while (i != putIndex);
493            }
494            return false;
495        } finally {
496            lock.unlock();
497        }
498    }
499
500    /**
501     * Returns {@code true} if this queue contains the specified element.
502     * More formally, returns {@code true} if and only if this queue contains
503     * at least one element {@code e} such that {@code o.equals(e)}.
504     *
505     * @param o object to be checked for containment in this queue
506     * @return {@code true} if this queue contains the specified element
507     */
508    public boolean contains(Object o) {
509        if (o == null) return false;
510        final ReentrantLock lock = this.lock;
511        lock.lock();
512        try {
513            if (count > 0) {
514                final Object[] items = this.items;
515                final int putIndex = this.putIndex;
516                int i = takeIndex;
517                do {
518                    if (o.equals(items[i]))
519                        return true;
520                    if (++i == items.length) i = 0;
521                } while (i != putIndex);
522            }
523            return false;
524        } finally {
525            lock.unlock();
526        }
527    }
528
529    /**
530     * Returns an array containing all of the elements in this queue, in
531     * proper sequence.
532     *
533     * <p>The returned array will be "safe" in that no references to it are
534     * maintained by this queue.  (In other words, this method must allocate
535     * a new array).  The caller is thus free to modify the returned array.
536     *
537     * <p>This method acts as bridge between array-based and collection-based
538     * APIs.
539     *
540     * @return an array containing all of the elements in this queue
541     */
542    public Object[] toArray() {
543        final ReentrantLock lock = this.lock;
544        lock.lock();
545        try {
546            final Object[] items = this.items;
547            final int end = takeIndex + count;
548            final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
549            if (end != putIndex)
550                System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
551            return a;
552        } finally {
553            lock.unlock();
554        }
555    }
556
557    /**
558     * Returns an array containing all of the elements in this queue, in
559     * proper sequence; the runtime type of the returned array is that of
560     * the specified array.  If the queue fits in the specified array, it
561     * is returned therein.  Otherwise, a new array is allocated with the
562     * runtime type of the specified array and the size of this queue.
563     *
564     * <p>If this queue fits in the specified array with room to spare
565     * (i.e., the array has more elements than this queue), the element in
566     * the array immediately following the end of the queue is set to
567     * {@code null}.
568     *
569     * <p>Like the {@link #toArray()} method, this method acts as bridge between
570     * array-based and collection-based APIs.  Further, this method allows
571     * precise control over the runtime type of the output array, and may,
572     * under certain circumstances, be used to save allocation costs.
573     *
574     * <p>Suppose {@code x} is a queue known to contain only strings.
575     * The following code can be used to dump the queue into a newly
576     * allocated array of {@code String}:
577     *
578     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
579     *
580     * Note that {@code toArray(new Object[0])} is identical in function to
581     * {@code toArray()}.
582     *
583     * @param a the array into which the elements of the queue are to
584     *          be stored, if it is big enough; otherwise, a new array of the
585     *          same runtime type is allocated for this purpose
586     * @return an array containing all of the elements in this queue
587     * @throws ArrayStoreException if the runtime type of the specified array
588     *         is not a supertype of the runtime type of every element in
589     *         this queue
590     * @throws NullPointerException if the specified array is null
591     */
592    @SuppressWarnings("unchecked")
593    public <T> T[] toArray(T[] a) {
594        final ReentrantLock lock = this.lock;
595        lock.lock();
596        try {
597            final Object[] items = this.items;
598            final int count = this.count;
599            final int firstLeg = Math.min(items.length - takeIndex, count);
600            if (a.length < count) {
601                a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
602                                             a.getClass());
603            } else {
604                System.arraycopy(items, takeIndex, a, 0, firstLeg);
605                if (a.length > count)
606                    a[count] = null;
607            }
608            if (firstLeg < count)
609                System.arraycopy(items, 0, a, firstLeg, putIndex);
610            return a;
611        } finally {
612            lock.unlock();
613        }
614    }
615
616    public String toString() {
617        return Helpers.collectionToString(this);
618    }
619
620    /**
621     * Atomically removes all of the elements from this queue.
622     * The queue will be empty after this call returns.
623     */
624    public void clear() {
625        final ReentrantLock lock = this.lock;
626        lock.lock();
627        try {
628            int k = count;
629            if (k > 0) {
630                final Object[] items = this.items;
631                final int putIndex = this.putIndex;
632                int i = takeIndex;
633                do {
634                    items[i] = null;
635                    if (++i == items.length) i = 0;
636                } while (i != putIndex);
637                takeIndex = putIndex;
638                count = 0;
639                if (itrs != null)
640                    itrs.queueIsEmpty();
641                for (; k > 0 && lock.hasWaiters(notFull); k--)
642                    notFull.signal();
643            }
644        } finally {
645            lock.unlock();
646        }
647    }
648
649    /**
650     * @throws UnsupportedOperationException {@inheritDoc}
651     * @throws ClassCastException            {@inheritDoc}
652     * @throws NullPointerException          {@inheritDoc}
653     * @throws IllegalArgumentException      {@inheritDoc}
654     */
655    public int drainTo(Collection<? super E> c) {
656        return drainTo(c, Integer.MAX_VALUE);
657    }
658
659    /**
660     * @throws UnsupportedOperationException {@inheritDoc}
661     * @throws ClassCastException            {@inheritDoc}
662     * @throws NullPointerException          {@inheritDoc}
663     * @throws IllegalArgumentException      {@inheritDoc}
664     */
665    public int drainTo(Collection<? super E> c, int maxElements) {
666        Objects.requireNonNull(c);
667        if (c == this)
668            throw new IllegalArgumentException();
669        if (maxElements <= 0)
670            return 0;
671        final Object[] items = this.items;
672        final ReentrantLock lock = this.lock;
673        lock.lock();
674        try {
675            int n = Math.min(maxElements, count);
676            int take = takeIndex;
677            int i = 0;
678            try {
679                while (i < n) {
680                    @SuppressWarnings("unchecked")
681                    E x = (E) items[take];
682                    c.add(x);
683                    items[take] = null;
684                    if (++take == items.length) take = 0;
685                    i++;
686                }
687                return n;
688            } finally {
689                // Restore invariants even if c.add() threw
690                if (i > 0) {
691                    count -= i;
692                    takeIndex = take;
693                    if (itrs != null) {
694                        if (count == 0)
695                            itrs.queueIsEmpty();
696                        else if (i > take)
697                            itrs.takeIndexWrapped();
698                    }
699                    for (; i > 0 && lock.hasWaiters(notFull); i--)
700                        notFull.signal();
701                }
702            }
703        } finally {
704            lock.unlock();
705        }
706    }
707
708    /**
709     * Returns an iterator over the elements in this queue in proper sequence.
710     * The elements will be returned in order from first (head) to last (tail).
711     *
712     * <p>The returned iterator is
713     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
714     *
715     * @return an iterator over the elements in this queue in proper sequence
716     */
717    public Iterator<E> iterator() {
718        return new Itr();
719    }
720
721    /**
722     * Shared data between iterators and their queue, allowing queue
723     * modifications to update iterators when elements are removed.
724     *
725     * This adds a lot of complexity for the sake of correctly
726     * handling some uncommon operations, but the combination of
727     * circular-arrays and supporting interior removes (i.e., those
728     * not at head) would cause iterators to sometimes lose their
729     * places and/or (re)report elements they shouldn't.  To avoid
730     * this, when a queue has one or more iterators, it keeps iterator
731     * state consistent by:
732     *
733     * (1) keeping track of the number of "cycles", that is, the
734     *     number of times takeIndex has wrapped around to 0.
735     * (2) notifying all iterators via the callback removedAt whenever
736     *     an interior element is removed (and thus other elements may
737     *     be shifted).
738     *
739     * These suffice to eliminate iterator inconsistencies, but
740     * unfortunately add the secondary responsibility of maintaining
741     * the list of iterators.  We track all active iterators in a
742     * simple linked list (accessed only when the queue's lock is
743     * held) of weak references to Itr.  The list is cleaned up using
744     * 3 different mechanisms:
745     *
746     * (1) Whenever a new iterator is created, do some O(1) checking for
747     *     stale list elements.
748     *
749     * (2) Whenever takeIndex wraps around to 0, check for iterators
750     *     that have been unused for more than one wrap-around cycle.
751     *
752     * (3) Whenever the queue becomes empty, all iterators are notified
753     *     and this entire data structure is discarded.
754     *
755     * So in addition to the removedAt callback that is necessary for
756     * correctness, iterators have the shutdown and takeIndexWrapped
757     * callbacks that help remove stale iterators from the list.
758     *
759     * Whenever a list element is examined, it is expunged if either
760     * the GC has determined that the iterator is discarded, or if the
761     * iterator reports that it is "detached" (does not need any
762     * further state updates).  Overhead is maximal when takeIndex
763     * never advances, iterators are discarded before they are
764     * exhausted, and all removals are interior removes, in which case
765     * all stale iterators are discovered by the GC.  But even in this
766     * case we don't increase the amortized complexity.
767     *
768     * Care must be taken to keep list sweeping methods from
769     * reentrantly invoking another such method, causing subtle
770     * corruption bugs.
771     */
772    class Itrs {
773
774        /**
775         * Node in a linked list of weak iterator references.
776         */
777        private class Node extends WeakReference<Itr> {
778            Node next;
779
780            Node(Itr iterator, Node next) {
781                super(iterator);
782                this.next = next;
783            }
784        }
785
786        /** Incremented whenever takeIndex wraps around to 0 */
787        int cycles;
788
789        /** Linked list of weak iterator references */
790        private Node head;
791
792        /** Used to expunge stale iterators */
793        private Node sweeper;
794
795        private static final int SHORT_SWEEP_PROBES = 4;
796        private static final int LONG_SWEEP_PROBES = 16;
797
798        Itrs(Itr initial) {
799            register(initial);
800        }
801
802        /**
803         * Sweeps itrs, looking for and expunging stale iterators.
804         * If at least one was found, tries harder to find more.
805         * Called only from iterating thread.
806         *
807         * @param tryHarder whether to start in try-harder mode, because
808         * there is known to be at least one iterator to collect
809         */
810        void doSomeSweeping(boolean tryHarder) {
811            // assert lock.getHoldCount() == 1;
812            // assert head != null;
813            int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
814            Node o, p;
815            final Node sweeper = this.sweeper;
816            boolean passedGo;   // to limit search to one full sweep
817
818            if (sweeper == null) {
819                o = null;
820                p = head;
821                passedGo = true;
822            } else {
823                o = sweeper;
824                p = o.next;
825                passedGo = false;
826            }
827
828            for (; probes > 0; probes--) {
829                if (p == null) {
830                    if (passedGo)
831                        break;
832                    o = null;
833                    p = head;
834                    passedGo = true;
835                }
836                final Itr it = p.get();
837                final Node next = p.next;
838                if (it == null || it.isDetached()) {
839                    // found a discarded/exhausted iterator
840                    probes = LONG_SWEEP_PROBES; // "try harder"
841                    // unlink p
842                    p.clear();
843                    p.next = null;
844                    if (o == null) {
845                        head = next;
846                        if (next == null) {
847                            // We've run out of iterators to track; retire
848                            itrs = null;
849                            return;
850                        }
851                    }
852                    else
853                        o.next = next;
854                } else {
855                    o = p;
856                }
857                p = next;
858            }
859
860            this.sweeper = (p == null) ? null : o;
861        }
862
863        /**
864         * Adds a new iterator to the linked list of tracked iterators.
865         */
866        void register(Itr itr) {
867            // assert lock.getHoldCount() == 1;
868            head = new Node(itr, head);
869        }
870
871        /**
872         * Called whenever takeIndex wraps around to 0.
873         *
874         * Notifies all iterators, and expunges any that are now stale.
875         */
876        void takeIndexWrapped() {
877            // assert lock.getHoldCount() == 1;
878            cycles++;
879            for (Node o = null, p = head; p != null;) {
880                final Itr it = p.get();
881                final Node next = p.next;
882                if (it == null || it.takeIndexWrapped()) {
883                    // unlink p
884                    // assert it == null || it.isDetached();
885                    p.clear();
886                    p.next = null;
887                    if (o == null)
888                        head = next;
889                    else
890                        o.next = next;
891                } else {
892                    o = p;
893                }
894                p = next;
895            }
896            if (head == null)   // no more iterators to track
897                itrs = null;
898        }
899
900        /**
901         * Called whenever an interior remove (not at takeIndex) occurred.
902         *
903         * Notifies all iterators, and expunges any that are now stale.
904         */
905        void removedAt(int removedIndex) {
906            for (Node o = null, p = head; p != null;) {
907                final Itr it = p.get();
908                final Node next = p.next;
909                if (it == null || it.removedAt(removedIndex)) {
910                    // unlink p
911                    // assert it == null || it.isDetached();
912                    p.clear();
913                    p.next = null;
914                    if (o == null)
915                        head = next;
916                    else
917                        o.next = next;
918                } else {
919                    o = p;
920                }
921                p = next;
922            }
923            if (head == null)   // no more iterators to track
924                itrs = null;
925        }
926
927        /**
928         * Called whenever the queue becomes empty.
929         *
930         * Notifies all active iterators that the queue is empty,
931         * clears all weak refs, and unlinks the itrs datastructure.
932         */
933        void queueIsEmpty() {
934            // assert lock.getHoldCount() == 1;
935            for (Node p = head; p != null; p = p.next) {
936                Itr it = p.get();
937                if (it != null) {
938                    p.clear();
939                    it.shutdown();
940                }
941            }
942            head = null;
943            itrs = null;
944        }
945
946        /**
947         * Called whenever an element has been dequeued (at takeIndex).
948         */
949        void elementDequeued() {
950            // assert lock.getHoldCount() == 1;
951            if (count == 0)
952                queueIsEmpty();
953            else if (takeIndex == 0)
954                takeIndexWrapped();
955        }
956    }
957
958    /**
959     * Iterator for ArrayBlockingQueue.
960     *
961     * To maintain weak consistency with respect to puts and takes, we
962     * read ahead one slot, so as to not report hasNext true but then
963     * not have an element to return.
964     *
965     * We switch into "detached" mode (allowing prompt unlinking from
966     * itrs without help from the GC) when all indices are negative, or
967     * when hasNext returns false for the first time.  This allows the
968     * iterator to track concurrent updates completely accurately,
969     * except for the corner case of the user calling Iterator.remove()
970     * after hasNext() returned false.  Even in this case, we ensure
971     * that we don't remove the wrong element by keeping track of the
972     * expected element to remove, in lastItem.  Yes, we may fail to
973     * remove lastItem from the queue if it moved due to an interleaved
974     * interior remove while in detached mode.
975     */
976    private class Itr implements Iterator<E> {
977        /** Index to look for new nextItem; NONE at end */
978        private int cursor;
979
980        /** Element to be returned by next call to next(); null if none */
981        private E nextItem;
982
983        /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
984        private int nextIndex;
985
986        /** Last element returned; null if none or not detached. */
987        private E lastItem;
988
989        /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
990        private int lastRet;
991
992        /** Previous value of takeIndex, or DETACHED when detached */
993        private int prevTakeIndex;
994
995        /** Previous value of iters.cycles */
996        private int prevCycles;
997
998        /** Special index value indicating "not available" or "undefined" */
999        private static final int NONE = -1;
1000
1001        /**
1002         * Special index value indicating "removed elsewhere", that is,
1003         * removed by some operation other than a call to this.remove().
1004         */
1005        private static final int REMOVED = -2;
1006
1007        /** Special value for prevTakeIndex indicating "detached mode" */
1008        private static final int DETACHED = -3;
1009
1010        Itr() {
1011            // assert lock.getHoldCount() == 0;
1012            lastRet = NONE;
1013            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1014            lock.lock();
1015            try {
1016                if (count == 0) {
1017                    // assert itrs == null;
1018                    cursor = NONE;
1019                    nextIndex = NONE;
1020                    prevTakeIndex = DETACHED;
1021                } else {
1022                    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1023                    prevTakeIndex = takeIndex;
1024                    nextItem = itemAt(nextIndex = takeIndex);
1025                    cursor = incCursor(takeIndex);
1026                    if (itrs == null) {
1027                        itrs = new Itrs(this);
1028                    } else {
1029                        itrs.register(this); // in this order
1030                        itrs.doSomeSweeping(false);
1031                    }
1032                    prevCycles = itrs.cycles;
1033                    // assert takeIndex >= 0;
1034                    // assert prevTakeIndex == takeIndex;
1035                    // assert nextIndex >= 0;
1036                    // assert nextItem != null;
1037                }
1038            } finally {
1039                lock.unlock();
1040            }
1041        }
1042
1043        boolean isDetached() {
1044            // assert lock.getHoldCount() == 1;
1045            return prevTakeIndex < 0;
1046        }
1047
1048        private int incCursor(int index) {
1049            // assert lock.getHoldCount() == 1;
1050            if (++index == items.length) index = 0;
1051            if (index == putIndex) index = NONE;
1052            return index;
1053        }
1054
1055        /**
1056         * Returns true if index is invalidated by the given number of
1057         * dequeues, starting from prevTakeIndex.
1058         */
1059        private boolean invalidated(int index, int prevTakeIndex,
1060                                    long dequeues, int length) {
1061            if (index < 0)
1062                return false;
1063            int distance = index - prevTakeIndex;
1064            if (distance < 0)
1065                distance += length;
1066            return dequeues > distance;
1067        }
1068
1069        /**
1070         * Adjusts indices to incorporate all dequeues since the last
1071         * operation on this iterator.  Call only from iterating thread.
1072         */
1073        private void incorporateDequeues() {
1074            // assert lock.getHoldCount() == 1;
1075            // assert itrs != null;
1076            // assert !isDetached();
1077            // assert count > 0;
1078
1079            final int cycles = itrs.cycles;
1080            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1081            final int prevCycles = this.prevCycles;
1082            final int prevTakeIndex = this.prevTakeIndex;
1083
1084            if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1085                final int len = items.length;
1086                // how far takeIndex has advanced since the previous
1087                // operation of this iterator
1088                long dequeues = (cycles - prevCycles) * len
1089                    + (takeIndex - prevTakeIndex);
1090
1091                // Check indices for invalidation
1092                if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1093                    lastRet = REMOVED;
1094                if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1095                    nextIndex = REMOVED;
1096                if (invalidated(cursor, prevTakeIndex, dequeues, len))
1097                    cursor = takeIndex;
1098
1099                if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1100                    detach();
1101                else {
1102                    this.prevCycles = cycles;
1103                    this.prevTakeIndex = takeIndex;
1104                }
1105            }
1106        }
1107
1108        /**
1109         * Called when itrs should stop tracking this iterator, either
1110         * because there are no more indices to update (cursor < 0 &&
1111         * nextIndex < 0 && lastRet < 0) or as a special exception, when
1112         * lastRet >= 0, because hasNext() is about to return false for the
1113         * first time.  Call only from iterating thread.
1114         */
1115        private void detach() {
1116            // Switch to detached mode
1117            // assert lock.getHoldCount() == 1;
1118            // assert cursor == NONE;
1119            // assert nextIndex < 0;
1120            // assert lastRet < 0 || nextItem == null;
1121            // assert lastRet < 0 ^ lastItem != null;
1122            if (prevTakeIndex >= 0) {
1123                // assert itrs != null;
1124                prevTakeIndex = DETACHED;
1125                // try to unlink from itrs (but not too hard)
1126                itrs.doSomeSweeping(true);
1127            }
1128        }
1129
1130        /**
1131         * For performance reasons, we would like not to acquire a lock in
1132         * hasNext in the common case.  To allow for this, we only access
1133         * fields (i.e. nextItem) that are not modified by update operations
1134         * triggered by queue modifications.
1135         */
1136        public boolean hasNext() {
1137            // assert lock.getHoldCount() == 0;
1138            if (nextItem != null)
1139                return true;
1140            noNext();
1141            return false;
1142        }
1143
1144        private void noNext() {
1145            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1146            lock.lock();
1147            try {
1148                // assert cursor == NONE;
1149                // assert nextIndex == NONE;
1150                if (!isDetached()) {
1151                    // assert lastRet >= 0;
1152                    incorporateDequeues(); // might update lastRet
1153                    if (lastRet >= 0) {
1154                        lastItem = itemAt(lastRet);
1155                        // assert lastItem != null;
1156                        detach();
1157                    }
1158                }
1159                // assert isDetached();
1160                // assert lastRet < 0 ^ lastItem != null;
1161            } finally {
1162                lock.unlock();
1163            }
1164        }
1165
1166        public E next() {
1167            // assert lock.getHoldCount() == 0;
1168            final E x = nextItem;
1169            if (x == null)
1170                throw new NoSuchElementException();
1171            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1172            lock.lock();
1173            try {
1174                if (!isDetached())
1175                    incorporateDequeues();
1176                // assert nextIndex != NONE;
1177                // assert lastItem == null;
1178                lastRet = nextIndex;
1179                final int cursor = this.cursor;
1180                if (cursor >= 0) {
1181                    nextItem = itemAt(nextIndex = cursor);
1182                    // assert nextItem != null;
1183                    this.cursor = incCursor(cursor);
1184                } else {
1185                    nextIndex = NONE;
1186                    nextItem = null;
1187                }
1188            } finally {
1189                lock.unlock();
1190            }
1191            return x;
1192        }
1193
1194        public void remove() {
1195            // assert lock.getHoldCount() == 0;
1196            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1197            lock.lock();
1198            try {
1199                if (!isDetached())
1200                    incorporateDequeues(); // might update lastRet or detach
1201                final int lastRet = this.lastRet;
1202                this.lastRet = NONE;
1203                if (lastRet >= 0) {
1204                    if (!isDetached())
1205                        removeAt(lastRet);
1206                    else {
1207                        final E lastItem = this.lastItem;
1208                        // assert lastItem != null;
1209                        this.lastItem = null;
1210                        if (itemAt(lastRet) == lastItem)
1211                            removeAt(lastRet);
1212                    }
1213                } else if (lastRet == NONE)
1214                    throw new IllegalStateException();
1215                // else lastRet == REMOVED and the last returned element was
1216                // previously asynchronously removed via an operation other
1217                // than this.remove(), so nothing to do.
1218
1219                if (cursor < 0 && nextIndex < 0)
1220                    detach();
1221            } finally {
1222                lock.unlock();
1223                // assert lastRet == NONE;
1224                // assert lastItem == null;
1225            }
1226        }
1227
1228        /**
1229         * Called to notify the iterator that the queue is empty, or that it
1230         * has fallen hopelessly behind, so that it should abandon any
1231         * further iteration, except possibly to return one more element
1232         * from next(), as promised by returning true from hasNext().
1233         */
1234        void shutdown() {
1235            // assert lock.getHoldCount() == 1;
1236            cursor = NONE;
1237            if (nextIndex >= 0)
1238                nextIndex = REMOVED;
1239            if (lastRet >= 0) {
1240                lastRet = REMOVED;
1241                lastItem = null;
1242            }
1243            prevTakeIndex = DETACHED;
1244            // Don't set nextItem to null because we must continue to be
1245            // able to return it on next().
1246            //
1247            // Caller will unlink from itrs when convenient.
1248        }
1249
1250        private int distance(int index, int prevTakeIndex, int length) {
1251            int distance = index - prevTakeIndex;
1252            if (distance < 0)
1253                distance += length;
1254            return distance;
1255        }
1256
1257        /**
1258         * Called whenever an interior remove (not at takeIndex) occurred.
1259         *
1260         * @return true if this iterator should be unlinked from itrs
1261         */
1262        boolean removedAt(int removedIndex) {
1263            // assert lock.getHoldCount() == 1;
1264            if (isDetached())
1265                return true;
1266
1267            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1268            final int prevTakeIndex = this.prevTakeIndex;
1269            final int len = items.length;
1270            // distance from prevTakeIndex to removedIndex
1271            final int removedDistance =
1272                len * (itrs.cycles - this.prevCycles
1273                       + ((removedIndex < takeIndex) ? 1 : 0))
1274                + (removedIndex - prevTakeIndex);
1275            // assert itrs.cycles - this.prevCycles >= 0;
1276            // assert itrs.cycles - this.prevCycles <= 1;
1277            // assert removedDistance > 0;
1278            // assert removedIndex != takeIndex;
1279            int cursor = this.cursor;
1280            if (cursor >= 0) {
1281                int x = distance(cursor, prevTakeIndex, len);
1282                if (x == removedDistance) {
1283                    if (cursor == putIndex)
1284                        this.cursor = cursor = NONE;
1285                }
1286                else if (x > removedDistance) {
1287                    // assert cursor != prevTakeIndex;
1288                    this.cursor = cursor = dec(cursor);
1289                }
1290            }
1291            int lastRet = this.lastRet;
1292            if (lastRet >= 0) {
1293                int x = distance(lastRet, prevTakeIndex, len);
1294                if (x == removedDistance)
1295                    this.lastRet = lastRet = REMOVED;
1296                else if (x > removedDistance)
1297                    this.lastRet = lastRet = dec(lastRet);
1298            }
1299            int nextIndex = this.nextIndex;
1300            if (nextIndex >= 0) {
1301                int x = distance(nextIndex, prevTakeIndex, len);
1302                if (x == removedDistance)
1303                    this.nextIndex = nextIndex = REMOVED;
1304                else if (x > removedDistance)
1305                    this.nextIndex = nextIndex = dec(nextIndex);
1306            }
1307            if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1308                this.prevTakeIndex = DETACHED;
1309                return true;
1310            }
1311            return false;
1312        }
1313
1314        /**
1315         * Called whenever takeIndex wraps around to zero.
1316         *
1317         * @return true if this iterator should be unlinked from itrs
1318         */
1319        boolean takeIndexWrapped() {
1320            // assert lock.getHoldCount() == 1;
1321            if (isDetached())
1322                return true;
1323            if (itrs.cycles - prevCycles > 1) {
1324                // All the elements that existed at the time of the last
1325                // operation are gone, so abandon further iteration.
1326                shutdown();
1327                return true;
1328            }
1329            return false;
1330        }
1331
1332//         /** Uncomment for debugging. */
1333//         public String toString() {
1334//             return ("cursor=" + cursor + " " +
1335//                     "nextIndex=" + nextIndex + " " +
1336//                     "lastRet=" + lastRet + " " +
1337//                     "nextItem=" + nextItem + " " +
1338//                     "lastItem=" + lastItem + " " +
1339//                     "prevCycles=" + prevCycles + " " +
1340//                     "prevTakeIndex=" + prevTakeIndex + " " +
1341//                     "size()=" + size() + " " +
1342//                     "remainingCapacity()=" + remainingCapacity());
1343//         }
1344    }
1345
1346    /**
1347     * Returns a {@link Spliterator} over the elements in this queue.
1348     *
1349     * <p>The returned spliterator is
1350     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1351     *
1352     * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1353     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1354     *
1355     * @implNote
1356     * The {@code Spliterator} implements {@code trySplit} to permit limited
1357     * parallelism.
1358     *
1359     * @return a {@code Spliterator} over the elements in this queue
1360     * @since 1.8
1361     */
1362    public Spliterator<E> spliterator() {
1363        return Spliterators.spliterator
1364            (this, (Spliterator.ORDERED |
1365                    Spliterator.NONNULL |
1366                    Spliterator.CONCURRENT));
1367    }
1368
1369}
1370