SynchronousQueue.java revision f6c387128427e121477c1b32ad35cdcaa5101ba3
1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/licenses/publicdomain 5 */ 6 7package java.util.concurrent; 8import java.util.concurrent.locks.*; 9import java.util.*; 10 11// BEGIN android-note 12// removed link to collections framework docs 13// END android-note 14 15/** 16 * A {@linkplain BlockingQueue blocking queue} in which each 17 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A 18 * synchronous queue does not have any internal capacity, not even a 19 * capacity of one. You cannot <tt>peek</tt> at a synchronous queue 20 * because an element is only present when you try to take it; you 21 * cannot add an element (using any method) unless another thread is 22 * trying to remove it; you cannot iterate as there is nothing to 23 * iterate. The <em>head</em> of the queue is the element that the 24 * first queued thread is trying to add to the queue; if there are no 25 * queued threads then no element is being added and the head is 26 * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods 27 * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts 28 * as an empty collection. This queue does not permit <tt>null</tt> 29 * elements. 30 * 31 * <p>Synchronous queues are similar to rendezvous channels used in 32 * CSP and Ada. They are well suited for handoff designs, in which an 33 * object running in one thread must sync up with an object running 34 * in another thread in order to hand it some information, event, or 35 * task. 36 * 37 * <p> This class supports an optional fairness policy for ordering 38 * waiting producer and consumer threads. By default, this ordering 39 * is not guaranteed. However, a queue constructed with fairness set 40 * to <tt>true</tt> grants threads access in FIFO order. Fairness 41 * generally decreases throughput but reduces variability and avoids 42 * starvation. 43 * 44 * <p>This class implements all of the <em>optional</em> methods 45 * of the {@link Collection} and {@link Iterator} interfaces. 46 * 47 * @since 1.5 48 * @author Doug Lea 49 * @param <E> the type of elements held in this collection 50 */ 51public class SynchronousQueue<E> extends AbstractQueue<E> 52 implements BlockingQueue<E>, java.io.Serializable { 53 private static final long serialVersionUID = -3223113410248163686L; 54 55 /* 56 This implementation divides actions into two cases for puts: 57 58 * An arriving producer that does not already have a waiting consumer 59 creates a node holding item, and then waits for a consumer to take it. 60 * An arriving producer that does already have a waiting consumer fills 61 the slot node created by the consumer, and notifies it to continue. 62 63 And symmetrically, two for takes: 64 65 * An arriving consumer that does not already have a waiting producer 66 creates an empty slot node, and then waits for a producer to fill it. 67 * An arriving consumer that does already have a waiting producer takes 68 item from the node created by the producer, and notifies it to continue. 69 70 When a put or take waiting for the actions of its counterpart 71 aborts due to interruption or timeout, it marks the node 72 it created as "CANCELLED", which causes its counterpart to retry 73 the entire put or take sequence. 74 75 This requires keeping two simple queues, waitingProducers and 76 waitingConsumers. Each of these can be FIFO (preserves fairness) 77 or LIFO (improves throughput). 78 */ 79 80 /** Lock protecting both wait queues */ 81 private final ReentrantLock qlock; 82 /** Queue holding waiting puts */ 83 private final WaitQueue waitingProducers; 84 /** Queue holding waiting takes */ 85 private final WaitQueue waitingConsumers; 86 87 /** 88 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. 89 */ 90 public SynchronousQueue() { 91 this(false); 92 } 93 94 /** 95 * Creates a <tt>SynchronousQueue</tt> with specified fairness policy. 96 * @param fair if true, threads contend in FIFO order for access; 97 * otherwise the order is unspecified. 98 */ 99 public SynchronousQueue(boolean fair) { 100 if (fair) { 101 qlock = new ReentrantLock(true); 102 waitingProducers = new FifoWaitQueue(); 103 waitingConsumers = new FifoWaitQueue(); 104 } 105 else { 106 qlock = new ReentrantLock(); 107 waitingProducers = new LifoWaitQueue(); 108 waitingConsumers = new LifoWaitQueue(); 109 } 110 } 111 112 /** 113 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below. 114 * These queues have all transient fields, but are serializable 115 * in order to recover fairness settings when deserialized. 116 */ 117 static abstract class WaitQueue implements java.io.Serializable { 118 /** Create, add, and return node for x */ 119 abstract Node enq(Object x); 120 /** Remove and return node, or null if empty */ 121 abstract Node deq(); 122 } 123 124 /** 125 * FIFO queue to hold waiting puts/takes. 126 */ 127 static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable { 128 private static final long serialVersionUID = -3623113410248163686L; 129 private transient Node head; 130 private transient Node last; 131 132 Node enq(Object x) { 133 Node p = new Node(x); 134 if (last == null) 135 last = head = p; 136 else 137 last = last.next = p; 138 return p; 139 } 140 141 Node deq() { 142 Node p = head; 143 if (p != null) { 144 if ((head = p.next) == null) 145 last = null; 146 p.next = null; 147 } 148 return p; 149 } 150 } 151 152 /** 153 * LIFO queue to hold waiting puts/takes. 154 */ 155 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable { 156 private static final long serialVersionUID = -3633113410248163686L; 157 private transient Node head; 158 159 Node enq(Object x) { 160 return head = new Node(x, head); 161 } 162 163 Node deq() { 164 Node p = head; 165 if (p != null) { 166 head = p.next; 167 p.next = null; 168 } 169 return p; 170 } 171 } 172 173 /** 174 * Nodes each maintain an item and handle waits and signals for 175 * getting and setting it. The class extends 176 * AbstractQueuedSynchronizer to manage blocking, using AQS state 177 * 0 for waiting, 1 for ack, -1 for cancelled. 178 */ 179 static final class Node extends AbstractQueuedSynchronizer { 180 /** Synchronization state value representing that node acked */ 181 private static final int ACK = 1; 182 /** Synchronization state value representing that node cancelled */ 183 private static final int CANCEL = -1; 184 185 /** The item being transferred */ 186 Object item; 187 /** Next node in wait queue */ 188 Node next; 189 190 /** Creates a node with initial item */ 191 Node(Object x) { item = x; } 192 193 /** Creates a node with initial item and next */ 194 Node(Object x, Node n) { item = x; next = n; } 195 196 /** 197 * Implements AQS base acquire to succeed if not in WAITING state 198 */ 199 protected boolean tryAcquire(int ignore) { 200 return getState() != 0; 201 } 202 203 /** 204 * Implements AQS base release to signal if state changed 205 */ 206 protected boolean tryRelease(int newState) { 207 return compareAndSetState(0, newState); 208 } 209 210 /** 211 * Takes item and nulls out field (for sake of GC) 212 */ 213 private Object extract() { 214 Object x = item; 215 item = null; 216 return x; 217 } 218 219 /** 220 * Tries to cancel on interrupt; if so rethrowing, 221 * else setting interrupt state 222 */ 223 private void checkCancellationOnInterrupt(InterruptedException ie) 224 throws InterruptedException { 225 if (release(CANCEL)) 226 throw ie; 227 Thread.currentThread().interrupt(); 228 } 229 230 /** 231 * Fills in the slot created by the consumer and signal consumer to 232 * continue. 233 */ 234 boolean setItem(Object x) { 235 item = x; // can place in slot even if cancelled 236 return release(ACK); 237 } 238 239 /** 240 * Removes item from slot created by producer and signal producer 241 * to continue. 242 */ 243 Object getItem() { 244 return (release(ACK))? extract() : null; 245 } 246 247 /** 248 * Waits for a consumer to take item placed by producer. 249 */ 250 void waitForTake() throws InterruptedException { 251 try { 252 acquireInterruptibly(0); 253 } catch (InterruptedException ie) { 254 checkCancellationOnInterrupt(ie); 255 } 256 } 257 258 /** 259 * Waits for a producer to put item placed by consumer. 260 */ 261 Object waitForPut() throws InterruptedException { 262 try { 263 acquireInterruptibly(0); 264 } catch (InterruptedException ie) { 265 checkCancellationOnInterrupt(ie); 266 } 267 return extract(); 268 } 269 270 /** 271 * Waits for a consumer to take item placed by producer or time out. 272 */ 273 boolean waitForTake(long nanos) throws InterruptedException { 274 try { 275 if (!tryAcquireNanos(0, nanos) && 276 release(CANCEL)) 277 return false; 278 } catch (InterruptedException ie) { 279 checkCancellationOnInterrupt(ie); 280 } 281 return true; 282 } 283 284 /** 285 * Waits for a producer to put item placed by consumer, or time out. 286 */ 287 Object waitForPut(long nanos) throws InterruptedException { 288 try { 289 if (!tryAcquireNanos(0, nanos) && 290 release(CANCEL)) 291 return null; 292 } catch (InterruptedException ie) { 293 checkCancellationOnInterrupt(ie); 294 } 295 return extract(); 296 } 297 } 298 299 /** 300 * Adds the specified element to this queue, waiting if necessary for 301 * another thread to receive it. 302 * @param o the element to add 303 * @throws InterruptedException if interrupted while waiting. 304 * @throws NullPointerException if the specified element is <tt>null</tt>. 305 */ 306 public void put(E o) throws InterruptedException { 307 if (o == null) throw new NullPointerException(); 308 final ReentrantLock qlock = this.qlock; 309 310 for (;;) { 311 Node node; 312 boolean mustWait; 313 if (Thread.interrupted()) throw new InterruptedException(); 314 qlock.lock(); 315 try { 316 node = waitingConsumers.deq(); 317 if ( (mustWait = (node == null)) ) 318 node = waitingProducers.enq(o); 319 } finally { 320 qlock.unlock(); 321 } 322 323 if (mustWait) { 324 node.waitForTake(); 325 return; 326 } 327 328 else if (node.setItem(o)) 329 return; 330 331 // else consumer cancelled, so retry 332 } 333 } 334 335 /** 336 * Inserts the specified element into this queue, waiting if necessary 337 * up to the specified wait time for another thread to receive it. 338 * @param o the element to add 339 * @param timeout how long to wait before giving up, in units of 340 * <tt>unit</tt> 341 * @param unit a <tt>TimeUnit</tt> determining how to interpret the 342 * <tt>timeout</tt> parameter 343 * @return <tt>true</tt> if successful, or <tt>false</tt> if 344 * the specified waiting time elapses before a consumer appears. 345 * @throws InterruptedException if interrupted while waiting. 346 * @throws NullPointerException if the specified element is <tt>null</tt>. 347 */ 348 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { 349 if (o == null) throw new NullPointerException(); 350 long nanos = unit.toNanos(timeout); 351 final ReentrantLock qlock = this.qlock; 352 for (;;) { 353 Node node; 354 boolean mustWait; 355 if (Thread.interrupted()) throw new InterruptedException(); 356 qlock.lock(); 357 try { 358 node = waitingConsumers.deq(); 359 if ( (mustWait = (node == null)) ) 360 node = waitingProducers.enq(o); 361 } finally { 362 qlock.unlock(); 363 } 364 365 if (mustWait) 366 return node.waitForTake(nanos); 367 368 else if (node.setItem(o)) 369 return true; 370 371 // else consumer cancelled, so retry 372 } 373 } 374 375 /** 376 * Retrieves and removes the head of this queue, waiting if necessary 377 * for another thread to insert it. 378 * @throws InterruptedException if interrupted while waiting. 379 * @return the head of this queue 380 */ 381 public E take() throws InterruptedException { 382 final ReentrantLock qlock = this.qlock; 383 for (;;) { 384 Node node; 385 boolean mustWait; 386 387 if (Thread.interrupted()) throw new InterruptedException(); 388 qlock.lock(); 389 try { 390 node = waitingProducers.deq(); 391 if ( (mustWait = (node == null)) ) 392 node = waitingConsumers.enq(null); 393 } finally { 394 qlock.unlock(); 395 } 396 397 if (mustWait) { 398 Object x = node.waitForPut(); 399 return (E)x; 400 } 401 else { 402 Object x = node.getItem(); 403 if (x != null) 404 return (E)x; 405 // else cancelled, so retry 406 } 407 } 408 } 409 410 /** 411 * Retrieves and removes the head of this queue, waiting 412 * if necessary up to the specified wait time, for another thread 413 * to insert it. 414 * @param timeout how long to wait before giving up, in units of 415 * <tt>unit</tt> 416 * @param unit a <tt>TimeUnit</tt> determining how to interpret the 417 * <tt>timeout</tt> parameter 418 * @return the head of this queue, or <tt>null</tt> if the 419 * specified waiting time elapses before an element is present. 420 * @throws InterruptedException if interrupted while waiting. 421 */ 422 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 423 long nanos = unit.toNanos(timeout); 424 final ReentrantLock qlock = this.qlock; 425 426 for (;;) { 427 Node node; 428 boolean mustWait; 429 430 if (Thread.interrupted()) throw new InterruptedException(); 431 qlock.lock(); 432 try { 433 node = waitingProducers.deq(); 434 if ( (mustWait = (node == null)) ) 435 node = waitingConsumers.enq(null); 436 } finally { 437 qlock.unlock(); 438 } 439 440 if (mustWait) { 441 Object x = node.waitForPut(nanos); 442 return (E)x; 443 } 444 else { 445 Object x = node.getItem(); 446 if (x != null) 447 return (E)x; 448 // else cancelled, so retry 449 } 450 } 451 } 452 453 // Untimed nonblocking versions 454 455 /** 456 * Inserts the specified element into this queue, if another thread is 457 * waiting to receive it. 458 * 459 * @param o the element to add. 460 * @return <tt>true</tt> if it was possible to add the element to 461 * this queue, else <tt>false</tt> 462 * @throws NullPointerException if the specified element is <tt>null</tt> 463 */ 464 public boolean offer(E o) { 465 if (o == null) throw new NullPointerException(); 466 final ReentrantLock qlock = this.qlock; 467 468 for (;;) { 469 Node node; 470 qlock.lock(); 471 try { 472 node = waitingConsumers.deq(); 473 } finally { 474 qlock.unlock(); 475 } 476 if (node == null) 477 return false; 478 479 else if (node.setItem(o)) 480 return true; 481 // else retry 482 } 483 } 484 485 /** 486 * Retrieves and removes the head of this queue, if another thread 487 * is currently making an element available. 488 * 489 * @return the head of this queue, or <tt>null</tt> if no 490 * element is available. 491 */ 492 public E poll() { 493 final ReentrantLock qlock = this.qlock; 494 for (;;) { 495 Node node; 496 qlock.lock(); 497 try { 498 node = waitingProducers.deq(); 499 } finally { 500 qlock.unlock(); 501 } 502 if (node == null) 503 return null; 504 505 else { 506 Object x = node.getItem(); 507 if (x != null) 508 return (E)x; 509 // else retry 510 } 511 } 512 } 513 514 /** 515 * Always returns <tt>true</tt>. 516 * A <tt>SynchronousQueue</tt> has no internal capacity. 517 * @return <tt>true</tt> 518 */ 519 public boolean isEmpty() { 520 return true; 521 } 522 523 /** 524 * Always returns zero. 525 * A <tt>SynchronousQueue</tt> has no internal capacity. 526 * @return zero. 527 */ 528 public int size() { 529 return 0; 530 } 531 532 /** 533 * Always returns zero. 534 * A <tt>SynchronousQueue</tt> has no internal capacity. 535 * @return zero. 536 */ 537 public int remainingCapacity() { 538 return 0; 539 } 540 541 /** 542 * Does nothing. 543 * A <tt>SynchronousQueue</tt> has no internal capacity. 544 */ 545 public void clear() {} 546 547 /** 548 * Always returns <tt>false</tt>. 549 * A <tt>SynchronousQueue</tt> has no internal capacity. 550 * @param o the element 551 * @return <tt>false</tt> 552 */ 553 public boolean contains(Object o) { 554 return false; 555 } 556 557 /** 558 * Always returns <tt>false</tt>. 559 * A <tt>SynchronousQueue</tt> has no internal capacity. 560 * 561 * @param o the element to remove 562 * @return <tt>false</tt> 563 */ 564 public boolean remove(Object o) { 565 return false; 566 } 567 568 /** 569 * Returns <tt>false</tt> unless given collection is empty. 570 * A <tt>SynchronousQueue</tt> has no internal capacity. 571 * @param c the collection 572 * @return <tt>false</tt> unless given collection is empty 573 */ 574 public boolean containsAll(Collection<?> c) { 575 return c.isEmpty(); 576 } 577 578 /** 579 * Always returns <tt>false</tt>. 580 * A <tt>SynchronousQueue</tt> has no internal capacity. 581 * @param c the collection 582 * @return <tt>false</tt> 583 */ 584 public boolean removeAll(Collection<?> c) { 585 return false; 586 } 587 588 /** 589 * Always returns <tt>false</tt>. 590 * A <tt>SynchronousQueue</tt> has no internal capacity. 591 * @param c the collection 592 * @return <tt>false</tt> 593 */ 594 public boolean retainAll(Collection<?> c) { 595 return false; 596 } 597 598 /** 599 * Always returns <tt>null</tt>. 600 * A <tt>SynchronousQueue</tt> does not return elements 601 * unless actively waited on. 602 * @return <tt>null</tt> 603 */ 604 public E peek() { 605 return null; 606 } 607 608 609 static class EmptyIterator<E> implements Iterator<E> { 610 public boolean hasNext() { 611 return false; 612 } 613 public E next() { 614 throw new NoSuchElementException(); 615 } 616 public void remove() { 617 throw new IllegalStateException(); 618 } 619 } 620 621 /** 622 * Returns an empty iterator in which <tt>hasNext</tt> always returns 623 * <tt>false</tt>. 624 * 625 * @return an empty iterator 626 */ 627 public Iterator<E> iterator() { 628 return new EmptyIterator<E>(); 629 } 630 631 632 /** 633 * Returns a zero-length array. 634 * @return a zero-length array 635 */ 636 public Object[] toArray() { 637 return new Object[0]; 638 } 639 640 /** 641 * Sets the zeroeth element of the specified array to <tt>null</tt> 642 * (if the array has non-zero length) and returns it. 643 * @param a the array 644 * @return the specified array 645 */ 646 public <T> T[] toArray(T[] a) { 647 if (a.length > 0) 648 a[0] = null; 649 return a; 650 } 651 652 653 public int drainTo(Collection<? super E> c) { 654 if (c == null) 655 throw new NullPointerException(); 656 if (c == this) 657 throw new IllegalArgumentException(); 658 int n = 0; 659 E e; 660 while ( (e = poll()) != null) { 661 c.add(e); 662 ++n; 663 } 664 return n; 665 } 666 667 public int drainTo(Collection<? super E> c, int maxElements) { 668 if (c == null) 669 throw new NullPointerException(); 670 if (c == this) 671 throw new IllegalArgumentException(); 672 int n = 0; 673 E e; 674 while (n < maxElements && (e = poll()) != null) { 675 c.add(e); 676 ++n; 677 } 678 return n; 679 } 680} 681 682 683 684 685 686