PriorityBlockingQueue.java revision 51b1b6997fd3f980076b8081f7f1165ccc2a4008
1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25/* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36package java.util.concurrent; 37 38import java.util.concurrent.locks.Condition; 39import java.util.concurrent.locks.ReentrantLock; 40import java.util.*; 41 42/** 43 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 44 * the same ordering rules as class {@link PriorityQueue} and supplies 45 * blocking retrieval operations. While this queue is logically 46 * unbounded, attempted additions may fail due to resource exhaustion 47 * (causing {@code OutOfMemoryError}). This class does not permit 48 * {@code null} elements. A priority queue relying on {@linkplain 49 * Comparable natural ordering} also does not permit insertion of 50 * non-comparable objects (doing so results in 51 * {@code ClassCastException}). 52 * 53 * <p>This class and its iterator implement all of the 54 * <em>optional</em> methods of the {@link Collection} and {@link 55 * Iterator} interfaces. The Iterator provided in method {@link 56 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 57 * the PriorityBlockingQueue in any particular order. If you need 58 * ordered traversal, consider using 59 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} 60 * can be used to <em>remove</em> some or all elements in priority 61 * order and place them in another collection. 62 * 63 * <p>Operations on this class make no guarantees about the ordering 64 * of elements with equal priority. If you need to enforce an 65 * ordering, you can define custom classes or comparators that use a 66 * secondary key to break ties in primary priority values. For 67 * example, here is a class that applies first-in-first-out 68 * tie-breaking to comparable elements. To use it, you would insert a 69 * {@code new FIFOEntry(anEntry)} instead of a plain entry object. 70 * 71 * <pre> {@code 72 * class FIFOEntry<E extends Comparable<? super E>> 73 * implements Comparable<FIFOEntry<E>> { 74 * static final AtomicLong seq = new AtomicLong(0); 75 * final long seqNum; 76 * final E entry; 77 * public FIFOEntry(E entry) { 78 * seqNum = seq.getAndIncrement(); 79 * this.entry = entry; 80 * } 81 * public E getEntry() { return entry; } 82 * public int compareTo(FIFOEntry<E> other) { 83 * int res = entry.compareTo(other.entry); 84 * if (res == 0 && other.entry != this.entry) 85 * res = (seqNum < other.seqNum ? -1 : 1); 86 * return res; 87 * } 88 * }}</pre> 89 * 90 * <p>This class is a member of the 91 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 92 * Java Collections Framework</a>. 93 * 94 * @since 1.5 95 * @author Doug Lea 96 * @param <E> the type of elements held in this collection 97 */ 98public class PriorityBlockingQueue<E> extends AbstractQueue<E> 99 implements BlockingQueue<E>, java.io.Serializable { 100 private static final long serialVersionUID = 5595510919245408276L; 101 102 /* 103 * The implementation uses an array-based binary heap, with public 104 * operations protected with a single lock. However, allocation 105 * during resizing uses a simple spinlock (used only while not 106 * holding main lock) in order to allow takes to operate 107 * concurrently with allocation. This avoids repeated 108 * postponement of waiting consumers and consequent element 109 * build-up. The need to back away from lock during allocation 110 * makes it impossible to simply wrap delegated 111 * java.util.PriorityQueue operations within a lock, as was done 112 * in a previous version of this class. To maintain 113 * interoperability, a plain PriorityQueue is still used during 114 * serialization, which maintains compatibility at the expense of 115 * transiently doubling overhead. 116 */ 117 118 /** 119 * Default array capacity. 120 */ 121 private static final int DEFAULT_INITIAL_CAPACITY = 11; 122 123 /** 124 * The maximum size of array to allocate. 125 * Some VMs reserve some header words in an array. 126 * Attempts to allocate larger arrays may result in 127 * OutOfMemoryError: Requested array size exceeds VM limit 128 */ 129 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 130 131 /** 132 * Priority queue represented as a balanced binary heap: the two 133 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 134 * priority queue is ordered by comparator, or by the elements' 135 * natural ordering, if comparator is null: For each node n in the 136 * heap and each descendant d of n, n <= d. The element with the 137 * lowest value is in queue[0], assuming the queue is nonempty. 138 */ 139 private transient Object[] queue; 140 141 /** 142 * The number of elements in the priority queue. 143 */ 144 private transient int size; 145 146 /** 147 * The comparator, or null if priority queue uses elements' 148 * natural ordering. 149 */ 150 private transient Comparator<? super E> comparator; 151 152 /** 153 * Lock used for all public operations 154 */ 155 private final ReentrantLock lock; 156 157 /** 158 * Condition for blocking when empty 159 */ 160 private final Condition notEmpty; 161 162 /** 163 * Spinlock for allocation, acquired via CAS. 164 */ 165 private transient volatile int allocationSpinLock; 166 167 /** 168 * A plain PriorityQueue used only for serialization, 169 * to maintain compatibility with previous versions 170 * of this class. Non-null only during serialization/deserialization. 171 */ 172 private PriorityQueue q; 173 174 /** 175 * Creates a {@code PriorityBlockingQueue} with the default 176 * initial capacity (11) that orders its elements according to 177 * their {@linkplain Comparable natural ordering}. 178 */ 179 public PriorityBlockingQueue() { 180 this(DEFAULT_INITIAL_CAPACITY, null); 181 } 182 183 /** 184 * Creates a {@code PriorityBlockingQueue} with the specified 185 * initial capacity that orders its elements according to their 186 * {@linkplain Comparable natural ordering}. 187 * 188 * @param initialCapacity the initial capacity for this priority queue 189 * @throws IllegalArgumentException if {@code initialCapacity} is less 190 * than 1 191 */ 192 public PriorityBlockingQueue(int initialCapacity) { 193 this(initialCapacity, null); 194 } 195 196 /** 197 * Creates a {@code PriorityBlockingQueue} with the specified initial 198 * capacity that orders its elements according to the specified 199 * comparator. 200 * 201 * @param initialCapacity the initial capacity for this priority queue 202 * @param comparator the comparator that will be used to order this 203 * priority queue. If {@code null}, the {@linkplain Comparable 204 * natural ordering} of the elements will be used. 205 * @throws IllegalArgumentException if {@code initialCapacity} is less 206 * than 1 207 */ 208 public PriorityBlockingQueue(int initialCapacity, 209 Comparator<? super E> comparator) { 210 if (initialCapacity < 1) 211 throw new IllegalArgumentException(); 212 this.lock = new ReentrantLock(); 213 this.notEmpty = lock.newCondition(); 214 this.comparator = comparator; 215 this.queue = new Object[initialCapacity]; 216 } 217 218 /** 219 * Creates a {@code PriorityBlockingQueue} containing the elements 220 * in the specified collection. If the specified collection is a 221 * {@link SortedSet} or a {@link PriorityQueue}, this 222 * priority queue will be ordered according to the same ordering. 223 * Otherwise, this priority queue will be ordered according to the 224 * {@linkplain Comparable natural ordering} of its elements. 225 * 226 * @param c the collection whose elements are to be placed 227 * into this priority queue 228 * @throws ClassCastException if elements of the specified collection 229 * cannot be compared to one another according to the priority 230 * queue's ordering 231 * @throws NullPointerException if the specified collection or any 232 * of its elements are null 233 */ 234 public PriorityBlockingQueue(Collection<? extends E> c) { 235 this.lock = new ReentrantLock(); 236 this.notEmpty = lock.newCondition(); 237 boolean heapify = true; // true if not known to be in heap order 238 boolean screen = true; // true if must screen for nulls 239 if (c instanceof SortedSet<?>) { 240 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 241 this.comparator = (Comparator<? super E>) ss.comparator(); 242 heapify = false; 243 } 244 else if (c instanceof PriorityBlockingQueue<?>) { 245 PriorityBlockingQueue<? extends E> pq = 246 (PriorityBlockingQueue<? extends E>) c; 247 this.comparator = (Comparator<? super E>) pq.comparator(); 248 screen = false; 249 if (pq.getClass() == PriorityBlockingQueue.class) // exact match 250 heapify = false; 251 } 252 Object[] a = c.toArray(); 253 int n = a.length; 254 // If c.toArray incorrectly doesn't return Object[], copy it. 255 if (a.getClass() != Object[].class) 256 a = Arrays.copyOf(a, n, Object[].class); 257 if (screen && (n == 1 || this.comparator != null)) { 258 for (int i = 0; i < n; ++i) 259 if (a[i] == null) 260 throw new NullPointerException(); 261 } 262 this.queue = a; 263 this.size = n; 264 if (heapify) 265 heapify(); 266 } 267 268 /** 269 * Tries to grow array to accommodate at least one more element 270 * (but normally expand by about 50%), giving up (allowing retry) 271 * on contention (which we expect to be rare). Call only while 272 * holding lock. 273 * 274 * @param array the heap array 275 * @param oldCap the length of the array 276 */ 277 private void tryGrow(Object[] array, int oldCap) { 278 lock.unlock(); // must release and then re-acquire main lock 279 Object[] newArray = null; 280 if (allocationSpinLock == 0 && 281 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 282 0, 1)) { 283 try { 284 int newCap = oldCap + ((oldCap < 64) ? 285 (oldCap + 2) : // grow faster if small 286 (oldCap >> 1)); 287 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow 288 int minCap = oldCap + 1; 289 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) 290 throw new OutOfMemoryError(); 291 newCap = MAX_ARRAY_SIZE; 292 } 293 if (newCap > oldCap && queue == array) 294 newArray = new Object[newCap]; 295 } finally { 296 allocationSpinLock = 0; 297 } 298 } 299 if (newArray == null) // back off if another thread is allocating 300 Thread.yield(); 301 lock.lock(); 302 if (newArray != null && queue == array) { 303 queue = newArray; 304 System.arraycopy(array, 0, newArray, 0, oldCap); 305 } 306 } 307 308 /** 309 * Mechanics for poll(). Call only while holding lock. 310 */ 311 private E dequeue() { 312 int n = size - 1; 313 if (n < 0) 314 return null; 315 else { 316 Object[] array = queue; 317 E result = (E) array[0]; 318 E x = (E) array[n]; 319 array[n] = null; 320 Comparator<? super E> cmp = comparator; 321 if (cmp == null) 322 siftDownComparable(0, x, array, n); 323 else 324 siftDownUsingComparator(0, x, array, n, cmp); 325 size = n; 326 return result; 327 } 328 } 329 330 /** 331 * Inserts item x at position k, maintaining heap invariant by 332 * promoting x up the tree until it is greater than or equal to 333 * its parent, or is the root. 334 * 335 * To simplify and speed up coercions and comparisons. the 336 * Comparable and Comparator versions are separated into different 337 * methods that are otherwise identical. (Similarly for siftDown.) 338 * These methods are static, with heap state as arguments, to 339 * simplify use in light of possible comparator exceptions. 340 * 341 * @param k the position to fill 342 * @param x the item to insert 343 * @param array the heap array 344 * @param n heap size 345 */ 346 private static <T> void siftUpComparable(int k, T x, Object[] array) { 347 Comparable<? super T> key = (Comparable<? super T>) x; 348 while (k > 0) { 349 int parent = (k - 1) >>> 1; 350 Object e = array[parent]; 351 if (key.compareTo((T) e) >= 0) 352 break; 353 array[k] = e; 354 k = parent; 355 } 356 array[k] = key; 357 } 358 359 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, 360 Comparator<? super T> cmp) { 361 while (k > 0) { 362 int parent = (k - 1) >>> 1; 363 Object e = array[parent]; 364 if (cmp.compare(x, (T) e) >= 0) 365 break; 366 array[k] = e; 367 k = parent; 368 } 369 array[k] = x; 370 } 371 372 /** 373 * Inserts item x at position k, maintaining heap invariant by 374 * demoting x down the tree repeatedly until it is less than or 375 * equal to its children or is a leaf. 376 * 377 * @param k the position to fill 378 * @param x the item to insert 379 * @param array the heap array 380 * @param n heap size 381 */ 382 private static <T> void siftDownComparable(int k, T x, Object[] array, 383 int n) { 384 if (n > 0) { 385 Comparable<? super T> key = (Comparable<? super T>)x; 386 int half = n >>> 1; // loop while a non-leaf 387 while (k < half) { 388 int child = (k << 1) + 1; // assume left child is least 389 Object c = array[child]; 390 int right = child + 1; 391 if (right < n && 392 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 393 c = array[child = right]; 394 if (key.compareTo((T) c) <= 0) 395 break; 396 array[k] = c; 397 k = child; 398 } 399 array[k] = key; 400 } 401 } 402 403 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, 404 int n, 405 Comparator<? super T> cmp) { 406 if (n > 0) { 407 int half = n >>> 1; 408 while (k < half) { 409 int child = (k << 1) + 1; 410 Object c = array[child]; 411 int right = child + 1; 412 if (right < n && cmp.compare((T) c, (T) array[right]) > 0) 413 c = array[child = right]; 414 if (cmp.compare(x, (T) c) <= 0) 415 break; 416 array[k] = c; 417 k = child; 418 } 419 array[k] = x; 420 } 421 } 422 423 /** 424 * Establishes the heap invariant (described above) in the entire tree, 425 * assuming nothing about the order of the elements prior to the call. 426 */ 427 private void heapify() { 428 Object[] array = queue; 429 int n = size; 430 int half = (n >>> 1) - 1; 431 Comparator<? super E> cmp = comparator; 432 if (cmp == null) { 433 for (int i = half; i >= 0; i--) 434 siftDownComparable(i, (E) array[i], array, n); 435 } 436 else { 437 for (int i = half; i >= 0; i--) 438 siftDownUsingComparator(i, (E) array[i], array, n, cmp); 439 } 440 } 441 442 /** 443 * Inserts the specified element into this priority queue. 444 * 445 * @param e the element to add 446 * @return {@code true} (as specified by {@link Collection#add}) 447 * @throws ClassCastException if the specified element cannot be compared 448 * with elements currently in the priority queue according to the 449 * priority queue's ordering 450 * @throws NullPointerException if the specified element is null 451 */ 452 public boolean add(E e) { 453 return offer(e); 454 } 455 456 /** 457 * Inserts the specified element into this priority queue. 458 * As the queue is unbounded, this method will never return {@code false}. 459 * 460 * @param e the element to add 461 * @return {@code true} (as specified by {@link Queue#offer}) 462 * @throws ClassCastException if the specified element cannot be compared 463 * with elements currently in the priority queue according to the 464 * priority queue's ordering 465 * @throws NullPointerException if the specified element is null 466 */ 467 public boolean offer(E e) { 468 if (e == null) 469 throw new NullPointerException(); 470 final ReentrantLock lock = this.lock; 471 lock.lock(); 472 int n, cap; 473 Object[] array; 474 while ((n = size) >= (cap = (array = queue).length)) 475 tryGrow(array, cap); 476 try { 477 Comparator<? super E> cmp = comparator; 478 if (cmp == null) 479 siftUpComparable(n, e, array); 480 else 481 siftUpUsingComparator(n, e, array, cmp); 482 size = n + 1; 483 notEmpty.signal(); 484 } finally { 485 lock.unlock(); 486 } 487 return true; 488 } 489 490 /** 491 * Inserts the specified element into this priority queue. 492 * As the queue is unbounded, this method will never block. 493 * 494 * @param e the element to add 495 * @throws ClassCastException if the specified element cannot be compared 496 * with elements currently in the priority queue according to the 497 * priority queue's ordering 498 * @throws NullPointerException if the specified element is null 499 */ 500 public void put(E e) { 501 offer(e); // never need to block 502 } 503 504 /** 505 * Inserts the specified element into this priority queue. 506 * As the queue is unbounded, this method will never block or 507 * return {@code false}. 508 * 509 * @param e the element to add 510 * @param timeout This parameter is ignored as the method never blocks 511 * @param unit This parameter is ignored as the method never blocks 512 * @return {@code true} (as specified by 513 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 514 * @throws ClassCastException if the specified element cannot be compared 515 * with elements currently in the priority queue according to the 516 * priority queue's ordering 517 * @throws NullPointerException if the specified element is null 518 */ 519 public boolean offer(E e, long timeout, TimeUnit unit) { 520 return offer(e); // never need to block 521 } 522 523 public E poll() { 524 final ReentrantLock lock = this.lock; 525 lock.lock(); 526 try { 527 return dequeue(); 528 } finally { 529 lock.unlock(); 530 } 531 } 532 533 public E take() throws InterruptedException { 534 final ReentrantLock lock = this.lock; 535 lock.lockInterruptibly(); 536 E result; 537 try { 538 while ( (result = dequeue()) == null) 539 notEmpty.await(); 540 } finally { 541 lock.unlock(); 542 } 543 return result; 544 } 545 546 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 547 long nanos = unit.toNanos(timeout); 548 final ReentrantLock lock = this.lock; 549 lock.lockInterruptibly(); 550 E result; 551 try { 552 while ( (result = dequeue()) == null && nanos > 0) 553 nanos = notEmpty.awaitNanos(nanos); 554 } finally { 555 lock.unlock(); 556 } 557 return result; 558 } 559 560 public E peek() { 561 final ReentrantLock lock = this.lock; 562 lock.lock(); 563 try { 564 return (size == 0) ? null : (E) queue[0]; 565 } finally { 566 lock.unlock(); 567 } 568 } 569 570 /** 571 * Returns the comparator used to order the elements in this queue, 572 * or {@code null} if this queue uses the {@linkplain Comparable 573 * natural ordering} of its elements. 574 * 575 * @return the comparator used to order the elements in this queue, 576 * or {@code null} if this queue uses the natural 577 * ordering of its elements 578 */ 579 public Comparator<? super E> comparator() { 580 return comparator; 581 } 582 583 public int size() { 584 final ReentrantLock lock = this.lock; 585 lock.lock(); 586 try { 587 return size; 588 } finally { 589 lock.unlock(); 590 } 591 } 592 593 /** 594 * Always returns {@code Integer.MAX_VALUE} because 595 * a {@code PriorityBlockingQueue} is not capacity constrained. 596 * @return {@code Integer.MAX_VALUE} always 597 */ 598 public int remainingCapacity() { 599 return Integer.MAX_VALUE; 600 } 601 602 private int indexOf(Object o) { 603 if (o != null) { 604 Object[] array = queue; 605 int n = size; 606 for (int i = 0; i < n; i++) 607 if (o.equals(array[i])) 608 return i; 609 } 610 return -1; 611 } 612 613 /** 614 * Removes the ith element from queue. 615 */ 616 private void removeAt(int i) { 617 Object[] array = queue; 618 int n = size - 1; 619 if (n == i) // removed last element 620 array[i] = null; 621 else { 622 E moved = (E) array[n]; 623 array[n] = null; 624 Comparator<? super E> cmp = comparator; 625 if (cmp == null) 626 siftDownComparable(i, moved, array, n); 627 else 628 siftDownUsingComparator(i, moved, array, n, cmp); 629 if (array[i] == moved) { 630 if (cmp == null) 631 siftUpComparable(i, moved, array); 632 else 633 siftUpUsingComparator(i, moved, array, cmp); 634 } 635 } 636 size = n; 637 } 638 639 /** 640 * Removes a single instance of the specified element from this queue, 641 * if it is present. More formally, removes an element {@code e} such 642 * that {@code o.equals(e)}, if this queue contains one or more such 643 * elements. Returns {@code true} if and only if this queue contained 644 * the specified element (or equivalently, if this queue changed as a 645 * result of the call). 646 * 647 * @param o element to be removed from this queue, if present 648 * @return {@code true} if this queue changed as a result of the call 649 */ 650 public boolean remove(Object o) { 651 final ReentrantLock lock = this.lock; 652 lock.lock(); 653 try { 654 int i = indexOf(o); 655 if (i == -1) 656 return false; 657 removeAt(i); 658 return true; 659 } finally { 660 lock.unlock(); 661 } 662 } 663 664 /** 665 * Identity-based version for use in Itr.remove 666 */ 667 void removeEQ(Object o) { 668 final ReentrantLock lock = this.lock; 669 lock.lock(); 670 try { 671 Object[] array = queue; 672 for (int i = 0, n = size; i < n; i++) { 673 if (o == array[i]) { 674 removeAt(i); 675 break; 676 } 677 } 678 } finally { 679 lock.unlock(); 680 } 681 } 682 683 /** 684 * Returns {@code true} if this queue contains the specified element. 685 * More formally, returns {@code true} if and only if this queue contains 686 * at least one element {@code e} such that {@code o.equals(e)}. 687 * 688 * @param o object to be checked for containment in this queue 689 * @return {@code true} if this queue contains the specified element 690 */ 691 public boolean contains(Object o) { 692 final ReentrantLock lock = this.lock; 693 lock.lock(); 694 try { 695 return indexOf(o) != -1; 696 } finally { 697 lock.unlock(); 698 } 699 } 700 701 /** 702 * Returns an array containing all of the elements in this queue. 703 * The returned array elements are in no particular order. 704 * 705 * <p>The returned array will be "safe" in that no references to it are 706 * maintained by this queue. (In other words, this method must allocate 707 * a new array). The caller is thus free to modify the returned array. 708 * 709 * <p>This method acts as bridge between array-based and collection-based 710 * APIs. 711 * 712 * @return an array containing all of the elements in this queue 713 */ 714 public Object[] toArray() { 715 final ReentrantLock lock = this.lock; 716 lock.lock(); 717 try { 718 return Arrays.copyOf(queue, size); 719 } finally { 720 lock.unlock(); 721 } 722 } 723 724 public String toString() { 725 final ReentrantLock lock = this.lock; 726 lock.lock(); 727 try { 728 int n = size; 729 if (n == 0) 730 return "[]"; 731 StringBuilder sb = new StringBuilder(); 732 sb.append('['); 733 for (int i = 0; i < n; ++i) { 734 Object e = queue[i]; 735 sb.append(e == this ? "(this Collection)" : e); 736 if (i != n - 1) 737 sb.append(',').append(' '); 738 } 739 return sb.append(']').toString(); 740 } finally { 741 lock.unlock(); 742 } 743 } 744 745 /** 746 * @throws UnsupportedOperationException {@inheritDoc} 747 * @throws ClassCastException {@inheritDoc} 748 * @throws NullPointerException {@inheritDoc} 749 * @throws IllegalArgumentException {@inheritDoc} 750 */ 751 public int drainTo(Collection<? super E> c) { 752 return drainTo(c, Integer.MAX_VALUE); 753 } 754 755 /** 756 * @throws UnsupportedOperationException {@inheritDoc} 757 * @throws ClassCastException {@inheritDoc} 758 * @throws NullPointerException {@inheritDoc} 759 * @throws IllegalArgumentException {@inheritDoc} 760 */ 761 public int drainTo(Collection<? super E> c, int maxElements) { 762 if (c == null) 763 throw new NullPointerException(); 764 if (c == this) 765 throw new IllegalArgumentException(); 766 if (maxElements <= 0) 767 return 0; 768 final ReentrantLock lock = this.lock; 769 lock.lock(); 770 try { 771 int n = Math.min(size, maxElements); 772 for (int i = 0; i < n; i++) { 773 c.add((E) queue[0]); // In this order, in case add() throws. 774 dequeue(); 775 } 776 return n; 777 } finally { 778 lock.unlock(); 779 } 780 } 781 782 /** 783 * Atomically removes all of the elements from this queue. 784 * The queue will be empty after this call returns. 785 */ 786 public void clear() { 787 final ReentrantLock lock = this.lock; 788 lock.lock(); 789 try { 790 Object[] array = queue; 791 int n = size; 792 size = 0; 793 for (int i = 0; i < n; i++) 794 array[i] = null; 795 } finally { 796 lock.unlock(); 797 } 798 } 799 800 /** 801 * Returns an array containing all of the elements in this queue; the 802 * runtime type of the returned array is that of the specified array. 803 * The returned array elements are in no particular order. 804 * If the queue fits in the specified array, it is returned therein. 805 * Otherwise, a new array is allocated with the runtime type of the 806 * specified array and the size of this queue. 807 * 808 * <p>If this queue fits in the specified array with room to spare 809 * (i.e., the array has more elements than this queue), the element in 810 * the array immediately following the end of the queue is set to 811 * {@code null}. 812 * 813 * <p>Like the {@link #toArray()} method, this method acts as bridge between 814 * array-based and collection-based APIs. Further, this method allows 815 * precise control over the runtime type of the output array, and may, 816 * under certain circumstances, be used to save allocation costs. 817 * 818 * <p>Suppose {@code x} is a queue known to contain only strings. 819 * The following code can be used to dump the queue into a newly 820 * allocated array of {@code String}: 821 * 822 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 823 * 824 * Note that {@code toArray(new Object[0])} is identical in function to 825 * {@code toArray()}. 826 * 827 * @param a the array into which the elements of the queue are to 828 * be stored, if it is big enough; otherwise, a new array of the 829 * same runtime type is allocated for this purpose 830 * @return an array containing all of the elements in this queue 831 * @throws ArrayStoreException if the runtime type of the specified array 832 * is not a supertype of the runtime type of every element in 833 * this queue 834 * @throws NullPointerException if the specified array is null 835 */ 836 public <T> T[] toArray(T[] a) { 837 final ReentrantLock lock = this.lock; 838 lock.lock(); 839 try { 840 int n = size; 841 if (a.length < n) 842 // Make a new array of a's runtime type, but my contents: 843 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 844 System.arraycopy(queue, 0, a, 0, n); 845 if (a.length > n) 846 a[n] = null; 847 return a; 848 } finally { 849 lock.unlock(); 850 } 851 } 852 853 /** 854 * Returns an iterator over the elements in this queue. The 855 * iterator does not return the elements in any particular order. 856 * 857 * <p>The returned iterator is a "weakly consistent" iterator that 858 * will never throw {@link java.util.ConcurrentModificationException 859 * ConcurrentModificationException}, and guarantees to traverse 860 * elements as they existed upon construction of the iterator, and 861 * may (but is not guaranteed to) reflect any modifications 862 * subsequent to construction. 863 * 864 * @return an iterator over the elements in this queue 865 */ 866 public Iterator<E> iterator() { 867 return new Itr(toArray()); 868 } 869 870 /** 871 * Snapshot iterator that works off copy of underlying q array. 872 */ 873 final class Itr implements Iterator<E> { 874 final Object[] array; // Array of all elements 875 int cursor; // index of next element to return 876 int lastRet; // index of last element, or -1 if no such 877 878 Itr(Object[] array) { 879 lastRet = -1; 880 this.array = array; 881 } 882 883 public boolean hasNext() { 884 return cursor < array.length; 885 } 886 887 public E next() { 888 if (cursor >= array.length) 889 throw new NoSuchElementException(); 890 lastRet = cursor; 891 return (E)array[cursor++]; 892 } 893 894 public void remove() { 895 if (lastRet < 0) 896 throw new IllegalStateException(); 897 removeEQ(array[lastRet]); 898 lastRet = -1; 899 } 900 } 901 902 /** 903 * Saves this queue to a stream (that is, serializes it). 904 * 905 * For compatibility with previous version of this class, elements 906 * are first copied to a java.util.PriorityQueue, which is then 907 * serialized. 908 */ 909 private void writeObject(java.io.ObjectOutputStream s) 910 throws java.io.IOException { 911 lock.lock(); 912 try { 913 // avoid zero capacity argument 914 q = new PriorityQueue<E>(Math.max(size, 1), comparator); 915 q.addAll(this); 916 s.defaultWriteObject(); 917 } finally { 918 q = null; 919 lock.unlock(); 920 } 921 } 922 923 /** 924 * Reconstitutes this queue from a stream (that is, deserializes it). 925 */ 926 private void readObject(java.io.ObjectInputStream s) 927 throws java.io.IOException, ClassNotFoundException { 928 try { 929 s.defaultReadObject(); 930 this.queue = new Object[q.size()]; 931 comparator = q.comparator(); 932 addAll(q); 933 } finally { 934 q = null; 935 } 936 } 937 938 // Unsafe mechanics 939 private static final sun.misc.Unsafe UNSAFE; 940 private static final long allocationSpinLockOffset; 941 static { 942 try { 943 UNSAFE = sun.misc.Unsafe.getUnsafe(); 944 Class k = PriorityBlockingQueue.class; 945 allocationSpinLockOffset = UNSAFE.objectFieldOffset 946 (k.getDeclaredField("allocationSpinLock")); 947 } catch (Exception e) { 948 throw new Error(e); 949 } 950 } 951} 952