PriorityBlockingQueue.java revision 51b1b6997fd3f980076b8081f7f1165ccc2a4008
1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.util.concurrent.locks.Condition;
39import java.util.concurrent.locks.ReentrantLock;
40import java.util.*;
41
42/**
43 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
44 * the same ordering rules as class {@link PriorityQueue} and supplies
45 * blocking retrieval operations.  While this queue is logically
46 * unbounded, attempted additions may fail due to resource exhaustion
47 * (causing {@code OutOfMemoryError}). This class does not permit
48 * {@code null} elements.  A priority queue relying on {@linkplain
49 * Comparable natural ordering} also does not permit insertion of
50 * non-comparable objects (doing so results in
51 * {@code ClassCastException}).
52 *
53 * <p>This class and its iterator implement all of the
54 * <em>optional</em> methods of the {@link Collection} and {@link
55 * Iterator} interfaces.  The Iterator provided in method {@link
56 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
57 * the PriorityBlockingQueue in any particular order. If you need
58 * ordered traversal, consider using
59 * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
60 * can be used to <em>remove</em> some or all elements in priority
61 * order and place them in another collection.
62 *
63 * <p>Operations on this class make no guarantees about the ordering
64 * of elements with equal priority. If you need to enforce an
65 * ordering, you can define custom classes or comparators that use a
66 * secondary key to break ties in primary priority values.  For
67 * example, here is a class that applies first-in-first-out
68 * tie-breaking to comparable elements. To use it, you would insert a
69 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
70 *
71 *  <pre> {@code
72 * class FIFOEntry<E extends Comparable<? super E>>
73 *     implements Comparable<FIFOEntry<E>> {
74 *   static final AtomicLong seq = new AtomicLong(0);
75 *   final long seqNum;
76 *   final E entry;
77 *   public FIFOEntry(E entry) {
78 *     seqNum = seq.getAndIncrement();
79 *     this.entry = entry;
80 *   }
81 *   public E getEntry() { return entry; }
82 *   public int compareTo(FIFOEntry<E> other) {
83 *     int res = entry.compareTo(other.entry);
84 *     if (res == 0 && other.entry != this.entry)
85 *       res = (seqNum < other.seqNum ? -1 : 1);
86 *     return res;
87 *   }
88 * }}</pre>
89 *
90 * <p>This class is a member of the
91 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
92 * Java Collections Framework</a>.
93 *
94 * @since 1.5
95 * @author Doug Lea
96 * @param <E> the type of elements held in this collection
97 */
98public class PriorityBlockingQueue<E> extends AbstractQueue<E>
99    implements BlockingQueue<E>, java.io.Serializable {
100    private static final long serialVersionUID = 5595510919245408276L;
101
102    /*
103     * The implementation uses an array-based binary heap, with public
104     * operations protected with a single lock. However, allocation
105     * during resizing uses a simple spinlock (used only while not
106     * holding main lock) in order to allow takes to operate
107     * concurrently with allocation.  This avoids repeated
108     * postponement of waiting consumers and consequent element
109     * build-up. The need to back away from lock during allocation
110     * makes it impossible to simply wrap delegated
111     * java.util.PriorityQueue operations within a lock, as was done
112     * in a previous version of this class. To maintain
113     * interoperability, a plain PriorityQueue is still used during
114     * serialization, which maintains compatibility at the expense of
115     * transiently doubling overhead.
116     */
117
118    /**
119     * Default array capacity.
120     */
121    private static final int DEFAULT_INITIAL_CAPACITY = 11;
122
123    /**
124     * The maximum size of array to allocate.
125     * Some VMs reserve some header words in an array.
126     * Attempts to allocate larger arrays may result in
127     * OutOfMemoryError: Requested array size exceeds VM limit
128     */
129    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
130
131    /**
132     * Priority queue represented as a balanced binary heap: the two
133     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
134     * priority queue is ordered by comparator, or by the elements'
135     * natural ordering, if comparator is null: For each node n in the
136     * heap and each descendant d of n, n <= d.  The element with the
137     * lowest value is in queue[0], assuming the queue is nonempty.
138     */
139    private transient Object[] queue;
140
141    /**
142     * The number of elements in the priority queue.
143     */
144    private transient int size;
145
146    /**
147     * The comparator, or null if priority queue uses elements'
148     * natural ordering.
149     */
150    private transient Comparator<? super E> comparator;
151
152    /**
153     * Lock used for all public operations
154     */
155    private final ReentrantLock lock;
156
157    /**
158     * Condition for blocking when empty
159     */
160    private final Condition notEmpty;
161
162    /**
163     * Spinlock for allocation, acquired via CAS.
164     */
165    private transient volatile int allocationSpinLock;
166
167    /**
168     * A plain PriorityQueue used only for serialization,
169     * to maintain compatibility with previous versions
170     * of this class. Non-null only during serialization/deserialization.
171     */
172    private PriorityQueue q;
173
174    /**
175     * Creates a {@code PriorityBlockingQueue} with the default
176     * initial capacity (11) that orders its elements according to
177     * their {@linkplain Comparable natural ordering}.
178     */
179    public PriorityBlockingQueue() {
180        this(DEFAULT_INITIAL_CAPACITY, null);
181    }
182
183    /**
184     * Creates a {@code PriorityBlockingQueue} with the specified
185     * initial capacity that orders its elements according to their
186     * {@linkplain Comparable natural ordering}.
187     *
188     * @param initialCapacity the initial capacity for this priority queue
189     * @throws IllegalArgumentException if {@code initialCapacity} is less
190     *         than 1
191     */
192    public PriorityBlockingQueue(int initialCapacity) {
193        this(initialCapacity, null);
194    }
195
196    /**
197     * Creates a {@code PriorityBlockingQueue} with the specified initial
198     * capacity that orders its elements according to the specified
199     * comparator.
200     *
201     * @param initialCapacity the initial capacity for this priority queue
202     * @param  comparator the comparator that will be used to order this
203     *         priority queue.  If {@code null}, the {@linkplain Comparable
204     *         natural ordering} of the elements will be used.
205     * @throws IllegalArgumentException if {@code initialCapacity} is less
206     *         than 1
207     */
208    public PriorityBlockingQueue(int initialCapacity,
209                                 Comparator<? super E> comparator) {
210        if (initialCapacity < 1)
211            throw new IllegalArgumentException();
212        this.lock = new ReentrantLock();
213        this.notEmpty = lock.newCondition();
214        this.comparator = comparator;
215        this.queue = new Object[initialCapacity];
216    }
217
218    /**
219     * Creates a {@code PriorityBlockingQueue} containing the elements
220     * in the specified collection.  If the specified collection is a
221     * {@link SortedSet} or a {@link PriorityQueue},  this
222     * priority queue will be ordered according to the same ordering.
223     * Otherwise, this priority queue will be ordered according to the
224     * {@linkplain Comparable natural ordering} of its elements.
225     *
226     * @param  c the collection whose elements are to be placed
227     *         into this priority queue
228     * @throws ClassCastException if elements of the specified collection
229     *         cannot be compared to one another according to the priority
230     *         queue's ordering
231     * @throws NullPointerException if the specified collection or any
232     *         of its elements are null
233     */
234    public PriorityBlockingQueue(Collection<? extends E> c) {
235        this.lock = new ReentrantLock();
236        this.notEmpty = lock.newCondition();
237        boolean heapify = true; // true if not known to be in heap order
238        boolean screen = true;  // true if must screen for nulls
239        if (c instanceof SortedSet<?>) {
240            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
241            this.comparator = (Comparator<? super E>) ss.comparator();
242            heapify = false;
243        }
244        else if (c instanceof PriorityBlockingQueue<?>) {
245            PriorityBlockingQueue<? extends E> pq =
246                (PriorityBlockingQueue<? extends E>) c;
247            this.comparator = (Comparator<? super E>) pq.comparator();
248            screen = false;
249            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
250                heapify = false;
251        }
252        Object[] a = c.toArray();
253        int n = a.length;
254        // If c.toArray incorrectly doesn't return Object[], copy it.
255        if (a.getClass() != Object[].class)
256            a = Arrays.copyOf(a, n, Object[].class);
257        if (screen && (n == 1 || this.comparator != null)) {
258            for (int i = 0; i < n; ++i)
259                if (a[i] == null)
260                    throw new NullPointerException();
261        }
262        this.queue = a;
263        this.size = n;
264        if (heapify)
265            heapify();
266    }
267
268    /**
269     * Tries to grow array to accommodate at least one more element
270     * (but normally expand by about 50%), giving up (allowing retry)
271     * on contention (which we expect to be rare). Call only while
272     * holding lock.
273     *
274     * @param array the heap array
275     * @param oldCap the length of the array
276     */
277    private void tryGrow(Object[] array, int oldCap) {
278        lock.unlock(); // must release and then re-acquire main lock
279        Object[] newArray = null;
280        if (allocationSpinLock == 0 &&
281            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
282                                     0, 1)) {
283            try {
284                int newCap = oldCap + ((oldCap < 64) ?
285                                       (oldCap + 2) : // grow faster if small
286                                       (oldCap >> 1));
287                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
288                    int minCap = oldCap + 1;
289                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
290                        throw new OutOfMemoryError();
291                    newCap = MAX_ARRAY_SIZE;
292                }
293                if (newCap > oldCap && queue == array)
294                    newArray = new Object[newCap];
295            } finally {
296                allocationSpinLock = 0;
297            }
298        }
299        if (newArray == null) // back off if another thread is allocating
300            Thread.yield();
301        lock.lock();
302        if (newArray != null && queue == array) {
303            queue = newArray;
304            System.arraycopy(array, 0, newArray, 0, oldCap);
305        }
306    }
307
308    /**
309     * Mechanics for poll().  Call only while holding lock.
310     */
311    private E dequeue() {
312        int n = size - 1;
313        if (n < 0)
314            return null;
315        else {
316            Object[] array = queue;
317            E result = (E) array[0];
318            E x = (E) array[n];
319            array[n] = null;
320            Comparator<? super E> cmp = comparator;
321            if (cmp == null)
322                siftDownComparable(0, x, array, n);
323            else
324                siftDownUsingComparator(0, x, array, n, cmp);
325            size = n;
326            return result;
327        }
328    }
329
330    /**
331     * Inserts item x at position k, maintaining heap invariant by
332     * promoting x up the tree until it is greater than or equal to
333     * its parent, or is the root.
334     *
335     * To simplify and speed up coercions and comparisons. the
336     * Comparable and Comparator versions are separated into different
337     * methods that are otherwise identical. (Similarly for siftDown.)
338     * These methods are static, with heap state as arguments, to
339     * simplify use in light of possible comparator exceptions.
340     *
341     * @param k the position to fill
342     * @param x the item to insert
343     * @param array the heap array
344     * @param n heap size
345     */
346    private static <T> void siftUpComparable(int k, T x, Object[] array) {
347        Comparable<? super T> key = (Comparable<? super T>) x;
348        while (k > 0) {
349            int parent = (k - 1) >>> 1;
350            Object e = array[parent];
351            if (key.compareTo((T) e) >= 0)
352                break;
353            array[k] = e;
354            k = parent;
355        }
356        array[k] = key;
357    }
358
359    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
360                                       Comparator<? super T> cmp) {
361        while (k > 0) {
362            int parent = (k - 1) >>> 1;
363            Object e = array[parent];
364            if (cmp.compare(x, (T) e) >= 0)
365                break;
366            array[k] = e;
367            k = parent;
368        }
369        array[k] = x;
370    }
371
372    /**
373     * Inserts item x at position k, maintaining heap invariant by
374     * demoting x down the tree repeatedly until it is less than or
375     * equal to its children or is a leaf.
376     *
377     * @param k the position to fill
378     * @param x the item to insert
379     * @param array the heap array
380     * @param n heap size
381     */
382    private static <T> void siftDownComparable(int k, T x, Object[] array,
383                                               int n) {
384        if (n > 0) {
385            Comparable<? super T> key = (Comparable<? super T>)x;
386            int half = n >>> 1;           // loop while a non-leaf
387            while (k < half) {
388                int child = (k << 1) + 1; // assume left child is least
389                Object c = array[child];
390                int right = child + 1;
391                if (right < n &&
392                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
393                    c = array[child = right];
394                if (key.compareTo((T) c) <= 0)
395                    break;
396                array[k] = c;
397                k = child;
398            }
399            array[k] = key;
400        }
401    }
402
403    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
404                                                    int n,
405                                                    Comparator<? super T> cmp) {
406        if (n > 0) {
407            int half = n >>> 1;
408            while (k < half) {
409                int child = (k << 1) + 1;
410                Object c = array[child];
411                int right = child + 1;
412                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
413                    c = array[child = right];
414                if (cmp.compare(x, (T) c) <= 0)
415                    break;
416                array[k] = c;
417                k = child;
418            }
419            array[k] = x;
420        }
421    }
422
423    /**
424     * Establishes the heap invariant (described above) in the entire tree,
425     * assuming nothing about the order of the elements prior to the call.
426     */
427    private void heapify() {
428        Object[] array = queue;
429        int n = size;
430        int half = (n >>> 1) - 1;
431        Comparator<? super E> cmp = comparator;
432        if (cmp == null) {
433            for (int i = half; i >= 0; i--)
434                siftDownComparable(i, (E) array[i], array, n);
435        }
436        else {
437            for (int i = half; i >= 0; i--)
438                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
439        }
440    }
441
442    /**
443     * Inserts the specified element into this priority queue.
444     *
445     * @param e the element to add
446     * @return {@code true} (as specified by {@link Collection#add})
447     * @throws ClassCastException if the specified element cannot be compared
448     *         with elements currently in the priority queue according to the
449     *         priority queue's ordering
450     * @throws NullPointerException if the specified element is null
451     */
452    public boolean add(E e) {
453        return offer(e);
454    }
455
456    /**
457     * Inserts the specified element into this priority queue.
458     * As the queue is unbounded, this method will never return {@code false}.
459     *
460     * @param e the element to add
461     * @return {@code true} (as specified by {@link Queue#offer})
462     * @throws ClassCastException if the specified element cannot be compared
463     *         with elements currently in the priority queue according to the
464     *         priority queue's ordering
465     * @throws NullPointerException if the specified element is null
466     */
467    public boolean offer(E e) {
468        if (e == null)
469            throw new NullPointerException();
470        final ReentrantLock lock = this.lock;
471        lock.lock();
472        int n, cap;
473        Object[] array;
474        while ((n = size) >= (cap = (array = queue).length))
475            tryGrow(array, cap);
476        try {
477            Comparator<? super E> cmp = comparator;
478            if (cmp == null)
479                siftUpComparable(n, e, array);
480            else
481                siftUpUsingComparator(n, e, array, cmp);
482            size = n + 1;
483            notEmpty.signal();
484        } finally {
485            lock.unlock();
486        }
487        return true;
488    }
489
490    /**
491     * Inserts the specified element into this priority queue.
492     * As the queue is unbounded, this method will never block.
493     *
494     * @param e the element to add
495     * @throws ClassCastException if the specified element cannot be compared
496     *         with elements currently in the priority queue according to the
497     *         priority queue's ordering
498     * @throws NullPointerException if the specified element is null
499     */
500    public void put(E e) {
501        offer(e); // never need to block
502    }
503
504    /**
505     * Inserts the specified element into this priority queue.
506     * As the queue is unbounded, this method will never block or
507     * return {@code false}.
508     *
509     * @param e the element to add
510     * @param timeout This parameter is ignored as the method never blocks
511     * @param unit This parameter is ignored as the method never blocks
512     * @return {@code true} (as specified by
513     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
514     * @throws ClassCastException if the specified element cannot be compared
515     *         with elements currently in the priority queue according to the
516     *         priority queue's ordering
517     * @throws NullPointerException if the specified element is null
518     */
519    public boolean offer(E e, long timeout, TimeUnit unit) {
520        return offer(e); // never need to block
521    }
522
523    public E poll() {
524        final ReentrantLock lock = this.lock;
525        lock.lock();
526        try {
527            return dequeue();
528        } finally {
529            lock.unlock();
530        }
531    }
532
533    public E take() throws InterruptedException {
534        final ReentrantLock lock = this.lock;
535        lock.lockInterruptibly();
536        E result;
537        try {
538            while ( (result = dequeue()) == null)
539                notEmpty.await();
540        } finally {
541            lock.unlock();
542        }
543        return result;
544    }
545
546    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
547        long nanos = unit.toNanos(timeout);
548        final ReentrantLock lock = this.lock;
549        lock.lockInterruptibly();
550        E result;
551        try {
552            while ( (result = dequeue()) == null && nanos > 0)
553                nanos = notEmpty.awaitNanos(nanos);
554        } finally {
555            lock.unlock();
556        }
557        return result;
558    }
559
560    public E peek() {
561        final ReentrantLock lock = this.lock;
562        lock.lock();
563        try {
564            return (size == 0) ? null : (E) queue[0];
565        } finally {
566            lock.unlock();
567        }
568    }
569
570    /**
571     * Returns the comparator used to order the elements in this queue,
572     * or {@code null} if this queue uses the {@linkplain Comparable
573     * natural ordering} of its elements.
574     *
575     * @return the comparator used to order the elements in this queue,
576     *         or {@code null} if this queue uses the natural
577     *         ordering of its elements
578     */
579    public Comparator<? super E> comparator() {
580        return comparator;
581    }
582
583    public int size() {
584        final ReentrantLock lock = this.lock;
585        lock.lock();
586        try {
587            return size;
588        } finally {
589            lock.unlock();
590        }
591    }
592
593    /**
594     * Always returns {@code Integer.MAX_VALUE} because
595     * a {@code PriorityBlockingQueue} is not capacity constrained.
596     * @return {@code Integer.MAX_VALUE} always
597     */
598    public int remainingCapacity() {
599        return Integer.MAX_VALUE;
600    }
601
602    private int indexOf(Object o) {
603        if (o != null) {
604            Object[] array = queue;
605            int n = size;
606            for (int i = 0; i < n; i++)
607                if (o.equals(array[i]))
608                    return i;
609        }
610        return -1;
611    }
612
613    /**
614     * Removes the ith element from queue.
615     */
616    private void removeAt(int i) {
617        Object[] array = queue;
618        int n = size - 1;
619        if (n == i) // removed last element
620            array[i] = null;
621        else {
622            E moved = (E) array[n];
623            array[n] = null;
624            Comparator<? super E> cmp = comparator;
625            if (cmp == null)
626                siftDownComparable(i, moved, array, n);
627            else
628                siftDownUsingComparator(i, moved, array, n, cmp);
629            if (array[i] == moved) {
630                if (cmp == null)
631                    siftUpComparable(i, moved, array);
632                else
633                    siftUpUsingComparator(i, moved, array, cmp);
634            }
635        }
636        size = n;
637    }
638
639    /**
640     * Removes a single instance of the specified element from this queue,
641     * if it is present.  More formally, removes an element {@code e} such
642     * that {@code o.equals(e)}, if this queue contains one or more such
643     * elements.  Returns {@code true} if and only if this queue contained
644     * the specified element (or equivalently, if this queue changed as a
645     * result of the call).
646     *
647     * @param o element to be removed from this queue, if present
648     * @return {@code true} if this queue changed as a result of the call
649     */
650    public boolean remove(Object o) {
651        final ReentrantLock lock = this.lock;
652        lock.lock();
653        try {
654            int i = indexOf(o);
655            if (i == -1)
656                return false;
657            removeAt(i);
658            return true;
659        } finally {
660            lock.unlock();
661        }
662    }
663
664    /**
665     * Identity-based version for use in Itr.remove
666     */
667    void removeEQ(Object o) {
668        final ReentrantLock lock = this.lock;
669        lock.lock();
670        try {
671            Object[] array = queue;
672            for (int i = 0, n = size; i < n; i++) {
673                if (o == array[i]) {
674                    removeAt(i);
675                    break;
676                }
677            }
678        } finally {
679            lock.unlock();
680        }
681    }
682
683    /**
684     * Returns {@code true} if this queue contains the specified element.
685     * More formally, returns {@code true} if and only if this queue contains
686     * at least one element {@code e} such that {@code o.equals(e)}.
687     *
688     * @param o object to be checked for containment in this queue
689     * @return {@code true} if this queue contains the specified element
690     */
691    public boolean contains(Object o) {
692        final ReentrantLock lock = this.lock;
693        lock.lock();
694        try {
695            return indexOf(o) != -1;
696        } finally {
697            lock.unlock();
698        }
699    }
700
701    /**
702     * Returns an array containing all of the elements in this queue.
703     * The returned array elements are in no particular order.
704     *
705     * <p>The returned array will be "safe" in that no references to it are
706     * maintained by this queue.  (In other words, this method must allocate
707     * a new array).  The caller is thus free to modify the returned array.
708     *
709     * <p>This method acts as bridge between array-based and collection-based
710     * APIs.
711     *
712     * @return an array containing all of the elements in this queue
713     */
714    public Object[] toArray() {
715        final ReentrantLock lock = this.lock;
716        lock.lock();
717        try {
718            return Arrays.copyOf(queue, size);
719        } finally {
720            lock.unlock();
721        }
722    }
723
724    public String toString() {
725        final ReentrantLock lock = this.lock;
726        lock.lock();
727        try {
728            int n = size;
729            if (n == 0)
730                return "[]";
731            StringBuilder sb = new StringBuilder();
732            sb.append('[');
733            for (int i = 0; i < n; ++i) {
734                Object e = queue[i];
735                sb.append(e == this ? "(this Collection)" : e);
736                if (i != n - 1)
737                    sb.append(',').append(' ');
738            }
739            return sb.append(']').toString();
740        } finally {
741            lock.unlock();
742        }
743    }
744
745    /**
746     * @throws UnsupportedOperationException {@inheritDoc}
747     * @throws ClassCastException            {@inheritDoc}
748     * @throws NullPointerException          {@inheritDoc}
749     * @throws IllegalArgumentException      {@inheritDoc}
750     */
751    public int drainTo(Collection<? super E> c) {
752        return drainTo(c, Integer.MAX_VALUE);
753    }
754
755    /**
756     * @throws UnsupportedOperationException {@inheritDoc}
757     * @throws ClassCastException            {@inheritDoc}
758     * @throws NullPointerException          {@inheritDoc}
759     * @throws IllegalArgumentException      {@inheritDoc}
760     */
761    public int drainTo(Collection<? super E> c, int maxElements) {
762        if (c == null)
763            throw new NullPointerException();
764        if (c == this)
765            throw new IllegalArgumentException();
766        if (maxElements <= 0)
767            return 0;
768        final ReentrantLock lock = this.lock;
769        lock.lock();
770        try {
771            int n = Math.min(size, maxElements);
772            for (int i = 0; i < n; i++) {
773                c.add((E) queue[0]); // In this order, in case add() throws.
774                dequeue();
775            }
776            return n;
777        } finally {
778            lock.unlock();
779        }
780    }
781
782    /**
783     * Atomically removes all of the elements from this queue.
784     * The queue will be empty after this call returns.
785     */
786    public void clear() {
787        final ReentrantLock lock = this.lock;
788        lock.lock();
789        try {
790            Object[] array = queue;
791            int n = size;
792            size = 0;
793            for (int i = 0; i < n; i++)
794                array[i] = null;
795        } finally {
796            lock.unlock();
797        }
798    }
799
800    /**
801     * Returns an array containing all of the elements in this queue; the
802     * runtime type of the returned array is that of the specified array.
803     * The returned array elements are in no particular order.
804     * If the queue fits in the specified array, it is returned therein.
805     * Otherwise, a new array is allocated with the runtime type of the
806     * specified array and the size of this queue.
807     *
808     * <p>If this queue fits in the specified array with room to spare
809     * (i.e., the array has more elements than this queue), the element in
810     * the array immediately following the end of the queue is set to
811     * {@code null}.
812     *
813     * <p>Like the {@link #toArray()} method, this method acts as bridge between
814     * array-based and collection-based APIs.  Further, this method allows
815     * precise control over the runtime type of the output array, and may,
816     * under certain circumstances, be used to save allocation costs.
817     *
818     * <p>Suppose {@code x} is a queue known to contain only strings.
819     * The following code can be used to dump the queue into a newly
820     * allocated array of {@code String}:
821     *
822     *  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
823     *
824     * Note that {@code toArray(new Object[0])} is identical in function to
825     * {@code toArray()}.
826     *
827     * @param a the array into which the elements of the queue are to
828     *          be stored, if it is big enough; otherwise, a new array of the
829     *          same runtime type is allocated for this purpose
830     * @return an array containing all of the elements in this queue
831     * @throws ArrayStoreException if the runtime type of the specified array
832     *         is not a supertype of the runtime type of every element in
833     *         this queue
834     * @throws NullPointerException if the specified array is null
835     */
836    public <T> T[] toArray(T[] a) {
837        final ReentrantLock lock = this.lock;
838        lock.lock();
839        try {
840            int n = size;
841            if (a.length < n)
842                // Make a new array of a's runtime type, but my contents:
843                return (T[]) Arrays.copyOf(queue, size, a.getClass());
844            System.arraycopy(queue, 0, a, 0, n);
845            if (a.length > n)
846                a[n] = null;
847            return a;
848        } finally {
849            lock.unlock();
850        }
851    }
852
853    /**
854     * Returns an iterator over the elements in this queue. The
855     * iterator does not return the elements in any particular order.
856     *
857     * <p>The returned iterator is a "weakly consistent" iterator that
858     * will never throw {@link java.util.ConcurrentModificationException
859     * ConcurrentModificationException}, and guarantees to traverse
860     * elements as they existed upon construction of the iterator, and
861     * may (but is not guaranteed to) reflect any modifications
862     * subsequent to construction.
863     *
864     * @return an iterator over the elements in this queue
865     */
866    public Iterator<E> iterator() {
867        return new Itr(toArray());
868    }
869
870    /**
871     * Snapshot iterator that works off copy of underlying q array.
872     */
873    final class Itr implements Iterator<E> {
874        final Object[] array; // Array of all elements
875        int cursor;           // index of next element to return
876        int lastRet;          // index of last element, or -1 if no such
877
878        Itr(Object[] array) {
879            lastRet = -1;
880            this.array = array;
881        }
882
883        public boolean hasNext() {
884            return cursor < array.length;
885        }
886
887        public E next() {
888            if (cursor >= array.length)
889                throw new NoSuchElementException();
890            lastRet = cursor;
891            return (E)array[cursor++];
892        }
893
894        public void remove() {
895            if (lastRet < 0)
896                throw new IllegalStateException();
897            removeEQ(array[lastRet]);
898            lastRet = -1;
899        }
900    }
901
902    /**
903     * Saves this queue to a stream (that is, serializes it).
904     *
905     * For compatibility with previous version of this class, elements
906     * are first copied to a java.util.PriorityQueue, which is then
907     * serialized.
908     */
909    private void writeObject(java.io.ObjectOutputStream s)
910        throws java.io.IOException {
911        lock.lock();
912        try {
913            // avoid zero capacity argument
914            q = new PriorityQueue<E>(Math.max(size, 1), comparator);
915            q.addAll(this);
916            s.defaultWriteObject();
917        } finally {
918            q = null;
919            lock.unlock();
920        }
921    }
922
923    /**
924     * Reconstitutes this queue from a stream (that is, deserializes it).
925     */
926    private void readObject(java.io.ObjectInputStream s)
927        throws java.io.IOException, ClassNotFoundException {
928        try {
929            s.defaultReadObject();
930            this.queue = new Object[q.size()];
931            comparator = q.comparator();
932            addAll(q);
933        } finally {
934            q = null;
935        }
936    }
937
938    // Unsafe mechanics
939    private static final sun.misc.Unsafe UNSAFE;
940    private static final long allocationSpinLockOffset;
941    static {
942        try {
943            UNSAFE = sun.misc.Unsafe.getUnsafe();
944            Class k = PriorityBlockingQueue.class;
945            allocationSpinLockOffset = UNSAFE.objectFieldOffset
946                (k.getDeclaredField("allocationSpinLock"));
947        } catch (Exception e) {
948            throw new Error(e);
949        }
950    }
951}
952