PriorityBlockingQueue.java revision e0cda69a87d2d429bae6c8e30d4d8ba44d889f25
1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/licenses/publicdomain 5 */ 6 7package java.util.concurrent; 8 9import java.util.concurrent.locks.*; 10import java.util.*; 11 12// BEGIN android-note 13// removed link to collections framework docs 14// END android-note 15 16/** 17 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 18 * the same ordering rules as class {@link PriorityQueue} and supplies 19 * blocking retrieval operations. While this queue is logically 20 * unbounded, attempted additions may fail due to resource exhaustion 21 * (causing {@code OutOfMemoryError}). This class does not permit 22 * {@code null} elements. A priority queue relying on {@linkplain 23 * Comparable natural ordering} also does not permit insertion of 24 * non-comparable objects (doing so results in 25 * {@code ClassCastException}). 26 * 27 * <p>This class and its iterator implement all of the 28 * <em>optional</em> methods of the {@link Collection} and {@link 29 * Iterator} interfaces. The Iterator provided in method {@link 30 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 31 * the PriorityBlockingQueue in any particular order. If you need 32 * ordered traversal, consider using 33 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} 34 * can be used to <em>remove</em> some or all elements in priority 35 * order and place them in another collection. 36 * 37 * <p>Operations on this class make no guarantees about the ordering 38 * of elements with equal priority. If you need to enforce an 39 * ordering, you can define custom classes or comparators that use a 40 * secondary key to break ties in primary priority values. For 41 * example, here is a class that applies first-in-first-out 42 * tie-breaking to comparable elements. To use it, you would insert a 43 * {@code new FIFOEntry(anEntry)} instead of a plain entry object. 44 * 45 * <pre> {@code 46 * class FIFOEntry<E extends Comparable<? super E>> 47 * implements Comparable<FIFOEntry<E>> { 48 * static final AtomicLong seq = new AtomicLong(0); 49 * final long seqNum; 50 * final E entry; 51 * public FIFOEntry(E entry) { 52 * seqNum = seq.getAndIncrement(); 53 * this.entry = entry; 54 * } 55 * public E getEntry() { return entry; } 56 * public int compareTo(FIFOEntry<E> other) { 57 * int res = entry.compareTo(other.entry); 58 * if (res == 0 && other.entry != this.entry) 59 * res = (seqNum < other.seqNum ? -1 : 1); 60 * return res; 61 * } 62 * }}</pre> 63 * 64 * @since 1.5 65 * @author Doug Lea 66 * @param <E> the type of elements held in this collection 67 */ 68public class PriorityBlockingQueue<E> extends AbstractQueue<E> 69 implements BlockingQueue<E>, java.io.Serializable { 70 private static final long serialVersionUID = 5595510919245408276L; 71 72 /* 73 * The implementation uses an array-based binary heap, with public 74 * operations protected with a single lock. However, allocation 75 * during resizing uses a simple spinlock (used only while not 76 * holding main lock) in order to allow takes to operate 77 * concurrently with allocation. This avoids repeated 78 * postponement of waiting consumers and consequent element 79 * build-up. The need to back away from lock during allocation 80 * makes it impossible to simply wrap delegated 81 * java.util.PriorityQueue operations within a lock, as was done 82 * in a previous version of this class. To maintain 83 * interoperability, a plain PriorityQueue is still used during 84 * serialization, which maintains compatibility at the espense of 85 * transiently doubling overhead. 86 */ 87 88 /** 89 * Default array capacity. 90 */ 91 private static final int DEFAULT_INITIAL_CAPACITY = 11; 92 93 /** 94 * The maximum size of array to allocate. 95 * Some VMs reserve some header words in an array. 96 * Attempts to allocate larger arrays may result in 97 * OutOfMemoryError: Requested array size exceeds VM limit 98 */ 99 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 100 101 /** 102 * Priority queue represented as a balanced binary heap: the two 103 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 104 * priority queue is ordered by comparator, or by the elements' 105 * natural ordering, if comparator is null: For each node n in the 106 * heap and each descendant d of n, n <= d. The element with the 107 * lowest value is in queue[0], assuming the queue is nonempty. 108 */ 109 private transient Object[] queue; 110 111 /** 112 * The number of elements in the priority queue. 113 */ 114 private transient int size; 115 116 /** 117 * The comparator, or null if priority queue uses elements' 118 * natural ordering. 119 */ 120 private transient Comparator<? super E> comparator; 121 122 /** 123 * Lock used for all public operations 124 */ 125 private final ReentrantLock lock; 126 127 /** 128 * Condition for blocking when empty 129 */ 130 private final Condition notEmpty; 131 132 /** 133 * Spinlock for allocation, acquired via CAS. 134 */ 135 private transient volatile int allocationSpinLock; 136 137 /** 138 * A plain PriorityQueue used only for serialization, 139 * to maintain compatibility with previous versions 140 * of this class. Non-null only during serialization/deserialization. 141 */ 142 private PriorityQueue q; 143 144 /** 145 * Creates a {@code PriorityBlockingQueue} with the default 146 * initial capacity (11) that orders its elements according to 147 * their {@linkplain Comparable natural ordering}. 148 */ 149 public PriorityBlockingQueue() { 150 this(DEFAULT_INITIAL_CAPACITY, null); 151 } 152 153 /** 154 * Creates a {@code PriorityBlockingQueue} with the specified 155 * initial capacity that orders its elements according to their 156 * {@linkplain Comparable natural ordering}. 157 * 158 * @param initialCapacity the initial capacity for this priority queue 159 * @throws IllegalArgumentException if {@code initialCapacity} is less 160 * than 1 161 */ 162 public PriorityBlockingQueue(int initialCapacity) { 163 this(initialCapacity, null); 164 } 165 166 /** 167 * Creates a {@code PriorityBlockingQueue} with the specified initial 168 * capacity that orders its elements according to the specified 169 * comparator. 170 * 171 * @param initialCapacity the initial capacity for this priority queue 172 * @param comparator the comparator that will be used to order this 173 * priority queue. If {@code null}, the {@linkplain Comparable 174 * natural ordering} of the elements will be used. 175 * @throws IllegalArgumentException if {@code initialCapacity} is less 176 * than 1 177 */ 178 public PriorityBlockingQueue(int initialCapacity, 179 Comparator<? super E> comparator) { 180 if (initialCapacity < 1) 181 throw new IllegalArgumentException(); 182 this.lock = new ReentrantLock(); 183 this.notEmpty = lock.newCondition(); 184 this.comparator = comparator; 185 this.queue = new Object[initialCapacity]; 186 } 187 188 /** 189 * Creates a {@code PriorityBlockingQueue} containing the elements 190 * in the specified collection. If the specified collection is a 191 * {@link SortedSet} or a {@link PriorityQueue}, this 192 * priority queue will be ordered according to the same ordering. 193 * Otherwise, this priority queue will be ordered according to the 194 * {@linkplain Comparable natural ordering} of its elements. 195 * 196 * @param c the collection whose elements are to be placed 197 * into this priority queue 198 * @throws ClassCastException if elements of the specified collection 199 * cannot be compared to one another according to the priority 200 * queue's ordering 201 * @throws NullPointerException if the specified collection or any 202 * of its elements are null 203 */ 204 public PriorityBlockingQueue(Collection<? extends E> c) { 205 this.lock = new ReentrantLock(); 206 this.notEmpty = lock.newCondition(); 207 boolean heapify = true; // true if not known to be in heap order 208 boolean screen = true; // true if must screen for nulls 209 if (c instanceof SortedSet<?>) { 210 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 211 this.comparator = (Comparator<? super E>) ss.comparator(); 212 heapify = false; 213 } 214 else if (c instanceof PriorityBlockingQueue<?>) { 215 PriorityBlockingQueue<? extends E> pq = 216 (PriorityBlockingQueue<? extends E>) c; 217 this.comparator = (Comparator<? super E>) pq.comparator(); 218 screen = false; 219 if (pq.getClass() == PriorityBlockingQueue.class) // exact match 220 heapify = false; 221 } 222 Object[] a = c.toArray(); 223 int n = a.length; 224 // If c.toArray incorrectly doesn't return Object[], copy it. 225 if (a.getClass() != Object[].class) 226 a = Arrays.copyOf(a, n, Object[].class); 227 if (screen && (n == 1 || this.comparator != null)) { 228 for (int i = 0; i < n; ++i) 229 if (a[i] == null) 230 throw new NullPointerException(); 231 } 232 this.queue = a; 233 this.size = n; 234 if (heapify) 235 heapify(); 236 } 237 238 /** 239 * Tries to grow array to accommodate at least one more element 240 * (but normally expand by about 50%), giving up (allowing retry) 241 * on contention (which we expect to be rare). Call only while 242 * holding lock. 243 * 244 * @param array the heap array 245 * @param oldCap the length of the array 246 */ 247 private void tryGrow(Object[] array, int oldCap) { 248 lock.unlock(); // must release and then re-acquire main lock 249 Object[] newArray = null; 250 if (allocationSpinLock == 0 && 251 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 252 0, 1)) { 253 try { 254 int newCap = oldCap + ((oldCap < 64) ? 255 (oldCap + 2) : // grow faster if small 256 (oldCap >> 1)); 257 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow 258 int minCap = oldCap + 1; 259 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) 260 throw new OutOfMemoryError(); 261 newCap = MAX_ARRAY_SIZE; 262 } 263 if (newCap > oldCap && queue == array) 264 newArray = new Object[newCap]; 265 } finally { 266 allocationSpinLock = 0; 267 } 268 } 269 if (newArray == null) // back off if another thread is allocating 270 Thread.yield(); 271 lock.lock(); 272 if (newArray != null && queue == array) { 273 queue = newArray; 274 System.arraycopy(array, 0, newArray, 0, oldCap); 275 } 276 } 277 278 /** 279 * Mechanics for poll(). Call only while holding lock. 280 */ 281 private E extract() { 282 E result; 283 int n = size - 1; 284 if (n < 0) 285 result = null; 286 else { 287 Object[] array = queue; 288 result = (E) array[0]; 289 E x = (E) array[n]; 290 array[n] = null; 291 Comparator<? super E> cmp = comparator; 292 if (cmp == null) 293 siftDownComparable(0, x, array, n); 294 else 295 siftDownUsingComparator(0, x, array, n, cmp); 296 size = n; 297 } 298 return result; 299 } 300 301 /** 302 * Inserts item x at position k, maintaining heap invariant by 303 * promoting x up the tree until it is greater than or equal to 304 * its parent, or is the root. 305 * 306 * To simplify and speed up coercions and comparisons. the 307 * Comparable and Comparator versions are separated into different 308 * methods that are otherwise identical. (Similarly for siftDown.) 309 * These methods are static, with heap state as arguments, to 310 * simplify use in light of possible comparator exceptions. 311 * 312 * @param k the position to fill 313 * @param x the item to insert 314 * @param array the heap array 315 */ 316 private static <T> void siftUpComparable(int k, T x, Object[] array) { 317 Comparable<? super T> key = (Comparable<? super T>) x; 318 while (k > 0) { 319 int parent = (k - 1) >>> 1; 320 Object e = array[parent]; 321 if (key.compareTo((T) e) >= 0) 322 break; 323 array[k] = e; 324 k = parent; 325 } 326 array[k] = key; 327 } 328 329 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, 330 Comparator<? super T> cmp) { 331 while (k > 0) { 332 int parent = (k - 1) >>> 1; 333 Object e = array[parent]; 334 if (cmp.compare(x, (T) e) >= 0) 335 break; 336 array[k] = e; 337 k = parent; 338 } 339 array[k] = x; 340 } 341 342 /** 343 * Inserts item x at position k, maintaining heap invariant by 344 * demoting x down the tree repeatedly until it is less than or 345 * equal to its children or is a leaf. 346 * 347 * @param k the position to fill 348 * @param x the item to insert 349 * @param array the heap array 350 * @param n heap size 351 */ 352 private static <T> void siftDownComparable(int k, T x, Object[] array, 353 int n) { 354 Comparable<? super T> key = (Comparable<? super T>)x; 355 int half = n >>> 1; // loop while a non-leaf 356 while (k < half) { 357 int child = (k << 1) + 1; // assume left child is least 358 Object c = array[child]; 359 int right = child + 1; 360 if (right < n && 361 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 362 c = array[child = right]; 363 if (key.compareTo((T) c) <= 0) 364 break; 365 array[k] = c; 366 k = child; 367 } 368 array[k] = key; 369 } 370 371 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, 372 int n, 373 Comparator<? super T> cmp) { 374 int half = n >>> 1; 375 while (k < half) { 376 int child = (k << 1) + 1; 377 Object c = array[child]; 378 int right = child + 1; 379 if (right < n && cmp.compare((T) c, (T) array[right]) > 0) 380 c = array[child = right]; 381 if (cmp.compare(x, (T) c) <= 0) 382 break; 383 array[k] = c; 384 k = child; 385 } 386 array[k] = x; 387 } 388 389 /** 390 * Establishes the heap invariant (described above) in the entire tree, 391 * assuming nothing about the order of the elements prior to the call. 392 */ 393 private void heapify() { 394 Object[] array = queue; 395 int n = size; 396 int half = (n >>> 1) - 1; 397 Comparator<? super E> cmp = comparator; 398 if (cmp == null) { 399 for (int i = half; i >= 0; i--) 400 siftDownComparable(i, (E) array[i], array, n); 401 } 402 else { 403 for (int i = half; i >= 0; i--) 404 siftDownUsingComparator(i, (E) array[i], array, n, cmp); 405 } 406 } 407 408 /** 409 * Inserts the specified element into this priority queue. 410 * 411 * @param e the element to add 412 * @return {@code true} (as specified by {@link Collection#add}) 413 * @throws ClassCastException if the specified element cannot be compared 414 * with elements currently in the priority queue according to the 415 * priority queue's ordering 416 * @throws NullPointerException if the specified element is null 417 */ 418 public boolean add(E e) { 419 return offer(e); 420 } 421 422 /** 423 * Inserts the specified element into this priority queue. 424 * As the queue is unbounded, this method will never return {@code false}. 425 * 426 * @param e the element to add 427 * @return {@code true} (as specified by {@link Queue#offer}) 428 * @throws ClassCastException if the specified element cannot be compared 429 * with elements currently in the priority queue according to the 430 * priority queue's ordering 431 * @throws NullPointerException if the specified element is null 432 */ 433 public boolean offer(E e) { 434 if (e == null) 435 throw new NullPointerException(); 436 final ReentrantLock lock = this.lock; 437 lock.lock(); 438 int n, cap; 439 Object[] array; 440 while ((n = size) >= (cap = (array = queue).length)) 441 tryGrow(array, cap); 442 try { 443 Comparator<? super E> cmp = comparator; 444 if (cmp == null) 445 siftUpComparable(n, e, array); 446 else 447 siftUpUsingComparator(n, e, array, cmp); 448 size = n + 1; 449 notEmpty.signal(); 450 } finally { 451 lock.unlock(); 452 } 453 return true; 454 } 455 456 /** 457 * Inserts the specified element into this priority queue. 458 * As the queue is unbounded, this method will never block. 459 * 460 * @param e the element to add 461 * @throws ClassCastException if the specified element cannot be compared 462 * with elements currently in the priority queue according to the 463 * priority queue's ordering 464 * @throws NullPointerException if the specified element is null 465 */ 466 public void put(E e) { 467 offer(e); // never need to block 468 } 469 470 /** 471 * Inserts the specified element into this priority queue. 472 * As the queue is unbounded, this method will never block or 473 * return {@code false}. 474 * 475 * @param e the element to add 476 * @param timeout This parameter is ignored as the method never blocks 477 * @param unit This parameter is ignored as the method never blocks 478 * @return {@code true} (as specified by 479 * {@link BlockingQueue#offer BlockingQueue.offer}) 480 * @throws ClassCastException if the specified element cannot be compared 481 * with elements currently in the priority queue according to the 482 * priority queue's ordering 483 * @throws NullPointerException if the specified element is null 484 */ 485 public boolean offer(E e, long timeout, TimeUnit unit) { 486 return offer(e); // never need to block 487 } 488 489 public E poll() { 490 final ReentrantLock lock = this.lock; 491 lock.lock(); 492 E result; 493 try { 494 result = extract(); 495 } finally { 496 lock.unlock(); 497 } 498 return result; 499 } 500 501 public E take() throws InterruptedException { 502 final ReentrantLock lock = this.lock; 503 lock.lockInterruptibly(); 504 E result; 505 try { 506 while ( (result = extract()) == null) 507 notEmpty.await(); 508 } finally { 509 lock.unlock(); 510 } 511 return result; 512 } 513 514 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 515 long nanos = unit.toNanos(timeout); 516 final ReentrantLock lock = this.lock; 517 lock.lockInterruptibly(); 518 E result; 519 try { 520 while ( (result = extract()) == null && nanos > 0) 521 nanos = notEmpty.awaitNanos(nanos); 522 } finally { 523 lock.unlock(); 524 } 525 return result; 526 } 527 528 public E peek() { 529 final ReentrantLock lock = this.lock; 530 lock.lock(); 531 E result; 532 try { 533 result = size > 0 ? (E) queue[0] : null; 534 } finally { 535 lock.unlock(); 536 } 537 return result; 538 } 539 540 /** 541 * Returns the comparator used to order the elements in this queue, 542 * or {@code null} if this queue uses the {@linkplain Comparable 543 * natural ordering} of its elements. 544 * 545 * @return the comparator used to order the elements in this queue, 546 * or {@code null} if this queue uses the natural 547 * ordering of its elements 548 */ 549 public Comparator<? super E> comparator() { 550 return comparator; 551 } 552 553 public int size() { 554 final ReentrantLock lock = this.lock; 555 lock.lock(); 556 try { 557 return size; 558 } finally { 559 lock.unlock(); 560 } 561 } 562 563 /** 564 * Always returns {@code Integer.MAX_VALUE} because 565 * a {@code PriorityBlockingQueue} is not capacity constrained. 566 * @return {@code Integer.MAX_VALUE} always 567 */ 568 public int remainingCapacity() { 569 return Integer.MAX_VALUE; 570 } 571 572 private int indexOf(Object o) { 573 if (o != null) { 574 Object[] array = queue; 575 int n = size; 576 for (int i = 0; i < n; i++) 577 if (o.equals(array[i])) 578 return i; 579 } 580 return -1; 581 } 582 583 /** 584 * Removes the ith element from queue. 585 */ 586 private void removeAt(int i) { 587 Object[] array = queue; 588 int n = size - 1; 589 if (n == i) // removed last element 590 array[i] = null; 591 else { 592 E moved = (E) array[n]; 593 array[n] = null; 594 Comparator<? super E> cmp = comparator; 595 if (cmp == null) 596 siftDownComparable(i, moved, array, n); 597 else 598 siftDownUsingComparator(i, moved, array, n, cmp); 599 if (array[i] == moved) { 600 if (cmp == null) 601 siftUpComparable(i, moved, array); 602 else 603 siftUpUsingComparator(i, moved, array, cmp); 604 } 605 } 606 size = n; 607 } 608 609 /** 610 * Removes a single instance of the specified element from this queue, 611 * if it is present. More formally, removes an element {@code e} such 612 * that {@code o.equals(e)}, if this queue contains one or more such 613 * elements. Returns {@code true} if and only if this queue contained 614 * the specified element (or equivalently, if this queue changed as a 615 * result of the call). 616 * 617 * @param o element to be removed from this queue, if present 618 * @return {@code true} if this queue changed as a result of the call 619 */ 620 public boolean remove(Object o) { 621 boolean removed = false; 622 final ReentrantLock lock = this.lock; 623 lock.lock(); 624 try { 625 int i = indexOf(o); 626 if (i != -1) { 627 removeAt(i); 628 removed = true; 629 } 630 } finally { 631 lock.unlock(); 632 } 633 return removed; 634 } 635 636 637 /** 638 * Identity-based version for use in Itr.remove 639 */ 640 private void removeEQ(Object o) { 641 final ReentrantLock lock = this.lock; 642 lock.lock(); 643 try { 644 Object[] array = queue; 645 int n = size; 646 for (int i = 0; i < n; i++) { 647 if (o == array[i]) { 648 removeAt(i); 649 break; 650 } 651 } 652 } finally { 653 lock.unlock(); 654 } 655 } 656 657 /** 658 * Returns {@code true} if this queue contains the specified element. 659 * More formally, returns {@code true} if and only if this queue contains 660 * at least one element {@code e} such that {@code o.equals(e)}. 661 * 662 * @param o object to be checked for containment in this queue 663 * @return {@code true} if this queue contains the specified element 664 */ 665 public boolean contains(Object o) { 666 int index; 667 final ReentrantLock lock = this.lock; 668 lock.lock(); 669 try { 670 index = indexOf(o); 671 } finally { 672 lock.unlock(); 673 } 674 return index != -1; 675 } 676 677 /** 678 * Returns an array containing all of the elements in this queue. 679 * The returned array elements are in no particular order. 680 * 681 * <p>The returned array will be "safe" in that no references to it are 682 * maintained by this queue. (In other words, this method must allocate 683 * a new array). The caller is thus free to modify the returned array. 684 * 685 * <p>This method acts as bridge between array-based and collection-based 686 * APIs. 687 * 688 * @return an array containing all of the elements in this queue 689 */ 690 public Object[] toArray() { 691 final ReentrantLock lock = this.lock; 692 lock.lock(); 693 try { 694 return Arrays.copyOf(queue, size); 695 } finally { 696 lock.unlock(); 697 } 698 } 699 700 701 public String toString() { 702 final ReentrantLock lock = this.lock; 703 lock.lock(); 704 try { 705 int n = size; 706 if (n == 0) 707 return "[]"; 708 StringBuilder sb = new StringBuilder(); 709 sb.append('['); 710 for (int i = 0; i < n; ++i) { 711 E e = (E)queue[i]; 712 sb.append(e == this ? "(this Collection)" : e); 713 if (i != n - 1) 714 sb.append(',').append(' '); 715 } 716 return sb.append(']').toString(); 717 } finally { 718 lock.unlock(); 719 } 720 } 721 722 /** 723 * @throws UnsupportedOperationException {@inheritDoc} 724 * @throws ClassCastException {@inheritDoc} 725 * @throws NullPointerException {@inheritDoc} 726 * @throws IllegalArgumentException {@inheritDoc} 727 */ 728 public int drainTo(Collection<? super E> c) { 729 if (c == null) 730 throw new NullPointerException(); 731 if (c == this) 732 throw new IllegalArgumentException(); 733 final ReentrantLock lock = this.lock; 734 lock.lock(); 735 try { 736 int n = 0; 737 E e; 738 while ( (e = extract()) != null) { 739 c.add(e); 740 ++n; 741 } 742 return n; 743 } finally { 744 lock.unlock(); 745 } 746 } 747 748 /** 749 * @throws UnsupportedOperationException {@inheritDoc} 750 * @throws ClassCastException {@inheritDoc} 751 * @throws NullPointerException {@inheritDoc} 752 * @throws IllegalArgumentException {@inheritDoc} 753 */ 754 public int drainTo(Collection<? super E> c, int maxElements) { 755 if (c == null) 756 throw new NullPointerException(); 757 if (c == this) 758 throw new IllegalArgumentException(); 759 if (maxElements <= 0) 760 return 0; 761 final ReentrantLock lock = this.lock; 762 lock.lock(); 763 try { 764 int n = 0; 765 E e; 766 while (n < maxElements && (e = extract()) != null) { 767 c.add(e); 768 ++n; 769 } 770 return n; 771 } finally { 772 lock.unlock(); 773 } 774 } 775 776 /** 777 * Atomically removes all of the elements from this queue. 778 * The queue will be empty after this call returns. 779 */ 780 public void clear() { 781 final ReentrantLock lock = this.lock; 782 lock.lock(); 783 try { 784 Object[] array = queue; 785 int n = size; 786 size = 0; 787 for (int i = 0; i < n; i++) 788 array[i] = null; 789 } finally { 790 lock.unlock(); 791 } 792 } 793 794 /** 795 * Returns an array containing all of the elements in this queue; the 796 * runtime type of the returned array is that of the specified array. 797 * The returned array elements are in no particular order. 798 * If the queue fits in the specified array, it is returned therein. 799 * Otherwise, a new array is allocated with the runtime type of the 800 * specified array and the size of this queue. 801 * 802 * <p>If this queue fits in the specified array with room to spare 803 * (i.e., the array has more elements than this queue), the element in 804 * the array immediately following the end of the queue is set to 805 * {@code null}. 806 * 807 * <p>Like the {@link #toArray()} method, this method acts as bridge between 808 * array-based and collection-based APIs. Further, this method allows 809 * precise control over the runtime type of the output array, and may, 810 * under certain circumstances, be used to save allocation costs. 811 * 812 * <p>Suppose {@code x} is a queue known to contain only strings. 813 * The following code can be used to dump the queue into a newly 814 * allocated array of {@code String}: 815 * 816 * <pre> 817 * String[] y = x.toArray(new String[0]);</pre> 818 * 819 * Note that {@code toArray(new Object[0])} is identical in function to 820 * {@code toArray()}. 821 * 822 * @param a the array into which the elements of the queue are to 823 * be stored, if it is big enough; otherwise, a new array of the 824 * same runtime type is allocated for this purpose 825 * @return an array containing all of the elements in this queue 826 * @throws ArrayStoreException if the runtime type of the specified array 827 * is not a supertype of the runtime type of every element in 828 * this queue 829 * @throws NullPointerException if the specified array is null 830 */ 831 public <T> T[] toArray(T[] a) { 832 final ReentrantLock lock = this.lock; 833 lock.lock(); 834 try { 835 int n = size; 836 if (a.length < n) 837 // Make a new array of a's runtime type, but my contents: 838 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 839 System.arraycopy(queue, 0, a, 0, n); 840 if (a.length > n) 841 a[n] = null; 842 return a; 843 } finally { 844 lock.unlock(); 845 } 846 } 847 848 /** 849 * Returns an iterator over the elements in this queue. The 850 * iterator does not return the elements in any particular order. 851 * 852 * <p>The returned iterator is a "weakly consistent" iterator that 853 * will never throw {@link java.util.ConcurrentModificationException 854 * ConcurrentModificationException}, and guarantees to traverse 855 * elements as they existed upon construction of the iterator, and 856 * may (but is not guaranteed to) reflect any modifications 857 * subsequent to construction. 858 * 859 * @return an iterator over the elements in this queue 860 */ 861 public Iterator<E> iterator() { 862 return new Itr(toArray()); 863 } 864 865 /** 866 * Snapshot iterator that works off copy of underlying q array. 867 */ 868 final class Itr implements Iterator<E> { 869 final Object[] array; // Array of all elements 870 int cursor; // index of next element to return; 871 int lastRet; // index of last element, or -1 if no such 872 873 Itr(Object[] array) { 874 lastRet = -1; 875 this.array = array; 876 } 877 878 public boolean hasNext() { 879 return cursor < array.length; 880 } 881 882 public E next() { 883 if (cursor >= array.length) 884 throw new NoSuchElementException(); 885 lastRet = cursor; 886 return (E)array[cursor++]; 887 } 888 889 public void remove() { 890 if (lastRet < 0) 891 throw new IllegalStateException(); 892 removeEQ(array[lastRet]); 893 lastRet = -1; 894 } 895 } 896 897 /** 898 * Saves the state to a stream (that is, serializes it). For 899 * compatibility with previous version of this class, 900 * elements are first copied to a java.util.PriorityQueue, 901 * which is then serialized. 902 */ 903 private void writeObject(java.io.ObjectOutputStream s) 904 throws java.io.IOException { 905 lock.lock(); 906 try { 907 int n = size; // avoid zero capacity argument 908 q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator); 909 q.addAll(this); 910 s.defaultWriteObject(); 911 } finally { 912 q = null; 913 lock.unlock(); 914 } 915 } 916 917 /** 918 * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream 919 * (that is, deserializes it). 920 * 921 * @param s the stream 922 */ 923 private void readObject(java.io.ObjectInputStream s) 924 throws java.io.IOException, ClassNotFoundException { 925 try { 926 s.defaultReadObject(); 927 this.queue = new Object[q.size()]; 928 comparator = q.comparator(); 929 addAll(q); 930 } finally { 931 q = null; 932 } 933 } 934 935 // Unsafe mechanics 936 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 937 private static final long allocationSpinLockOffset = 938 objectFieldOffset(UNSAFE, "allocationSpinLock", 939 PriorityBlockingQueue.class); 940 941 static long objectFieldOffset(sun.misc.Unsafe UNSAFE, 942 String field, Class<?> klazz) { 943 try { 944 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 945 } catch (NoSuchFieldException e) { 946 // Convert Exception to corresponding Error 947 NoSuchFieldError error = new NoSuchFieldError(field); 948 error.initCause(e); 949 throw error; 950 } 951 } 952 953} 954