ArrayBlockingQueue.java revision 91770798d8b9280d48d30df2ed7f63b3ed9b036f
1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8import java.util.concurrent.locks.Condition;
9import java.util.concurrent.locks.ReentrantLock;
10import java.util.AbstractQueue;
11import java.util.Collection;
12import java.util.Iterator;
13import java.util.NoSuchElementException;
14import java.lang.ref.WeakReference;
15
16// BEGIN android-note
17// removed link to collections framework docs
18// END android-note
19
20/**
21 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
22 * array.  This queue orders elements FIFO (first-in-first-out).  The
23 * <em>head</em> of the queue is that element that has been on the
24 * queue the longest time.  The <em>tail</em> of the queue is that
25 * element that has been on the queue the shortest time. New elements
26 * are inserted at the tail of the queue, and the queue retrieval
27 * operations obtain elements at the head of the queue.
28 *
29 * <p>This is a classic &quot;bounded buffer&quot;, in which a
30 * fixed-sized array holds elements inserted by producers and
31 * extracted by consumers.  Once created, the capacity cannot be
32 * changed.  Attempts to {@code put} an element into a full queue
33 * will result in the operation blocking; attempts to {@code take} an
34 * element from an empty queue will similarly block.
35 *
36 * <p>This class supports an optional fairness policy for ordering
37 * waiting producer and consumer threads.  By default, this ordering
38 * is not guaranteed. However, a queue constructed with fairness set
39 * to {@code true} grants threads access in FIFO order. Fairness
40 * generally decreases throughput but reduces variability and avoids
41 * starvation.
42 *
43 * <p>This class and its iterator implement all of the
44 * <em>optional</em> methods of the {@link Collection} and {@link
45 * Iterator} interfaces.
46 *
47 * @since 1.5
48 * @author Doug Lea
49 * @param <E> the type of elements held in this collection
50 */
51public class ArrayBlockingQueue<E> extends AbstractQueue<E>
52        implements BlockingQueue<E>, java.io.Serializable {
53
54    /**
55     * Serialization ID. This class relies on default serialization
56     * even for the items array, which is default-serialized, even if
57     * it is empty. Otherwise it could not be declared final, which is
58     * necessary here.
59     */
60    private static final long serialVersionUID = -817911632652898426L;
61
62    /** The queued items */
63    final Object[] items;
64
65    /** items index for next take, poll, peek or remove */
66    int takeIndex;
67
68    /** items index for next put, offer, or add */
69    int putIndex;
70
71    /** Number of elements in the queue */
72    int count;
73
74    /*
75     * Concurrency control uses the classic two-condition algorithm
76     * found in any textbook.
77     */
78
79    /** Main lock guarding all access */
80    final ReentrantLock lock;
81
82    /** Condition for waiting takes */
83    private final Condition notEmpty;
84
85    /** Condition for waiting puts */
86    private final Condition notFull;
87
88    /**
89     * Shared state for currently active iterators, or null if there
90     * are known not to be any.  Allows queue operations to update
91     * iterator state.
92     */
93    transient Itrs itrs = null;
94
95    // Internal helper methods
96
97    /**
98     * Circularly increment i.
99     */
100    final int inc(int i) {
101        return (++i == items.length) ? 0 : i;
102    }
103
104    /**
105     * Circularly decrement i.
106     */
107    final int dec(int i) {
108        return ((i == 0) ? items.length : i) - 1;
109    }
110
111    /**
112     * Returns item at index i.
113     */
114    @SuppressWarnings("unchecked")
115    final E itemAt(int i) {
116        return (E) items[i];
117    }
118
119    /**
120     * Throws NullPointerException if argument is null.
121     *
122     * @param v the element
123     */
124    private static void checkNotNull(Object v) {
125        if (v == null)
126            throw new NullPointerException();
127    }
128
129    /**
130     * Inserts element at current put position, advances, and signals.
131     * Call only when holding lock.
132     */
133    private void enqueue(E x) {
134        // assert lock.getHoldCount() == 1;
135        // assert items[putIndex] == null;
136        items[putIndex] = x;
137        putIndex = inc(putIndex);
138        count++;
139        notEmpty.signal();
140    }
141
142    /**
143     * Extracts element at current take position, advances, and signals.
144     * Call only when holding lock.
145     */
146    private E dequeue() {
147        // assert lock.getHoldCount() == 1;
148        // assert items[takeIndex] != null;
149        final Object[] items = this.items;
150        @SuppressWarnings("unchecked")
151        E x = (E) items[takeIndex];
152        items[takeIndex] = null;
153        takeIndex = inc(takeIndex);
154        count--;
155        if (itrs != null)
156            itrs.elementDequeued();
157        notFull.signal();
158        return x;
159    }
160
161    /**
162     * Deletes item at array index removeIndex.
163     * Utility for remove(Object) and iterator.remove.
164     * Call only when holding lock.
165     */
166    void removeAt(final int removeIndex) {
167        // assert lock.getHoldCount() == 1;
168        // assert items[removeIndex] != null;
169        // assert removeIndex >= 0 && removeIndex < items.length;
170        final Object[] items = this.items;
171        if (removeIndex == takeIndex) {
172            // removing front item; just advance
173            items[takeIndex] = null;
174            takeIndex = inc(takeIndex);
175            count--;
176            if (itrs != null)
177                itrs.elementDequeued();
178        } else {
179            // an "interior" remove
180
181            // slide over all others up through putIndex.
182            final int putIndex = this.putIndex;
183            for (int i = removeIndex;;) {
184                int next = inc(i);
185                if (next != putIndex) {
186                    items[i] = items[next];
187                    i = next;
188                } else {
189                    items[i] = null;
190                    this.putIndex = i;
191                    break;
192                }
193            }
194            count--;
195            if (itrs != null)
196                itrs.removedAt(removeIndex);
197        }
198        notFull.signal();
199    }
200
201    /**
202     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
203     * capacity and default access policy.
204     *
205     * @param capacity the capacity of this queue
206     * @throws IllegalArgumentException if {@code capacity < 1}
207     */
208    public ArrayBlockingQueue(int capacity) {
209        this(capacity, false);
210    }
211
212    /**
213     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
214     * capacity and the specified access policy.
215     *
216     * @param capacity the capacity of this queue
217     * @param fair if {@code true} then queue accesses for threads blocked
218     *        on insertion or removal, are processed in FIFO order;
219     *        if {@code false} the access order is unspecified.
220     * @throws IllegalArgumentException if {@code capacity < 1}
221     */
222    public ArrayBlockingQueue(int capacity, boolean fair) {
223        if (capacity <= 0)
224            throw new IllegalArgumentException();
225        this.items = new Object[capacity];
226        lock = new ReentrantLock(fair);
227        notEmpty = lock.newCondition();
228        notFull =  lock.newCondition();
229    }
230
231    /**
232     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
233     * capacity, the specified access policy and initially containing the
234     * elements of the given collection,
235     * added in traversal order of the collection's iterator.
236     *
237     * @param capacity the capacity of this queue
238     * @param fair if {@code true} then queue accesses for threads blocked
239     *        on insertion or removal, are processed in FIFO order;
240     *        if {@code false} the access order is unspecified.
241     * @param c the collection of elements to initially contain
242     * @throws IllegalArgumentException if {@code capacity} is less than
243     *         {@code c.size()}, or less than 1.
244     * @throws NullPointerException if the specified collection or any
245     *         of its elements are null
246     */
247    public ArrayBlockingQueue(int capacity, boolean fair,
248                              Collection<? extends E> c) {
249        this(capacity, fair);
250
251        final ReentrantLock lock = this.lock;
252        lock.lock(); // Lock only for visibility, not mutual exclusion
253        try {
254            int i = 0;
255            try {
256                for (E e : c) {
257                    checkNotNull(e);
258                    items[i++] = e;
259                }
260            } catch (ArrayIndexOutOfBoundsException ex) {
261                throw new IllegalArgumentException();
262            }
263            count = i;
264            putIndex = (i == capacity) ? 0 : i;
265        } finally {
266            lock.unlock();
267        }
268    }
269
270    /**
271     * Inserts the specified element at the tail of this queue if it is
272     * possible to do so immediately without exceeding the queue's capacity,
273     * returning {@code true} upon success and throwing an
274     * {@code IllegalStateException} if this queue is full.
275     *
276     * @param e the element to add
277     * @return {@code true} (as specified by {@link Collection#add})
278     * @throws IllegalStateException if this queue is full
279     * @throws NullPointerException if the specified element is null
280     */
281    public boolean add(E e) {
282        return super.add(e);
283    }
284
285    /**
286     * Inserts the specified element at the tail of this queue if it is
287     * possible to do so immediately without exceeding the queue's capacity,
288     * returning {@code true} upon success and {@code false} if this queue
289     * is full.  This method is generally preferable to method {@link #add},
290     * which can fail to insert an element only by throwing an exception.
291     *
292     * @throws NullPointerException if the specified element is null
293     */
294    public boolean offer(E e) {
295        checkNotNull(e);
296        final ReentrantLock lock = this.lock;
297        lock.lock();
298        try {
299            if (count == items.length)
300                return false;
301            else {
302                enqueue(e);
303                return true;
304            }
305        } finally {
306            lock.unlock();
307        }
308    }
309
310    /**
311     * Inserts the specified element at the tail of this queue, waiting
312     * for space to become available if the queue is full.
313     *
314     * @throws InterruptedException {@inheritDoc}
315     * @throws NullPointerException {@inheritDoc}
316     */
317    public void put(E e) throws InterruptedException {
318        checkNotNull(e);
319        final ReentrantLock lock = this.lock;
320        lock.lockInterruptibly();
321        try {
322            while (count == items.length)
323                notFull.await();
324            enqueue(e);
325        } finally {
326            lock.unlock();
327        }
328    }
329
330    /**
331     * Inserts the specified element at the tail of this queue, waiting
332     * up to the specified wait time for space to become available if
333     * the queue is full.
334     *
335     * @throws InterruptedException {@inheritDoc}
336     * @throws NullPointerException {@inheritDoc}
337     */
338    public boolean offer(E e, long timeout, TimeUnit unit)
339        throws InterruptedException {
340
341        checkNotNull(e);
342        long nanos = unit.toNanos(timeout);
343        final ReentrantLock lock = this.lock;
344        lock.lockInterruptibly();
345        try {
346            while (count == items.length) {
347                if (nanos <= 0)
348                    return false;
349                nanos = notFull.awaitNanos(nanos);
350            }
351            enqueue(e);
352            return true;
353        } finally {
354            lock.unlock();
355        }
356    }
357
358    public E poll() {
359        final ReentrantLock lock = this.lock;
360        lock.lock();
361        try {
362            return (count == 0) ? null : dequeue();
363        } finally {
364            lock.unlock();
365        }
366    }
367
368    public E take() throws InterruptedException {
369        final ReentrantLock lock = this.lock;
370        lock.lockInterruptibly();
371        try {
372            while (count == 0)
373                notEmpty.await();
374            return dequeue();
375        } finally {
376            lock.unlock();
377        }
378    }
379
380    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
381        long nanos = unit.toNanos(timeout);
382        final ReentrantLock lock = this.lock;
383        lock.lockInterruptibly();
384        try {
385            while (count == 0) {
386                if (nanos <= 0)
387                    return null;
388                nanos = notEmpty.awaitNanos(nanos);
389            }
390            return dequeue();
391        } finally {
392            lock.unlock();
393        }
394    }
395
396    public E peek() {
397        final ReentrantLock lock = this.lock;
398        lock.lock();
399        try {
400            return itemAt(takeIndex); // null when queue is empty
401        } finally {
402            lock.unlock();
403        }
404    }
405
406    // this doc comment is overridden to remove the reference to collections
407    // greater in size than Integer.MAX_VALUE
408    /**
409     * Returns the number of elements in this queue.
410     *
411     * @return the number of elements in this queue
412     */
413    public int size() {
414        final ReentrantLock lock = this.lock;
415        lock.lock();
416        try {
417            return count;
418        } finally {
419            lock.unlock();
420        }
421    }
422
423    // this doc comment is a modified copy of the inherited doc comment,
424    // without the reference to unlimited queues.
425    /**
426     * Returns the number of additional elements that this queue can ideally
427     * (in the absence of memory or resource constraints) accept without
428     * blocking. This is always equal to the initial capacity of this queue
429     * less the current {@code size} of this queue.
430     *
431     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
432     * an element will succeed by inspecting {@code remainingCapacity}
433     * because it may be the case that another thread is about to
434     * insert or remove an element.
435     */
436    public int remainingCapacity() {
437        final ReentrantLock lock = this.lock;
438        lock.lock();
439        try {
440            return items.length - count;
441        } finally {
442            lock.unlock();
443        }
444    }
445
446    /**
447     * Removes a single instance of the specified element from this queue,
448     * if it is present.  More formally, removes an element {@code e} such
449     * that {@code o.equals(e)}, if this queue contains one or more such
450     * elements.
451     * Returns {@code true} if this queue contained the specified element
452     * (or equivalently, if this queue changed as a result of the call).
453     *
454     * <p>Removal of interior elements in circular array based queues
455     * is an intrinsically slow and disruptive operation, so should
456     * be undertaken only in exceptional circumstances, ideally
457     * only when the queue is known not to be accessible by other
458     * threads.
459     *
460     * @param o element to be removed from this queue, if present
461     * @return {@code true} if this queue changed as a result of the call
462     */
463    public boolean remove(Object o) {
464        if (o == null) return false;
465        final Object[] items = this.items;
466        final ReentrantLock lock = this.lock;
467        lock.lock();
468        try {
469            if (count > 0) {
470                final int putIndex = this.putIndex;
471                int i = takeIndex;
472                do {
473                    if (o.equals(items[i])) {
474                        removeAt(i);
475                        return true;
476                    }
477                } while ((i = inc(i)) != putIndex);
478            }
479            return false;
480        } finally {
481            lock.unlock();
482        }
483    }
484
485    /**
486     * Returns {@code true} if this queue contains the specified element.
487     * More formally, returns {@code true} if and only if this queue contains
488     * at least one element {@code e} such that {@code o.equals(e)}.
489     *
490     * @param o object to be checked for containment in this queue
491     * @return {@code true} if this queue contains the specified element
492     */
493    public boolean contains(Object o) {
494        if (o == null) return false;
495        final Object[] items = this.items;
496        final ReentrantLock lock = this.lock;
497        lock.lock();
498        try {
499            if (count > 0) {
500                final int putIndex = this.putIndex;
501                int i = takeIndex;
502                do {
503                    if (o.equals(items[i]))
504                        return true;
505                } while ((i = inc(i)) != putIndex);
506            }
507            return false;
508        } finally {
509            lock.unlock();
510        }
511    }
512
513    /**
514     * Returns an array containing all of the elements in this queue, in
515     * proper sequence.
516     *
517     * <p>The returned array will be "safe" in that no references to it are
518     * maintained by this queue.  (In other words, this method must allocate
519     * a new array).  The caller is thus free to modify the returned array.
520     *
521     * <p>This method acts as bridge between array-based and collection-based
522     * APIs.
523     *
524     * @return an array containing all of the elements in this queue
525     */
526    public Object[] toArray() {
527        final Object[] items = this.items;
528        final ReentrantLock lock = this.lock;
529        lock.lock();
530        try {
531            final int count = this.count;
532            Object[] a = new Object[count];
533            int n = items.length - takeIndex;
534            if (count <= n) {
535                System.arraycopy(items, takeIndex, a, 0, count);
536            } else {
537                System.arraycopy(items, takeIndex, a, 0, n);
538                System.arraycopy(items, 0, a, n, count - n);
539            }
540            return a;
541        } finally {
542            lock.unlock();
543        }
544    }
545
546    /**
547     * Returns an array containing all of the elements in this queue, in
548     * proper sequence; the runtime type of the returned array is that of
549     * the specified array.  If the queue fits in the specified array, it
550     * is returned therein.  Otherwise, a new array is allocated with the
551     * runtime type of the specified array and the size of this queue.
552     *
553     * <p>If this queue fits in the specified array with room to spare
554     * (i.e., the array has more elements than this queue), the element in
555     * the array immediately following the end of the queue is set to
556     * {@code null}.
557     *
558     * <p>Like the {@link #toArray()} method, this method acts as bridge between
559     * array-based and collection-based APIs.  Further, this method allows
560     * precise control over the runtime type of the output array, and may,
561     * under certain circumstances, be used to save allocation costs.
562     *
563     * <p>Suppose {@code x} is a queue known to contain only strings.
564     * The following code can be used to dump the queue into a newly
565     * allocated array of {@code String}:
566     *
567     *  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
568     *
569     * Note that {@code toArray(new Object[0])} is identical in function to
570     * {@code toArray()}.
571     *
572     * @param a the array into which the elements of the queue are to
573     *          be stored, if it is big enough; otherwise, a new array of the
574     *          same runtime type is allocated for this purpose
575     * @return an array containing all of the elements in this queue
576     * @throws ArrayStoreException if the runtime type of the specified array
577     *         is not a supertype of the runtime type of every element in
578     *         this queue
579     * @throws NullPointerException if the specified array is null
580     */
581    @SuppressWarnings("unchecked")
582    public <T> T[] toArray(T[] a) {
583        final Object[] items = this.items;
584        final ReentrantLock lock = this.lock;
585        lock.lock();
586        try {
587            final int count = this.count;
588            final int len = a.length;
589            if (len < count)
590                a = (T[])java.lang.reflect.Array.newInstance(
591                    a.getClass().getComponentType(), count);
592            int n = items.length - takeIndex;
593            if (count <= n)
594                System.arraycopy(items, takeIndex, a, 0, count);
595            else {
596                System.arraycopy(items, takeIndex, a, 0, n);
597                System.arraycopy(items, 0, a, n, count - n);
598            }
599            if (len > count)
600                a[count] = null;
601            return a;
602        } finally {
603            lock.unlock();
604        }
605    }
606
607    public String toString() {
608        final ReentrantLock lock = this.lock;
609        lock.lock();
610        try {
611            int k = count;
612            if (k == 0)
613                return "[]";
614
615            StringBuilder sb = new StringBuilder();
616            sb.append('[');
617            for (int i = takeIndex; ; i = inc(i)) {
618                Object e = items[i];
619                sb.append(e == this ? "(this Collection)" : e);
620                if (--k == 0)
621                    return sb.append(']').toString();
622                sb.append(',').append(' ');
623            }
624        } finally {
625            lock.unlock();
626        }
627    }
628
629    /**
630     * Atomically removes all of the elements from this queue.
631     * The queue will be empty after this call returns.
632     */
633    public void clear() {
634        final Object[] items = this.items;
635        final ReentrantLock lock = this.lock;
636        lock.lock();
637        try {
638            int k = count;
639            if (k > 0) {
640                final int putIndex = this.putIndex;
641                int i = takeIndex;
642                do {
643                    items[i] = null;
644                } while ((i = inc(i)) != putIndex);
645                takeIndex = putIndex;
646                count = 0;
647                if (itrs != null)
648                    itrs.queueIsEmpty();
649                for (; k > 0 && lock.hasWaiters(notFull); k--)
650                    notFull.signal();
651            }
652        } finally {
653            lock.unlock();
654        }
655    }
656
657    /**
658     * @throws UnsupportedOperationException {@inheritDoc}
659     * @throws ClassCastException            {@inheritDoc}
660     * @throws NullPointerException          {@inheritDoc}
661     * @throws IllegalArgumentException      {@inheritDoc}
662     */
663    public int drainTo(Collection<? super E> c) {
664        return drainTo(c, Integer.MAX_VALUE);
665    }
666
667    /**
668     * @throws UnsupportedOperationException {@inheritDoc}
669     * @throws ClassCastException            {@inheritDoc}
670     * @throws NullPointerException          {@inheritDoc}
671     * @throws IllegalArgumentException      {@inheritDoc}
672     */
673    public int drainTo(Collection<? super E> c, int maxElements) {
674        checkNotNull(c);
675        if (c == this)
676            throw new IllegalArgumentException();
677        if (maxElements <= 0)
678            return 0;
679        final Object[] items = this.items;
680        final ReentrantLock lock = this.lock;
681        lock.lock();
682        try {
683            int n = Math.min(maxElements, count);
684            int take = takeIndex;
685            int i = 0;
686            try {
687                while (i < n) {
688                    @SuppressWarnings("unchecked")
689                    E x = (E) items[take];
690                    c.add(x);
691                    items[take] = null;
692                    take = inc(take);
693                    i++;
694                }
695                return n;
696            } finally {
697                // Restore invariants even if c.add() threw
698                if (i > 0) {
699                    count -= i;
700                    takeIndex = take;
701                    if (itrs != null) {
702                        if (count == 0)
703                            itrs.queueIsEmpty();
704                        else if (i > take)
705                            itrs.takeIndexWrapped();
706                    }
707                    for (; i > 0 && lock.hasWaiters(notFull); i--)
708                        notFull.signal();
709                }
710            }
711        } finally {
712            lock.unlock();
713        }
714    }
715
716    /**
717     * Returns an iterator over the elements in this queue in proper sequence.
718     * The elements will be returned in order from first (head) to last (tail).
719     *
720     * <p>The returned iterator is a "weakly consistent" iterator that
721     * will never throw {@link java.util.ConcurrentModificationException
722     * ConcurrentModificationException}, and guarantees to traverse
723     * elements as they existed upon construction of the iterator, and
724     * may (but is not guaranteed to) reflect any modifications
725     * subsequent to construction.
726     *
727     * @return an iterator over the elements in this queue in proper sequence
728     */
729    public Iterator<E> iterator() {
730        return new Itr();
731    }
732
733    /**
734     * Shared data between iterators and their queue, allowing queue
735     * modifications to update iterators when elements are removed.
736     *
737     * This adds a lot of complexity for the sake of correctly
738     * handling some uncommon operations, but the combination of
739     * circular-arrays and supporting interior removes (i.e., those
740     * not at head) would cause iterators to sometimes lose their
741     * places and/or (re)report elements they shouldn't.  To avoid
742     * this, when a queue has one or more iterators, it keeps iterator
743     * state consistent by:
744     *
745     * (1) keeping track of the number of "cycles", that is, the
746     *     number of times takeIndex has wrapped around to 0.
747     * (2) notifying all iterators via the callback removedAt whenever
748     *     an interior element is removed (and thus other elements may
749     *     be shifted).
750     *
751     * These suffice to eliminate iterator inconsistencies, but
752     * unfortunately add the secondary responsibility of maintaining
753     * the list of iterators.  We track all active iterators in a
754     * simple linked list (accessed only when the queue's lock is
755     * held) of weak references to Itr.  The list is cleaned up using
756     * 3 different mechanisms:
757     *
758     * (1) Whenever a new iterator is created, do some O(1) checking for
759     *     stale list elements.
760     *
761     * (2) Whenever takeIndex wraps around to 0, check for iterators
762     *     that have been unused for more than one wrap-around cycle.
763     *
764     * (3) Whenever the queue becomes empty, all iterators are notified
765     *     and this entire data structure is discarded.
766     *
767     * So in addition to the removedAt callback that is necessary for
768     * correctness, iterators have the shutdown and takeIndexWrapped
769     * callbacks that help remove stale iterators from the list.
770     *
771     * Whenever a list element is examined, it is expunged if either
772     * the GC has determined that the iterator is discarded, or if the
773     * iterator reports that it is "detached" (does not need any
774     * further state updates).  Overhead is maximal when takeIndex
775     * never advances, iterators are discarded before they are
776     * exhausted, and all removals are interior removes, in which case
777     * all stale iterators are discovered by the GC.  But even in this
778     * case we don't increase the amortized complexity.
779     *
780     * Care must be taken to keep list sweeping methods from
781     * reentrantly invoking another such method, causing subtle
782     * corruption bugs.
783     */
784    class Itrs {
785
786        /**
787         * Node in a linked list of weak iterator references.
788         */
789        private class Node extends WeakReference<Itr> {
790            Node next;
791
792            Node(Itr iterator, Node next) {
793                super(iterator);
794                this.next = next;
795            }
796        }
797
798        /** Incremented whenever takeIndex wraps around to 0 */
799        int cycles = 0;
800
801        /** Linked list of weak iterator references */
802        private Node head;
803
804        /** Used to expunge stale iterators */
805        private Node sweeper = null;
806
807        private static final int SHORT_SWEEP_PROBES = 4;
808        private static final int LONG_SWEEP_PROBES = 16;
809
810        Itrs(Itr initial) {
811            register(initial);
812        }
813
814        /**
815         * Sweeps itrs, looking for and expunging stale iterators.
816         * If at least one was found, tries harder to find more.
817         * Called only from iterating thread.
818         *
819         * @param tryHarder whether to start in try-harder mode, because
820         * there is known to be at least one iterator to collect
821         */
822        void doSomeSweeping(boolean tryHarder) {
823            // assert lock.getHoldCount() == 1;
824            // assert head != null;
825            int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
826            Node o, p;
827            final Node sweeper = this.sweeper;
828            boolean passedGo;   // to limit search to one full sweep
829
830            if (sweeper == null) {
831                o = null;
832                p = head;
833                passedGo = true;
834            } else {
835                o = sweeper;
836                p = o.next;
837                passedGo = false;
838            }
839
840            for (; probes > 0; probes--) {
841                if (p == null) {
842                    if (passedGo)
843                        break;
844                    o = null;
845                    p = head;
846                    passedGo = true;
847                }
848                final Itr it = p.get();
849                final Node next = p.next;
850                if (it == null || it.isDetached()) {
851                    // found a discarded/exhausted iterator
852                    probes = LONG_SWEEP_PROBES; // "try harder"
853                    // unlink p
854                    p.clear();
855                    p.next = null;
856                    if (o == null) {
857                        head = next;
858                        if (next == null) {
859                            // We've run out of iterators to track; retire
860                            itrs = null;
861                            return;
862                        }
863                    }
864                    else
865                        o.next = next;
866                } else {
867                    o = p;
868                }
869                p = next;
870            }
871
872            this.sweeper = (p == null) ? null : o;
873        }
874
875        /**
876         * Adds a new iterator to the linked list of tracked iterators.
877         */
878        void register(Itr itr) {
879            // assert lock.getHoldCount() == 1;
880            head = new Node(itr, head);
881        }
882
883        /**
884         * Called whenever takeIndex wraps around to 0.
885         *
886         * Notifies all iterators, and expunges any that are now stale.
887         */
888        void takeIndexWrapped() {
889            // assert lock.getHoldCount() == 1;
890            cycles++;
891            for (Node o = null, p = head; p != null;) {
892                final Itr it = p.get();
893                final Node next = p.next;
894                if (it == null || it.takeIndexWrapped()) {
895                    // unlink p
896                    // assert it == null || it.isDetached();
897                    p.clear();
898                    p.next = null;
899                    if (o == null)
900                        head = next;
901                    else
902                        o.next = next;
903                } else {
904                    o = p;
905                }
906                p = next;
907            }
908            if (head == null)   // no more iterators to track
909                itrs = null;
910        }
911
912        /**
913         * Called whenever an interior remove (not at takeIndex) occured.
914         *
915         * Notifies all iterators, and expunges any that are now stale.
916         */
917        void removedAt(int removedIndex) {
918            for (Node o = null, p = head; p != null;) {
919                final Itr it = p.get();
920                final Node next = p.next;
921                if (it == null || it.removedAt(removedIndex)) {
922                    // unlink p
923                    // assert it == null || it.isDetached();
924                    p.clear();
925                    p.next = null;
926                    if (o == null)
927                        head = next;
928                    else
929                        o.next = next;
930                } else {
931                    o = p;
932                }
933                p = next;
934            }
935            if (head == null)   // no more iterators to track
936                itrs = null;
937        }
938
939        /**
940         * Called whenever the queue becomes empty.
941         *
942         * Notifies all active iterators that the queue is empty,
943         * clears all weak refs, and unlinks the itrs datastructure.
944         */
945        void queueIsEmpty() {
946            // assert lock.getHoldCount() == 1;
947            for (Node p = head; p != null; p = p.next) {
948                Itr it = p.get();
949                if (it != null) {
950                    p.clear();
951                    it.shutdown();
952                }
953            }
954            head = null;
955            itrs = null;
956        }
957
958        /**
959         * Called whenever an element has been dequeued (at takeIndex).
960         */
961        void elementDequeued() {
962            // assert lock.getHoldCount() == 1;
963            if (count == 0)
964                queueIsEmpty();
965            else if (takeIndex == 0)
966                takeIndexWrapped();
967        }
968    }
969
970    /**
971     * Iterator for ArrayBlockingQueue.
972     *
973     * To maintain weak consistency with respect to puts and takes, we
974     * read ahead one slot, so as to not report hasNext true but then
975     * not have an element to return.
976     *
977     * We switch into "detached" mode (allowing prompt unlinking from
978     * itrs without help from the GC) when all indices are negative, or
979     * when hasNext returns false for the first time.  This allows the
980     * iterator to track concurrent updates completely accurately,
981     * except for the corner case of the user calling Iterator.remove()
982     * after hasNext() returned false.  Even in this case, we ensure
983     * that we don't remove the wrong element by keeping track of the
984     * expected element to remove, in lastItem.  Yes, we may fail to
985     * remove lastItem from the queue if it moved due to an interleaved
986     * interior remove while in detached mode.
987     */
988    private class Itr implements Iterator<E> {
989        /** Index to look for new nextItem; NONE at end */
990        private int cursor;
991
992        /** Element to be returned by next call to next(); null if none */
993        private E nextItem;
994
995        /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
996        private int nextIndex;
997
998        /** Last element returned; null if none or not detached. */
999        private E lastItem;
1000
1001        /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1002        private int lastRet;
1003
1004        /** Previous value of takeIndex, or DETACHED when detached */
1005        private int prevTakeIndex;
1006
1007        /** Previous value of iters.cycles */
1008        private int prevCycles;
1009
1010        /** Special index value indicating "not available" or "undefined" */
1011        private static final int NONE = -1;
1012
1013        /**
1014         * Special index value indicating "removed elsewhere", that is,
1015         * removed by some operation other than a call to this.remove().
1016         */
1017        private static final int REMOVED = -2;
1018
1019        /** Special value for prevTakeIndex indicating "detached mode" */
1020        private static final int DETACHED = -3;
1021
1022        Itr() {
1023            // assert lock.getHoldCount() == 0;
1024            lastRet = NONE;
1025            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1026            lock.lock();
1027            try {
1028                if (count == 0) {
1029                    // assert itrs == null;
1030                    cursor = NONE;
1031                    nextIndex = NONE;
1032                    prevTakeIndex = DETACHED;
1033                } else {
1034                    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1035                    prevTakeIndex = takeIndex;
1036                    nextItem = itemAt(nextIndex = takeIndex);
1037                    cursor = incCursor(takeIndex);
1038                    if (itrs == null) {
1039                        itrs = new Itrs(this);
1040                    } else {
1041                        itrs.register(this); // in this order
1042                        itrs.doSomeSweeping(false);
1043                    }
1044                    prevCycles = itrs.cycles;
1045                    // assert takeIndex >= 0;
1046                    // assert prevTakeIndex == takeIndex;
1047                    // assert nextIndex >= 0;
1048                    // assert nextItem != null;
1049                }
1050            } finally {
1051                lock.unlock();
1052            }
1053        }
1054
1055        boolean isDetached() {
1056            // assert lock.getHoldCount() == 1;
1057            return prevTakeIndex < 0;
1058        }
1059
1060        private int incCursor(int index) {
1061            // assert lock.getHoldCount() == 1;
1062            index = inc(index);
1063            if (index == putIndex)
1064                index = NONE;
1065            return index;
1066        }
1067
1068        /**
1069         * Returns true if index is invalidated by the given number of
1070         * dequeues, starting from prevTakeIndex.
1071         */
1072        private boolean invalidated(int index, int prevTakeIndex,
1073                                    long dequeues, int length) {
1074            if (index < 0)
1075                return false;
1076            int distance = index - prevTakeIndex;
1077            if (distance < 0)
1078                distance += length;
1079            return dequeues > distance;
1080        }
1081
1082        /**
1083         * Adjusts indices to incorporate all dequeues since the last
1084         * operation on this iterator.  Call only from iterating thread.
1085         */
1086        private void incorporateDequeues() {
1087            // assert lock.getHoldCount() == 1;
1088            // assert itrs != null;
1089            // assert !isDetached();
1090            // assert count > 0;
1091
1092            final int cycles = itrs.cycles;
1093            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1094            final int prevCycles = this.prevCycles;
1095            final int prevTakeIndex = this.prevTakeIndex;
1096
1097            if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1098                final int len = items.length;
1099                // how far takeIndex has advanced since the previous
1100                // operation of this iterator
1101                long dequeues = (cycles - prevCycles) * len
1102                    + (takeIndex - prevTakeIndex);
1103
1104                // Check indices for invalidation
1105                if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1106                    lastRet = REMOVED;
1107                if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1108                    nextIndex = REMOVED;
1109                if (invalidated(cursor, prevTakeIndex, dequeues, len))
1110                    cursor = takeIndex;
1111
1112                if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1113                    detach();
1114                else {
1115                    this.prevCycles = cycles;
1116                    this.prevTakeIndex = takeIndex;
1117                }
1118            }
1119        }
1120
1121        /**
1122         * Called when itrs should stop tracking this iterator, either
1123         * because there are no more indices to update (cursor < 0 &&
1124         * nextIndex < 0 && lastRet < 0) or as a special exception, when
1125         * lastRet >= 0, because hasNext() is about to return false for the
1126         * first time.  Call only from iterating thread.
1127         */
1128        private void detach() {
1129            // Switch to detached mode
1130            // assert lock.getHoldCount() == 1;
1131            // assert cursor == NONE;
1132            // assert nextIndex < 0;
1133            // assert lastRet < 0 || nextItem == null;
1134            // assert lastRet < 0 ^ lastItem != null;
1135            if (prevTakeIndex >= 0) {
1136                // assert itrs != null;
1137                prevTakeIndex = DETACHED;
1138                // try to unlink from itrs (but not too hard)
1139                itrs.doSomeSweeping(true);
1140            }
1141        }
1142
1143        /**
1144         * For performance reasons, we would like not to acquire a lock in
1145         * hasNext in the common case.  To allow for this, we only access
1146         * fields (i.e. nextItem) that are not modified by update operations
1147         * triggered by queue modifications.
1148         */
1149        public boolean hasNext() {
1150            // assert lock.getHoldCount() == 0;
1151            if (nextItem != null)
1152                return true;
1153            noNext();
1154            return false;
1155        }
1156
1157        private void noNext() {
1158            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1159            lock.lock();
1160            try {
1161                // assert cursor == NONE;
1162                // assert nextIndex == NONE;
1163                if (!isDetached()) {
1164                    // assert lastRet >= 0;
1165                    incorporateDequeues(); // might update lastRet
1166                    if (lastRet >= 0) {
1167                        lastItem = itemAt(lastRet);
1168                        // assert lastItem != null;
1169                        detach();
1170                    }
1171                }
1172                // assert isDetached();
1173                // assert lastRet < 0 ^ lastItem != null;
1174            } finally {
1175                lock.unlock();
1176            }
1177        }
1178
1179        public E next() {
1180            // assert lock.getHoldCount() == 0;
1181            final E x = nextItem;
1182            if (x == null)
1183                throw new NoSuchElementException();
1184            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1185            lock.lock();
1186            try {
1187                if (!isDetached())
1188                    incorporateDequeues();
1189                // assert nextIndex != NONE;
1190                // assert lastItem == null;
1191                lastRet = nextIndex;
1192                final int cursor = this.cursor;
1193                if (cursor >= 0) {
1194                    nextItem = itemAt(nextIndex = cursor);
1195                    // assert nextItem != null;
1196                    this.cursor = incCursor(cursor);
1197                } else {
1198                    nextIndex = NONE;
1199                    nextItem = null;
1200                }
1201            } finally {
1202                lock.unlock();
1203            }
1204            return x;
1205        }
1206
1207        public void remove() {
1208            // assert lock.getHoldCount() == 0;
1209            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1210            lock.lock();
1211            try {
1212                if (!isDetached())
1213                    incorporateDequeues(); // might update lastRet or detach
1214                final int lastRet = this.lastRet;
1215                this.lastRet = NONE;
1216                if (lastRet >= 0) {
1217                    if (!isDetached())
1218                        removeAt(lastRet);
1219                    else {
1220                        final E lastItem = this.lastItem;
1221                        // assert lastItem != null;
1222                        this.lastItem = null;
1223                        if (itemAt(lastRet) == lastItem)
1224                            removeAt(lastRet);
1225                    }
1226                } else if (lastRet == NONE)
1227                    throw new IllegalStateException();
1228                // else lastRet == REMOVED and the last returned element was
1229                // previously asynchronously removed via an operation other
1230                // than this.remove(), so nothing to do.
1231
1232                if (cursor < 0 && nextIndex < 0)
1233                    detach();
1234            } finally {
1235                lock.unlock();
1236                // assert lastRet == NONE;
1237                // assert lastItem == null;
1238            }
1239        }
1240
1241        /**
1242         * Called to notify the iterator that the queue is empty, or that it
1243         * has fallen hopelessly behind, so that it should abandon any
1244         * further iteration, except possibly to return one more element
1245         * from next(), as promised by returning true from hasNext().
1246         */
1247        void shutdown() {
1248            // assert lock.getHoldCount() == 1;
1249            cursor = NONE;
1250            if (nextIndex >= 0)
1251                nextIndex = REMOVED;
1252            if (lastRet >= 0) {
1253                lastRet = REMOVED;
1254                lastItem = null;
1255            }
1256            prevTakeIndex = DETACHED;
1257            // Don't set nextItem to null because we must continue to be
1258            // able to return it on next().
1259            //
1260            // Caller will unlink from itrs when convenient.
1261        }
1262
1263        private int distance(int index, int prevTakeIndex, int length) {
1264            int distance = index - prevTakeIndex;
1265            if (distance < 0)
1266                distance += length;
1267            return distance;
1268        }
1269
1270        /**
1271         * Called whenever an interior remove (not at takeIndex) occured.
1272         *
1273         * @return true if this iterator should be unlinked from itrs
1274         */
1275        boolean removedAt(int removedIndex) {
1276            // assert lock.getHoldCount() == 1;
1277            if (isDetached())
1278                return true;
1279
1280            final int cycles = itrs.cycles;
1281            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1282            final int prevCycles = this.prevCycles;
1283            final int prevTakeIndex = this.prevTakeIndex;
1284            final int len = items.length;
1285            int cycleDiff = cycles - prevCycles;
1286            if (removedIndex < takeIndex)
1287                cycleDiff++;
1288            final int removedDistance =
1289                (cycleDiff * len) + (removedIndex - prevTakeIndex);
1290            // assert removedDistance >= 0;
1291            int cursor = this.cursor;
1292            if (cursor >= 0) {
1293                int x = distance(cursor, prevTakeIndex, len);
1294                if (x == removedDistance) {
1295                    if (cursor == putIndex)
1296                        this.cursor = cursor = NONE;
1297                }
1298                else if (x > removedDistance) {
1299                    // assert cursor != prevTakeIndex;
1300                    this.cursor = cursor = dec(cursor);
1301                }
1302            }
1303            int lastRet = this.lastRet;
1304            if (lastRet >= 0) {
1305                int x = distance(lastRet, prevTakeIndex, len);
1306                if (x == removedDistance)
1307                    this.lastRet = lastRet = REMOVED;
1308                else if (x > removedDistance)
1309                    this.lastRet = lastRet = dec(lastRet);
1310            }
1311            int nextIndex = this.nextIndex;
1312            if (nextIndex >= 0) {
1313                int x = distance(nextIndex, prevTakeIndex, len);
1314                if (x == removedDistance)
1315                    this.nextIndex = nextIndex = REMOVED;
1316                else if (x > removedDistance)
1317                    this.nextIndex = nextIndex = dec(nextIndex);
1318            }
1319            else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1320                this.prevTakeIndex = DETACHED;
1321                return true;
1322            }
1323            return false;
1324        }
1325
1326        /**
1327         * Called whenever takeIndex wraps around to zero.
1328         *
1329         * @return true if this iterator should be unlinked from itrs
1330         */
1331        boolean takeIndexWrapped() {
1332            // assert lock.getHoldCount() == 1;
1333            if (isDetached())
1334                return true;
1335            if (itrs.cycles - prevCycles > 1) {
1336                // All the elements that existed at the time of the last
1337                // operation are gone, so abandon further iteration.
1338                shutdown();
1339                return true;
1340            }
1341            return false;
1342        }
1343
1344//         /** Uncomment for debugging. */
1345//         public String toString() {
1346//             return ("cursor=" + cursor + " " +
1347//                     "nextIndex=" + nextIndex + " " +
1348//                     "lastRet=" + lastRet + " " +
1349//                     "nextItem=" + nextItem + " " +
1350//                     "lastItem=" + lastItem + " " +
1351//                     "prevCycles=" + prevCycles + " " +
1352//                     "prevTakeIndex=" + prevTakeIndex + " " +
1353//                     "size()=" + size() + " " +
1354//                     "remainingCapacity()=" + remainingCapacity());
1355//         }
1356    }
1357}
1358