PriorityBlockingQueue.java revision cec4dd4b1d33f78997603d0f89c0d0e56e64dbcd
1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7package java.util.concurrent;
8
9import java.util.concurrent.locks.*;
10import java.util.*;
11
12// BEGIN android-note
13// removed link to collections framework docs
14// END android-note
15
16/**
17 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
18 * the same ordering rules as class {@link PriorityQueue} and supplies
19 * blocking retrieval operations.  While this queue is logically
20 * unbounded, attempted additions may fail due to resource exhaustion
21 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
22 * <tt>null</tt> elements.  A priority queue relying on {@linkplain
23 * Comparable natural ordering} also does not permit insertion of
24 * non-comparable objects (doing so results in
25 * <tt>ClassCastException</tt>).
26 *
27 * <p>This class and its iterator implement all of the
28 * <em>optional</em> methods of the {@link Collection} and {@link
29 * Iterator} interfaces.  The Iterator provided in method {@link
30 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
31 * the PriorityBlockingQueue in any particular order. If you need
32 * ordered traversal, consider using
33 * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
34 * can be used to <em>remove</em> some or all elements in priority
35 * order and place them in another collection.
36 *
37 * <p>Operations on this class make no guarantees about the ordering
38 * of elements with equal priority. If you need to enforce an
39 * ordering, you can define custom classes or comparators that use a
40 * secondary key to break ties in primary priority values.  For
41 * example, here is a class that applies first-in-first-out
42 * tie-breaking to comparable elements. To use it, you would insert a
43 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
44 *
45 * <pre>
46 * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
47 *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
48 *   final static AtomicLong seq = new AtomicLong();
49 *   final long seqNum;
50 *   final E entry;
51 *   public FIFOEntry(E entry) {
52 *     seqNum = seq.getAndIncrement();
53 *     this.entry = entry;
54 *   }
55 *   public E getEntry() { return entry; }
56 *   public int compareTo(FIFOEntry&lt;E&gt; other) {
57 *     int res = entry.compareTo(other.entry);
58 *     if (res == 0 &amp;&amp; other.entry != this.entry)
59 *       res = (seqNum &lt; other.seqNum ? -1 : 1);
60 *     return res;
61 *   }
62 * }</pre>
63 *
64 * @since 1.5
65 * @author Doug Lea
66 * @param <E> the type of elements held in this collection
67 */
68public class PriorityBlockingQueue<E> extends AbstractQueue<E>
69    implements BlockingQueue<E>, java.io.Serializable {
70    private static final long serialVersionUID = 5595510919245408276L;
71
72    private final PriorityQueue<E> q;
73    private final ReentrantLock lock = new ReentrantLock(true);
74    private final Condition notEmpty = lock.newCondition();
75
76    /**
77     * Creates a <tt>PriorityBlockingQueue</tt> with the default
78     * initial capacity (11) that orders its elements according to
79     * their {@linkplain Comparable natural ordering}.
80     */
81    public PriorityBlockingQueue() {
82        q = new PriorityQueue<E>();
83    }
84
85    /**
86     * Creates a <tt>PriorityBlockingQueue</tt> with the specified
87     * initial capacity that orders its elements according to their
88     * {@linkplain Comparable natural ordering}.
89     *
90     * @param initialCapacity the initial capacity for this priority queue
91     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
92     *         than 1
93     */
94    public PriorityBlockingQueue(int initialCapacity) {
95        q = new PriorityQueue<E>(initialCapacity, null);
96    }
97
98    /**
99     * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
100     * capacity that orders its elements according to the specified
101     * comparator.
102     *
103     * @param initialCapacity the initial capacity for this priority queue
104     * @param  comparator the comparator that will be used to order this
105     *         priority queue.  If {@code null}, the {@linkplain Comparable
106     *         natural ordering} of the elements will be used.
107     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
108     *         than 1
109     */
110    public PriorityBlockingQueue(int initialCapacity,
111                                 Comparator<? super E> comparator) {
112        q = new PriorityQueue<E>(initialCapacity, comparator);
113    }
114
115    /**
116     * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
117     * in the specified collection.  If the specified collection is a
118     * {@link SortedSet} or a {@link PriorityQueue},  this
119     * priority queue will be ordered according to the same ordering.
120     * Otherwise, this priority queue will be ordered according to the
121     * {@linkplain Comparable natural ordering} of its elements.
122     *
123     * @param  c the collection whose elements are to be placed
124     *         into this priority queue
125     * @throws ClassCastException if elements of the specified collection
126     *         cannot be compared to one another according to the priority
127     *         queue's ordering
128     * @throws NullPointerException if the specified collection or any
129     *         of its elements are null
130     */
131    public PriorityBlockingQueue(Collection<? extends E> c) {
132        q = new PriorityQueue<E>(c);
133    }
134
135    /**
136     * Inserts the specified element into this priority queue.
137     *
138     * @param e the element to add
139     * @return <tt>true</tt> (as specified by {@link Collection#add})
140     * @throws ClassCastException if the specified element cannot be compared
141     *         with elements currently in the priority queue according to the
142     *         priority queue's ordering
143     * @throws NullPointerException if the specified element is null
144     */
145    public boolean add(E e) {
146        return offer(e);
147    }
148
149    /**
150     * Inserts the specified element into this priority queue.
151     *
152     * @param e the element to add
153     * @return <tt>true</tt> (as specified by {@link Queue#offer})
154     * @throws ClassCastException if the specified element cannot be compared
155     *         with elements currently in the priority queue according to the
156     *         priority queue's ordering
157     * @throws NullPointerException if the specified element is null
158     */
159    public boolean offer(E e) {
160        final ReentrantLock lock = this.lock;
161        lock.lock();
162        try {
163            boolean ok = q.offer(e);
164            assert ok;
165            notEmpty.signal();
166            return true;
167        } finally {
168            lock.unlock();
169        }
170    }
171
172    /**
173     * Inserts the specified element into this priority queue. As the queue is
174     * unbounded this method will never block.
175     *
176     * @param e the element to add
177     * @throws ClassCastException if the specified element cannot be compared
178     *         with elements currently in the priority queue according to the
179     *         priority queue's ordering
180     * @throws NullPointerException if the specified element is null
181     */
182    public void put(E e) {
183        offer(e); // never need to block
184    }
185
186    /**
187     * Inserts the specified element into this priority queue. As the queue is
188     * unbounded this method will never block.
189     *
190     * @param e the element to add
191     * @param timeout This parameter is ignored as the method never blocks
192     * @param unit This parameter is ignored as the method never blocks
193     * @return <tt>true</tt>
194     * @throws ClassCastException if the specified element cannot be compared
195     *         with elements currently in the priority queue according to the
196     *         priority queue's ordering
197     * @throws NullPointerException if the specified element is null
198     */
199    public boolean offer(E e, long timeout, TimeUnit unit) {
200        return offer(e); // never need to block
201    }
202
203    public E poll() {
204        final ReentrantLock lock = this.lock;
205        lock.lock();
206        try {
207            return q.poll();
208        } finally {
209            lock.unlock();
210        }
211    }
212
213    public E take() throws InterruptedException {
214        final ReentrantLock lock = this.lock;
215        lock.lockInterruptibly();
216        try {
217            try {
218                while (q.size() == 0)
219                    notEmpty.await();
220            } catch (InterruptedException ie) {
221                notEmpty.signal(); // propagate to non-interrupted thread
222                throw ie;
223            }
224            E x = q.poll();
225            assert x != null;
226            return x;
227        } finally {
228            lock.unlock();
229        }
230    }
231
232    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
233        long nanos = unit.toNanos(timeout);
234        final ReentrantLock lock = this.lock;
235        lock.lockInterruptibly();
236        try {
237            for (;;) {
238                E x = q.poll();
239                if (x != null)
240                    return x;
241                if (nanos <= 0)
242                    return null;
243                try {
244                    nanos = notEmpty.awaitNanos(nanos);
245                } catch (InterruptedException ie) {
246                    notEmpty.signal(); // propagate to non-interrupted thread
247                    throw ie;
248                }
249            }
250        } finally {
251            lock.unlock();
252        }
253    }
254
255    public E peek() {
256        final ReentrantLock lock = this.lock;
257        lock.lock();
258        try {
259            return q.peek();
260        } finally {
261            lock.unlock();
262        }
263    }
264
265    /**
266     * Returns the comparator used to order the elements in this queue,
267     * or <tt>null</tt> if this queue uses the {@linkplain Comparable
268     * natural ordering} of its elements.
269     *
270     * @return the comparator used to order the elements in this queue,
271     *         or <tt>null</tt> if this queue uses the natural
272     *         ordering of its elements
273     */
274    public Comparator<? super E> comparator() {
275        return q.comparator();
276    }
277
278    public int size() {
279        final ReentrantLock lock = this.lock;
280        lock.lock();
281        try {
282            return q.size();
283        } finally {
284            lock.unlock();
285        }
286    }
287
288    /**
289     * Always returns <tt>Integer.MAX_VALUE</tt> because
290     * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
291     * @return <tt>Integer.MAX_VALUE</tt>
292     */
293    public int remainingCapacity() {
294        return Integer.MAX_VALUE;
295    }
296
297    /**
298     * Removes a single instance of the specified element from this queue,
299     * if it is present.  More formally, removes an element {@code e} such
300     * that {@code o.equals(e)}, if this queue contains one or more such
301     * elements.  Returns {@code true} if and only if this queue contained
302     * the specified element (or equivalently, if this queue changed as a
303     * result of the call).
304     *
305     * @param o element to be removed from this queue, if present
306     * @return <tt>true</tt> if this queue changed as a result of the call
307     */
308    public boolean remove(Object o) {
309        final ReentrantLock lock = this.lock;
310        lock.lock();
311        try {
312            return q.remove(o);
313        } finally {
314            lock.unlock();
315        }
316    }
317
318    /**
319     * Returns {@code true} if this queue contains the specified element.
320     * More formally, returns {@code true} if and only if this queue contains
321     * at least one element {@code e} such that {@code o.equals(e)}.
322     *
323     * @param o object to be checked for containment in this queue
324     * @return <tt>true</tt> if this queue contains the specified element
325     */
326    public boolean contains(Object o) {
327        final ReentrantLock lock = this.lock;
328        lock.lock();
329        try {
330            return q.contains(o);
331        } finally {
332            lock.unlock();
333        }
334    }
335
336    /**
337     * Returns an array containing all of the elements in this queue.
338     * The returned array elements are in no particular order.
339     *
340     * <p>The returned array will be "safe" in that no references to it are
341     * maintained by this queue.  (In other words, this method must allocate
342     * a new array).  The caller is thus free to modify the returned array.
343     *
344     * <p>This method acts as bridge between array-based and collection-based
345     * APIs.
346     *
347     * @return an array containing all of the elements in this queue
348     */
349    public Object[] toArray() {
350        final ReentrantLock lock = this.lock;
351        lock.lock();
352        try {
353            return q.toArray();
354        } finally {
355            lock.unlock();
356        }
357    }
358
359
360    public String toString() {
361        final ReentrantLock lock = this.lock;
362        lock.lock();
363        try {
364            return q.toString();
365        } finally {
366            lock.unlock();
367        }
368    }
369
370    /**
371     * @throws UnsupportedOperationException {@inheritDoc}
372     * @throws ClassCastException            {@inheritDoc}
373     * @throws NullPointerException          {@inheritDoc}
374     * @throws IllegalArgumentException      {@inheritDoc}
375     */
376    public int drainTo(Collection<? super E> c) {
377        if (c == null)
378            throw new NullPointerException();
379        if (c == this)
380            throw new IllegalArgumentException();
381        final ReentrantLock lock = this.lock;
382        lock.lock();
383        try {
384            int n = 0;
385            E e;
386            while ( (e = q.poll()) != null) {
387                c.add(e);
388                ++n;
389            }
390            return n;
391        } finally {
392            lock.unlock();
393        }
394    }
395
396    /**
397     * @throws UnsupportedOperationException {@inheritDoc}
398     * @throws ClassCastException            {@inheritDoc}
399     * @throws NullPointerException          {@inheritDoc}
400     * @throws IllegalArgumentException      {@inheritDoc}
401     */
402    public int drainTo(Collection<? super E> c, int maxElements) {
403        if (c == null)
404            throw new NullPointerException();
405        if (c == this)
406            throw new IllegalArgumentException();
407        if (maxElements <= 0)
408            return 0;
409        final ReentrantLock lock = this.lock;
410        lock.lock();
411        try {
412            int n = 0;
413            E e;
414            while (n < maxElements && (e = q.poll()) != null) {
415                c.add(e);
416                ++n;
417            }
418            return n;
419        } finally {
420            lock.unlock();
421        }
422    }
423
424    /**
425     * Atomically removes all of the elements from this queue.
426     * The queue will be empty after this call returns.
427     */
428    public void clear() {
429        final ReentrantLock lock = this.lock;
430        lock.lock();
431        try {
432            q.clear();
433        } finally {
434            lock.unlock();
435        }
436    }
437
438    /**
439     * Returns an array containing all of the elements in this queue; the
440     * runtime type of the returned array is that of the specified array.
441     * The returned array elements are in no particular order.
442     * If the queue fits in the specified array, it is returned therein.
443     * Otherwise, a new array is allocated with the runtime type of the
444     * specified array and the size of this queue.
445     *
446     * <p>If this queue fits in the specified array with room to spare
447     * (i.e., the array has more elements than this queue), the element in
448     * the array immediately following the end of the queue is set to
449     * <tt>null</tt>.
450     *
451     * <p>Like the {@link #toArray()} method, this method acts as bridge between
452     * array-based and collection-based APIs.  Further, this method allows
453     * precise control over the runtime type of the output array, and may,
454     * under certain circumstances, be used to save allocation costs.
455     *
456     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
457     * The following code can be used to dump the queue into a newly
458     * allocated array of <tt>String</tt>:
459     *
460     * <pre>
461     *     String[] y = x.toArray(new String[0]);</pre>
462     *
463     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
464     * <tt>toArray()</tt>.
465     *
466     * @param a the array into which the elements of the queue are to
467     *          be stored, if it is big enough; otherwise, a new array of the
468     *          same runtime type is allocated for this purpose
469     * @return an array containing all of the elements in this queue
470     * @throws ArrayStoreException if the runtime type of the specified array
471     *         is not a supertype of the runtime type of every element in
472     *         this queue
473     * @throws NullPointerException if the specified array is null
474     */
475    public <T> T[] toArray(T[] a) {
476        final ReentrantLock lock = this.lock;
477        lock.lock();
478        try {
479            return q.toArray(a);
480        } finally {
481            lock.unlock();
482        }
483    }
484
485    /**
486     * Returns an iterator over the elements in this queue. The
487     * iterator does not return the elements in any particular order.
488     * The returned <tt>Iterator</tt> is a "weakly consistent"
489     * iterator that will never throw {@link
490     * ConcurrentModificationException}, and guarantees to traverse
491     * elements as they existed upon construction of the iterator, and
492     * may (but is not guaranteed to) reflect any modifications
493     * subsequent to construction.
494     *
495     * @return an iterator over the elements in this queue
496     */
497    public Iterator<E> iterator() {
498        return new Itr(toArray());
499    }
500
501    /**
502     * Snapshot iterator that works off copy of underlying q array.
503     */
504    private class Itr implements Iterator<E> {
505        final Object[] array; // Array of all elements
506        int cursor;           // index of next element to return;
507        int lastRet;          // index of last element, or -1 if no such
508
509        Itr(Object[] array) {
510            lastRet = -1;
511            this.array = array;
512        }
513
514        public boolean hasNext() {
515            return cursor < array.length;
516        }
517
518        public E next() {
519            if (cursor >= array.length)
520                throw new NoSuchElementException();
521            lastRet = cursor;
522            return (E)array[cursor++];
523        }
524
525        public void remove() {
526            if (lastRet < 0)
527                throw new IllegalStateException();
528            Object x = array[lastRet];
529            lastRet = -1;
530            // Traverse underlying queue to find == element,
531            // not just a .equals element.
532            lock.lock();
533            try {
534                for (Iterator it = q.iterator(); it.hasNext(); ) {
535                    if (it.next() == x) {
536                        it.remove();
537                        return;
538                    }
539                }
540            } finally {
541                lock.unlock();
542            }
543        }
544    }
545
546    /**
547     * Saves the state to a stream (that is, serializes it).  This
548     * merely wraps default serialization within lock.  The
549     * serialization strategy for items is left to underlying
550     * Queue. Note that locking is not needed on deserialization, so
551     * readObject is not defined, just relying on default.
552     */
553    private void writeObject(java.io.ObjectOutputStream s)
554        throws java.io.IOException {
555        lock.lock();
556        try {
557            s.defaultWriteObject();
558        } finally {
559            lock.unlock();
560        }
561    }
562
563}
564