1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25/* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36package java.util.concurrent; 37 38import java.util.AbstractQueue; 39import java.util.Collection; 40import java.util.Iterator; 41import java.util.NoSuchElementException; 42import java.util.Spliterator; 43import java.util.Spliterators; 44import java.util.concurrent.atomic.AtomicInteger; 45import java.util.concurrent.locks.Condition; 46import java.util.concurrent.locks.ReentrantLock; 47import java.util.function.Consumer; 48 49// BEGIN android-note 50// removed link to collections framework docs 51// END android-note 52 53/** 54 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 55 * linked nodes. 56 * This queue orders elements FIFO (first-in-first-out). 57 * The <em>head</em> of the queue is that element that has been on the 58 * queue the longest time. 59 * The <em>tail</em> of the queue is that element that has been on the 60 * queue the shortest time. New elements 61 * are inserted at the tail of the queue, and the queue retrieval 62 * operations obtain elements at the head of the queue. 63 * Linked queues typically have higher throughput than array-based queues but 64 * less predictable performance in most concurrent applications. 65 * 66 * <p>The optional capacity bound constructor argument serves as a 67 * way to prevent excessive queue expansion. The capacity, if unspecified, 68 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 69 * dynamically created upon each insertion unless this would bring the 70 * queue above capacity. 71 * 72 * <p>This class and its iterator implement all of the 73 * <em>optional</em> methods of the {@link Collection} and {@link 74 * Iterator} interfaces. 75 * 76 * @since 1.5 77 * @author Doug Lea 78 * @param <E> the type of elements held in this queue 79 */ 80public class LinkedBlockingQueue<E> extends AbstractQueue<E> 81 implements BlockingQueue<E>, java.io.Serializable { 82 private static final long serialVersionUID = -6903933977591709194L; 83 84 /* 85 * A variant of the "two lock queue" algorithm. The putLock gates 86 * entry to put (and offer), and has an associated condition for 87 * waiting puts. Similarly for the takeLock. The "count" field 88 * that they both rely on is maintained as an atomic to avoid 89 * needing to get both locks in most cases. Also, to minimize need 90 * for puts to get takeLock and vice-versa, cascading notifies are 91 * used. When a put notices that it has enabled at least one take, 92 * it signals taker. That taker in turn signals others if more 93 * items have been entered since the signal. And symmetrically for 94 * takes signalling puts. Operations such as remove(Object) and 95 * iterators acquire both locks. 96 * 97 * Visibility between writers and readers is provided as follows: 98 * 99 * Whenever an element is enqueued, the putLock is acquired and 100 * count updated. A subsequent reader guarantees visibility to the 101 * enqueued Node by either acquiring the putLock (via fullyLock) 102 * or by acquiring the takeLock, and then reading n = count.get(); 103 * this gives visibility to the first n items. 104 * 105 * To implement weakly consistent iterators, it appears we need to 106 * keep all Nodes GC-reachable from a predecessor dequeued Node. 107 * That would cause two problems: 108 * - allow a rogue Iterator to cause unbounded memory retention 109 * - cause cross-generational linking of old Nodes to new Nodes if 110 * a Node was tenured while live, which generational GCs have a 111 * hard time dealing with, causing repeated major collections. 112 * However, only non-deleted Nodes need to be reachable from 113 * dequeued Nodes, and reachability does not necessarily have to 114 * be of the kind understood by the GC. We use the trick of 115 * linking a Node that has just been dequeued to itself. Such a 116 * self-link implicitly means to advance to head.next. 117 */ 118 119 /** 120 * Linked list node class. 121 */ 122 static class Node<E> { 123 E item; 124 125 /** 126 * One of: 127 * - the real successor Node 128 * - this Node, meaning the successor is head.next 129 * - null, meaning there is no successor (this is the last node) 130 */ 131 Node<E> next; 132 133 Node(E x) { item = x; } 134 } 135 136 /** The capacity bound, or Integer.MAX_VALUE if none */ 137 private final int capacity; 138 139 /** Current number of elements */ 140 private final AtomicInteger count = new AtomicInteger(); 141 142 /** 143 * Head of linked list. 144 * Invariant: head.item == null 145 */ 146 transient Node<E> head; 147 148 /** 149 * Tail of linked list. 150 * Invariant: last.next == null 151 */ 152 private transient Node<E> last; 153 154 /** Lock held by take, poll, etc */ 155 private final ReentrantLock takeLock = new ReentrantLock(); 156 157 /** Wait queue for waiting takes */ 158 private final Condition notEmpty = takeLock.newCondition(); 159 160 /** Lock held by put, offer, etc */ 161 private final ReentrantLock putLock = new ReentrantLock(); 162 163 /** Wait queue for waiting puts */ 164 private final Condition notFull = putLock.newCondition(); 165 166 /** 167 * Signals a waiting take. Called only from put/offer (which do not 168 * otherwise ordinarily lock takeLock.) 169 */ 170 private void signalNotEmpty() { 171 final ReentrantLock takeLock = this.takeLock; 172 takeLock.lock(); 173 try { 174 notEmpty.signal(); 175 } finally { 176 takeLock.unlock(); 177 } 178 } 179 180 /** 181 * Signals a waiting put. Called only from take/poll. 182 */ 183 private void signalNotFull() { 184 final ReentrantLock putLock = this.putLock; 185 putLock.lock(); 186 try { 187 notFull.signal(); 188 } finally { 189 putLock.unlock(); 190 } 191 } 192 193 /** 194 * Links node at end of queue. 195 * 196 * @param node the node 197 */ 198 private void enqueue(Node<E> node) { 199 // assert putLock.isHeldByCurrentThread(); 200 // assert last.next == null; 201 last = last.next = node; 202 } 203 204 /** 205 * Removes a node from head of queue. 206 * 207 * @return the node 208 */ 209 private E dequeue() { 210 // assert takeLock.isHeldByCurrentThread(); 211 // assert head.item == null; 212 Node<E> h = head; 213 Node<E> first = h.next; 214 h.next = h; // help GC 215 head = first; 216 E x = first.item; 217 first.item = null; 218 return x; 219 } 220 221 /** 222 * Locks to prevent both puts and takes. 223 */ 224 void fullyLock() { 225 putLock.lock(); 226 takeLock.lock(); 227 } 228 229 /** 230 * Unlocks to allow both puts and takes. 231 */ 232 void fullyUnlock() { 233 takeLock.unlock(); 234 putLock.unlock(); 235 } 236 237// /** 238// * Tells whether both locks are held by current thread. 239// */ 240// boolean isFullyLocked() { 241// return (putLock.isHeldByCurrentThread() && 242// takeLock.isHeldByCurrentThread()); 243// } 244 245 /** 246 * Creates a {@code LinkedBlockingQueue} with a capacity of 247 * {@link Integer#MAX_VALUE}. 248 */ 249 public LinkedBlockingQueue() { 250 this(Integer.MAX_VALUE); 251 } 252 253 /** 254 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. 255 * 256 * @param capacity the capacity of this queue 257 * @throws IllegalArgumentException if {@code capacity} is not greater 258 * than zero 259 */ 260 public LinkedBlockingQueue(int capacity) { 261 if (capacity <= 0) throw new IllegalArgumentException(); 262 this.capacity = capacity; 263 last = head = new Node<E>(null); 264 } 265 266 /** 267 * Creates a {@code LinkedBlockingQueue} with a capacity of 268 * {@link Integer#MAX_VALUE}, initially containing the elements of the 269 * given collection, 270 * added in traversal order of the collection's iterator. 271 * 272 * @param c the collection of elements to initially contain 273 * @throws NullPointerException if the specified collection or any 274 * of its elements are null 275 */ 276 public LinkedBlockingQueue(Collection<? extends E> c) { 277 this(Integer.MAX_VALUE); 278 final ReentrantLock putLock = this.putLock; 279 putLock.lock(); // Never contended, but necessary for visibility 280 try { 281 int n = 0; 282 for (E e : c) { 283 if (e == null) 284 throw new NullPointerException(); 285 if (n == capacity) 286 throw new IllegalStateException("Queue full"); 287 enqueue(new Node<E>(e)); 288 ++n; 289 } 290 count.set(n); 291 } finally { 292 putLock.unlock(); 293 } 294 } 295 296 // this doc comment is overridden to remove the reference to collections 297 // greater in size than Integer.MAX_VALUE 298 /** 299 * Returns the number of elements in this queue. 300 * 301 * @return the number of elements in this queue 302 */ 303 public int size() { 304 return count.get(); 305 } 306 307 // this doc comment is a modified copy of the inherited doc comment, 308 // without the reference to unlimited queues. 309 /** 310 * Returns the number of additional elements that this queue can ideally 311 * (in the absence of memory or resource constraints) accept without 312 * blocking. This is always equal to the initial capacity of this queue 313 * less the current {@code size} of this queue. 314 * 315 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 316 * an element will succeed by inspecting {@code remainingCapacity} 317 * because it may be the case that another thread is about to 318 * insert or remove an element. 319 */ 320 public int remainingCapacity() { 321 return capacity - count.get(); 322 } 323 324 /** 325 * Inserts the specified element at the tail of this queue, waiting if 326 * necessary for space to become available. 327 * 328 * @throws InterruptedException {@inheritDoc} 329 * @throws NullPointerException {@inheritDoc} 330 */ 331 public void put(E e) throws InterruptedException { 332 if (e == null) throw new NullPointerException(); 333 // Note: convention in all put/take/etc is to preset local var 334 // holding count negative to indicate failure unless set. 335 int c = -1; 336 Node<E> node = new Node<E>(e); 337 final ReentrantLock putLock = this.putLock; 338 final AtomicInteger count = this.count; 339 putLock.lockInterruptibly(); 340 try { 341 /* 342 * Note that count is used in wait guard even though it is 343 * not protected by lock. This works because count can 344 * only decrease at this point (all other puts are shut 345 * out by lock), and we (or some other waiting put) are 346 * signalled if it ever changes from capacity. Similarly 347 * for all other uses of count in other wait guards. 348 */ 349 while (count.get() == capacity) { 350 notFull.await(); 351 } 352 enqueue(node); 353 c = count.getAndIncrement(); 354 if (c + 1 < capacity) 355 notFull.signal(); 356 } finally { 357 putLock.unlock(); 358 } 359 if (c == 0) 360 signalNotEmpty(); 361 } 362 363 /** 364 * Inserts the specified element at the tail of this queue, waiting if 365 * necessary up to the specified wait time for space to become available. 366 * 367 * @return {@code true} if successful, or {@code false} if 368 * the specified waiting time elapses before space is available 369 * @throws InterruptedException {@inheritDoc} 370 * @throws NullPointerException {@inheritDoc} 371 */ 372 public boolean offer(E e, long timeout, TimeUnit unit) 373 throws InterruptedException { 374 375 if (e == null) throw new NullPointerException(); 376 long nanos = unit.toNanos(timeout); 377 int c = -1; 378 final ReentrantLock putLock = this.putLock; 379 final AtomicInteger count = this.count; 380 putLock.lockInterruptibly(); 381 try { 382 while (count.get() == capacity) { 383 if (nanos <= 0L) 384 return false; 385 nanos = notFull.awaitNanos(nanos); 386 } 387 enqueue(new Node<E>(e)); 388 c = count.getAndIncrement(); 389 if (c + 1 < capacity) 390 notFull.signal(); 391 } finally { 392 putLock.unlock(); 393 } 394 if (c == 0) 395 signalNotEmpty(); 396 return true; 397 } 398 399 /** 400 * Inserts the specified element at the tail of this queue if it is 401 * possible to do so immediately without exceeding the queue's capacity, 402 * returning {@code true} upon success and {@code false} if this queue 403 * is full. 404 * When using a capacity-restricted queue, this method is generally 405 * preferable to method {@link BlockingQueue#add add}, which can fail to 406 * insert an element only by throwing an exception. 407 * 408 * @throws NullPointerException if the specified element is null 409 */ 410 public boolean offer(E e) { 411 if (e == null) throw new NullPointerException(); 412 final AtomicInteger count = this.count; 413 if (count.get() == capacity) 414 return false; 415 int c = -1; 416 Node<E> node = new Node<E>(e); 417 final ReentrantLock putLock = this.putLock; 418 putLock.lock(); 419 try { 420 if (count.get() < capacity) { 421 enqueue(node); 422 c = count.getAndIncrement(); 423 if (c + 1 < capacity) 424 notFull.signal(); 425 } 426 } finally { 427 putLock.unlock(); 428 } 429 if (c == 0) 430 signalNotEmpty(); 431 return c >= 0; 432 } 433 434 public E take() throws InterruptedException { 435 E x; 436 int c = -1; 437 final AtomicInteger count = this.count; 438 final ReentrantLock takeLock = this.takeLock; 439 takeLock.lockInterruptibly(); 440 try { 441 while (count.get() == 0) { 442 notEmpty.await(); 443 } 444 x = dequeue(); 445 c = count.getAndDecrement(); 446 if (c > 1) 447 notEmpty.signal(); 448 } finally { 449 takeLock.unlock(); 450 } 451 if (c == capacity) 452 signalNotFull(); 453 return x; 454 } 455 456 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 457 E x = null; 458 int c = -1; 459 long nanos = unit.toNanos(timeout); 460 final AtomicInteger count = this.count; 461 final ReentrantLock takeLock = this.takeLock; 462 takeLock.lockInterruptibly(); 463 try { 464 while (count.get() == 0) { 465 if (nanos <= 0L) 466 return null; 467 nanos = notEmpty.awaitNanos(nanos); 468 } 469 x = dequeue(); 470 c = count.getAndDecrement(); 471 if (c > 1) 472 notEmpty.signal(); 473 } finally { 474 takeLock.unlock(); 475 } 476 if (c == capacity) 477 signalNotFull(); 478 return x; 479 } 480 481 public E poll() { 482 final AtomicInteger count = this.count; 483 if (count.get() == 0) 484 return null; 485 E x = null; 486 int c = -1; 487 final ReentrantLock takeLock = this.takeLock; 488 takeLock.lock(); 489 try { 490 if (count.get() > 0) { 491 x = dequeue(); 492 c = count.getAndDecrement(); 493 if (c > 1) 494 notEmpty.signal(); 495 } 496 } finally { 497 takeLock.unlock(); 498 } 499 if (c == capacity) 500 signalNotFull(); 501 return x; 502 } 503 504 public E peek() { 505 if (count.get() == 0) 506 return null; 507 final ReentrantLock takeLock = this.takeLock; 508 takeLock.lock(); 509 try { 510 return (count.get() > 0) ? head.next.item : null; 511 } finally { 512 takeLock.unlock(); 513 } 514 } 515 516 /** 517 * Unlinks interior Node p with predecessor trail. 518 */ 519 void unlink(Node<E> p, Node<E> trail) { 520 // assert isFullyLocked(); 521 // p.next is not changed, to allow iterators that are 522 // traversing p to maintain their weak-consistency guarantee. 523 p.item = null; 524 trail.next = p.next; 525 if (last == p) 526 last = trail; 527 if (count.getAndDecrement() == capacity) 528 notFull.signal(); 529 } 530 531 /** 532 * Removes a single instance of the specified element from this queue, 533 * if it is present. More formally, removes an element {@code e} such 534 * that {@code o.equals(e)}, if this queue contains one or more such 535 * elements. 536 * Returns {@code true} if this queue contained the specified element 537 * (or equivalently, if this queue changed as a result of the call). 538 * 539 * @param o element to be removed from this queue, if present 540 * @return {@code true} if this queue changed as a result of the call 541 */ 542 public boolean remove(Object o) { 543 if (o == null) return false; 544 fullyLock(); 545 try { 546 for (Node<E> trail = head, p = trail.next; 547 p != null; 548 trail = p, p = p.next) { 549 if (o.equals(p.item)) { 550 unlink(p, trail); 551 return true; 552 } 553 } 554 return false; 555 } finally { 556 fullyUnlock(); 557 } 558 } 559 560 /** 561 * Returns {@code true} if this queue contains the specified element. 562 * More formally, returns {@code true} if and only if this queue contains 563 * at least one element {@code e} such that {@code o.equals(e)}. 564 * 565 * @param o object to be checked for containment in this queue 566 * @return {@code true} if this queue contains the specified element 567 */ 568 public boolean contains(Object o) { 569 if (o == null) return false; 570 fullyLock(); 571 try { 572 for (Node<E> p = head.next; p != null; p = p.next) 573 if (o.equals(p.item)) 574 return true; 575 return false; 576 } finally { 577 fullyUnlock(); 578 } 579 } 580 581 /** 582 * Returns an array containing all of the elements in this queue, in 583 * proper sequence. 584 * 585 * <p>The returned array will be "safe" in that no references to it are 586 * maintained by this queue. (In other words, this method must allocate 587 * a new array). The caller is thus free to modify the returned array. 588 * 589 * <p>This method acts as bridge between array-based and collection-based 590 * APIs. 591 * 592 * @return an array containing all of the elements in this queue 593 */ 594 public Object[] toArray() { 595 fullyLock(); 596 try { 597 int size = count.get(); 598 Object[] a = new Object[size]; 599 int k = 0; 600 for (Node<E> p = head.next; p != null; p = p.next) 601 a[k++] = p.item; 602 return a; 603 } finally { 604 fullyUnlock(); 605 } 606 } 607 608 /** 609 * Returns an array containing all of the elements in this queue, in 610 * proper sequence; the runtime type of the returned array is that of 611 * the specified array. If the queue fits in the specified array, it 612 * is returned therein. Otherwise, a new array is allocated with the 613 * runtime type of the specified array and the size of this queue. 614 * 615 * <p>If this queue fits in the specified array with room to spare 616 * (i.e., the array has more elements than this queue), the element in 617 * the array immediately following the end of the queue is set to 618 * {@code null}. 619 * 620 * <p>Like the {@link #toArray()} method, this method acts as bridge between 621 * array-based and collection-based APIs. Further, this method allows 622 * precise control over the runtime type of the output array, and may, 623 * under certain circumstances, be used to save allocation costs. 624 * 625 * <p>Suppose {@code x} is a queue known to contain only strings. 626 * The following code can be used to dump the queue into a newly 627 * allocated array of {@code String}: 628 * 629 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 630 * 631 * Note that {@code toArray(new Object[0])} is identical in function to 632 * {@code toArray()}. 633 * 634 * @param a the array into which the elements of the queue are to 635 * be stored, if it is big enough; otherwise, a new array of the 636 * same runtime type is allocated for this purpose 637 * @return an array containing all of the elements in this queue 638 * @throws ArrayStoreException if the runtime type of the specified array 639 * is not a supertype of the runtime type of every element in 640 * this queue 641 * @throws NullPointerException if the specified array is null 642 */ 643 @SuppressWarnings("unchecked") 644 public <T> T[] toArray(T[] a) { 645 fullyLock(); 646 try { 647 int size = count.get(); 648 if (a.length < size) 649 a = (T[])java.lang.reflect.Array.newInstance 650 (a.getClass().getComponentType(), size); 651 652 int k = 0; 653 for (Node<E> p = head.next; p != null; p = p.next) 654 a[k++] = (T)p.item; 655 if (a.length > k) 656 a[k] = null; 657 return a; 658 } finally { 659 fullyUnlock(); 660 } 661 } 662 663 public String toString() { 664 return Helpers.collectionToString(this); 665 } 666 667 /** 668 * Atomically removes all of the elements from this queue. 669 * The queue will be empty after this call returns. 670 */ 671 public void clear() { 672 fullyLock(); 673 try { 674 for (Node<E> p, h = head; (p = h.next) != null; h = p) { 675 h.next = h; 676 p.item = null; 677 } 678 head = last; 679 // assert head.item == null && head.next == null; 680 if (count.getAndSet(0) == capacity) 681 notFull.signal(); 682 } finally { 683 fullyUnlock(); 684 } 685 } 686 687 /** 688 * @throws UnsupportedOperationException {@inheritDoc} 689 * @throws ClassCastException {@inheritDoc} 690 * @throws NullPointerException {@inheritDoc} 691 * @throws IllegalArgumentException {@inheritDoc} 692 */ 693 public int drainTo(Collection<? super E> c) { 694 return drainTo(c, Integer.MAX_VALUE); 695 } 696 697 /** 698 * @throws UnsupportedOperationException {@inheritDoc} 699 * @throws ClassCastException {@inheritDoc} 700 * @throws NullPointerException {@inheritDoc} 701 * @throws IllegalArgumentException {@inheritDoc} 702 */ 703 public int drainTo(Collection<? super E> c, int maxElements) { 704 if (c == null) 705 throw new NullPointerException(); 706 if (c == this) 707 throw new IllegalArgumentException(); 708 if (maxElements <= 0) 709 return 0; 710 boolean signalNotFull = false; 711 final ReentrantLock takeLock = this.takeLock; 712 takeLock.lock(); 713 try { 714 int n = Math.min(maxElements, count.get()); 715 // count.get provides visibility to first n Nodes 716 Node<E> h = head; 717 int i = 0; 718 try { 719 while (i < n) { 720 Node<E> p = h.next; 721 c.add(p.item); 722 p.item = null; 723 h.next = h; 724 h = p; 725 ++i; 726 } 727 return n; 728 } finally { 729 // Restore invariants even if c.add() threw 730 if (i > 0) { 731 // assert h.item == null; 732 head = h; 733 signalNotFull = (count.getAndAdd(-i) == capacity); 734 } 735 } 736 } finally { 737 takeLock.unlock(); 738 if (signalNotFull) 739 signalNotFull(); 740 } 741 } 742 743 /** 744 * Returns an iterator over the elements in this queue in proper sequence. 745 * The elements will be returned in order from first (head) to last (tail). 746 * 747 * <p>The returned iterator is 748 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 749 * 750 * @return an iterator over the elements in this queue in proper sequence 751 */ 752 public Iterator<E> iterator() { 753 return new Itr(); 754 } 755 756 private class Itr implements Iterator<E> { 757 /* 758 * Basic weakly-consistent iterator. At all times hold the next 759 * item to hand out so that if hasNext() reports true, we will 760 * still have it to return even if lost race with a take etc. 761 */ 762 763 private Node<E> current; 764 private Node<E> lastRet; 765 private E currentElement; 766 767 Itr() { 768 fullyLock(); 769 try { 770 current = head.next; 771 if (current != null) 772 currentElement = current.item; 773 } finally { 774 fullyUnlock(); 775 } 776 } 777 778 public boolean hasNext() { 779 return current != null; 780 } 781 782 public E next() { 783 fullyLock(); 784 try { 785 if (current == null) 786 throw new NoSuchElementException(); 787 lastRet = current; 788 E item = null; 789 // Unlike other traversal methods, iterators must handle both: 790 // - dequeued nodes (p.next == p) 791 // - (possibly multiple) interior removed nodes (p.item == null) 792 for (Node<E> p = current, q;; p = q) { 793 if ((q = p.next) == p) 794 q = head.next; 795 if (q == null || (item = q.item) != null) { 796 current = q; 797 E x = currentElement; 798 currentElement = item; 799 return x; 800 } 801 } 802 } finally { 803 fullyUnlock(); 804 } 805 } 806 807 public void remove() { 808 if (lastRet == null) 809 throw new IllegalStateException(); 810 fullyLock(); 811 try { 812 Node<E> node = lastRet; 813 lastRet = null; 814 for (Node<E> trail = head, p = trail.next; 815 p != null; 816 trail = p, p = p.next) { 817 if (p == node) { 818 unlink(p, trail); 819 break; 820 } 821 } 822 } finally { 823 fullyUnlock(); 824 } 825 } 826 } 827 828 /** A customized variant of Spliterators.IteratorSpliterator */ 829 static final class LBQSpliterator<E> implements Spliterator<E> { 830 static final int MAX_BATCH = 1 << 25; // max batch array size; 831 final LinkedBlockingQueue<E> queue; 832 Node<E> current; // current node; null until initialized 833 int batch; // batch size for splits 834 boolean exhausted; // true when no more nodes 835 long est; // size estimate 836 LBQSpliterator(LinkedBlockingQueue<E> queue) { 837 this.queue = queue; 838 this.est = queue.size(); 839 } 840 841 public long estimateSize() { return est; } 842 843 public Spliterator<E> trySplit() { 844 Node<E> h; 845 final LinkedBlockingQueue<E> q = this.queue; 846 int b = batch; 847 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; 848 if (!exhausted && 849 ((h = current) != null || (h = q.head.next) != null) && 850 h.next != null) { 851 Object[] a = new Object[n]; 852 int i = 0; 853 Node<E> p = current; 854 q.fullyLock(); 855 try { 856 if (p != null || (p = q.head.next) != null) { 857 do { 858 if ((a[i] = p.item) != null) 859 ++i; 860 } while ((p = p.next) != null && i < n); 861 } 862 } finally { 863 q.fullyUnlock(); 864 } 865 if ((current = p) == null) { 866 est = 0L; 867 exhausted = true; 868 } 869 else if ((est -= i) < 0L) 870 est = 0L; 871 if (i > 0) { 872 batch = i; 873 return Spliterators.spliterator 874 (a, 0, i, (Spliterator.ORDERED | 875 Spliterator.NONNULL | 876 Spliterator.CONCURRENT)); 877 } 878 } 879 return null; 880 } 881 882 public void forEachRemaining(Consumer<? super E> action) { 883 if (action == null) throw new NullPointerException(); 884 final LinkedBlockingQueue<E> q = this.queue; 885 if (!exhausted) { 886 exhausted = true; 887 Node<E> p = current; 888 do { 889 E e = null; 890 q.fullyLock(); 891 try { 892 if (p == null) 893 p = q.head.next; 894 while (p != null) { 895 e = p.item; 896 p = p.next; 897 if (e != null) 898 break; 899 } 900 } finally { 901 q.fullyUnlock(); 902 } 903 if (e != null) 904 action.accept(e); 905 } while (p != null); 906 } 907 } 908 909 public boolean tryAdvance(Consumer<? super E> action) { 910 if (action == null) throw new NullPointerException(); 911 final LinkedBlockingQueue<E> q = this.queue; 912 if (!exhausted) { 913 E e = null; 914 q.fullyLock(); 915 try { 916 if (current == null) 917 current = q.head.next; 918 while (current != null) { 919 e = current.item; 920 current = current.next; 921 if (e != null) 922 break; 923 } 924 } finally { 925 q.fullyUnlock(); 926 } 927 if (current == null) 928 exhausted = true; 929 if (e != null) { 930 action.accept(e); 931 return true; 932 } 933 } 934 return false; 935 } 936 937 public int characteristics() { 938 return Spliterator.ORDERED | Spliterator.NONNULL | 939 Spliterator.CONCURRENT; 940 } 941 } 942 943 /** 944 * Returns a {@link Spliterator} over the elements in this queue. 945 * 946 * <p>The returned spliterator is 947 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 948 * 949 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 950 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 951 * 952 * @implNote 953 * The {@code Spliterator} implements {@code trySplit} to permit limited 954 * parallelism. 955 * 956 * @return a {@code Spliterator} over the elements in this queue 957 * @since 1.8 958 */ 959 public Spliterator<E> spliterator() { 960 return new LBQSpliterator<E>(this); 961 } 962 963 /** 964 * Saves this queue to a stream (that is, serializes it). 965 * 966 * @param s the stream 967 * @throws java.io.IOException if an I/O error occurs 968 * @serialData The capacity is emitted (int), followed by all of 969 * its elements (each an {@code Object}) in the proper order, 970 * followed by a null 971 */ 972 private void writeObject(java.io.ObjectOutputStream s) 973 throws java.io.IOException { 974 975 fullyLock(); 976 try { 977 // Write out any hidden stuff, plus capacity 978 s.defaultWriteObject(); 979 980 // Write out all elements in the proper order. 981 for (Node<E> p = head.next; p != null; p = p.next) 982 s.writeObject(p.item); 983 984 // Use trailing null as sentinel 985 s.writeObject(null); 986 } finally { 987 fullyUnlock(); 988 } 989 } 990 991 /** 992 * Reconstitutes this queue from a stream (that is, deserializes it). 993 * @param s the stream 994 * @throws ClassNotFoundException if the class of a serialized object 995 * could not be found 996 * @throws java.io.IOException if an I/O error occurs 997 */ 998 private void readObject(java.io.ObjectInputStream s) 999 throws java.io.IOException, ClassNotFoundException { 1000 // Read in capacity, and any hidden stuff 1001 s.defaultReadObject(); 1002 1003 count.set(0); 1004 last = head = new Node<E>(null); 1005 1006 // Read in all elements and place in queue 1007 for (;;) { 1008 @SuppressWarnings("unchecked") 1009 E item = (E)s.readObject(); 1010 if (item == null) 1011 break; 1012 add(item); 1013 } 1014 } 1015} 1016