1/*
2 * Copyright (C) 2010 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import com.google.common.collect.ObjectArrays;
20
21import java.util.AbstractQueue;
22import java.util.Collection;
23import java.util.ConcurrentModificationException;
24import java.util.Iterator;
25import java.util.NoSuchElementException;
26import java.util.concurrent.BlockingQueue;
27import java.util.concurrent.TimeUnit;
28
29import javax.annotation.Nullable;
30
31/**
32 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
33 * array.  This queue orders elements FIFO (first-in-first-out).  The
34 * <em>head</em> of the queue is that element that has been on the
35 * queue the longest time.  The <em>tail</em> of the queue is that
36 * element that has been on the queue the shortest time. New elements
37 * are inserted at the tail of the queue, and the queue retrieval
38 * operations obtain elements at the head of the queue.
39 *
40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
41 * fixed-sized array holds elements inserted by producers and
42 * extracted by consumers.  Once created, the capacity cannot be
43 * increased.  Attempts to <tt>put</tt> an element into a full queue
44 * will result in the operation blocking; attempts to <tt>take</tt> an
45 * element from an empty queue will similarly block.
46 *
47 * <p> This class supports an optional fairness policy for ordering
48 * waiting producer and consumer threads.  By default, this ordering
49 * is not guaranteed. However, a queue constructed with fairness set
50 * to <tt>true</tt> grants threads access in FIFO order. Fairness
51 * generally decreases throughput but reduces variability and avoids
52 * starvation.
53 *
54 * <p>This class and its iterator implement all of the
55 * <em>optional</em> methods of the {@link Collection} and {@link
56 * Iterator} interfaces.
57 *
58 * @author Doug Lea
59 * @author Justin T. Sampson
60 * @param <E> the type of elements held in this collection
61 */
62public class MonitorBasedArrayBlockingQueue<E> extends AbstractQueue<E>
63        implements BlockingQueue<E> {
64
65    // Based on revision 1.58 of ArrayBlockingQueue by Doug Lea, from
66    // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
67
68    /** The queued items  */
69    final E[] items;
70    /** items index for next take, poll or remove */
71    int takeIndex;
72    /** items index for next put, offer, or add. */
73    int putIndex;
74    /** Number of items in the queue */
75    private int count;
76
77    /*
78     * Concurrency control uses the classic two-condition algorithm
79     * found in any textbook.
80     */
81
82    /** Monitor guarding all access */
83    final Monitor monitor;
84
85    /** Guard for waiting takes */
86    private final Monitor.Guard notEmpty;
87
88    /** Guard for waiting puts */
89    private final Monitor.Guard notFull;
90
91    // Internal helper methods
92
93    /**
94     * Circularly increment i.
95     */
96    final int inc(int i) {
97        return (++i == items.length) ? 0 : i;
98    }
99
100    /**
101     * Inserts element at current put position, advances, and signals.
102     * Call only when occupying monitor.
103     */
104    private void insert(E x) {
105        items[putIndex] = x;
106        putIndex = inc(putIndex);
107        ++count;
108    }
109
110    /**
111     * Extracts element at current take position, advances, and signals.
112     * Call only when occupying monitor.
113     */
114    private E extract() {
115        final E[] items = this.items;
116        E x = items[takeIndex];
117        items[takeIndex] = null;
118        takeIndex = inc(takeIndex);
119        --count;
120        return x;
121    }
122
123    /**
124     * Utility for remove and iterator.remove: Delete item at position i.
125     * Call only when occupying monitor.
126     */
127    void removeAt(int i) {
128        final E[] items = this.items;
129        // if removing front item, just advance
130        if (i == takeIndex) {
131            items[takeIndex] = null;
132            takeIndex = inc(takeIndex);
133        } else {
134            // slide over all others up through putIndex.
135            for (;;) {
136                int nexti = inc(i);
137                if (nexti != putIndex) {
138                    items[i] = items[nexti];
139                    i = nexti;
140                } else {
141                    items[i] = null;
142                    putIndex = i;
143                    break;
144                }
145            }
146        }
147        --count;
148    }
149
150    /**
151     * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
152     * capacity and default access policy.
153     *
154     * @param capacity the capacity of this queue
155     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
156     */
157    public MonitorBasedArrayBlockingQueue(int capacity) {
158        this(capacity, false);
159    }
160
161    /**
162     * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
163     * capacity and the specified access policy.
164     *
165     * @param capacity the capacity of this queue
166     * @param fair if <tt>true</tt> then queue accesses for threads blocked
167     *        on insertion or removal, are processed in FIFO order;
168     *        if <tt>false</tt> the access order is unspecified.
169     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
170     */
171    public MonitorBasedArrayBlockingQueue(int capacity, boolean fair) {
172        if (capacity <= 0)
173            throw new IllegalArgumentException();
174        this.items = newEArray(capacity);
175        monitor = new Monitor(fair);
176        notEmpty = new Monitor.Guard(monitor) {
177            @Override public boolean isSatisfied() {
178                return count > 0;
179            }
180        };
181        notFull = new Monitor.Guard(monitor) {
182            @Override public boolean isSatisfied() {
183                return count < items.length;
184            }
185        };
186    }
187
188    @SuppressWarnings("unchecked") // please don't try this home, kids
189    private static <E> E[] newEArray(int capacity) {
190        return (E[]) new Object[capacity];
191    }
192
193    /**
194     * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
195     * capacity, the specified access policy and initially containing the
196     * elements of the given collection,
197     * added in traversal order of the collection's iterator.
198     *
199     * @param capacity the capacity of this queue
200     * @param fair if <tt>true</tt> then queue accesses for threads blocked
201     *        on insertion or removal, are processed in FIFO order;
202     *        if <tt>false</tt> the access order is unspecified.
203     * @param c the collection of elements to initially contain
204     * @throws IllegalArgumentException if <tt>capacity</tt> is less than
205     *         <tt>c.size()</tt>, or less than 1.
206     * @throws NullPointerException if the specified collection or any
207     *         of its elements are null
208     */
209    public MonitorBasedArrayBlockingQueue(int capacity, boolean fair,
210                              Collection<? extends E> c) {
211        this(capacity, fair);
212        if (capacity < c.size())
213            throw new IllegalArgumentException();
214
215        for (E e : c)
216            add(e);
217    }
218
219    /**
220     * Inserts the specified element at the tail of this queue if it is
221     * possible to do so immediately without exceeding the queue's capacity,
222     * returning <tt>true</tt> upon success and throwing an
223     * <tt>IllegalStateException</tt> if this queue is full.
224     *
225     * @param e the element to add
226     * @return <tt>true</tt> (as specified by {@link Collection#add})
227     * @throws IllegalStateException if this queue is full
228     * @throws NullPointerException if the specified element is null
229     */
230    @Override public boolean add(E e) {
231        return super.add(e);
232    }
233
234    /**
235     * Inserts the specified element at the tail of this queue if it is
236     * possible to do so immediately without exceeding the queue's capacity,
237     * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
238     * is full.  This method is generally preferable to method {@link #add},
239     * which can fail to insert an element only by throwing an exception.
240     *
241     * @throws NullPointerException if the specified element is null
242     */
243    @Override
244    public boolean offer(E e) {
245        if (e == null) throw new NullPointerException();
246        final Monitor monitor = this.monitor;
247        if (monitor.enterIf(notFull)) {
248            try {
249                insert(e);
250                return true;
251            } finally {
252                monitor.leave();
253            }
254        } else {
255          return false;
256        }
257    }
258
259    /**
260     * Inserts the specified element at the tail of this queue, waiting
261     * for space to become available if the queue is full.
262     *
263     * @throws InterruptedException {@inheritDoc}
264     * @throws NullPointerException {@inheritDoc}
265     */
266    @Override
267    public void put(E e) throws InterruptedException {
268        if (e == null) throw new NullPointerException();
269        final Monitor monitor = this.monitor;
270        monitor.enterWhen(notFull);
271        try {
272            insert(e);
273        } finally {
274            monitor.leave();
275        }
276    }
277
278    /**
279     * Inserts the specified element at the tail of this queue, waiting
280     * up to the specified wait time for space to become available if
281     * the queue is full.
282     *
283     * @throws InterruptedException {@inheritDoc}
284     * @throws NullPointerException {@inheritDoc}
285     */
286    @Override
287    public boolean offer(E e, long timeout, TimeUnit unit)
288        throws InterruptedException {
289
290        if (e == null) throw new NullPointerException();
291        final Monitor monitor = this.monitor;
292        if (monitor.enterWhen(notFull, timeout, unit)) {
293            try {
294                insert(e);
295                return true;
296            } finally {
297                monitor.leave();
298            }
299        } else {
300          return false;
301        }
302    }
303
304    @Override
305    public E poll() {
306        final Monitor monitor = this.monitor;
307        if (monitor.enterIf(notEmpty)) {
308            try {
309                return extract();
310            } finally {
311                monitor.leave();
312            }
313        } else {
314          return null;
315        }
316    }
317
318    @Override
319    public E take() throws InterruptedException {
320        final Monitor monitor = this.monitor;
321        monitor.enterWhen(notEmpty);
322        try {
323            return extract();
324        } finally {
325            monitor.leave();
326        }
327    }
328
329    @Override
330    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
331        final Monitor monitor = this.monitor;
332        if (monitor.enterWhen(notEmpty, timeout, unit)) {
333            try {
334                return extract();
335            } finally {
336                monitor.leave();
337            }
338        } else {
339          return null;
340        }
341    }
342
343    @Override
344    public E peek() {
345        final Monitor monitor = this.monitor;
346        if (monitor.enterIf(notEmpty)) {
347            try {
348                return items[takeIndex];
349            } finally {
350                monitor.leave();
351            }
352        } else {
353            return null;
354        }
355    }
356
357    // this doc comment is overridden to remove the reference to collections
358    // greater in size than Integer.MAX_VALUE
359    /**
360     * Returns the number of elements in this queue.
361     *
362     * @return the number of elements in this queue
363     */
364    @Override public int size() {
365        final Monitor monitor = this.monitor;
366        monitor.enter();
367        try {
368            return count;
369        } finally {
370            monitor.leave();
371        }
372    }
373
374    // this doc comment is a modified copy of the inherited doc comment,
375    // without the reference to unlimited queues.
376    /**
377     * Returns the number of additional elements that this queue can ideally
378     * (in the absence of memory or resource constraints) accept without
379     * blocking. This is always equal to the initial capacity of this queue
380     * less the current <tt>size</tt> of this queue.
381     *
382     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
383     * an element will succeed by inspecting <tt>remainingCapacity</tt>
384     * because it may be the case that another thread is about to
385     * insert or remove an element.
386     */
387    @Override
388    public int remainingCapacity() {
389        final Monitor monitor = this.monitor;
390        monitor.enter();
391        try {
392            return items.length - count;
393        } finally {
394            monitor.leave();
395        }
396    }
397
398    /**
399     * Removes a single instance of the specified element from this queue,
400     * if it is present.  More formally, removes an element <tt>e</tt> such
401     * that <tt>o.equals(e)</tt>, if this queue contains one or more such
402     * elements.
403     * Returns <tt>true</tt> if this queue contained the specified element
404     * (or equivalently, if this queue changed as a result of the call).
405     *
406     * @param o element to be removed from this queue, if present
407     * @return <tt>true</tt> if this queue changed as a result of the call
408     */
409    @Override public boolean remove(@Nullable Object o) {
410        if (o == null) return false;
411        final E[] items = this.items;
412        final Monitor monitor = this.monitor;
413        monitor.enter();
414        try {
415            int i = takeIndex;
416            int k = 0;
417            for (;;) {
418                if (k++ >= count)
419                    return false;
420                if (o.equals(items[i])) {
421                    removeAt(i);
422                    return true;
423                }
424                i = inc(i);
425            }
426        } finally {
427            monitor.leave();
428        }
429    }
430
431    /**
432     * Returns <tt>true</tt> if this queue contains the specified element.
433     * More formally, returns <tt>true</tt> if and only if this queue contains
434     * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
435     *
436     * @param o object to be checked for containment in this queue
437     * @return <tt>true</tt> if this queue contains the specified element
438     */
439    @Override public boolean contains(@Nullable Object o) {
440        if (o == null) return false;
441        final E[] items = this.items;
442        final Monitor monitor = this.monitor;
443        monitor.enter();
444        try {
445            int i = takeIndex;
446            int k = 0;
447            while (k++ < count) {
448                if (o.equals(items[i]))
449                    return true;
450                i = inc(i);
451            }
452            return false;
453        } finally {
454            monitor.leave();
455        }
456    }
457
458    /**
459     * Returns an array containing all of the elements in this queue, in
460     * proper sequence.
461     *
462     * <p>The returned array will be "safe" in that no references to it are
463     * maintained by this queue.  (In other words, this method must allocate
464     * a new array).  The caller is thus free to modify the returned array.
465     *
466     * <p>This method acts as bridge between array-based and collection-based
467     * APIs.
468     *
469     * @return an array containing all of the elements in this queue
470     */
471    @Override public Object[] toArray() {
472        final E[] items = this.items;
473        final Monitor monitor = this.monitor;
474        monitor.enter();
475        try {
476            Object[] a = new Object[count];
477            int k = 0;
478            int i = takeIndex;
479            while (k < count) {
480                a[k++] = items[i];
481                i = inc(i);
482            }
483            return a;
484        } finally {
485            monitor.leave();
486        }
487    }
488
489    /**
490     * Returns an array containing all of the elements in this queue, in
491     * proper sequence; the runtime type of the returned array is that of
492     * the specified array.  If the queue fits in the specified array, it
493     * is returned therein.  Otherwise, a new array is allocated with the
494     * runtime type of the specified array and the size of this queue.
495     *
496     * <p>If this queue fits in the specified array with room to spare
497     * (i.e., the array has more elements than this queue), the element in
498     * the array immediately following the end of the queue is set to
499     * <tt>null</tt>.
500     *
501     * <p>Like the {@link #toArray()} method, this method acts as bridge between
502     * array-based and collection-based APIs.  Further, this method allows
503     * precise control over the runtime type of the output array, and may,
504     * under certain circumstances, be used to save allocation costs.
505     *
506     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
507     * The following code can be used to dump the queue into a newly
508     * allocated array of <tt>String</tt>:
509     *
510     * <pre>
511     *     String[] y = x.toArray(new String[0]);</pre>
512     *
513     * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to
514     * <tt>toArray()</tt>.
515     *
516     * @param a the array into which the elements of the queue are to
517     *          be stored, if it is big enough; otherwise, a new array of the
518     *          same runtime type is allocated for this purpose
519     * @return an array containing all of the elements in this queue
520     * @throws ArrayStoreException if the runtime type of the specified array
521     *         is not a supertype of the runtime type of every element in
522     *         this queue
523     * @throws NullPointerException if the specified array is null
524     */
525    @Override public <T> T[] toArray(T[] a) {
526        final E[] items = this.items;
527        final Monitor monitor = this.monitor;
528        monitor.enter();
529        try {
530            if (a.length < count)
531                a = ObjectArrays.newArray(a, count);
532
533            int k = 0;
534            int i = takeIndex;
535            while (k < count) {
536                // This cast is not itself safe, but the following statement
537                // will fail if the runtime type of items[i] is not assignable
538                // to the runtime type of a[k++], which is all that the method
539                // contract requires (see @throws ArrayStoreException above).
540                @SuppressWarnings("unchecked")
541                T t = (T) items[i];
542                a[k++] = t;
543                i = inc(i);
544            }
545            if (a.length > count)
546                a[count] = null;
547            return a;
548        } finally {
549            monitor.leave();
550        }
551    }
552
553    @Override public String toString() {
554        final Monitor monitor = this.monitor;
555        monitor.enter();
556        try {
557            return super.toString();
558        } finally {
559            monitor.leave();
560        }
561    }
562
563    /**
564     * Atomically removes all of the elements from this queue.
565     * The queue will be empty after this call returns.
566     */
567    @Override public void clear() {
568        final E[] items = this.items;
569        final Monitor monitor = this.monitor;
570        monitor.enter();
571        try {
572            int i = takeIndex;
573            int k = count;
574            while (k-- > 0) {
575                items[i] = null;
576                i = inc(i);
577            }
578            count = 0;
579            putIndex = 0;
580            takeIndex = 0;
581        } finally {
582            monitor.leave();
583        }
584    }
585
586    /**
587     * @throws UnsupportedOperationException {@inheritDoc}
588     * @throws ClassCastException            {@inheritDoc}
589     * @throws NullPointerException          {@inheritDoc}
590     * @throws IllegalArgumentException      {@inheritDoc}
591     */
592    @Override
593    public int drainTo(Collection<? super E> c) {
594        if (c == null)
595            throw new NullPointerException();
596        if (c == this)
597            throw new IllegalArgumentException();
598        final E[] items = this.items;
599        final Monitor monitor = this.monitor;
600        monitor.enter();
601        try {
602            int i = takeIndex;
603            int n = 0;
604            int max = count;
605            while (n < max) {
606                c.add(items[i]);
607                items[i] = null;
608                i = inc(i);
609                ++n;
610            }
611            if (n > 0) {
612                count = 0;
613                putIndex = 0;
614                takeIndex = 0;
615            }
616            return n;
617        } finally {
618            monitor.leave();
619        }
620    }
621
622    /**
623     * @throws UnsupportedOperationException {@inheritDoc}
624     * @throws ClassCastException            {@inheritDoc}
625     * @throws NullPointerException          {@inheritDoc}
626     * @throws IllegalArgumentException      {@inheritDoc}
627     */
628    @Override
629    public int drainTo(Collection<? super E> c, int maxElements) {
630        if (c == null)
631            throw new NullPointerException();
632        if (c == this)
633            throw new IllegalArgumentException();
634        if (maxElements <= 0)
635            return 0;
636        final E[] items = this.items;
637        final Monitor monitor = this.monitor;
638        monitor.enter();
639        try {
640            int i = takeIndex;
641            int n = 0;
642            int max = (maxElements < count) ? maxElements : count;
643            while (n < max) {
644                c.add(items[i]);
645                items[i] = null;
646                i = inc(i);
647                ++n;
648            }
649            if (n > 0) {
650                count -= n;
651                takeIndex = i;
652            }
653            return n;
654        } finally {
655            monitor.leave();
656        }
657    }
658
659    /**
660     * Returns an iterator over the elements in this queue in proper sequence.
661     * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
662     * will never throw {@link ConcurrentModificationException},
663     * and guarantees to traverse elements as they existed upon
664     * construction of the iterator, and may (but is not guaranteed to)
665     * reflect any modifications subsequent to construction.
666     *
667     * @return an iterator over the elements in this queue in proper sequence
668     */
669    @Override public Iterator<E> iterator() {
670        final Monitor monitor = this.monitor;
671        monitor.enter();
672        try {
673            return new Itr();
674        } finally {
675            monitor.leave();
676        }
677    }
678
679    /**
680     * Iterator for MonitorBasedArrayBlockingQueue
681     */
682    private class Itr implements Iterator<E> {
683        /**
684         * Index of element to be returned by next,
685         * or a negative number if no such.
686         */
687        private int nextIndex;
688
689        /**
690         * nextItem holds on to item fields because once we claim
691         * that an element exists in hasNext(), we must return it in
692         * the following next() call even if it was in the process of
693         * being removed when hasNext() was called.
694         */
695        private E nextItem;
696
697        /**
698         * Index of element returned by most recent call to next.
699         * Reset to -1 if this element is deleted by a call to remove.
700         */
701        private int lastRet;
702
703        Itr() {
704            lastRet = -1;
705            if (count == 0)
706                nextIndex = -1;
707            else {
708                nextIndex = takeIndex;
709                nextItem = items[takeIndex];
710            }
711        }
712
713        @Override
714        public boolean hasNext() {
715            /*
716             * No sync. We can return true by mistake here
717             * only if this iterator passed across threads,
718             * which we don't support anyway.
719             */
720            return nextIndex >= 0;
721        }
722
723        /**
724         * Checks whether nextIndex is valid; if so setting nextItem.
725         * Stops iterator when either hits putIndex or sees null item.
726         */
727        private void checkNext() {
728            if (nextIndex == putIndex) {
729                nextIndex = -1;
730                nextItem = null;
731            } else {
732                nextItem = items[nextIndex];
733                if (nextItem == null)
734                    nextIndex = -1;
735            }
736        }
737
738        @Override
739        public E next() {
740            final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
741            monitor.enter();
742            try {
743                if (nextIndex < 0)
744                    throw new NoSuchElementException();
745                lastRet = nextIndex;
746                E x = nextItem;
747                nextIndex = inc(nextIndex);
748                checkNext();
749                return x;
750            } finally {
751                monitor.leave();
752            }
753        }
754
755        @Override
756        public void remove() {
757            final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
758            monitor.enter();
759            try {
760                int i = lastRet;
761                if (i == -1)
762                    throw new IllegalStateException();
763                lastRet = -1;
764
765                int ti = takeIndex;
766                removeAt(i);
767                // back up cursor (reset to front if was first element)
768                nextIndex = (i == ti) ? takeIndex : i;
769                checkNext();
770            } finally {
771                monitor.leave();
772            }
773        }
774    }
775}
776