PriorityBlockingQueue.java revision a807b4d808d2591894daf13aab179b2e9c46a2f5
1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent; 8 9import java.util.concurrent.locks.*; 10import java.util.*; 11 12// BEGIN android-note 13// removed link to collections framework docs 14// END android-note 15 16/** 17 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 18 * the same ordering rules as class {@link PriorityQueue} and supplies 19 * blocking retrieval operations. While this queue is logically 20 * unbounded, attempted additions may fail due to resource exhaustion 21 * (causing {@code OutOfMemoryError}). This class does not permit 22 * {@code null} elements. A priority queue relying on {@linkplain 23 * Comparable natural ordering} also does not permit insertion of 24 * non-comparable objects (doing so results in 25 * {@code ClassCastException}). 26 * 27 * <p>This class and its iterator implement all of the 28 * <em>optional</em> methods of the {@link Collection} and {@link 29 * Iterator} interfaces. The Iterator provided in method {@link 30 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 31 * the PriorityBlockingQueue in any particular order. If you need 32 * ordered traversal, consider using 33 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} 34 * can be used to <em>remove</em> some or all elements in priority 35 * order and place them in another collection. 36 * 37 * <p>Operations on this class make no guarantees about the ordering 38 * of elements with equal priority. If you need to enforce an 39 * ordering, you can define custom classes or comparators that use a 40 * secondary key to break ties in primary priority values. For 41 * example, here is a class that applies first-in-first-out 42 * tie-breaking to comparable elements. To use it, you would insert a 43 * {@code new FIFOEntry(anEntry)} instead of a plain entry object. 44 * 45 * <pre> {@code 46 * class FIFOEntry<E extends Comparable<? super E>> 47 * implements Comparable<FIFOEntry<E>> { 48 * static final AtomicLong seq = new AtomicLong(0); 49 * final long seqNum; 50 * final E entry; 51 * public FIFOEntry(E entry) { 52 * seqNum = seq.getAndIncrement(); 53 * this.entry = entry; 54 * } 55 * public E getEntry() { return entry; } 56 * public int compareTo(FIFOEntry<E> other) { 57 * int res = entry.compareTo(other.entry); 58 * if (res == 0 && other.entry != this.entry) 59 * res = (seqNum < other.seqNum ? -1 : 1); 60 * return res; 61 * } 62 * }}</pre> 63 * 64 * @since 1.5 65 * @author Doug Lea 66 * @param <E> the type of elements held in this collection 67 */ 68public class PriorityBlockingQueue<E> extends AbstractQueue<E> 69 implements BlockingQueue<E>, java.io.Serializable { 70 private static final long serialVersionUID = 5595510919245408276L; 71 72 /* 73 * The implementation uses an array-based binary heap, with public 74 * operations protected with a single lock. However, allocation 75 * during resizing uses a simple spinlock (used only while not 76 * holding main lock) in order to allow takes to operate 77 * concurrently with allocation. This avoids repeated 78 * postponement of waiting consumers and consequent element 79 * build-up. The need to back away from lock during allocation 80 * makes it impossible to simply wrap delegated 81 * java.util.PriorityQueue operations within a lock, as was done 82 * in a previous version of this class. To maintain 83 * interoperability, a plain PriorityQueue is still used during 84 * serialization, which maintains compatibility at the expense of 85 * transiently doubling overhead. 86 */ 87 88 /** 89 * Default array capacity. 90 */ 91 private static final int DEFAULT_INITIAL_CAPACITY = 11; 92 93 /** 94 * The maximum size of array to allocate. 95 * Some VMs reserve some header words in an array. 96 * Attempts to allocate larger arrays may result in 97 * OutOfMemoryError: Requested array size exceeds VM limit 98 */ 99 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 100 101 /** 102 * Priority queue represented as a balanced binary heap: the two 103 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 104 * priority queue is ordered by comparator, or by the elements' 105 * natural ordering, if comparator is null: For each node n in the 106 * heap and each descendant d of n, n <= d. The element with the 107 * lowest value is in queue[0], assuming the queue is nonempty. 108 */ 109 private transient Object[] queue; 110 111 /** 112 * The number of elements in the priority queue. 113 */ 114 private transient int size; 115 116 /** 117 * The comparator, or null if priority queue uses elements' 118 * natural ordering. 119 */ 120 private transient Comparator<? super E> comparator; 121 122 /** 123 * Lock used for all public operations 124 */ 125 private final ReentrantLock lock; 126 127 /** 128 * Condition for blocking when empty 129 */ 130 private final Condition notEmpty; 131 132 /** 133 * Spinlock for allocation, acquired via CAS. 134 */ 135 private transient volatile int allocationSpinLock; 136 137 /** 138 * A plain PriorityQueue used only for serialization, 139 * to maintain compatibility with previous versions 140 * of this class. Non-null only during serialization/deserialization. 141 */ 142 private PriorityQueue<E> q; 143 144 /** 145 * Creates a {@code PriorityBlockingQueue} with the default 146 * initial capacity (11) that orders its elements according to 147 * their {@linkplain Comparable natural ordering}. 148 */ 149 public PriorityBlockingQueue() { 150 this(DEFAULT_INITIAL_CAPACITY, null); 151 } 152 153 /** 154 * Creates a {@code PriorityBlockingQueue} with the specified 155 * initial capacity that orders its elements according to their 156 * {@linkplain Comparable natural ordering}. 157 * 158 * @param initialCapacity the initial capacity for this priority queue 159 * @throws IllegalArgumentException if {@code initialCapacity} is less 160 * than 1 161 */ 162 public PriorityBlockingQueue(int initialCapacity) { 163 this(initialCapacity, null); 164 } 165 166 /** 167 * Creates a {@code PriorityBlockingQueue} with the specified initial 168 * capacity that orders its elements according to the specified 169 * comparator. 170 * 171 * @param initialCapacity the initial capacity for this priority queue 172 * @param comparator the comparator that will be used to order this 173 * priority queue. If {@code null}, the {@linkplain Comparable 174 * natural ordering} of the elements will be used. 175 * @throws IllegalArgumentException if {@code initialCapacity} is less 176 * than 1 177 */ 178 public PriorityBlockingQueue(int initialCapacity, 179 Comparator<? super E> comparator) { 180 if (initialCapacity < 1) 181 throw new IllegalArgumentException(); 182 this.lock = new ReentrantLock(); 183 this.notEmpty = lock.newCondition(); 184 this.comparator = comparator; 185 this.queue = new Object[initialCapacity]; 186 } 187 188 /** 189 * Creates a {@code PriorityBlockingQueue} containing the elements 190 * in the specified collection. If the specified collection is a 191 * {@link SortedSet} or a {@link PriorityQueue}, this 192 * priority queue will be ordered according to the same ordering. 193 * Otherwise, this priority queue will be ordered according to the 194 * {@linkplain Comparable natural ordering} of its elements. 195 * 196 * @param c the collection whose elements are to be placed 197 * into this priority queue 198 * @throws ClassCastException if elements of the specified collection 199 * cannot be compared to one another according to the priority 200 * queue's ordering 201 * @throws NullPointerException if the specified collection or any 202 * of its elements are null 203 */ 204 public PriorityBlockingQueue(Collection<? extends E> c) { 205 this.lock = new ReentrantLock(); 206 this.notEmpty = lock.newCondition(); 207 boolean heapify = true; // true if not known to be in heap order 208 boolean screen = true; // true if must screen for nulls 209 if (c instanceof SortedSet<?>) { 210 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 211 this.comparator = (Comparator<? super E>) ss.comparator(); 212 heapify = false; 213 } 214 else if (c instanceof PriorityBlockingQueue<?>) { 215 PriorityBlockingQueue<? extends E> pq = 216 (PriorityBlockingQueue<? extends E>) c; 217 this.comparator = (Comparator<? super E>) pq.comparator(); 218 screen = false; 219 if (pq.getClass() == PriorityBlockingQueue.class) // exact match 220 heapify = false; 221 } 222 Object[] a = c.toArray(); 223 int n = a.length; 224 // If c.toArray incorrectly doesn't return Object[], copy it. 225 if (a.getClass() != Object[].class) 226 a = Arrays.copyOf(a, n, Object[].class); 227 if (screen && (n == 1 || this.comparator != null)) { 228 for (int i = 0; i < n; ++i) 229 if (a[i] == null) 230 throw new NullPointerException(); 231 } 232 this.queue = a; 233 this.size = n; 234 if (heapify) 235 heapify(); 236 } 237 238 /** 239 * Tries to grow array to accommodate at least one more element 240 * (but normally expand by about 50%), giving up (allowing retry) 241 * on contention (which we expect to be rare). Call only while 242 * holding lock. 243 * 244 * @param array the heap array 245 * @param oldCap the length of the array 246 */ 247 private void tryGrow(Object[] array, int oldCap) { 248 lock.unlock(); // must release and then re-acquire main lock 249 Object[] newArray = null; 250 if (allocationSpinLock == 0 && 251 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 252 0, 1)) { 253 try { 254 int newCap = oldCap + ((oldCap < 64) ? 255 (oldCap + 2) : // grow faster if small 256 (oldCap >> 1)); 257 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow 258 int minCap = oldCap + 1; 259 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) 260 throw new OutOfMemoryError(); 261 newCap = MAX_ARRAY_SIZE; 262 } 263 if (newCap > oldCap && queue == array) 264 newArray = new Object[newCap]; 265 } finally { 266 allocationSpinLock = 0; 267 } 268 } 269 if (newArray == null) // back off if another thread is allocating 270 Thread.yield(); 271 lock.lock(); 272 if (newArray != null && queue == array) { 273 queue = newArray; 274 System.arraycopy(array, 0, newArray, 0, oldCap); 275 } 276 } 277 278 /** 279 * Mechanics for poll(). Call only while holding lock. 280 */ 281 private E dequeue() { 282 int n = size - 1; 283 if (n < 0) 284 return null; 285 else { 286 Object[] array = queue; 287 E result = (E) array[0]; 288 E x = (E) array[n]; 289 array[n] = null; 290 Comparator<? super E> cmp = comparator; 291 if (cmp == null) 292 siftDownComparable(0, x, array, n); 293 else 294 siftDownUsingComparator(0, x, array, n, cmp); 295 size = n; 296 return result; 297 } 298 } 299 300 /** 301 * Inserts item x at position k, maintaining heap invariant by 302 * promoting x up the tree until it is greater than or equal to 303 * its parent, or is the root. 304 * 305 * To simplify and speed up coercions and comparisons. the 306 * Comparable and Comparator versions are separated into different 307 * methods that are otherwise identical. (Similarly for siftDown.) 308 * These methods are static, with heap state as arguments, to 309 * simplify use in light of possible comparator exceptions. 310 * 311 * @param k the position to fill 312 * @param x the item to insert 313 * @param array the heap array 314 * @param n heap size 315 */ 316 private static <T> void siftUpComparable(int k, T x, Object[] array) { 317 Comparable<? super T> key = (Comparable<? super T>) x; 318 while (k > 0) { 319 int parent = (k - 1) >>> 1; 320 Object e = array[parent]; 321 if (key.compareTo((T) e) >= 0) 322 break; 323 array[k] = e; 324 k = parent; 325 } 326 array[k] = key; 327 } 328 329 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, 330 Comparator<? super T> cmp) { 331 while (k > 0) { 332 int parent = (k - 1) >>> 1; 333 Object e = array[parent]; 334 if (cmp.compare(x, (T) e) >= 0) 335 break; 336 array[k] = e; 337 k = parent; 338 } 339 array[k] = x; 340 } 341 342 /** 343 * Inserts item x at position k, maintaining heap invariant by 344 * demoting x down the tree repeatedly until it is less than or 345 * equal to its children or is a leaf. 346 * 347 * @param k the position to fill 348 * @param x the item to insert 349 * @param array the heap array 350 * @param n heap size 351 */ 352 private static <T> void siftDownComparable(int k, T x, Object[] array, 353 int n) { 354 Comparable<? super T> key = (Comparable<? super T>)x; 355 int half = n >>> 1; // loop while a non-leaf 356 while (k < half) { 357 int child = (k << 1) + 1; // assume left child is least 358 Object c = array[child]; 359 int right = child + 1; 360 if (right < n && 361 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 362 c = array[child = right]; 363 if (key.compareTo((T) c) <= 0) 364 break; 365 array[k] = c; 366 k = child; 367 } 368 array[k] = key; 369 } 370 371 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, 372 int n, 373 Comparator<? super T> cmp) { 374 int half = n >>> 1; 375 while (k < half) { 376 int child = (k << 1) + 1; 377 Object c = array[child]; 378 int right = child + 1; 379 if (right < n && cmp.compare((T) c, (T) array[right]) > 0) 380 c = array[child = right]; 381 if (cmp.compare(x, (T) c) <= 0) 382 break; 383 array[k] = c; 384 k = child; 385 } 386 array[k] = x; 387 } 388 389 /** 390 * Establishes the heap invariant (described above) in the entire tree, 391 * assuming nothing about the order of the elements prior to the call. 392 */ 393 private void heapify() { 394 Object[] array = queue; 395 int n = size; 396 int half = (n >>> 1) - 1; 397 Comparator<? super E> cmp = comparator; 398 if (cmp == null) { 399 for (int i = half; i >= 0; i--) 400 siftDownComparable(i, (E) array[i], array, n); 401 } 402 else { 403 for (int i = half; i >= 0; i--) 404 siftDownUsingComparator(i, (E) array[i], array, n, cmp); 405 } 406 } 407 408 /** 409 * Inserts the specified element into this priority queue. 410 * 411 * @param e the element to add 412 * @return {@code true} (as specified by {@link Collection#add}) 413 * @throws ClassCastException if the specified element cannot be compared 414 * with elements currently in the priority queue according to the 415 * priority queue's ordering 416 * @throws NullPointerException if the specified element is null 417 */ 418 public boolean add(E e) { 419 return offer(e); 420 } 421 422 /** 423 * Inserts the specified element into this priority queue. 424 * As the queue is unbounded, this method will never return {@code false}. 425 * 426 * @param e the element to add 427 * @return {@code true} (as specified by {@link Queue#offer}) 428 * @throws ClassCastException if the specified element cannot be compared 429 * with elements currently in the priority queue according to the 430 * priority queue's ordering 431 * @throws NullPointerException if the specified element is null 432 */ 433 public boolean offer(E e) { 434 if (e == null) 435 throw new NullPointerException(); 436 final ReentrantLock lock = this.lock; 437 lock.lock(); 438 int n, cap; 439 Object[] array; 440 while ((n = size) >= (cap = (array = queue).length)) 441 tryGrow(array, cap); 442 try { 443 Comparator<? super E> cmp = comparator; 444 if (cmp == null) 445 siftUpComparable(n, e, array); 446 else 447 siftUpUsingComparator(n, e, array, cmp); 448 size = n + 1; 449 notEmpty.signal(); 450 } finally { 451 lock.unlock(); 452 } 453 return true; 454 } 455 456 /** 457 * Inserts the specified element into this priority queue. 458 * As the queue is unbounded, this method will never block. 459 * 460 * @param e the element to add 461 * @throws ClassCastException if the specified element cannot be compared 462 * with elements currently in the priority queue according to the 463 * priority queue's ordering 464 * @throws NullPointerException if the specified element is null 465 */ 466 public void put(E e) { 467 offer(e); // never need to block 468 } 469 470 /** 471 * Inserts the specified element into this priority queue. 472 * As the queue is unbounded, this method will never block or 473 * return {@code false}. 474 * 475 * @param e the element to add 476 * @param timeout This parameter is ignored as the method never blocks 477 * @param unit This parameter is ignored as the method never blocks 478 * @return {@code true} (as specified by 479 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 480 * @throws ClassCastException if the specified element cannot be compared 481 * with elements currently in the priority queue according to the 482 * priority queue's ordering 483 * @throws NullPointerException if the specified element is null 484 */ 485 public boolean offer(E e, long timeout, TimeUnit unit) { 486 return offer(e); // never need to block 487 } 488 489 public E poll() { 490 final ReentrantLock lock = this.lock; 491 lock.lock(); 492 try { 493 return dequeue(); 494 } finally { 495 lock.unlock(); 496 } 497 } 498 499 public E take() throws InterruptedException { 500 final ReentrantLock lock = this.lock; 501 lock.lockInterruptibly(); 502 E result; 503 try { 504 while ( (result = dequeue()) == null) 505 notEmpty.await(); 506 } finally { 507 lock.unlock(); 508 } 509 return result; 510 } 511 512 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 513 long nanos = unit.toNanos(timeout); 514 final ReentrantLock lock = this.lock; 515 lock.lockInterruptibly(); 516 E result; 517 try { 518 while ( (result = dequeue()) == null && nanos > 0) 519 nanos = notEmpty.awaitNanos(nanos); 520 } finally { 521 lock.unlock(); 522 } 523 return result; 524 } 525 526 public E peek() { 527 final ReentrantLock lock = this.lock; 528 lock.lock(); 529 try { 530 return (size == 0) ? null : (E) queue[0]; 531 } finally { 532 lock.unlock(); 533 } 534 } 535 536 /** 537 * Returns the comparator used to order the elements in this queue, 538 * or {@code null} if this queue uses the {@linkplain Comparable 539 * natural ordering} of its elements. 540 * 541 * @return the comparator used to order the elements in this queue, 542 * or {@code null} if this queue uses the natural 543 * ordering of its elements 544 */ 545 public Comparator<? super E> comparator() { 546 return comparator; 547 } 548 549 public int size() { 550 final ReentrantLock lock = this.lock; 551 lock.lock(); 552 try { 553 return size; 554 } finally { 555 lock.unlock(); 556 } 557 } 558 559 /** 560 * Always returns {@code Integer.MAX_VALUE} because 561 * a {@code PriorityBlockingQueue} is not capacity constrained. 562 * @return {@code Integer.MAX_VALUE} always 563 */ 564 public int remainingCapacity() { 565 return Integer.MAX_VALUE; 566 } 567 568 private int indexOf(Object o) { 569 if (o != null) { 570 Object[] array = queue; 571 int n = size; 572 for (int i = 0; i < n; i++) 573 if (o.equals(array[i])) 574 return i; 575 } 576 return -1; 577 } 578 579 /** 580 * Removes the ith element from queue. 581 */ 582 private void removeAt(int i) { 583 Object[] array = queue; 584 int n = size - 1; 585 if (n == i) // removed last element 586 array[i] = null; 587 else { 588 E moved = (E) array[n]; 589 array[n] = null; 590 Comparator<? super E> cmp = comparator; 591 if (cmp == null) 592 siftDownComparable(i, moved, array, n); 593 else 594 siftDownUsingComparator(i, moved, array, n, cmp); 595 if (array[i] == moved) { 596 if (cmp == null) 597 siftUpComparable(i, moved, array); 598 else 599 siftUpUsingComparator(i, moved, array, cmp); 600 } 601 } 602 size = n; 603 } 604 605 /** 606 * Removes a single instance of the specified element from this queue, 607 * if it is present. More formally, removes an element {@code e} such 608 * that {@code o.equals(e)}, if this queue contains one or more such 609 * elements. Returns {@code true} if and only if this queue contained 610 * the specified element (or equivalently, if this queue changed as a 611 * result of the call). 612 * 613 * @param o element to be removed from this queue, if present 614 * @return {@code true} if this queue changed as a result of the call 615 */ 616 public boolean remove(Object o) { 617 final ReentrantLock lock = this.lock; 618 lock.lock(); 619 try { 620 int i = indexOf(o); 621 if (i == -1) 622 return false; 623 removeAt(i); 624 return true; 625 } finally { 626 lock.unlock(); 627 } 628 } 629 630 /** 631 * Identity-based version for use in Itr.remove 632 */ 633 void removeEQ(Object o) { 634 final ReentrantLock lock = this.lock; 635 lock.lock(); 636 try { 637 Object[] array = queue; 638 for (int i = 0, n = size; i < n; i++) { 639 if (o == array[i]) { 640 removeAt(i); 641 break; 642 } 643 } 644 } finally { 645 lock.unlock(); 646 } 647 } 648 649 /** 650 * Returns {@code true} if this queue contains the specified element. 651 * More formally, returns {@code true} if and only if this queue contains 652 * at least one element {@code e} such that {@code o.equals(e)}. 653 * 654 * @param o object to be checked for containment in this queue 655 * @return {@code true} if this queue contains the specified element 656 */ 657 public boolean contains(Object o) { 658 final ReentrantLock lock = this.lock; 659 lock.lock(); 660 try { 661 return indexOf(o) != -1; 662 } finally { 663 lock.unlock(); 664 } 665 } 666 667 /** 668 * Returns an array containing all of the elements in this queue. 669 * The returned array elements are in no particular order. 670 * 671 * <p>The returned array will be "safe" in that no references to it are 672 * maintained by this queue. (In other words, this method must allocate 673 * a new array). The caller is thus free to modify the returned array. 674 * 675 * <p>This method acts as bridge between array-based and collection-based 676 * APIs. 677 * 678 * @return an array containing all of the elements in this queue 679 */ 680 public Object[] toArray() { 681 final ReentrantLock lock = this.lock; 682 lock.lock(); 683 try { 684 return Arrays.copyOf(queue, size); 685 } finally { 686 lock.unlock(); 687 } 688 } 689 690 public String toString() { 691 final ReentrantLock lock = this.lock; 692 lock.lock(); 693 try { 694 int n = size; 695 if (n == 0) 696 return "[]"; 697 StringBuilder sb = new StringBuilder(); 698 sb.append('['); 699 for (int i = 0; i < n; ++i) { 700 Object e = queue[i]; 701 sb.append(e == this ? "(this Collection)" : e); 702 if (i != n - 1) 703 sb.append(',').append(' '); 704 } 705 return sb.append(']').toString(); 706 } finally { 707 lock.unlock(); 708 } 709 } 710 711 /** 712 * @throws UnsupportedOperationException {@inheritDoc} 713 * @throws ClassCastException {@inheritDoc} 714 * @throws NullPointerException {@inheritDoc} 715 * @throws IllegalArgumentException {@inheritDoc} 716 */ 717 public int drainTo(Collection<? super E> c) { 718 return drainTo(c, Integer.MAX_VALUE); 719 } 720 721 /** 722 * @throws UnsupportedOperationException {@inheritDoc} 723 * @throws ClassCastException {@inheritDoc} 724 * @throws NullPointerException {@inheritDoc} 725 * @throws IllegalArgumentException {@inheritDoc} 726 */ 727 public int drainTo(Collection<? super E> c, int maxElements) { 728 if (c == null) 729 throw new NullPointerException(); 730 if (c == this) 731 throw new IllegalArgumentException(); 732 if (maxElements <= 0) 733 return 0; 734 final ReentrantLock lock = this.lock; 735 lock.lock(); 736 try { 737 int n = Math.min(size, maxElements); 738 for (int i = 0; i < n; i++) { 739 c.add((E) queue[0]); // In this order, in case add() throws. 740 dequeue(); 741 } 742 return n; 743 } finally { 744 lock.unlock(); 745 } 746 } 747 748 /** 749 * Atomically removes all of the elements from this queue. 750 * The queue will be empty after this call returns. 751 */ 752 public void clear() { 753 final ReentrantLock lock = this.lock; 754 lock.lock(); 755 try { 756 Object[] array = queue; 757 int n = size; 758 size = 0; 759 for (int i = 0; i < n; i++) 760 array[i] = null; 761 } finally { 762 lock.unlock(); 763 } 764 } 765 766 /** 767 * Returns an array containing all of the elements in this queue; the 768 * runtime type of the returned array is that of the specified array. 769 * The returned array elements are in no particular order. 770 * If the queue fits in the specified array, it is returned therein. 771 * Otherwise, a new array is allocated with the runtime type of the 772 * specified array and the size of this queue. 773 * 774 * <p>If this queue fits in the specified array with room to spare 775 * (i.e., the array has more elements than this queue), the element in 776 * the array immediately following the end of the queue is set to 777 * {@code null}. 778 * 779 * <p>Like the {@link #toArray()} method, this method acts as bridge between 780 * array-based and collection-based APIs. Further, this method allows 781 * precise control over the runtime type of the output array, and may, 782 * under certain circumstances, be used to save allocation costs. 783 * 784 * <p>Suppose {@code x} is a queue known to contain only strings. 785 * The following code can be used to dump the queue into a newly 786 * allocated array of {@code String}: 787 * 788 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 789 * 790 * Note that {@code toArray(new Object[0])} is identical in function to 791 * {@code toArray()}. 792 * 793 * @param a the array into which the elements of the queue are to 794 * be stored, if it is big enough; otherwise, a new array of the 795 * same runtime type is allocated for this purpose 796 * @return an array containing all of the elements in this queue 797 * @throws ArrayStoreException if the runtime type of the specified array 798 * is not a supertype of the runtime type of every element in 799 * this queue 800 * @throws NullPointerException if the specified array is null 801 */ 802 public <T> T[] toArray(T[] a) { 803 final ReentrantLock lock = this.lock; 804 lock.lock(); 805 try { 806 int n = size; 807 if (a.length < n) 808 // Make a new array of a's runtime type, but my contents: 809 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 810 System.arraycopy(queue, 0, a, 0, n); 811 if (a.length > n) 812 a[n] = null; 813 return a; 814 } finally { 815 lock.unlock(); 816 } 817 } 818 819 /** 820 * Returns an iterator over the elements in this queue. The 821 * iterator does not return the elements in any particular order. 822 * 823 * <p>The returned iterator is a "weakly consistent" iterator that 824 * will never throw {@link java.util.ConcurrentModificationException 825 * ConcurrentModificationException}, and guarantees to traverse 826 * elements as they existed upon construction of the iterator, and 827 * may (but is not guaranteed to) reflect any modifications 828 * subsequent to construction. 829 * 830 * @return an iterator over the elements in this queue 831 */ 832 public Iterator<E> iterator() { 833 return new Itr(toArray()); 834 } 835 836 /** 837 * Snapshot iterator that works off copy of underlying q array. 838 */ 839 final class Itr implements Iterator<E> { 840 final Object[] array; // Array of all elements 841 int cursor; // index of next element to return 842 int lastRet; // index of last element, or -1 if no such 843 844 Itr(Object[] array) { 845 lastRet = -1; 846 this.array = array; 847 } 848 849 public boolean hasNext() { 850 return cursor < array.length; 851 } 852 853 public E next() { 854 if (cursor >= array.length) 855 throw new NoSuchElementException(); 856 lastRet = cursor; 857 return (E)array[cursor++]; 858 } 859 860 public void remove() { 861 if (lastRet < 0) 862 throw new IllegalStateException(); 863 removeEQ(array[lastRet]); 864 lastRet = -1; 865 } 866 } 867 868 /** 869 * Saves the state to a stream (that is, serializes it). For 870 * compatibility with previous version of this class, 871 * elements are first copied to a java.util.PriorityQueue, 872 * which is then serialized. 873 */ 874 private void writeObject(java.io.ObjectOutputStream s) 875 throws java.io.IOException { 876 lock.lock(); 877 try { 878 // avoid zero capacity argument 879 q = new PriorityQueue<E>(Math.max(size, 1), comparator); 880 q.addAll(this); 881 s.defaultWriteObject(); 882 } finally { 883 q = null; 884 lock.unlock(); 885 } 886 } 887 888 /** 889 * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream 890 * (that is, deserializes it). 891 * 892 * @param s the stream 893 */ 894 private void readObject(java.io.ObjectInputStream s) 895 throws java.io.IOException, ClassNotFoundException { 896 try { 897 s.defaultReadObject(); 898 this.queue = new Object[q.size()]; 899 comparator = q.comparator(); 900 addAll(q); 901 } finally { 902 q = null; 903 } 904 } 905 906 // Unsafe mechanics 907 private static final sun.misc.Unsafe UNSAFE; 908 private static final long allocationSpinLockOffset; 909 static { 910 try { 911 UNSAFE = sun.misc.Unsafe.getUnsafe(); 912 Class<?> k = PriorityBlockingQueue.class; 913 allocationSpinLockOffset = UNSAFE.objectFieldOffset 914 (k.getDeclaredField("allocationSpinLock")); 915 } catch (Exception e) { 916 throw new Error(e); 917 } 918 } 919} 920