1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent; 8 9import java.util.concurrent.locks.Condition; 10import java.util.concurrent.locks.ReentrantLock; 11import java.util.*; 12 13// BEGIN android-note 14// removed link to collections framework docs 15// END android-note 16 17/** 18 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 19 * the same ordering rules as class {@link PriorityQueue} and supplies 20 * blocking retrieval operations. While this queue is logically 21 * unbounded, attempted additions may fail due to resource exhaustion 22 * (causing {@code OutOfMemoryError}). This class does not permit 23 * {@code null} elements. A priority queue relying on {@linkplain 24 * Comparable natural ordering} also does not permit insertion of 25 * non-comparable objects (doing so results in 26 * {@code ClassCastException}). 27 * 28 * <p>This class and its iterator implement all of the 29 * <em>optional</em> methods of the {@link Collection} and {@link 30 * Iterator} interfaces. The Iterator provided in method {@link 31 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 32 * the PriorityBlockingQueue in any particular order. If you need 33 * ordered traversal, consider using 34 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} 35 * can be used to <em>remove</em> some or all elements in priority 36 * order and place them in another collection. 37 * 38 * <p>Operations on this class make no guarantees about the ordering 39 * of elements with equal priority. If you need to enforce an 40 * ordering, you can define custom classes or comparators that use a 41 * secondary key to break ties in primary priority values. For 42 * example, here is a class that applies first-in-first-out 43 * tie-breaking to comparable elements. To use it, you would insert a 44 * {@code new FIFOEntry(anEntry)} instead of a plain entry object. 45 * 46 * <pre> {@code 47 * class FIFOEntry<E extends Comparable<? super E>> 48 * implements Comparable<FIFOEntry<E>> { 49 * static final AtomicLong seq = new AtomicLong(0); 50 * final long seqNum; 51 * final E entry; 52 * public FIFOEntry(E entry) { 53 * seqNum = seq.getAndIncrement(); 54 * this.entry = entry; 55 * } 56 * public E getEntry() { return entry; } 57 * public int compareTo(FIFOEntry<E> other) { 58 * int res = entry.compareTo(other.entry); 59 * if (res == 0 && other.entry != this.entry) 60 * res = (seqNum < other.seqNum ? -1 : 1); 61 * return res; 62 * } 63 * }}</pre> 64 * 65 * @since 1.5 66 * @author Doug Lea 67 * @param <E> the type of elements held in this collection 68 */ 69@SuppressWarnings("unchecked") 70public class PriorityBlockingQueue<E> extends AbstractQueue<E> 71 implements BlockingQueue<E>, java.io.Serializable { 72 private static final long serialVersionUID = 5595510919245408276L; 73 74 /* 75 * The implementation uses an array-based binary heap, with public 76 * operations protected with a single lock. However, allocation 77 * during resizing uses a simple spinlock (used only while not 78 * holding main lock) in order to allow takes to operate 79 * concurrently with allocation. This avoids repeated 80 * postponement of waiting consumers and consequent element 81 * build-up. The need to back away from lock during allocation 82 * makes it impossible to simply wrap delegated 83 * java.util.PriorityQueue operations within a lock, as was done 84 * in a previous version of this class. To maintain 85 * interoperability, a plain PriorityQueue is still used during 86 * serialization, which maintains compatibility at the expense of 87 * transiently doubling overhead. 88 */ 89 90 /** 91 * Default array capacity. 92 */ 93 private static final int DEFAULT_INITIAL_CAPACITY = 11; 94 95 /** 96 * The maximum size of array to allocate. 97 * Some VMs reserve some header words in an array. 98 * Attempts to allocate larger arrays may result in 99 * OutOfMemoryError: Requested array size exceeds VM limit 100 */ 101 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 102 103 /** 104 * Priority queue represented as a balanced binary heap: the two 105 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 106 * priority queue is ordered by comparator, or by the elements' 107 * natural ordering, if comparator is null: For each node n in the 108 * heap and each descendant d of n, n <= d. The element with the 109 * lowest value is in queue[0], assuming the queue is nonempty. 110 */ 111 private transient Object[] queue; 112 113 /** 114 * The number of elements in the priority queue. 115 */ 116 private transient int size; 117 118 /** 119 * The comparator, or null if priority queue uses elements' 120 * natural ordering. 121 */ 122 private transient Comparator<? super E> comparator; 123 124 /** 125 * Lock used for all public operations 126 */ 127 private final ReentrantLock lock; 128 129 /** 130 * Condition for blocking when empty 131 */ 132 private final Condition notEmpty; 133 134 /** 135 * Spinlock for allocation, acquired via CAS. 136 */ 137 private transient volatile int allocationSpinLock; 138 139 /** 140 * A plain PriorityQueue used only for serialization, 141 * to maintain compatibility with previous versions 142 * of this class. Non-null only during serialization/deserialization. 143 */ 144 private PriorityQueue<E> q; 145 146 /** 147 * Creates a {@code PriorityBlockingQueue} with the default 148 * initial capacity (11) that orders its elements according to 149 * their {@linkplain Comparable natural ordering}. 150 */ 151 public PriorityBlockingQueue() { 152 this(DEFAULT_INITIAL_CAPACITY, null); 153 } 154 155 /** 156 * Creates a {@code PriorityBlockingQueue} with the specified 157 * initial capacity that orders its elements according to their 158 * {@linkplain Comparable natural ordering}. 159 * 160 * @param initialCapacity the initial capacity for this priority queue 161 * @throws IllegalArgumentException if {@code initialCapacity} is less 162 * than 1 163 */ 164 public PriorityBlockingQueue(int initialCapacity) { 165 this(initialCapacity, null); 166 } 167 168 /** 169 * Creates a {@code PriorityBlockingQueue} with the specified initial 170 * capacity that orders its elements according to the specified 171 * comparator. 172 * 173 * @param initialCapacity the initial capacity for this priority queue 174 * @param comparator the comparator that will be used to order this 175 * priority queue. If {@code null}, the {@linkplain Comparable 176 * natural ordering} of the elements will be used. 177 * @throws IllegalArgumentException if {@code initialCapacity} is less 178 * than 1 179 */ 180 public PriorityBlockingQueue(int initialCapacity, 181 Comparator<? super E> comparator) { 182 if (initialCapacity < 1) 183 throw new IllegalArgumentException(); 184 this.lock = new ReentrantLock(); 185 this.notEmpty = lock.newCondition(); 186 this.comparator = comparator; 187 this.queue = new Object[initialCapacity]; 188 } 189 190 /** 191 * Creates a {@code PriorityBlockingQueue} containing the elements 192 * in the specified collection. If the specified collection is a 193 * {@link SortedSet} or a {@link PriorityQueue}, this 194 * priority queue will be ordered according to the same ordering. 195 * Otherwise, this priority queue will be ordered according to the 196 * {@linkplain Comparable natural ordering} of its elements. 197 * 198 * @param c the collection whose elements are to be placed 199 * into this priority queue 200 * @throws ClassCastException if elements of the specified collection 201 * cannot be compared to one another according to the priority 202 * queue's ordering 203 * @throws NullPointerException if the specified collection or any 204 * of its elements are null 205 */ 206 public PriorityBlockingQueue(Collection<? extends E> c) { 207 this.lock = new ReentrantLock(); 208 this.notEmpty = lock.newCondition(); 209 boolean heapify = true; // true if not known to be in heap order 210 boolean screen = true; // true if must screen for nulls 211 if (c instanceof SortedSet<?>) { 212 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 213 this.comparator = (Comparator<? super E>) ss.comparator(); 214 heapify = false; 215 } 216 else if (c instanceof PriorityBlockingQueue<?>) { 217 PriorityBlockingQueue<? extends E> pq = 218 (PriorityBlockingQueue<? extends E>) c; 219 this.comparator = (Comparator<? super E>) pq.comparator(); 220 screen = false; 221 if (pq.getClass() == PriorityBlockingQueue.class) // exact match 222 heapify = false; 223 } 224 Object[] a = c.toArray(); 225 int n = a.length; 226 // If c.toArray incorrectly doesn't return Object[], copy it. 227 if (a.getClass() != Object[].class) 228 a = Arrays.copyOf(a, n, Object[].class); 229 if (screen && (n == 1 || this.comparator != null)) { 230 for (int i = 0; i < n; ++i) 231 if (a[i] == null) 232 throw new NullPointerException(); 233 } 234 this.queue = a; 235 this.size = n; 236 if (heapify) 237 heapify(); 238 } 239 240 /** 241 * Tries to grow array to accommodate at least one more element 242 * (but normally expand by about 50%), giving up (allowing retry) 243 * on contention (which we expect to be rare). Call only while 244 * holding lock. 245 * 246 * @param array the heap array 247 * @param oldCap the length of the array 248 */ 249 private void tryGrow(Object[] array, int oldCap) { 250 lock.unlock(); // must release and then re-acquire main lock 251 Object[] newArray = null; 252 if (allocationSpinLock == 0 && 253 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 254 0, 1)) { 255 try { 256 int newCap = oldCap + ((oldCap < 64) ? 257 (oldCap + 2) : // grow faster if small 258 (oldCap >> 1)); 259 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow 260 int minCap = oldCap + 1; 261 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) 262 throw new OutOfMemoryError(); 263 newCap = MAX_ARRAY_SIZE; 264 } 265 if (newCap > oldCap && queue == array) 266 newArray = new Object[newCap]; 267 } finally { 268 allocationSpinLock = 0; 269 } 270 } 271 if (newArray == null) // back off if another thread is allocating 272 Thread.yield(); 273 lock.lock(); 274 if (newArray != null && queue == array) { 275 queue = newArray; 276 System.arraycopy(array, 0, newArray, 0, oldCap); 277 } 278 } 279 280 /** 281 * Mechanics for poll(). Call only while holding lock. 282 */ 283 private E dequeue() { 284 int n = size - 1; 285 if (n < 0) 286 return null; 287 else { 288 Object[] array = queue; 289 E result = (E) array[0]; 290 E x = (E) array[n]; 291 array[n] = null; 292 Comparator<? super E> cmp = comparator; 293 if (cmp == null) 294 siftDownComparable(0, x, array, n); 295 else 296 siftDownUsingComparator(0, x, array, n, cmp); 297 size = n; 298 return result; 299 } 300 } 301 302 /** 303 * Inserts item x at position k, maintaining heap invariant by 304 * promoting x up the tree until it is greater than or equal to 305 * its parent, or is the root. 306 * 307 * To simplify and speed up coercions and comparisons. the 308 * Comparable and Comparator versions are separated into different 309 * methods that are otherwise identical. (Similarly for siftDown.) 310 * These methods are static, with heap state as arguments, to 311 * simplify use in light of possible comparator exceptions. 312 * 313 * @param k the position to fill 314 * @param x the item to insert 315 * @param array the heap array 316 */ 317 private static <T> void siftUpComparable(int k, T x, Object[] array) { 318 Comparable<? super T> key = (Comparable<? super T>) x; 319 while (k > 0) { 320 int parent = (k - 1) >>> 1; 321 Object e = array[parent]; 322 if (key.compareTo((T) e) >= 0) 323 break; 324 array[k] = e; 325 k = parent; 326 } 327 array[k] = key; 328 } 329 330 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, 331 Comparator<? super T> cmp) { 332 while (k > 0) { 333 int parent = (k - 1) >>> 1; 334 Object e = array[parent]; 335 if (cmp.compare(x, (T) e) >= 0) 336 break; 337 array[k] = e; 338 k = parent; 339 } 340 array[k] = x; 341 } 342 343 /** 344 * Inserts item x at position k, maintaining heap invariant by 345 * demoting x down the tree repeatedly until it is less than or 346 * equal to its children or is a leaf. 347 * 348 * @param k the position to fill 349 * @param x the item to insert 350 * @param array the heap array 351 * @param n heap size 352 */ 353 private static <T> void siftDownComparable(int k, T x, Object[] array, 354 int n) { 355 if (n > 0) { 356 Comparable<? super T> key = (Comparable<? super T>)x; 357 int half = n >>> 1; // loop while a non-leaf 358 while (k < half) { 359 int child = (k << 1) + 1; // assume left child is least 360 Object c = array[child]; 361 int right = child + 1; 362 if (right < n && 363 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 364 c = array[child = right]; 365 if (key.compareTo((T) c) <= 0) 366 break; 367 array[k] = c; 368 k = child; 369 } 370 array[k] = key; 371 } 372 } 373 374 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, 375 int n, 376 Comparator<? super T> cmp) { 377 if (n > 0) { 378 int half = n >>> 1; 379 while (k < half) { 380 int child = (k << 1) + 1; 381 Object c = array[child]; 382 int right = child + 1; 383 if (right < n && cmp.compare((T) c, (T) array[right]) > 0) 384 c = array[child = right]; 385 if (cmp.compare(x, (T) c) <= 0) 386 break; 387 array[k] = c; 388 k = child; 389 } 390 array[k] = x; 391 } 392 } 393 394 /** 395 * Establishes the heap invariant (described above) in the entire tree, 396 * assuming nothing about the order of the elements prior to the call. 397 */ 398 private void heapify() { 399 Object[] array = queue; 400 int n = size; 401 int half = (n >>> 1) - 1; 402 Comparator<? super E> cmp = comparator; 403 if (cmp == null) { 404 for (int i = half; i >= 0; i--) 405 siftDownComparable(i, (E) array[i], array, n); 406 } 407 else { 408 for (int i = half; i >= 0; i--) 409 siftDownUsingComparator(i, (E) array[i], array, n, cmp); 410 } 411 } 412 413 /** 414 * Inserts the specified element into this priority queue. 415 * 416 * @param e the element to add 417 * @return {@code true} (as specified by {@link Collection#add}) 418 * @throws ClassCastException if the specified element cannot be compared 419 * with elements currently in the priority queue according to the 420 * priority queue's ordering 421 * @throws NullPointerException if the specified element is null 422 */ 423 public boolean add(E e) { 424 return offer(e); 425 } 426 427 /** 428 * Inserts the specified element into this priority queue. 429 * As the queue is unbounded, this method will never return {@code false}. 430 * 431 * @param e the element to add 432 * @return {@code true} (as specified by {@link Queue#offer}) 433 * @throws ClassCastException if the specified element cannot be compared 434 * with elements currently in the priority queue according to the 435 * priority queue's ordering 436 * @throws NullPointerException if the specified element is null 437 */ 438 public boolean offer(E e) { 439 if (e == null) 440 throw new NullPointerException(); 441 final ReentrantLock lock = this.lock; 442 lock.lock(); 443 int n, cap; 444 Object[] array; 445 while ((n = size) >= (cap = (array = queue).length)) 446 tryGrow(array, cap); 447 try { 448 Comparator<? super E> cmp = comparator; 449 if (cmp == null) 450 siftUpComparable(n, e, array); 451 else 452 siftUpUsingComparator(n, e, array, cmp); 453 size = n + 1; 454 notEmpty.signal(); 455 } finally { 456 lock.unlock(); 457 } 458 return true; 459 } 460 461 /** 462 * Inserts the specified element into this priority queue. 463 * As the queue is unbounded, this method will never block. 464 * 465 * @param e the element to add 466 * @throws ClassCastException if the specified element cannot be compared 467 * with elements currently in the priority queue according to the 468 * priority queue's ordering 469 * @throws NullPointerException if the specified element is null 470 */ 471 public void put(E e) { 472 offer(e); // never need to block 473 } 474 475 /** 476 * Inserts the specified element into this priority queue. 477 * As the queue is unbounded, this method will never block or 478 * return {@code false}. 479 * 480 * @param e the element to add 481 * @param timeout This parameter is ignored as the method never blocks 482 * @param unit This parameter is ignored as the method never blocks 483 * @return {@code true} (as specified by 484 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 485 * @throws ClassCastException if the specified element cannot be compared 486 * with elements currently in the priority queue according to the 487 * priority queue's ordering 488 * @throws NullPointerException if the specified element is null 489 */ 490 public boolean offer(E e, long timeout, TimeUnit unit) { 491 return offer(e); // never need to block 492 } 493 494 public E poll() { 495 final ReentrantLock lock = this.lock; 496 lock.lock(); 497 try { 498 return dequeue(); 499 } finally { 500 lock.unlock(); 501 } 502 } 503 504 public E take() throws InterruptedException { 505 final ReentrantLock lock = this.lock; 506 lock.lockInterruptibly(); 507 E result; 508 try { 509 while ( (result = dequeue()) == null) 510 notEmpty.await(); 511 } finally { 512 lock.unlock(); 513 } 514 return result; 515 } 516 517 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 518 long nanos = unit.toNanos(timeout); 519 final ReentrantLock lock = this.lock; 520 lock.lockInterruptibly(); 521 E result; 522 try { 523 while ( (result = dequeue()) == null && nanos > 0) 524 nanos = notEmpty.awaitNanos(nanos); 525 } finally { 526 lock.unlock(); 527 } 528 return result; 529 } 530 531 public E peek() { 532 final ReentrantLock lock = this.lock; 533 lock.lock(); 534 try { 535 return (size == 0) ? null : (E) queue[0]; 536 } finally { 537 lock.unlock(); 538 } 539 } 540 541 /** 542 * Returns the comparator used to order the elements in this queue, 543 * or {@code null} if this queue uses the {@linkplain Comparable 544 * natural ordering} of its elements. 545 * 546 * @return the comparator used to order the elements in this queue, 547 * or {@code null} if this queue uses the natural 548 * ordering of its elements 549 */ 550 public Comparator<? super E> comparator() { 551 return comparator; 552 } 553 554 public int size() { 555 final ReentrantLock lock = this.lock; 556 lock.lock(); 557 try { 558 return size; 559 } finally { 560 lock.unlock(); 561 } 562 } 563 564 /** 565 * Always returns {@code Integer.MAX_VALUE} because 566 * a {@code PriorityBlockingQueue} is not capacity constrained. 567 * @return {@code Integer.MAX_VALUE} always 568 */ 569 public int remainingCapacity() { 570 return Integer.MAX_VALUE; 571 } 572 573 private int indexOf(Object o) { 574 if (o != null) { 575 Object[] array = queue; 576 int n = size; 577 for (int i = 0; i < n; i++) 578 if (o.equals(array[i])) 579 return i; 580 } 581 return -1; 582 } 583 584 /** 585 * Removes the ith element from queue. 586 */ 587 private void removeAt(int i) { 588 Object[] array = queue; 589 int n = size - 1; 590 if (n == i) // removed last element 591 array[i] = null; 592 else { 593 E moved = (E) array[n]; 594 array[n] = null; 595 Comparator<? super E> cmp = comparator; 596 if (cmp == null) 597 siftDownComparable(i, moved, array, n); 598 else 599 siftDownUsingComparator(i, moved, array, n, cmp); 600 if (array[i] == moved) { 601 if (cmp == null) 602 siftUpComparable(i, moved, array); 603 else 604 siftUpUsingComparator(i, moved, array, cmp); 605 } 606 } 607 size = n; 608 } 609 610 /** 611 * Removes a single instance of the specified element from this queue, 612 * if it is present. More formally, removes an element {@code e} such 613 * that {@code o.equals(e)}, if this queue contains one or more such 614 * elements. Returns {@code true} if and only if this queue contained 615 * the specified element (or equivalently, if this queue changed as a 616 * result of the call). 617 * 618 * @param o element to be removed from this queue, if present 619 * @return {@code true} if this queue changed as a result of the call 620 */ 621 public boolean remove(Object o) { 622 final ReentrantLock lock = this.lock; 623 lock.lock(); 624 try { 625 int i = indexOf(o); 626 if (i == -1) 627 return false; 628 removeAt(i); 629 return true; 630 } finally { 631 lock.unlock(); 632 } 633 } 634 635 /** 636 * Identity-based version for use in Itr.remove 637 */ 638 void removeEQ(Object o) { 639 final ReentrantLock lock = this.lock; 640 lock.lock(); 641 try { 642 Object[] array = queue; 643 for (int i = 0, n = size; i < n; i++) { 644 if (o == array[i]) { 645 removeAt(i); 646 break; 647 } 648 } 649 } finally { 650 lock.unlock(); 651 } 652 } 653 654 /** 655 * Returns {@code true} if this queue contains the specified element. 656 * More formally, returns {@code true} if and only if this queue contains 657 * at least one element {@code e} such that {@code o.equals(e)}. 658 * 659 * @param o object to be checked for containment in this queue 660 * @return {@code true} if this queue contains the specified element 661 */ 662 public boolean contains(Object o) { 663 final ReentrantLock lock = this.lock; 664 lock.lock(); 665 try { 666 return indexOf(o) != -1; 667 } finally { 668 lock.unlock(); 669 } 670 } 671 672 /** 673 * Returns an array containing all of the elements in this queue. 674 * The returned array elements are in no particular order. 675 * 676 * <p>The returned array will be "safe" in that no references to it are 677 * maintained by this queue. (In other words, this method must allocate 678 * a new array). The caller is thus free to modify the returned array. 679 * 680 * <p>This method acts as bridge between array-based and collection-based 681 * APIs. 682 * 683 * @return an array containing all of the elements in this queue 684 */ 685 public Object[] toArray() { 686 final ReentrantLock lock = this.lock; 687 lock.lock(); 688 try { 689 return Arrays.copyOf(queue, size); 690 } finally { 691 lock.unlock(); 692 } 693 } 694 695 public String toString() { 696 final ReentrantLock lock = this.lock; 697 lock.lock(); 698 try { 699 int n = size; 700 if (n == 0) 701 return "[]"; 702 StringBuilder sb = new StringBuilder(); 703 sb.append('['); 704 for (int i = 0; i < n; ++i) { 705 Object e = queue[i]; 706 sb.append(e == this ? "(this Collection)" : e); 707 if (i != n - 1) 708 sb.append(',').append(' '); 709 } 710 return sb.append(']').toString(); 711 } finally { 712 lock.unlock(); 713 } 714 } 715 716 /** 717 * @throws UnsupportedOperationException {@inheritDoc} 718 * @throws ClassCastException {@inheritDoc} 719 * @throws NullPointerException {@inheritDoc} 720 * @throws IllegalArgumentException {@inheritDoc} 721 */ 722 public int drainTo(Collection<? super E> c) { 723 return drainTo(c, Integer.MAX_VALUE); 724 } 725 726 /** 727 * @throws UnsupportedOperationException {@inheritDoc} 728 * @throws ClassCastException {@inheritDoc} 729 * @throws NullPointerException {@inheritDoc} 730 * @throws IllegalArgumentException {@inheritDoc} 731 */ 732 public int drainTo(Collection<? super E> c, int maxElements) { 733 if (c == null) 734 throw new NullPointerException(); 735 if (c == this) 736 throw new IllegalArgumentException(); 737 if (maxElements <= 0) 738 return 0; 739 final ReentrantLock lock = this.lock; 740 lock.lock(); 741 try { 742 int n = Math.min(size, maxElements); 743 for (int i = 0; i < n; i++) { 744 c.add((E) queue[0]); // In this order, in case add() throws. 745 dequeue(); 746 } 747 return n; 748 } finally { 749 lock.unlock(); 750 } 751 } 752 753 /** 754 * Atomically removes all of the elements from this queue. 755 * The queue will be empty after this call returns. 756 */ 757 public void clear() { 758 final ReentrantLock lock = this.lock; 759 lock.lock(); 760 try { 761 Object[] array = queue; 762 int n = size; 763 size = 0; 764 for (int i = 0; i < n; i++) 765 array[i] = null; 766 } finally { 767 lock.unlock(); 768 } 769 } 770 771 /** 772 * Returns an array containing all of the elements in this queue; the 773 * runtime type of the returned array is that of the specified array. 774 * The returned array elements are in no particular order. 775 * If the queue fits in the specified array, it is returned therein. 776 * Otherwise, a new array is allocated with the runtime type of the 777 * specified array and the size of this queue. 778 * 779 * <p>If this queue fits in the specified array with room to spare 780 * (i.e., the array has more elements than this queue), the element in 781 * the array immediately following the end of the queue is set to 782 * {@code null}. 783 * 784 * <p>Like the {@link #toArray()} method, this method acts as bridge between 785 * array-based and collection-based APIs. Further, this method allows 786 * precise control over the runtime type of the output array, and may, 787 * under certain circumstances, be used to save allocation costs. 788 * 789 * <p>Suppose {@code x} is a queue known to contain only strings. 790 * The following code can be used to dump the queue into a newly 791 * allocated array of {@code String}: 792 * 793 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 794 * 795 * Note that {@code toArray(new Object[0])} is identical in function to 796 * {@code toArray()}. 797 * 798 * @param a the array into which the elements of the queue are to 799 * be stored, if it is big enough; otherwise, a new array of the 800 * same runtime type is allocated for this purpose 801 * @return an array containing all of the elements in this queue 802 * @throws ArrayStoreException if the runtime type of the specified array 803 * is not a supertype of the runtime type of every element in 804 * this queue 805 * @throws NullPointerException if the specified array is null 806 */ 807 public <T> T[] toArray(T[] a) { 808 final ReentrantLock lock = this.lock; 809 lock.lock(); 810 try { 811 int n = size; 812 if (a.length < n) 813 // Make a new array of a's runtime type, but my contents: 814 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 815 System.arraycopy(queue, 0, a, 0, n); 816 if (a.length > n) 817 a[n] = null; 818 return a; 819 } finally { 820 lock.unlock(); 821 } 822 } 823 824 /** 825 * Returns an iterator over the elements in this queue. The 826 * iterator does not return the elements in any particular order. 827 * 828 * <p>The returned iterator is a "weakly consistent" iterator that 829 * will never throw {@link java.util.ConcurrentModificationException 830 * ConcurrentModificationException}, and guarantees to traverse 831 * elements as they existed upon construction of the iterator, and 832 * may (but is not guaranteed to) reflect any modifications 833 * subsequent to construction. 834 * 835 * @return an iterator over the elements in this queue 836 */ 837 public Iterator<E> iterator() { 838 return new Itr(toArray()); 839 } 840 841 /** 842 * Snapshot iterator that works off copy of underlying q array. 843 */ 844 final class Itr implements Iterator<E> { 845 final Object[] array; // Array of all elements 846 int cursor; // index of next element to return 847 int lastRet; // index of last element, or -1 if no such 848 849 Itr(Object[] array) { 850 lastRet = -1; 851 this.array = array; 852 } 853 854 public boolean hasNext() { 855 return cursor < array.length; 856 } 857 858 public E next() { 859 if (cursor >= array.length) 860 throw new NoSuchElementException(); 861 lastRet = cursor; 862 return (E)array[cursor++]; 863 } 864 865 public void remove() { 866 if (lastRet < 0) 867 throw new IllegalStateException(); 868 removeEQ(array[lastRet]); 869 lastRet = -1; 870 } 871 } 872 873 /** 874 * Saves this queue to a stream (that is, serializes it). 875 * 876 * For compatibility with previous version of this class, elements 877 * are first copied to a java.util.PriorityQueue, which is then 878 * serialized. 879 */ 880 private void writeObject(java.io.ObjectOutputStream s) 881 throws java.io.IOException { 882 lock.lock(); 883 try { 884 // avoid zero capacity argument 885 q = new PriorityQueue<E>(Math.max(size, 1), comparator); 886 q.addAll(this); 887 s.defaultWriteObject(); 888 } finally { 889 q = null; 890 lock.unlock(); 891 } 892 } 893 894 /** 895 * Reconstitutes this queue from a stream (that is, deserializes it). 896 */ 897 private void readObject(java.io.ObjectInputStream s) 898 throws java.io.IOException, ClassNotFoundException { 899 try { 900 s.defaultReadObject(); 901 this.queue = new Object[q.size()]; 902 comparator = q.comparator(); 903 addAll(q); 904 } finally { 905 q = null; 906 } 907 } 908 909 // Unsafe mechanics 910 private static final sun.misc.Unsafe UNSAFE; 911 private static final long allocationSpinLockOffset; 912 static { 913 try { 914 UNSAFE = sun.misc.Unsafe.getUnsafe(); 915 Class<?> k = PriorityBlockingQueue.class; 916 allocationSpinLockOffset = UNSAFE.objectFieldOffset 917 (k.getDeclaredField("allocationSpinLock")); 918 } catch (Exception e) { 919 throw new Error(e); 920 } 921 } 922} 923