1/*
2 * Copyright (C) 2010 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20
21import java.util.AbstractQueue;
22import java.util.Collection;
23import java.util.Comparator;
24import java.util.ConcurrentModificationException;
25import java.util.Iterator;
26import java.util.NoSuchElementException;
27import java.util.PriorityQueue;
28import java.util.Queue;
29import java.util.SortedSet;
30import java.util.concurrent.BlockingQueue;
31import java.util.concurrent.TimeUnit;
32
33import javax.annotation.Nullable;
34
35/**
36 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
37 * the same ordering rules as class {@link PriorityQueue} and supplies
38 * blocking retrieval operations.  While this queue is logically
39 * unbounded, attempted additions may fail due to resource exhaustion
40 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
41 * <tt>null</tt> elements.  A priority queue relying on {@linkplain
42 * Comparable natural ordering} also does not permit insertion of
43 * non-comparable objects (doing so results in
44 * <tt>ClassCastException</tt>).
45 *
46 * <p>This class and its iterator implement all of the
47 * <em>optional</em> methods of the {@link Collection} and {@link
48 * Iterator} interfaces.  The Iterator provided in method {@link
49 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
50 * the MonitorBasedPriorityBlockingQueue in any particular order. If you need
51 * ordered traversal, consider using
52 * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
53 * can be used to <em>remove</em> some or all elements in priority
54 * order and place them in another collection.
55 *
56 * <p>Operations on this class make no guarantees about the ordering
57 * of elements with equal priority. If you need to enforce an
58 * ordering, you can define custom classes or comparators that use a
59 * secondary key to break ties in primary priority values.  For
60 * example, here is a class that applies first-in-first-out
61 * tie-breaking to comparable elements. To use it, you would insert a
62 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
63 *
64 * <pre>
65 * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
66 *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
67 *   final static AtomicLong seq = new AtomicLong();
68 *   final long seqNum;
69 *   final E entry;
70 *   public FIFOEntry(E entry) {
71 *     seqNum = seq.getAndIncrement();
72 *     this.entry = entry;
73 *   }
74 *   public E getEntry() { return entry; }
75 *   public int compareTo(FIFOEntry&lt;E&gt; other) {
76 *     int res = entry.compareTo(other.entry);
77 *     if (res == 0 &amp;&amp; other.entry != this.entry)
78 *       res = (seqNum &lt; other.seqNum ? -1 : 1);
79 *     return res;
80 *   }
81 * }</pre>
82 *
83 * @author Doug Lea
84 * @author Justin T. Sampson
85 * @param <E> the type of elements held in this collection
86 */
87public class MonitorBasedPriorityBlockingQueue<E> extends AbstractQueue<E>
88    implements BlockingQueue<E> {
89
90    // Based on revision 1.55 of PriorityBlockingQueue by Doug Lea, from
91    // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
92
93    private static final long serialVersionUID = 5595510919245408276L;
94
95    final PriorityQueue<E> q;
96    final Monitor monitor = new Monitor(true);
97    private final Monitor.Guard notEmpty =
98        new Monitor.Guard(monitor) {
99            @Override public boolean isSatisfied() {
100              return !q.isEmpty();
101            }
102        };
103
104    /**
105     * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the default
106     * initial capacity (11) that orders its elements according to
107     * their {@linkplain Comparable natural ordering}.
108     */
109    public MonitorBasedPriorityBlockingQueue() {
110        q = new PriorityQueue<E>();
111    }
112
113    /**
114     * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified
115     * initial capacity that orders its elements according to their
116     * {@linkplain Comparable natural ordering}.
117     *
118     * @param initialCapacity the initial capacity for this priority queue
119     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
120     *         than 1
121     */
122    public MonitorBasedPriorityBlockingQueue(int initialCapacity) {
123        q = new PriorityQueue<E>(initialCapacity, null);
124    }
125
126    /**
127     * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified initial
128     * capacity that orders its elements according to the specified
129     * comparator.
130     *
131     * @param initialCapacity the initial capacity for this priority queue
132     * @param  comparator the comparator that will be used to order this
133     *         priority queue.  If {@code null}, the {@linkplain Comparable
134     *         natural ordering} of the elements will be used.
135     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
136     *         than 1
137     */
138    public MonitorBasedPriorityBlockingQueue(int initialCapacity,
139                                 @Nullable Comparator<? super E> comparator) {
140        q = new PriorityQueue<E>(initialCapacity, comparator);
141    }
142
143    /**
144     * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> containing the elements
145     * in the specified collection.  If the specified collection is a
146     * {@link SortedSet} or a {@link PriorityQueue},  this
147     * priority queue will be ordered according to the same ordering.
148     * Otherwise, this priority queue will be ordered according to the
149     * {@linkplain Comparable natural ordering} of its elements.
150     *
151     * @param  c the collection whose elements are to be placed
152     *         into this priority queue
153     * @throws ClassCastException if elements of the specified collection
154     *         cannot be compared to one another according to the priority
155     *         queue's ordering
156     * @throws NullPointerException if the specified collection or any
157     *         of its elements are null
158     */
159    public MonitorBasedPriorityBlockingQueue(Collection<? extends E> c) {
160        q = new PriorityQueue<E>(c);
161    }
162
163    /**
164     * Inserts the specified element into this priority queue.
165     *
166     * @param e the element to add
167     * @return <tt>true</tt> (as specified by {@link Collection#add})
168     * @throws ClassCastException if the specified element cannot be compared
169     *         with elements currently in the priority queue according to the
170     *         priority queue's ordering
171     * @throws NullPointerException if the specified element is null
172     */
173    @Override public boolean add(E e) {
174        return offer(e);
175    }
176
177    /**
178     * Inserts the specified element into this priority queue.
179     *
180     * @param e the element to add
181     * @return <tt>true</tt> (as specified by {@link Queue#offer})
182     * @throws ClassCastException if the specified element cannot be compared
183     *         with elements currently in the priority queue according to the
184     *         priority queue's ordering
185     * @throws NullPointerException if the specified element is null
186     */
187    @Override
188    public boolean offer(E e) {
189        final Monitor monitor = this.monitor;
190        monitor.enter();
191        try {
192            boolean ok = q.offer(e);
193            if (!ok) {
194              throw new AssertionError();
195            }
196            return true;
197        } finally {
198            monitor.leave();
199        }
200    }
201
202    /**
203     * Inserts the specified element into this priority queue. As the queue is
204     * unbounded this method will never block.
205     *
206     * @param e the element to add
207     * @throws ClassCastException if the specified element cannot be compared
208     *         with elements currently in the priority queue according to the
209     *         priority queue's ordering
210     * @throws NullPointerException if the specified element is null
211     */
212    @Override
213    public void put(E e) {
214        offer(e); // never need to block
215    }
216
217    /**
218     * Inserts the specified element into this priority queue. As the queue is
219     * unbounded this method will never block.
220     *
221     * @param e the element to add
222     * @param timeout This parameter is ignored as the method never blocks
223     * @param unit This parameter is ignored as the method never blocks
224     * @return <tt>true</tt>
225     * @throws ClassCastException if the specified element cannot be compared
226     *         with elements currently in the priority queue according to the
227     *         priority queue's ordering
228     * @throws NullPointerException if the specified element is null
229     */
230    @Override
231    public boolean offer(E e, long timeout, TimeUnit unit) {
232        checkNotNull(unit);
233        return offer(e); // never need to block
234    }
235
236    @Override
237    public E poll() {
238        final Monitor monitor = this.monitor;
239        monitor.enter();
240        try {
241            return q.poll();
242        } finally {
243            monitor.leave();
244        }
245    }
246
247    @Override
248    public E take() throws InterruptedException {
249        final Monitor monitor = this.monitor;
250        monitor.enterWhen(notEmpty);
251        try {
252            return q.poll();
253        } finally {
254            monitor.leave();
255        }
256    }
257
258    @Override
259    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
260        final Monitor monitor = this.monitor;
261        if (monitor.enterWhen(notEmpty, timeout, unit)) {
262            try {
263                return q.poll();
264            } finally {
265                monitor.leave();
266            }
267        } else {
268            return null;
269        }
270    }
271
272    @Override
273    public E peek() {
274        final Monitor monitor = this.monitor;
275        monitor.enter();
276        try {
277            return q.peek();
278        } finally {
279            monitor.leave();
280        }
281    }
282
283    /**
284     * Returns the comparator used to order the elements in this queue,
285     * or <tt>null</tt> if this queue uses the {@linkplain Comparable
286     * natural ordering} of its elements.
287     *
288     * @return the comparator used to order the elements in this queue,
289     *         or <tt>null</tt> if this queue uses the natural
290     *         ordering of its elements
291     */
292    public Comparator<? super E> comparator() {
293        return q.comparator();
294    }
295
296    @Override public int size() {
297        final Monitor monitor = this.monitor;
298        monitor.enter();
299        try {
300            return q.size();
301        } finally {
302            monitor.leave();
303        }
304    }
305
306    /**
307     * Always returns <tt>Integer.MAX_VALUE</tt> because
308     * a <tt>MonitorBasedPriorityBlockingQueue</tt> is not capacity constrained.
309     * @return <tt>Integer.MAX_VALUE</tt>
310     */
311    @Override
312    public int remainingCapacity() {
313        return Integer.MAX_VALUE;
314    }
315
316    /**
317     * Removes a single instance of the specified element from this queue,
318     * if it is present.  More formally, removes an element {@code e} such
319     * that {@code o.equals(e)}, if this queue contains one or more such
320     * elements.  Returns {@code true} if and only if this queue contained
321     * the specified element (or equivalently, if this queue changed as a
322     * result of the call).
323     *
324     * @param o element to be removed from this queue, if present
325     * @return <tt>true</tt> if this queue changed as a result of the call
326     */
327    @Override public boolean remove(@Nullable Object o) {
328        final Monitor monitor = this.monitor;
329        monitor.enter();
330        try {
331            return q.remove(o);
332        } finally {
333            monitor.leave();
334        }
335    }
336
337    /**
338     * Returns {@code true} if this queue contains the specified element.
339     * More formally, returns {@code true} if and only if this queue contains
340     * at least one element {@code e} such that {@code o.equals(e)}.
341     *
342     * @param o object to be checked for containment in this queue
343     * @return <tt>true</tt> if this queue contains the specified element
344     */
345    @Override public boolean contains(@Nullable Object o) {
346        final Monitor monitor = this.monitor;
347        monitor.enter();
348        try {
349            return q.contains(o);
350        } finally {
351            monitor.leave();
352        }
353    }
354
355    /**
356     * Returns an array containing all of the elements in this queue.
357     * The returned array elements are in no particular order.
358     *
359     * <p>The returned array will be "safe" in that no references to it are
360     * maintained by this queue.  (In other words, this method must allocate
361     * a new array).  The caller is thus free to modify the returned array.
362     *
363     * <p>This method acts as bridge between array-based and collection-based
364     * APIs.
365     *
366     * @return an array containing all of the elements in this queue
367     */
368    @Override public Object[] toArray() {
369        final Monitor monitor = this.monitor;
370        monitor.enter();
371        try {
372            return q.toArray();
373        } finally {
374            monitor.leave();
375        }
376    }
377
378    @Override public String toString() {
379        final Monitor monitor = this.monitor;
380        monitor.enter();
381        try {
382            return q.toString();
383        } finally {
384            monitor.leave();
385        }
386    }
387
388    /**
389     * @throws UnsupportedOperationException {@inheritDoc}
390     * @throws ClassCastException            {@inheritDoc}
391     * @throws NullPointerException          {@inheritDoc}
392     * @throws IllegalArgumentException      {@inheritDoc}
393     */
394    @Override
395    public int drainTo(Collection<? super E> c) {
396        if (c == null)
397            throw new NullPointerException();
398        if (c == this)
399            throw new IllegalArgumentException();
400        final Monitor monitor = this.monitor;
401        monitor.enter();
402        try {
403            int n = 0;
404            E e;
405            while ( (e = q.poll()) != null) {
406                c.add(e);
407                ++n;
408            }
409            return n;
410        } finally {
411            monitor.leave();
412        }
413    }
414
415    /**
416     * @throws UnsupportedOperationException {@inheritDoc}
417     * @throws ClassCastException            {@inheritDoc}
418     * @throws NullPointerException          {@inheritDoc}
419     * @throws IllegalArgumentException      {@inheritDoc}
420     */
421    @Override
422    public int drainTo(Collection<? super E> c, int maxElements) {
423        if (c == null)
424            throw new NullPointerException();
425        if (c == this)
426            throw new IllegalArgumentException();
427        if (maxElements <= 0)
428            return 0;
429        final Monitor monitor = this.monitor;
430        monitor.enter();
431        try {
432            int n = 0;
433            E e;
434            while (n < maxElements && (e = q.poll()) != null) {
435                c.add(e);
436                ++n;
437            }
438            return n;
439        } finally {
440            monitor.leave();
441        }
442    }
443
444    /**
445     * Atomically removes all of the elements from this queue.
446     * The queue will be empty after this call returns.
447     */
448    @Override public void clear() {
449        final Monitor monitor = this.monitor;
450        monitor.enter();
451        try {
452            q.clear();
453        } finally {
454            monitor.leave();
455        }
456    }
457
458    /**
459     * Returns an array containing all of the elements in this queue; the
460     * runtime type of the returned array is that of the specified array.
461     * The returned array elements are in no particular order.
462     * If the queue fits in the specified array, it is returned therein.
463     * Otherwise, a new array is allocated with the runtime type of the
464     * specified array and the size of this queue.
465     *
466     * <p>If this queue fits in the specified array with room to spare
467     * (i.e., the array has more elements than this queue), the element in
468     * the array immediately following the end of the queue is set to
469     * <tt>null</tt>.
470     *
471     * <p>Like the {@link #toArray()} method, this method acts as bridge between
472     * array-based and collection-based APIs.  Further, this method allows
473     * precise control over the runtime type of the output array, and may,
474     * under certain circumstances, be used to save allocation costs.
475     *
476     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
477     * The following code can be used to dump the queue into a newly
478     * allocated array of <tt>String</tt>:
479     *
480     * <pre>
481     *     String[] y = x.toArray(new String[0]);</pre>
482     *
483     * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to
484     * <tt>toArray()</tt>.
485     *
486     * @param a the array into which the elements of the queue are to
487     *          be stored, if it is big enough; otherwise, a new array of the
488     *          same runtime type is allocated for this purpose
489     * @return an array containing all of the elements in this queue
490     * @throws ArrayStoreException if the runtime type of the specified array
491     *         is not a supertype of the runtime type of every element in
492     *         this queue
493     * @throws NullPointerException if the specified array is null
494     */
495    @Override public <T> T[] toArray(T[] a) {
496        final Monitor monitor = this.monitor;
497        monitor.enter();
498        try {
499            return q.toArray(a);
500        } finally {
501            monitor.leave();
502        }
503    }
504
505    /**
506     * Returns an iterator over the elements in this queue. The
507     * iterator does not return the elements in any particular order.
508     * The returned <tt>Iterator</tt> is a "weakly consistent"
509     * iterator that will never throw {@link
510     * ConcurrentModificationException}, and guarantees to traverse
511     * elements as they existed upon construction of the iterator, and
512     * may (but is not guaranteed to) reflect any modifications
513     * subsequent to construction.
514     *
515     * @return an iterator over the elements in this queue
516     */
517    @Override public Iterator<E> iterator() {
518        return new Itr(toArray());
519    }
520
521    /**
522     * Snapshot iterator that works off copy of underlying q array.
523     */
524    private class Itr implements Iterator<E> {
525        final Object[] array; // Array of all elements
526        int cursor;           // index of next element to return;
527        int lastRet;          // index of last element, or -1 if no such
528
529        Itr(Object[] array) {
530            lastRet = -1;
531            this.array = array;
532        }
533
534        @Override
535        public boolean hasNext() {
536            return cursor < array.length;
537        }
538
539        @Override
540        public E next() {
541            if (cursor >= array.length)
542                throw new NoSuchElementException();
543            lastRet = cursor;
544
545            // array comes from q.toArray() and so should have only E's in it
546            @SuppressWarnings("unchecked")
547            E e = (E) array[cursor++];
548            return e;
549        }
550
551        @Override
552        public void remove() {
553            if (lastRet < 0)
554                throw new IllegalStateException();
555            Object x = array[lastRet];
556            lastRet = -1;
557            // Traverse underlying queue to find == element,
558            // not just a .equals element.
559            monitor.enter();
560            try {
561                for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
562                    if (it.next() == x) {
563                        it.remove();
564                        return;
565                    }
566                }
567            } finally {
568                monitor.leave();
569            }
570        }
571    }
572
573    /**
574     * Saves the state to a stream (that is, serializes it).  This
575     * merely wraps default serialization within the monitor.  The
576     * serialization strategy for items is left to underlying
577     * Queue. Note that locking is not needed on deserialization, so
578     * readObject is not defined, just relying on default.
579     */
580    private void writeObject(java.io.ObjectOutputStream s)
581        throws java.io.IOException {
582        monitor.enter();
583        try {
584            s.defaultWriteObject();
585        } finally {
586            monitor.leave();
587        }
588    }
589
590}
591