ArrayBlockingQueue.java revision edf43d27e240d82106f39ae91404963c23987234
1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent; 8 9import java.lang.ref.WeakReference; 10import java.util.Arrays; 11import java.util.AbstractQueue; 12import java.util.Collection; 13import java.util.Iterator; 14import java.util.NoSuchElementException; 15import java.util.concurrent.locks.Condition; 16import java.util.concurrent.locks.ReentrantLock; 17 18// BEGIN android-note 19// removed link to collections framework docs 20// END android-note 21 22/** 23 * A bounded {@linkplain BlockingQueue blocking queue} backed by an 24 * array. This queue orders elements FIFO (first-in-first-out). The 25 * <em>head</em> of the queue is that element that has been on the 26 * queue the longest time. The <em>tail</em> of the queue is that 27 * element that has been on the queue the shortest time. New elements 28 * are inserted at the tail of the queue, and the queue retrieval 29 * operations obtain elements at the head of the queue. 30 * 31 * <p>This is a classic "bounded buffer", in which a 32 * fixed-sized array holds elements inserted by producers and 33 * extracted by consumers. Once created, the capacity cannot be 34 * changed. Attempts to {@code put} an element into a full queue 35 * will result in the operation blocking; attempts to {@code take} an 36 * element from an empty queue will similarly block. 37 * 38 * <p>This class supports an optional fairness policy for ordering 39 * waiting producer and consumer threads. By default, this ordering 40 * is not guaranteed. However, a queue constructed with fairness set 41 * to {@code true} grants threads access in FIFO order. Fairness 42 * generally decreases throughput but reduces variability and avoids 43 * starvation. 44 * 45 * <p>This class and its iterator implement all of the 46 * <em>optional</em> methods of the {@link Collection} and {@link 47 * Iterator} interfaces. 48 * 49 * @since 1.5 50 * @author Doug Lea 51 * @param <E> the type of elements held in this queue 52 */ 53public class ArrayBlockingQueue<E> extends AbstractQueue<E> 54 implements BlockingQueue<E>, java.io.Serializable { 55 56 /** 57 * Serialization ID. This class relies on default serialization 58 * even for the items array, which is default-serialized, even if 59 * it is empty. Otherwise it could not be declared final, which is 60 * necessary here. 61 */ 62 private static final long serialVersionUID = -817911632652898426L; 63 64 /** The queued items */ 65 final Object[] items; 66 67 /** items index for next take, poll, peek or remove */ 68 int takeIndex; 69 70 /** items index for next put, offer, or add */ 71 int putIndex; 72 73 /** Number of elements in the queue */ 74 int count; 75 76 /* 77 * Concurrency control uses the classic two-condition algorithm 78 * found in any textbook. 79 */ 80 81 /** Main lock guarding all access */ 82 final ReentrantLock lock; 83 84 /** Condition for waiting takes */ 85 private final Condition notEmpty; 86 87 /** Condition for waiting puts */ 88 private final Condition notFull; 89 90 /** 91 * Shared state for currently active iterators, or null if there 92 * are known not to be any. Allows queue operations to update 93 * iterator state. 94 */ 95 transient Itrs itrs = null; 96 97 // Internal helper methods 98 99 /** 100 * Circularly decrements array index i. 101 */ 102 final int dec(int i) { 103 return ((i == 0) ? items.length : i) - 1; 104 } 105 106 /** 107 * Returns item at index i. 108 */ 109 @SuppressWarnings("unchecked") 110 final E itemAt(int i) { 111 return (E) items[i]; 112 } 113 114 /** 115 * Inserts element at current put position, advances, and signals. 116 * Call only when holding lock. 117 */ 118 private void enqueue(E x) { 119 // assert lock.getHoldCount() == 1; 120 // assert items[putIndex] == null; 121 final Object[] items = this.items; 122 items[putIndex] = x; 123 if (++putIndex == items.length) putIndex = 0; 124 count++; 125 notEmpty.signal(); 126 } 127 128 /** 129 * Extracts element at current take position, advances, and signals. 130 * Call only when holding lock. 131 */ 132 private E dequeue() { 133 // assert lock.getHoldCount() == 1; 134 // assert items[takeIndex] != null; 135 final Object[] items = this.items; 136 @SuppressWarnings("unchecked") 137 E x = (E) items[takeIndex]; 138 items[takeIndex] = null; 139 if (++takeIndex == items.length) takeIndex = 0; 140 count--; 141 if (itrs != null) 142 itrs.elementDequeued(); 143 notFull.signal(); 144 return x; 145 } 146 147 /** 148 * Deletes item at array index removeIndex. 149 * Utility for remove(Object) and iterator.remove. 150 * Call only when holding lock. 151 */ 152 void removeAt(final int removeIndex) { 153 // assert lock.getHoldCount() == 1; 154 // assert items[removeIndex] != null; 155 // assert removeIndex >= 0 && removeIndex < items.length; 156 final Object[] items = this.items; 157 if (removeIndex == takeIndex) { 158 // removing front item; just advance 159 items[takeIndex] = null; 160 if (++takeIndex == items.length) takeIndex = 0; 161 count--; 162 if (itrs != null) 163 itrs.elementDequeued(); 164 } else { 165 // an "interior" remove 166 167 // slide over all others up through putIndex. 168 for (int i = removeIndex, putIndex = this.putIndex;;) { 169 int pred = i; 170 if (++i == items.length) i = 0; 171 if (i == putIndex) { 172 items[pred] = null; 173 this.putIndex = pred; 174 break; 175 } 176 items[pred] = items[i]; 177 } 178 count--; 179 if (itrs != null) 180 itrs.removedAt(removeIndex); 181 } 182 notFull.signal(); 183 } 184 185 /** 186 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 187 * capacity and default access policy. 188 * 189 * @param capacity the capacity of this queue 190 * @throws IllegalArgumentException if {@code capacity < 1} 191 */ 192 public ArrayBlockingQueue(int capacity) { 193 this(capacity, false); 194 } 195 196 /** 197 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 198 * capacity and the specified access policy. 199 * 200 * @param capacity the capacity of this queue 201 * @param fair if {@code true} then queue accesses for threads blocked 202 * on insertion or removal, are processed in FIFO order; 203 * if {@code false} the access order is unspecified. 204 * @throws IllegalArgumentException if {@code capacity < 1} 205 */ 206 public ArrayBlockingQueue(int capacity, boolean fair) { 207 if (capacity <= 0) 208 throw new IllegalArgumentException(); 209 this.items = new Object[capacity]; 210 lock = new ReentrantLock(fair); 211 notEmpty = lock.newCondition(); 212 notFull = lock.newCondition(); 213 } 214 215 /** 216 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 217 * capacity, the specified access policy and initially containing the 218 * elements of the given collection, 219 * added in traversal order of the collection's iterator. 220 * 221 * @param capacity the capacity of this queue 222 * @param fair if {@code true} then queue accesses for threads blocked 223 * on insertion or removal, are processed in FIFO order; 224 * if {@code false} the access order is unspecified. 225 * @param c the collection of elements to initially contain 226 * @throws IllegalArgumentException if {@code capacity} is less than 227 * {@code c.size()}, or less than 1. 228 * @throws NullPointerException if the specified collection or any 229 * of its elements are null 230 */ 231 public ArrayBlockingQueue(int capacity, boolean fair, 232 Collection<? extends E> c) { 233 this(capacity, fair); 234 235 final ReentrantLock lock = this.lock; 236 lock.lock(); // Lock only for visibility, not mutual exclusion 237 try { 238 int i = 0; 239 try { 240 for (E e : c) { 241 if (e == null) throw new NullPointerException(); 242 items[i++] = e; 243 } 244 } catch (ArrayIndexOutOfBoundsException ex) { 245 throw new IllegalArgumentException(); 246 } 247 count = i; 248 putIndex = (i == capacity) ? 0 : i; 249 } finally { 250 lock.unlock(); 251 } 252 } 253 254 /** 255 * Inserts the specified element at the tail of this queue if it is 256 * possible to do so immediately without exceeding the queue's capacity, 257 * returning {@code true} upon success and throwing an 258 * {@code IllegalStateException} if this queue is full. 259 * 260 * @param e the element to add 261 * @return {@code true} (as specified by {@link Collection#add}) 262 * @throws IllegalStateException if this queue is full 263 * @throws NullPointerException if the specified element is null 264 */ 265 public boolean add(E e) { 266 return super.add(e); 267 } 268 269 /** 270 * Inserts the specified element at the tail of this queue if it is 271 * possible to do so immediately without exceeding the queue's capacity, 272 * returning {@code true} upon success and {@code false} if this queue 273 * is full. This method is generally preferable to method {@link #add}, 274 * which can fail to insert an element only by throwing an exception. 275 * 276 * @throws NullPointerException if the specified element is null 277 */ 278 public boolean offer(E e) { 279 if (e == null) throw new NullPointerException(); 280 final ReentrantLock lock = this.lock; 281 lock.lock(); 282 try { 283 if (count == items.length) 284 return false; 285 else { 286 enqueue(e); 287 return true; 288 } 289 } finally { 290 lock.unlock(); 291 } 292 } 293 294 /** 295 * Inserts the specified element at the tail of this queue, waiting 296 * for space to become available if the queue is full. 297 * 298 * @throws InterruptedException {@inheritDoc} 299 * @throws NullPointerException {@inheritDoc} 300 */ 301 public void put(E e) throws InterruptedException { 302 if (e == null) throw new NullPointerException(); 303 final ReentrantLock lock = this.lock; 304 lock.lockInterruptibly(); 305 try { 306 while (count == items.length) 307 notFull.await(); 308 enqueue(e); 309 } finally { 310 lock.unlock(); 311 } 312 } 313 314 /** 315 * Inserts the specified element at the tail of this queue, waiting 316 * up to the specified wait time for space to become available if 317 * the queue is full. 318 * 319 * @throws InterruptedException {@inheritDoc} 320 * @throws NullPointerException {@inheritDoc} 321 */ 322 public boolean offer(E e, long timeout, TimeUnit unit) 323 throws InterruptedException { 324 325 if (e == null) throw new NullPointerException(); 326 long nanos = unit.toNanos(timeout); 327 final ReentrantLock lock = this.lock; 328 lock.lockInterruptibly(); 329 try { 330 while (count == items.length) { 331 if (nanos <= 0) 332 return false; 333 nanos = notFull.awaitNanos(nanos); 334 } 335 enqueue(e); 336 return true; 337 } finally { 338 lock.unlock(); 339 } 340 } 341 342 public E poll() { 343 final ReentrantLock lock = this.lock; 344 lock.lock(); 345 try { 346 return (count == 0) ? null : dequeue(); 347 } finally { 348 lock.unlock(); 349 } 350 } 351 352 public E take() throws InterruptedException { 353 final ReentrantLock lock = this.lock; 354 lock.lockInterruptibly(); 355 try { 356 while (count == 0) 357 notEmpty.await(); 358 return dequeue(); 359 } finally { 360 lock.unlock(); 361 } 362 } 363 364 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 365 long nanos = unit.toNanos(timeout); 366 final ReentrantLock lock = this.lock; 367 lock.lockInterruptibly(); 368 try { 369 while (count == 0) { 370 if (nanos <= 0) 371 return null; 372 nanos = notEmpty.awaitNanos(nanos); 373 } 374 return dequeue(); 375 } finally { 376 lock.unlock(); 377 } 378 } 379 380 public E peek() { 381 final ReentrantLock lock = this.lock; 382 lock.lock(); 383 try { 384 return itemAt(takeIndex); // null when queue is empty 385 } finally { 386 lock.unlock(); 387 } 388 } 389 390 // this doc comment is overridden to remove the reference to collections 391 // greater in size than Integer.MAX_VALUE 392 /** 393 * Returns the number of elements in this queue. 394 * 395 * @return the number of elements in this queue 396 */ 397 public int size() { 398 final ReentrantLock lock = this.lock; 399 lock.lock(); 400 try { 401 return count; 402 } finally { 403 lock.unlock(); 404 } 405 } 406 407 // this doc comment is a modified copy of the inherited doc comment, 408 // without the reference to unlimited queues. 409 /** 410 * Returns the number of additional elements that this queue can ideally 411 * (in the absence of memory or resource constraints) accept without 412 * blocking. This is always equal to the initial capacity of this queue 413 * less the current {@code size} of this queue. 414 * 415 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 416 * an element will succeed by inspecting {@code remainingCapacity} 417 * because it may be the case that another thread is about to 418 * insert or remove an element. 419 */ 420 public int remainingCapacity() { 421 final ReentrantLock lock = this.lock; 422 lock.lock(); 423 try { 424 return items.length - count; 425 } finally { 426 lock.unlock(); 427 } 428 } 429 430 /** 431 * Removes a single instance of the specified element from this queue, 432 * if it is present. More formally, removes an element {@code e} such 433 * that {@code o.equals(e)}, if this queue contains one or more such 434 * elements. 435 * Returns {@code true} if this queue contained the specified element 436 * (or equivalently, if this queue changed as a result of the call). 437 * 438 * <p>Removal of interior elements in circular array based queues 439 * is an intrinsically slow and disruptive operation, so should 440 * be undertaken only in exceptional circumstances, ideally 441 * only when the queue is known not to be accessible by other 442 * threads. 443 * 444 * @param o element to be removed from this queue, if present 445 * @return {@code true} if this queue changed as a result of the call 446 */ 447 public boolean remove(Object o) { 448 if (o == null) return false; 449 final ReentrantLock lock = this.lock; 450 lock.lock(); 451 try { 452 if (count > 0) { 453 final Object[] items = this.items; 454 final int putIndex = this.putIndex; 455 int i = takeIndex; 456 do { 457 if (o.equals(items[i])) { 458 removeAt(i); 459 return true; 460 } 461 if (++i == items.length) i = 0; 462 } while (i != putIndex); 463 } 464 return false; 465 } finally { 466 lock.unlock(); 467 } 468 } 469 470 /** 471 * Returns {@code true} if this queue contains the specified element. 472 * More formally, returns {@code true} if and only if this queue contains 473 * at least one element {@code e} such that {@code o.equals(e)}. 474 * 475 * @param o object to be checked for containment in this queue 476 * @return {@code true} if this queue contains the specified element 477 */ 478 public boolean contains(Object o) { 479 if (o == null) return false; 480 final ReentrantLock lock = this.lock; 481 lock.lock(); 482 try { 483 if (count > 0) { 484 final Object[] items = this.items; 485 final int putIndex = this.putIndex; 486 int i = takeIndex; 487 do { 488 if (o.equals(items[i])) 489 return true; 490 if (++i == items.length) i = 0; 491 } while (i != putIndex); 492 } 493 return false; 494 } finally { 495 lock.unlock(); 496 } 497 } 498 499 /** 500 * Returns an array containing all of the elements in this queue, in 501 * proper sequence. 502 * 503 * <p>The returned array will be "safe" in that no references to it are 504 * maintained by this queue. (In other words, this method must allocate 505 * a new array). The caller is thus free to modify the returned array. 506 * 507 * <p>This method acts as bridge between array-based and collection-based 508 * APIs. 509 * 510 * @return an array containing all of the elements in this queue 511 */ 512 public Object[] toArray() { 513 final ReentrantLock lock = this.lock; 514 lock.lock(); 515 try { 516 final Object[] items = this.items; 517 final int end = takeIndex + count; 518 final Object[] a = Arrays.copyOfRange(items, takeIndex, end); 519 if (end != putIndex) 520 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex); 521 return a; 522 } finally { 523 lock.unlock(); 524 } 525 } 526 527 /** 528 * Returns an array containing all of the elements in this queue, in 529 * proper sequence; the runtime type of the returned array is that of 530 * the specified array. If the queue fits in the specified array, it 531 * is returned therein. Otherwise, a new array is allocated with the 532 * runtime type of the specified array and the size of this queue. 533 * 534 * <p>If this queue fits in the specified array with room to spare 535 * (i.e., the array has more elements than this queue), the element in 536 * the array immediately following the end of the queue is set to 537 * {@code null}. 538 * 539 * <p>Like the {@link #toArray()} method, this method acts as bridge between 540 * array-based and collection-based APIs. Further, this method allows 541 * precise control over the runtime type of the output array, and may, 542 * under certain circumstances, be used to save allocation costs. 543 * 544 * <p>Suppose {@code x} is a queue known to contain only strings. 545 * The following code can be used to dump the queue into a newly 546 * allocated array of {@code String}: 547 * 548 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 549 * 550 * Note that {@code toArray(new Object[0])} is identical in function to 551 * {@code toArray()}. 552 * 553 * @param a the array into which the elements of the queue are to 554 * be stored, if it is big enough; otherwise, a new array of the 555 * same runtime type is allocated for this purpose 556 * @return an array containing all of the elements in this queue 557 * @throws ArrayStoreException if the runtime type of the specified array 558 * is not a supertype of the runtime type of every element in 559 * this queue 560 * @throws NullPointerException if the specified array is null 561 */ 562 @SuppressWarnings("unchecked") 563 public <T> T[] toArray(T[] a) { 564 final ReentrantLock lock = this.lock; 565 lock.lock(); 566 try { 567 final Object[] items = this.items; 568 final int count = this.count; 569 final int firstLeg = Math.min(items.length - takeIndex, count); 570 if (a.length < count) { 571 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count, 572 a.getClass()); 573 } else { 574 System.arraycopy(items, takeIndex, a, 0, firstLeg); 575 if (a.length > count) 576 a[count] = null; 577 } 578 if (firstLeg < count) 579 System.arraycopy(items, 0, a, firstLeg, putIndex); 580 return a; 581 } finally { 582 lock.unlock(); 583 } 584 } 585 586 public String toString() { 587 final ReentrantLock lock = this.lock; 588 lock.lock(); 589 try { 590 int k = count; 591 if (k == 0) 592 return "[]"; 593 594 final Object[] items = this.items; 595 StringBuilder sb = new StringBuilder(); 596 sb.append('['); 597 for (int i = takeIndex; ; ) { 598 Object e = items[i]; 599 sb.append(e == this ? "(this Collection)" : e); 600 if (--k == 0) 601 return sb.append(']').toString(); 602 sb.append(',').append(' '); 603 if (++i == items.length) i = 0; 604 } 605 } finally { 606 lock.unlock(); 607 } 608 } 609 610 /** 611 * Atomically removes all of the elements from this queue. 612 * The queue will be empty after this call returns. 613 */ 614 public void clear() { 615 final Object[] items = this.items; 616 final ReentrantLock lock = this.lock; 617 lock.lock(); 618 try { 619 int k = count; 620 if (k > 0) { 621 final int putIndex = this.putIndex; 622 int i = takeIndex; 623 do { 624 items[i] = null; 625 if (++i == items.length) i = 0; 626 } while (i != putIndex); 627 takeIndex = putIndex; 628 count = 0; 629 if (itrs != null) 630 itrs.queueIsEmpty(); 631 for (; k > 0 && lock.hasWaiters(notFull); k--) 632 notFull.signal(); 633 } 634 } finally { 635 lock.unlock(); 636 } 637 } 638 639 /** 640 * @throws UnsupportedOperationException {@inheritDoc} 641 * @throws ClassCastException {@inheritDoc} 642 * @throws NullPointerException {@inheritDoc} 643 * @throws IllegalArgumentException {@inheritDoc} 644 */ 645 public int drainTo(Collection<? super E> c) { 646 return drainTo(c, Integer.MAX_VALUE); 647 } 648 649 /** 650 * @throws UnsupportedOperationException {@inheritDoc} 651 * @throws ClassCastException {@inheritDoc} 652 * @throws NullPointerException {@inheritDoc} 653 * @throws IllegalArgumentException {@inheritDoc} 654 */ 655 public int drainTo(Collection<? super E> c, int maxElements) { 656 if (c == null) throw new NullPointerException(); 657 if (c == this) 658 throw new IllegalArgumentException(); 659 if (maxElements <= 0) 660 return 0; 661 final Object[] items = this.items; 662 final ReentrantLock lock = this.lock; 663 lock.lock(); 664 try { 665 int n = Math.min(maxElements, count); 666 int take = takeIndex; 667 int i = 0; 668 try { 669 while (i < n) { 670 @SuppressWarnings("unchecked") 671 E x = (E) items[take]; 672 c.add(x); 673 items[take] = null; 674 if (++take == items.length) take = 0; 675 i++; 676 } 677 return n; 678 } finally { 679 // Restore invariants even if c.add() threw 680 if (i > 0) { 681 count -= i; 682 takeIndex = take; 683 if (itrs != null) { 684 if (count == 0) 685 itrs.queueIsEmpty(); 686 else if (i > take) 687 itrs.takeIndexWrapped(); 688 } 689 for (; i > 0 && lock.hasWaiters(notFull); i--) 690 notFull.signal(); 691 } 692 } 693 } finally { 694 lock.unlock(); 695 } 696 } 697 698 /** 699 * Returns an iterator over the elements in this queue in proper sequence. 700 * The elements will be returned in order from first (head) to last (tail). 701 * 702 * <p>The returned iterator is 703 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 704 * 705 * @return an iterator over the elements in this queue in proper sequence 706 */ 707 public Iterator<E> iterator() { 708 return new Itr(); 709 } 710 711 /** 712 * Shared data between iterators and their queue, allowing queue 713 * modifications to update iterators when elements are removed. 714 * 715 * This adds a lot of complexity for the sake of correctly 716 * handling some uncommon operations, but the combination of 717 * circular-arrays and supporting interior removes (i.e., those 718 * not at head) would cause iterators to sometimes lose their 719 * places and/or (re)report elements they shouldn't. To avoid 720 * this, when a queue has one or more iterators, it keeps iterator 721 * state consistent by: 722 * 723 * (1) keeping track of the number of "cycles", that is, the 724 * number of times takeIndex has wrapped around to 0. 725 * (2) notifying all iterators via the callback removedAt whenever 726 * an interior element is removed (and thus other elements may 727 * be shifted). 728 * 729 * These suffice to eliminate iterator inconsistencies, but 730 * unfortunately add the secondary responsibility of maintaining 731 * the list of iterators. We track all active iterators in a 732 * simple linked list (accessed only when the queue's lock is 733 * held) of weak references to Itr. The list is cleaned up using 734 * 3 different mechanisms: 735 * 736 * (1) Whenever a new iterator is created, do some O(1) checking for 737 * stale list elements. 738 * 739 * (2) Whenever takeIndex wraps around to 0, check for iterators 740 * that have been unused for more than one wrap-around cycle. 741 * 742 * (3) Whenever the queue becomes empty, all iterators are notified 743 * and this entire data structure is discarded. 744 * 745 * So in addition to the removedAt callback that is necessary for 746 * correctness, iterators have the shutdown and takeIndexWrapped 747 * callbacks that help remove stale iterators from the list. 748 * 749 * Whenever a list element is examined, it is expunged if either 750 * the GC has determined that the iterator is discarded, or if the 751 * iterator reports that it is "detached" (does not need any 752 * further state updates). Overhead is maximal when takeIndex 753 * never advances, iterators are discarded before they are 754 * exhausted, and all removals are interior removes, in which case 755 * all stale iterators are discovered by the GC. But even in this 756 * case we don't increase the amortized complexity. 757 * 758 * Care must be taken to keep list sweeping methods from 759 * reentrantly invoking another such method, causing subtle 760 * corruption bugs. 761 */ 762 class Itrs { 763 764 /** 765 * Node in a linked list of weak iterator references. 766 */ 767 private class Node extends WeakReference<Itr> { 768 Node next; 769 770 Node(Itr iterator, Node next) { 771 super(iterator); 772 this.next = next; 773 } 774 } 775 776 /** Incremented whenever takeIndex wraps around to 0 */ 777 int cycles; 778 779 /** Linked list of weak iterator references */ 780 private Node head; 781 782 /** Used to expunge stale iterators */ 783 private Node sweeper; 784 785 private static final int SHORT_SWEEP_PROBES = 4; 786 private static final int LONG_SWEEP_PROBES = 16; 787 788 Itrs(Itr initial) { 789 register(initial); 790 } 791 792 /** 793 * Sweeps itrs, looking for and expunging stale iterators. 794 * If at least one was found, tries harder to find more. 795 * Called only from iterating thread. 796 * 797 * @param tryHarder whether to start in try-harder mode, because 798 * there is known to be at least one iterator to collect 799 */ 800 void doSomeSweeping(boolean tryHarder) { 801 // assert lock.getHoldCount() == 1; 802 // assert head != null; 803 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; 804 Node o, p; 805 final Node sweeper = this.sweeper; 806 boolean passedGo; // to limit search to one full sweep 807 808 if (sweeper == null) { 809 o = null; 810 p = head; 811 passedGo = true; 812 } else { 813 o = sweeper; 814 p = o.next; 815 passedGo = false; 816 } 817 818 for (; probes > 0; probes--) { 819 if (p == null) { 820 if (passedGo) 821 break; 822 o = null; 823 p = head; 824 passedGo = true; 825 } 826 final Itr it = p.get(); 827 final Node next = p.next; 828 if (it == null || it.isDetached()) { 829 // found a discarded/exhausted iterator 830 probes = LONG_SWEEP_PROBES; // "try harder" 831 // unlink p 832 p.clear(); 833 p.next = null; 834 if (o == null) { 835 head = next; 836 if (next == null) { 837 // We've run out of iterators to track; retire 838 itrs = null; 839 return; 840 } 841 } 842 else 843 o.next = next; 844 } else { 845 o = p; 846 } 847 p = next; 848 } 849 850 this.sweeper = (p == null) ? null : o; 851 } 852 853 /** 854 * Adds a new iterator to the linked list of tracked iterators. 855 */ 856 void register(Itr itr) { 857 // assert lock.getHoldCount() == 1; 858 head = new Node(itr, head); 859 } 860 861 /** 862 * Called whenever takeIndex wraps around to 0. 863 * 864 * Notifies all iterators, and expunges any that are now stale. 865 */ 866 void takeIndexWrapped() { 867 // assert lock.getHoldCount() == 1; 868 cycles++; 869 for (Node o = null, p = head; p != null;) { 870 final Itr it = p.get(); 871 final Node next = p.next; 872 if (it == null || it.takeIndexWrapped()) { 873 // unlink p 874 // assert it == null || it.isDetached(); 875 p.clear(); 876 p.next = null; 877 if (o == null) 878 head = next; 879 else 880 o.next = next; 881 } else { 882 o = p; 883 } 884 p = next; 885 } 886 if (head == null) // no more iterators to track 887 itrs = null; 888 } 889 890 /** 891 * Called whenever an interior remove (not at takeIndex) occurred. 892 * 893 * Notifies all iterators, and expunges any that are now stale. 894 */ 895 void removedAt(int removedIndex) { 896 for (Node o = null, p = head; p != null;) { 897 final Itr it = p.get(); 898 final Node next = p.next; 899 if (it == null || it.removedAt(removedIndex)) { 900 // unlink p 901 // assert it == null || it.isDetached(); 902 p.clear(); 903 p.next = null; 904 if (o == null) 905 head = next; 906 else 907 o.next = next; 908 } else { 909 o = p; 910 } 911 p = next; 912 } 913 if (head == null) // no more iterators to track 914 itrs = null; 915 } 916 917 /** 918 * Called whenever the queue becomes empty. 919 * 920 * Notifies all active iterators that the queue is empty, 921 * clears all weak refs, and unlinks the itrs datastructure. 922 */ 923 void queueIsEmpty() { 924 // assert lock.getHoldCount() == 1; 925 for (Node p = head; p != null; p = p.next) { 926 Itr it = p.get(); 927 if (it != null) { 928 p.clear(); 929 it.shutdown(); 930 } 931 } 932 head = null; 933 itrs = null; 934 } 935 936 /** 937 * Called whenever an element has been dequeued (at takeIndex). 938 */ 939 void elementDequeued() { 940 // assert lock.getHoldCount() == 1; 941 if (count == 0) 942 queueIsEmpty(); 943 else if (takeIndex == 0) 944 takeIndexWrapped(); 945 } 946 } 947 948 /** 949 * Iterator for ArrayBlockingQueue. 950 * 951 * To maintain weak consistency with respect to puts and takes, we 952 * read ahead one slot, so as to not report hasNext true but then 953 * not have an element to return. 954 * 955 * We switch into "detached" mode (allowing prompt unlinking from 956 * itrs without help from the GC) when all indices are negative, or 957 * when hasNext returns false for the first time. This allows the 958 * iterator to track concurrent updates completely accurately, 959 * except for the corner case of the user calling Iterator.remove() 960 * after hasNext() returned false. Even in this case, we ensure 961 * that we don't remove the wrong element by keeping track of the 962 * expected element to remove, in lastItem. Yes, we may fail to 963 * remove lastItem from the queue if it moved due to an interleaved 964 * interior remove while in detached mode. 965 */ 966 private class Itr implements Iterator<E> { 967 /** Index to look for new nextItem; NONE at end */ 968 private int cursor; 969 970 /** Element to be returned by next call to next(); null if none */ 971 private E nextItem; 972 973 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ 974 private int nextIndex; 975 976 /** Last element returned; null if none or not detached. */ 977 private E lastItem; 978 979 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ 980 private int lastRet; 981 982 /** Previous value of takeIndex, or DETACHED when detached */ 983 private int prevTakeIndex; 984 985 /** Previous value of iters.cycles */ 986 private int prevCycles; 987 988 /** Special index value indicating "not available" or "undefined" */ 989 private static final int NONE = -1; 990 991 /** 992 * Special index value indicating "removed elsewhere", that is, 993 * removed by some operation other than a call to this.remove(). 994 */ 995 private static final int REMOVED = -2; 996 997 /** Special value for prevTakeIndex indicating "detached mode" */ 998 private static final int DETACHED = -3; 999 1000 Itr() { 1001 // assert lock.getHoldCount() == 0; 1002 lastRet = NONE; 1003 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1004 lock.lock(); 1005 try { 1006 if (count == 0) { 1007 // assert itrs == null; 1008 cursor = NONE; 1009 nextIndex = NONE; 1010 prevTakeIndex = DETACHED; 1011 } else { 1012 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1013 prevTakeIndex = takeIndex; 1014 nextItem = itemAt(nextIndex = takeIndex); 1015 cursor = incCursor(takeIndex); 1016 if (itrs == null) { 1017 itrs = new Itrs(this); 1018 } else { 1019 itrs.register(this); // in this order 1020 itrs.doSomeSweeping(false); 1021 } 1022 prevCycles = itrs.cycles; 1023 // assert takeIndex >= 0; 1024 // assert prevTakeIndex == takeIndex; 1025 // assert nextIndex >= 0; 1026 // assert nextItem != null; 1027 } 1028 } finally { 1029 lock.unlock(); 1030 } 1031 } 1032 1033 boolean isDetached() { 1034 // assert lock.getHoldCount() == 1; 1035 return prevTakeIndex < 0; 1036 } 1037 1038 private int incCursor(int index) { 1039 // assert lock.getHoldCount() == 1; 1040 if (++index == items.length) index = 0; 1041 if (index == putIndex) index = NONE; 1042 return index; 1043 } 1044 1045 /** 1046 * Returns true if index is invalidated by the given number of 1047 * dequeues, starting from prevTakeIndex. 1048 */ 1049 private boolean invalidated(int index, int prevTakeIndex, 1050 long dequeues, int length) { 1051 if (index < 0) 1052 return false; 1053 int distance = index - prevTakeIndex; 1054 if (distance < 0) 1055 distance += length; 1056 return dequeues > distance; 1057 } 1058 1059 /** 1060 * Adjusts indices to incorporate all dequeues since the last 1061 * operation on this iterator. Call only from iterating thread. 1062 */ 1063 private void incorporateDequeues() { 1064 // assert lock.getHoldCount() == 1; 1065 // assert itrs != null; 1066 // assert !isDetached(); 1067 // assert count > 0; 1068 1069 final int cycles = itrs.cycles; 1070 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1071 final int prevCycles = this.prevCycles; 1072 final int prevTakeIndex = this.prevTakeIndex; 1073 1074 if (cycles != prevCycles || takeIndex != prevTakeIndex) { 1075 final int len = items.length; 1076 // how far takeIndex has advanced since the previous 1077 // operation of this iterator 1078 long dequeues = (cycles - prevCycles) * len 1079 + (takeIndex - prevTakeIndex); 1080 1081 // Check indices for invalidation 1082 if (invalidated(lastRet, prevTakeIndex, dequeues, len)) 1083 lastRet = REMOVED; 1084 if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) 1085 nextIndex = REMOVED; 1086 if (invalidated(cursor, prevTakeIndex, dequeues, len)) 1087 cursor = takeIndex; 1088 1089 if (cursor < 0 && nextIndex < 0 && lastRet < 0) 1090 detach(); 1091 else { 1092 this.prevCycles = cycles; 1093 this.prevTakeIndex = takeIndex; 1094 } 1095 } 1096 } 1097 1098 /** 1099 * Called when itrs should stop tracking this iterator, either 1100 * because there are no more indices to update (cursor < 0 && 1101 * nextIndex < 0 && lastRet < 0) or as a special exception, when 1102 * lastRet >= 0, because hasNext() is about to return false for the 1103 * first time. Call only from iterating thread. 1104 */ 1105 private void detach() { 1106 // Switch to detached mode 1107 // assert lock.getHoldCount() == 1; 1108 // assert cursor == NONE; 1109 // assert nextIndex < 0; 1110 // assert lastRet < 0 || nextItem == null; 1111 // assert lastRet < 0 ^ lastItem != null; 1112 if (prevTakeIndex >= 0) { 1113 // assert itrs != null; 1114 prevTakeIndex = DETACHED; 1115 // try to unlink from itrs (but not too hard) 1116 itrs.doSomeSweeping(true); 1117 } 1118 } 1119 1120 /** 1121 * For performance reasons, we would like not to acquire a lock in 1122 * hasNext in the common case. To allow for this, we only access 1123 * fields (i.e. nextItem) that are not modified by update operations 1124 * triggered by queue modifications. 1125 */ 1126 public boolean hasNext() { 1127 // assert lock.getHoldCount() == 0; 1128 if (nextItem != null) 1129 return true; 1130 noNext(); 1131 return false; 1132 } 1133 1134 private void noNext() { 1135 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1136 lock.lock(); 1137 try { 1138 // assert cursor == NONE; 1139 // assert nextIndex == NONE; 1140 if (!isDetached()) { 1141 // assert lastRet >= 0; 1142 incorporateDequeues(); // might update lastRet 1143 if (lastRet >= 0) { 1144 lastItem = itemAt(lastRet); 1145 // assert lastItem != null; 1146 detach(); 1147 } 1148 } 1149 // assert isDetached(); 1150 // assert lastRet < 0 ^ lastItem != null; 1151 } finally { 1152 lock.unlock(); 1153 } 1154 } 1155 1156 public E next() { 1157 // assert lock.getHoldCount() == 0; 1158 final E x = nextItem; 1159 if (x == null) 1160 throw new NoSuchElementException(); 1161 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1162 lock.lock(); 1163 try { 1164 if (!isDetached()) 1165 incorporateDequeues(); 1166 // assert nextIndex != NONE; 1167 // assert lastItem == null; 1168 lastRet = nextIndex; 1169 final int cursor = this.cursor; 1170 if (cursor >= 0) { 1171 nextItem = itemAt(nextIndex = cursor); 1172 // assert nextItem != null; 1173 this.cursor = incCursor(cursor); 1174 } else { 1175 nextIndex = NONE; 1176 nextItem = null; 1177 } 1178 } finally { 1179 lock.unlock(); 1180 } 1181 return x; 1182 } 1183 1184 public void remove() { 1185 // assert lock.getHoldCount() == 0; 1186 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1187 lock.lock(); 1188 try { 1189 if (!isDetached()) 1190 incorporateDequeues(); // might update lastRet or detach 1191 final int lastRet = this.lastRet; 1192 this.lastRet = NONE; 1193 if (lastRet >= 0) { 1194 if (!isDetached()) 1195 removeAt(lastRet); 1196 else { 1197 final E lastItem = this.lastItem; 1198 // assert lastItem != null; 1199 this.lastItem = null; 1200 if (itemAt(lastRet) == lastItem) 1201 removeAt(lastRet); 1202 } 1203 } else if (lastRet == NONE) 1204 throw new IllegalStateException(); 1205 // else lastRet == REMOVED and the last returned element was 1206 // previously asynchronously removed via an operation other 1207 // than this.remove(), so nothing to do. 1208 1209 if (cursor < 0 && nextIndex < 0) 1210 detach(); 1211 } finally { 1212 lock.unlock(); 1213 // assert lastRet == NONE; 1214 // assert lastItem == null; 1215 } 1216 } 1217 1218 /** 1219 * Called to notify the iterator that the queue is empty, or that it 1220 * has fallen hopelessly behind, so that it should abandon any 1221 * further iteration, except possibly to return one more element 1222 * from next(), as promised by returning true from hasNext(). 1223 */ 1224 void shutdown() { 1225 // assert lock.getHoldCount() == 1; 1226 cursor = NONE; 1227 if (nextIndex >= 0) 1228 nextIndex = REMOVED; 1229 if (lastRet >= 0) { 1230 lastRet = REMOVED; 1231 lastItem = null; 1232 } 1233 prevTakeIndex = DETACHED; 1234 // Don't set nextItem to null because we must continue to be 1235 // able to return it on next(). 1236 // 1237 // Caller will unlink from itrs when convenient. 1238 } 1239 1240 private int distance(int index, int prevTakeIndex, int length) { 1241 int distance = index - prevTakeIndex; 1242 if (distance < 0) 1243 distance += length; 1244 return distance; 1245 } 1246 1247 /** 1248 * Called whenever an interior remove (not at takeIndex) occurred. 1249 * 1250 * @return true if this iterator should be unlinked from itrs 1251 */ 1252 boolean removedAt(int removedIndex) { 1253 // assert lock.getHoldCount() == 1; 1254 if (isDetached()) 1255 return true; 1256 1257 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1258 final int prevTakeIndex = this.prevTakeIndex; 1259 final int len = items.length; 1260 // distance from prevTakeIndex to removedIndex 1261 final int removedDistance = 1262 len * (itrs.cycles - this.prevCycles 1263 + ((removedIndex < takeIndex) ? 1 : 0)) 1264 + (removedIndex - prevTakeIndex); 1265 // assert itrs.cycles - this.prevCycles >= 0; 1266 // assert itrs.cycles - this.prevCycles <= 1; 1267 // assert removedDistance > 0; 1268 // assert removedIndex != takeIndex; 1269 int cursor = this.cursor; 1270 if (cursor >= 0) { 1271 int x = distance(cursor, prevTakeIndex, len); 1272 if (x == removedDistance) { 1273 if (cursor == putIndex) 1274 this.cursor = cursor = NONE; 1275 } 1276 else if (x > removedDistance) { 1277 // assert cursor != prevTakeIndex; 1278 this.cursor = cursor = dec(cursor); 1279 } 1280 } 1281 int lastRet = this.lastRet; 1282 if (lastRet >= 0) { 1283 int x = distance(lastRet, prevTakeIndex, len); 1284 if (x == removedDistance) 1285 this.lastRet = lastRet = REMOVED; 1286 else if (x > removedDistance) 1287 this.lastRet = lastRet = dec(lastRet); 1288 } 1289 int nextIndex = this.nextIndex; 1290 if (nextIndex >= 0) { 1291 int x = distance(nextIndex, prevTakeIndex, len); 1292 if (x == removedDistance) 1293 this.nextIndex = nextIndex = REMOVED; 1294 else if (x > removedDistance) 1295 this.nextIndex = nextIndex = dec(nextIndex); 1296 } 1297 if (cursor < 0 && nextIndex < 0 && lastRet < 0) { 1298 this.prevTakeIndex = DETACHED; 1299 return true; 1300 } 1301 return false; 1302 } 1303 1304 /** 1305 * Called whenever takeIndex wraps around to zero. 1306 * 1307 * @return true if this iterator should be unlinked from itrs 1308 */ 1309 boolean takeIndexWrapped() { 1310 // assert lock.getHoldCount() == 1; 1311 if (isDetached()) 1312 return true; 1313 if (itrs.cycles - prevCycles > 1) { 1314 // All the elements that existed at the time of the last 1315 // operation are gone, so abandon further iteration. 1316 shutdown(); 1317 return true; 1318 } 1319 return false; 1320 } 1321 1322// /** Uncomment for debugging. */ 1323// public String toString() { 1324// return ("cursor=" + cursor + " " + 1325// "nextIndex=" + nextIndex + " " + 1326// "lastRet=" + lastRet + " " + 1327// "nextItem=" + nextItem + " " + 1328// "lastItem=" + lastItem + " " + 1329// "prevCycles=" + prevCycles + " " + 1330// "prevTakeIndex=" + prevTakeIndex + " " + 1331// "size()=" + size() + " " + 1332// "remainingCapacity()=" + remainingCapacity()); 1333// } 1334 } 1335 1336} 1337