1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent.locks; 8import java.util.*; 9import java.util.concurrent.*; 10import sun.misc.Unsafe; 11 12/** 13 * A version of {@link AbstractQueuedSynchronizer} in 14 * which synchronization state is maintained as a <tt>long</tt>. 15 * This class has exactly the same structure, properties, and methods 16 * as <tt>AbstractQueuedSynchronizer</tt> with the exception 17 * that all state-related parameters and results are defined 18 * as <tt>long</tt> rather than <tt>int</tt>. This class 19 * may be useful when creating synchronizers such as 20 * multilevel locks and barriers that require 21 * 64 bits of state. 22 * 23 * <p>See {@link AbstractQueuedSynchronizer} for usage 24 * notes and examples. 25 * 26 * @since 1.6 27 * @author Doug Lea 28 */ 29public abstract class AbstractQueuedLongSynchronizer 30 extends AbstractOwnableSynchronizer 31 implements java.io.Serializable { 32 33 private static final long serialVersionUID = 7373984972572414692L; 34 35 /* 36 To keep sources in sync, the remainder of this source file is 37 exactly cloned from AbstractQueuedSynchronizer, replacing class 38 name and changing ints related with sync state to longs. Please 39 keep it that way. 40 */ 41 42 /** 43 * Creates a new <tt>AbstractQueuedLongSynchronizer</tt> instance 44 * with initial synchronization state of zero. 45 */ 46 protected AbstractQueuedLongSynchronizer() { } 47 48 /** 49 * Wait queue node class. 50 * 51 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 52 * Hagersten) lock queue. CLH locks are normally used for 53 * spinlocks. We instead use them for blocking synchronizers, but 54 * use the same basic tactic of holding some of the control 55 * information about a thread in the predecessor of its node. A 56 * "status" field in each node keeps track of whether a thread 57 * should block. A node is signalled when its predecessor 58 * releases. Each node of the queue otherwise serves as a 59 * specific-notification-style monitor holding a single waiting 60 * thread. The status field does NOT control whether threads are 61 * granted locks etc though. A thread may try to acquire if it is 62 * first in the queue. But being first does not guarantee success; 63 * it only gives the right to contend. So the currently released 64 * contender thread may need to rewait. 65 * 66 * <p>To enqueue into a CLH lock, you atomically splice it in as new 67 * tail. To dequeue, you just set the head field. 68 * <pre> 69 * +------+ prev +-----+ +-----+ 70 * head | | <---- | | <---- | | tail 71 * +------+ +-----+ +-----+ 72 * </pre> 73 * 74 * <p>Insertion into a CLH queue requires only a single atomic 75 * operation on "tail", so there is a simple atomic point of 76 * demarcation from unqueued to queued. Similarly, dequeing 77 * involves only updating the "head". However, it takes a bit 78 * more work for nodes to determine who their successors are, 79 * in part to deal with possible cancellation due to timeouts 80 * and interrupts. 81 * 82 * <p>The "prev" links (not used in original CLH locks), are mainly 83 * needed to handle cancellation. If a node is cancelled, its 84 * successor is (normally) relinked to a non-cancelled 85 * predecessor. For explanation of similar mechanics in the case 86 * of spin locks, see the papers by Scott and Scherer at 87 * http://www.cs.rochester.edu/u/scott/synchronization/ 88 * 89 * <p>We also use "next" links to implement blocking mechanics. 90 * The thread id for each node is kept in its own node, so a 91 * predecessor signals the next node to wake up by traversing 92 * next link to determine which thread it is. Determination of 93 * successor must avoid races with newly queued nodes to set 94 * the "next" fields of their predecessors. This is solved 95 * when necessary by checking backwards from the atomically 96 * updated "tail" when a node's successor appears to be null. 97 * (Or, said differently, the next-links are an optimization 98 * so that we don't usually need a backward scan.) 99 * 100 * <p>Cancellation introduces some conservatism to the basic 101 * algorithms. Since we must poll for cancellation of other 102 * nodes, we can miss noticing whether a cancelled node is 103 * ahead or behind us. This is dealt with by always unparking 104 * successors upon cancellation, allowing them to stabilize on 105 * a new predecessor, unless we can identify an uncancelled 106 * predecessor who will carry this responsibility. 107 * 108 * <p>CLH queues need a dummy header node to get started. But 109 * we don't create them on construction, because it would be wasted 110 * effort if there is never contention. Instead, the node 111 * is constructed and head and tail pointers are set upon first 112 * contention. 113 * 114 * <p>Threads waiting on Conditions use the same nodes, but 115 * use an additional link. Conditions only need to link nodes 116 * in simple (non-concurrent) linked queues because they are 117 * only accessed when exclusively held. Upon await, a node is 118 * inserted into a condition queue. Upon signal, the node is 119 * transferred to the main queue. A special value of status 120 * field is used to mark which queue a node is on. 121 * 122 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 123 * Scherer and Michael Scott, along with members of JSR-166 124 * expert group, for helpful ideas, discussions, and critiques 125 * on the design of this class. 126 */ 127 static final class Node { 128 /** Marker to indicate a node is waiting in shared mode */ 129 static final Node SHARED = new Node(); 130 /** Marker to indicate a node is waiting in exclusive mode */ 131 static final Node EXCLUSIVE = null; 132 133 /** waitStatus value to indicate thread has cancelled */ 134 static final int CANCELLED = 1; 135 /** waitStatus value to indicate successor's thread needs unparking */ 136 static final int SIGNAL = -1; 137 /** waitStatus value to indicate thread is waiting on condition */ 138 static final int CONDITION = -2; 139 /** 140 * waitStatus value to indicate the next acquireShared should 141 * unconditionally propagate 142 */ 143 static final int PROPAGATE = -3; 144 145 /** 146 * Status field, taking on only the values: 147 * SIGNAL: The successor of this node is (or will soon be) 148 * blocked (via park), so the current node must 149 * unpark its successor when it releases or 150 * cancels. To avoid races, acquire methods must 151 * first indicate they need a signal, 152 * then retry the atomic acquire, and then, 153 * on failure, block. 154 * CANCELLED: This node is cancelled due to timeout or interrupt. 155 * Nodes never leave this state. In particular, 156 * a thread with cancelled node never again blocks. 157 * CONDITION: This node is currently on a condition queue. 158 * It will not be used as a sync queue node 159 * until transferred, at which time the status 160 * will be set to 0. (Use of this value here has 161 * nothing to do with the other uses of the 162 * field, but simplifies mechanics.) 163 * PROPAGATE: A releaseShared should be propagated to other 164 * nodes. This is set (for head node only) in 165 * doReleaseShared to ensure propagation 166 * continues, even if other operations have 167 * since intervened. 168 * 0: None of the above 169 * 170 * The values are arranged numerically to simplify use. 171 * Non-negative values mean that a node doesn't need to 172 * signal. So, most code doesn't need to check for particular 173 * values, just for sign. 174 * 175 * The field is initialized to 0 for normal sync nodes, and 176 * CONDITION for condition nodes. It is modified using CAS 177 * (or when possible, unconditional volatile writes). 178 */ 179 volatile int waitStatus; 180 181 /** 182 * Link to predecessor node that current node/thread relies on 183 * for checking waitStatus. Assigned during enqueing, and nulled 184 * out (for sake of GC) only upon dequeuing. Also, upon 185 * cancellation of a predecessor, we short-circuit while 186 * finding a non-cancelled one, which will always exist 187 * because the head node is never cancelled: A node becomes 188 * head only as a result of successful acquire. A 189 * cancelled thread never succeeds in acquiring, and a thread only 190 * cancels itself, not any other node. 191 */ 192 volatile Node prev; 193 194 /** 195 * Link to the successor node that the current node/thread 196 * unparks upon release. Assigned during enqueuing, adjusted 197 * when bypassing cancelled predecessors, and nulled out (for 198 * sake of GC) when dequeued. The enq operation does not 199 * assign next field of a predecessor until after attachment, 200 * so seeing a null next field does not necessarily mean that 201 * node is at end of queue. However, if a next field appears 202 * to be null, we can scan prev's from the tail to 203 * double-check. The next field of cancelled nodes is set to 204 * point to the node itself instead of null, to make life 205 * easier for isOnSyncQueue. 206 */ 207 volatile Node next; 208 209 /** 210 * The thread that enqueued this node. Initialized on 211 * construction and nulled out after use. 212 */ 213 volatile Thread thread; 214 215 /** 216 * Link to next node waiting on condition, or the special 217 * value SHARED. Because condition queues are accessed only 218 * when holding in exclusive mode, we just need a simple 219 * linked queue to hold nodes while they are waiting on 220 * conditions. They are then transferred to the queue to 221 * re-acquire. And because conditions can only be exclusive, 222 * we save a field by using special value to indicate shared 223 * mode. 224 */ 225 Node nextWaiter; 226 227 /** 228 * Returns true if node is waiting in shared mode 229 */ 230 final boolean isShared() { 231 return nextWaiter == SHARED; 232 } 233 234 /** 235 * Returns previous node, or throws NullPointerException if null. 236 * Use when predecessor cannot be null. The null check could 237 * be elided, but is present to help the VM. 238 * 239 * @return the predecessor of this node 240 */ 241 final Node predecessor() throws NullPointerException { 242 Node p = prev; 243 if (p == null) 244 throw new NullPointerException(); 245 else 246 return p; 247 } 248 249 Node() { // Used to establish initial head or SHARED marker 250 } 251 252 Node(Thread thread, Node mode) { // Used by addWaiter 253 this.nextWaiter = mode; 254 this.thread = thread; 255 } 256 257 Node(Thread thread, int waitStatus) { // Used by Condition 258 this.waitStatus = waitStatus; 259 this.thread = thread; 260 } 261 } 262 263 /** 264 * Head of the wait queue, lazily initialized. Except for 265 * initialization, it is modified only via method setHead. Note: 266 * If head exists, its waitStatus is guaranteed not to be 267 * CANCELLED. 268 */ 269 private transient volatile Node head; 270 271 /** 272 * Tail of the wait queue, lazily initialized. Modified only via 273 * method enq to add new wait node. 274 */ 275 private transient volatile Node tail; 276 277 /** 278 * The synchronization state. 279 */ 280 private volatile long state; 281 282 /** 283 * Returns the current value of synchronization state. 284 * This operation has memory semantics of a <tt>volatile</tt> read. 285 * @return current state value 286 */ 287 protected final long getState() { 288 return state; 289 } 290 291 /** 292 * Sets the value of synchronization state. 293 * This operation has memory semantics of a <tt>volatile</tt> write. 294 * @param newState the new state value 295 */ 296 protected final void setState(long newState) { 297 state = newState; 298 } 299 300 /** 301 * Atomically sets synchronization state to the given updated 302 * value if the current state value equals the expected value. 303 * This operation has memory semantics of a <tt>volatile</tt> read 304 * and write. 305 * 306 * @param expect the expected value 307 * @param update the new value 308 * @return true if successful. False return indicates that the actual 309 * value was not equal to the expected value. 310 */ 311 protected final boolean compareAndSetState(long expect, long update) { 312 // See below for intrinsics setup to support this 313 return unsafe.compareAndSwapLong(this, stateOffset, expect, update); 314 } 315 316 // Queuing utilities 317 318 /** 319 * The number of nanoseconds for which it is faster to spin 320 * rather than to use timed park. A rough estimate suffices 321 * to improve responsiveness with very short timeouts. 322 */ 323 static final long spinForTimeoutThreshold = 1000L; 324 325 /** 326 * Inserts node into queue, initializing if necessary. See picture above. 327 * @param node the node to insert 328 * @return node's predecessor 329 */ 330 private Node enq(final Node node) { 331 for (;;) { 332 Node t = tail; 333 if (t == null) { // Must initialize 334 if (compareAndSetHead(new Node())) 335 tail = head; 336 } else { 337 node.prev = t; 338 if (compareAndSetTail(t, node)) { 339 t.next = node; 340 return t; 341 } 342 } 343 } 344 } 345 346 /** 347 * Creates and enqueues node for current thread and given mode. 348 * 349 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 350 * @return the new node 351 */ 352 private Node addWaiter(Node mode) { 353 Node node = new Node(Thread.currentThread(), mode); 354 // Try the fast path of enq; backup to full enq on failure 355 Node pred = tail; 356 if (pred != null) { 357 node.prev = pred; 358 if (compareAndSetTail(pred, node)) { 359 pred.next = node; 360 return node; 361 } 362 } 363 enq(node); 364 return node; 365 } 366 367 /** 368 * Sets head of queue to be node, thus dequeuing. Called only by 369 * acquire methods. Also nulls out unused fields for sake of GC 370 * and to suppress unnecessary signals and traversals. 371 * 372 * @param node the node 373 */ 374 private void setHead(Node node) { 375 head = node; 376 node.thread = null; 377 node.prev = null; 378 } 379 380 /** 381 * Wakes up node's successor, if one exists. 382 * 383 * @param node the node 384 */ 385 private void unparkSuccessor(Node node) { 386 /* 387 * If status is negative (i.e., possibly needing signal) try 388 * to clear in anticipation of signalling. It is OK if this 389 * fails or if status is changed by waiting thread. 390 */ 391 int ws = node.waitStatus; 392 if (ws < 0) 393 compareAndSetWaitStatus(node, ws, 0); 394 395 /* 396 * Thread to unpark is held in successor, which is normally 397 * just the next node. But if cancelled or apparently null, 398 * traverse backwards from tail to find the actual 399 * non-cancelled successor. 400 */ 401 Node s = node.next; 402 if (s == null || s.waitStatus > 0) { 403 s = null; 404 for (Node t = tail; t != null && t != node; t = t.prev) 405 if (t.waitStatus <= 0) 406 s = t; 407 } 408 if (s != null) 409 LockSupport.unpark(s.thread); 410 } 411 412 /** 413 * Release action for shared mode -- signal successor and ensure 414 * propagation. (Note: For exclusive mode, release just amounts 415 * to calling unparkSuccessor of head if it needs signal.) 416 */ 417 private void doReleaseShared() { 418 /* 419 * Ensure that a release propagates, even if there are other 420 * in-progress acquires/releases. This proceeds in the usual 421 * way of trying to unparkSuccessor of head if it needs 422 * signal. But if it does not, status is set to PROPAGATE to 423 * ensure that upon release, propagation continues. 424 * Additionally, we must loop in case a new node is added 425 * while we are doing this. Also, unlike other uses of 426 * unparkSuccessor, we need to know if CAS to reset status 427 * fails, if so rechecking. 428 */ 429 for (;;) { 430 Node h = head; 431 if (h != null && h != tail) { 432 int ws = h.waitStatus; 433 if (ws == Node.SIGNAL) { 434 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 435 continue; // loop to recheck cases 436 unparkSuccessor(h); 437 } 438 else if (ws == 0 && 439 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 440 continue; // loop on failed CAS 441 } 442 if (h == head) // loop if head changed 443 break; 444 } 445 } 446 447 /** 448 * Sets head of queue, and checks if successor may be waiting 449 * in shared mode, if so propagating if either propagate > 0 or 450 * PROPAGATE status was set. 451 * 452 * @param node the node 453 * @param propagate the return value from a tryAcquireShared 454 */ 455 private void setHeadAndPropagate(Node node, long propagate) { 456 Node h = head; // Record old head for check below 457 setHead(node); 458 /* 459 * Try to signal next queued node if: 460 * Propagation was indicated by caller, 461 * or was recorded (as h.waitStatus) by a previous operation 462 * (note: this uses sign-check of waitStatus because 463 * PROPAGATE status may transition to SIGNAL.) 464 * and 465 * The next node is waiting in shared mode, 466 * or we don't know, because it appears null 467 * 468 * The conservatism in both of these checks may cause 469 * unnecessary wake-ups, but only when there are multiple 470 * racing acquires/releases, so most need signals now or soon 471 * anyway. 472 */ 473 if (propagate > 0 || h == null || h.waitStatus < 0) { 474 Node s = node.next; 475 if (s == null || s.isShared()) 476 doReleaseShared(); 477 } 478 } 479 480 // Utilities for various versions of acquire 481 482 /** 483 * Cancels an ongoing attempt to acquire. 484 * 485 * @param node the node 486 */ 487 private void cancelAcquire(Node node) { 488 // Ignore if node doesn't exist 489 if (node == null) 490 return; 491 492 node.thread = null; 493 494 // Skip cancelled predecessors 495 Node pred = node.prev; 496 while (pred.waitStatus > 0) 497 node.prev = pred = pred.prev; 498 499 // predNext is the apparent node to unsplice. CASes below will 500 // fail if not, in which case, we lost race vs another cancel 501 // or signal, so no further action is necessary. 502 Node predNext = pred.next; 503 504 // Can use unconditional write instead of CAS here. 505 // After this atomic step, other Nodes can skip past us. 506 // Before, we are free of interference from other threads. 507 node.waitStatus = Node.CANCELLED; 508 509 // If we are the tail, remove ourselves. 510 if (node == tail && compareAndSetTail(node, pred)) { 511 compareAndSetNext(pred, predNext, null); 512 } else { 513 // If successor needs signal, try to set pred's next-link 514 // so it will get one. Otherwise wake it up to propagate. 515 int ws; 516 if (pred != head && 517 ((ws = pred.waitStatus) == Node.SIGNAL || 518 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 519 pred.thread != null) { 520 Node next = node.next; 521 if (next != null && next.waitStatus <= 0) 522 compareAndSetNext(pred, predNext, next); 523 } else { 524 unparkSuccessor(node); 525 } 526 527 node.next = node; // help GC 528 } 529 } 530 531 /** 532 * Checks and updates status for a node that failed to acquire. 533 * Returns true if thread should block. This is the main signal 534 * control in all acquire loops. Requires that pred == node.prev 535 * 536 * @param pred node's predecessor holding status 537 * @param node the node 538 * @return {@code true} if thread should block 539 */ 540 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 541 int ws = pred.waitStatus; 542 if (ws == Node.SIGNAL) 543 /* 544 * This node has already set status asking a release 545 * to signal it, so it can safely park. 546 */ 547 return true; 548 if (ws > 0) { 549 /* 550 * Predecessor was cancelled. Skip over predecessors and 551 * indicate retry. 552 */ 553 do { 554 node.prev = pred = pred.prev; 555 } while (pred.waitStatus > 0); 556 pred.next = node; 557 } else { 558 /* 559 * waitStatus must be 0 or PROPAGATE. Indicate that we 560 * need a signal, but don't park yet. Caller will need to 561 * retry to make sure it cannot acquire before parking. 562 */ 563 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 564 } 565 return false; 566 } 567 568 /** 569 * Convenience method to interrupt current thread. 570 */ 571 static void selfInterrupt() { 572 Thread.currentThread().interrupt(); 573 } 574 575 /** 576 * Convenience method to park and then check if interrupted 577 * 578 * @return {@code true} if interrupted 579 */ 580 private final boolean parkAndCheckInterrupt() { 581 LockSupport.park(this); 582 return Thread.interrupted(); 583 } 584 585 /* 586 * Various flavors of acquire, varying in exclusive/shared and 587 * control modes. Each is mostly the same, but annoyingly 588 * different. Only a little bit of factoring is possible due to 589 * interactions of exception mechanics (including ensuring that we 590 * cancel if tryAcquire throws exception) and other control, at 591 * least not without hurting performance too much. 592 */ 593 594 /** 595 * Acquires in exclusive uninterruptible mode for thread already in 596 * queue. Used by condition wait methods as well as acquire. 597 * 598 * @param node the node 599 * @param arg the acquire argument 600 * @return {@code true} if interrupted while waiting 601 */ 602 final boolean acquireQueued(final Node node, long arg) { 603 boolean failed = true; 604 try { 605 boolean interrupted = false; 606 for (;;) { 607 final Node p = node.predecessor(); 608 if (p == head && tryAcquire(arg)) { 609 setHead(node); 610 p.next = null; // help GC 611 failed = false; 612 return interrupted; 613 } 614 if (shouldParkAfterFailedAcquire(p, node) && 615 parkAndCheckInterrupt()) 616 interrupted = true; 617 } 618 } finally { 619 if (failed) 620 cancelAcquire(node); 621 } 622 } 623 624 /** 625 * Acquires in exclusive interruptible mode. 626 * @param arg the acquire argument 627 */ 628 private void doAcquireInterruptibly(long arg) 629 throws InterruptedException { 630 final Node node = addWaiter(Node.EXCLUSIVE); 631 boolean failed = true; 632 try { 633 for (;;) { 634 final Node p = node.predecessor(); 635 if (p == head && tryAcquire(arg)) { 636 setHead(node); 637 p.next = null; // help GC 638 failed = false; 639 return; 640 } 641 if (shouldParkAfterFailedAcquire(p, node) && 642 parkAndCheckInterrupt()) 643 throw new InterruptedException(); 644 } 645 } finally { 646 if (failed) 647 cancelAcquire(node); 648 } 649 } 650 651 /** 652 * Acquires in exclusive timed mode. 653 * 654 * @param arg the acquire argument 655 * @param nanosTimeout max wait time 656 * @return {@code true} if acquired 657 */ 658 private boolean doAcquireNanos(long arg, long nanosTimeout) 659 throws InterruptedException { 660 long lastTime = System.nanoTime(); 661 final Node node = addWaiter(Node.EXCLUSIVE); 662 boolean failed = true; 663 try { 664 for (;;) { 665 final Node p = node.predecessor(); 666 if (p == head && tryAcquire(arg)) { 667 setHead(node); 668 p.next = null; // help GC 669 failed = false; 670 return true; 671 } 672 if (nanosTimeout <= 0) 673 return false; 674 if (shouldParkAfterFailedAcquire(p, node) && 675 nanosTimeout > spinForTimeoutThreshold) 676 LockSupport.parkNanos(this, nanosTimeout); 677 long now = System.nanoTime(); 678 nanosTimeout -= now - lastTime; 679 lastTime = now; 680 if (Thread.interrupted()) 681 throw new InterruptedException(); 682 } 683 } finally { 684 if (failed) 685 cancelAcquire(node); 686 } 687 } 688 689 /** 690 * Acquires in shared uninterruptible mode. 691 * @param arg the acquire argument 692 */ 693 private void doAcquireShared(long arg) { 694 final Node node = addWaiter(Node.SHARED); 695 boolean failed = true; 696 try { 697 boolean interrupted = false; 698 for (;;) { 699 final Node p = node.predecessor(); 700 if (p == head) { 701 long r = tryAcquireShared(arg); 702 if (r >= 0) { 703 setHeadAndPropagate(node, r); 704 p.next = null; // help GC 705 if (interrupted) 706 selfInterrupt(); 707 failed = false; 708 return; 709 } 710 } 711 if (shouldParkAfterFailedAcquire(p, node) && 712 parkAndCheckInterrupt()) 713 interrupted = true; 714 } 715 } finally { 716 if (failed) 717 cancelAcquire(node); 718 } 719 } 720 721 /** 722 * Acquires in shared interruptible mode. 723 * @param arg the acquire argument 724 */ 725 private void doAcquireSharedInterruptibly(long arg) 726 throws InterruptedException { 727 final Node node = addWaiter(Node.SHARED); 728 boolean failed = true; 729 try { 730 for (;;) { 731 final Node p = node.predecessor(); 732 if (p == head) { 733 long r = tryAcquireShared(arg); 734 if (r >= 0) { 735 setHeadAndPropagate(node, r); 736 p.next = null; // help GC 737 failed = false; 738 return; 739 } 740 } 741 if (shouldParkAfterFailedAcquire(p, node) && 742 parkAndCheckInterrupt()) 743 throw new InterruptedException(); 744 } 745 } finally { 746 if (failed) 747 cancelAcquire(node); 748 } 749 } 750 751 /** 752 * Acquires in shared timed mode. 753 * 754 * @param arg the acquire argument 755 * @param nanosTimeout max wait time 756 * @return {@code true} if acquired 757 */ 758 private boolean doAcquireSharedNanos(long arg, long nanosTimeout) 759 throws InterruptedException { 760 761 long lastTime = System.nanoTime(); 762 final Node node = addWaiter(Node.SHARED); 763 boolean failed = true; 764 try { 765 for (;;) { 766 final Node p = node.predecessor(); 767 if (p == head) { 768 long r = tryAcquireShared(arg); 769 if (r >= 0) { 770 setHeadAndPropagate(node, r); 771 p.next = null; // help GC 772 failed = false; 773 return true; 774 } 775 } 776 if (nanosTimeout <= 0) 777 return false; 778 if (shouldParkAfterFailedAcquire(p, node) && 779 nanosTimeout > spinForTimeoutThreshold) 780 LockSupport.parkNanos(this, nanosTimeout); 781 long now = System.nanoTime(); 782 nanosTimeout -= now - lastTime; 783 lastTime = now; 784 if (Thread.interrupted()) 785 throw new InterruptedException(); 786 } 787 } finally { 788 if (failed) 789 cancelAcquire(node); 790 } 791 } 792 793 // Main exported methods 794 795 /** 796 * Attempts to acquire in exclusive mode. This method should query 797 * if the state of the object permits it to be acquired in the 798 * exclusive mode, and if so to acquire it. 799 * 800 * <p>This method is always invoked by the thread performing 801 * acquire. If this method reports failure, the acquire method 802 * may queue the thread, if it is not already queued, until it is 803 * signalled by a release from some other thread. This can be used 804 * to implement method {@link Lock#tryLock()}. 805 * 806 * <p>The default 807 * implementation throws {@link UnsupportedOperationException}. 808 * 809 * @param arg the acquire argument. This value is always the one 810 * passed to an acquire method, or is the value saved on entry 811 * to a condition wait. The value is otherwise uninterpreted 812 * and can represent anything you like. 813 * @return {@code true} if successful. Upon success, this object has 814 * been acquired. 815 * @throws IllegalMonitorStateException if acquiring would place this 816 * synchronizer in an illegal state. This exception must be 817 * thrown in a consistent fashion for synchronization to work 818 * correctly. 819 * @throws UnsupportedOperationException if exclusive mode is not supported 820 */ 821 protected boolean tryAcquire(long arg) { 822 throw new UnsupportedOperationException(); 823 } 824 825 /** 826 * Attempts to set the state to reflect a release in exclusive 827 * mode. 828 * 829 * <p>This method is always invoked by the thread performing release. 830 * 831 * <p>The default implementation throws 832 * {@link UnsupportedOperationException}. 833 * 834 * @param arg the release argument. This value is always the one 835 * passed to a release method, or the current state value upon 836 * entry to a condition wait. The value is otherwise 837 * uninterpreted and can represent anything you like. 838 * @return {@code true} if this object is now in a fully released 839 * state, so that any waiting threads may attempt to acquire; 840 * and {@code false} otherwise. 841 * @throws IllegalMonitorStateException if releasing would place this 842 * synchronizer in an illegal state. This exception must be 843 * thrown in a consistent fashion for synchronization to work 844 * correctly. 845 * @throws UnsupportedOperationException if exclusive mode is not supported 846 */ 847 protected boolean tryRelease(long arg) { 848 throw new UnsupportedOperationException(); 849 } 850 851 /** 852 * Attempts to acquire in shared mode. This method should query if 853 * the state of the object permits it to be acquired in the shared 854 * mode, and if so to acquire it. 855 * 856 * <p>This method is always invoked by the thread performing 857 * acquire. If this method reports failure, the acquire method 858 * may queue the thread, if it is not already queued, until it is 859 * signalled by a release from some other thread. 860 * 861 * <p>The default implementation throws {@link 862 * UnsupportedOperationException}. 863 * 864 * @param arg the acquire argument. This value is always the one 865 * passed to an acquire method, or is the value saved on entry 866 * to a condition wait. The value is otherwise uninterpreted 867 * and can represent anything you like. 868 * @return a negative value on failure; zero if acquisition in shared 869 * mode succeeded but no subsequent shared-mode acquire can 870 * succeed; and a positive value if acquisition in shared 871 * mode succeeded and subsequent shared-mode acquires might 872 * also succeed, in which case a subsequent waiting thread 873 * must check availability. (Support for three different 874 * return values enables this method to be used in contexts 875 * where acquires only sometimes act exclusively.) Upon 876 * success, this object has been acquired. 877 * @throws IllegalMonitorStateException if acquiring would place this 878 * synchronizer in an illegal state. This exception must be 879 * thrown in a consistent fashion for synchronization to work 880 * correctly. 881 * @throws UnsupportedOperationException if shared mode is not supported 882 */ 883 protected long tryAcquireShared(long arg) { 884 throw new UnsupportedOperationException(); 885 } 886 887 /** 888 * Attempts to set the state to reflect a release in shared mode. 889 * 890 * <p>This method is always invoked by the thread performing release. 891 * 892 * <p>The default implementation throws 893 * {@link UnsupportedOperationException}. 894 * 895 * @param arg the release argument. This value is always the one 896 * passed to a release method, or the current state value upon 897 * entry to a condition wait. The value is otherwise 898 * uninterpreted and can represent anything you like. 899 * @return {@code true} if this release of shared mode may permit a 900 * waiting acquire (shared or exclusive) to succeed; and 901 * {@code false} otherwise 902 * @throws IllegalMonitorStateException if releasing would place this 903 * synchronizer in an illegal state. This exception must be 904 * thrown in a consistent fashion for synchronization to work 905 * correctly. 906 * @throws UnsupportedOperationException if shared mode is not supported 907 */ 908 protected boolean tryReleaseShared(long arg) { 909 throw new UnsupportedOperationException(); 910 } 911 912 /** 913 * Returns {@code true} if synchronization is held exclusively with 914 * respect to the current (calling) thread. This method is invoked 915 * upon each call to a non-waiting {@link ConditionObject} method. 916 * (Waiting methods instead invoke {@link #release}.) 917 * 918 * <p>The default implementation throws {@link 919 * UnsupportedOperationException}. This method is invoked 920 * internally only within {@link ConditionObject} methods, so need 921 * not be defined if conditions are not used. 922 * 923 * @return {@code true} if synchronization is held exclusively; 924 * {@code false} otherwise 925 * @throws UnsupportedOperationException if conditions are not supported 926 */ 927 protected boolean isHeldExclusively() { 928 throw new UnsupportedOperationException(); 929 } 930 931 /** 932 * Acquires in exclusive mode, ignoring interrupts. Implemented 933 * by invoking at least once {@link #tryAcquire}, 934 * returning on success. Otherwise the thread is queued, possibly 935 * repeatedly blocking and unblocking, invoking {@link 936 * #tryAcquire} until success. This method can be used 937 * to implement method {@link Lock#lock}. 938 * 939 * @param arg the acquire argument. This value is conveyed to 940 * {@link #tryAcquire} but is otherwise uninterpreted and 941 * can represent anything you like. 942 */ 943 public final void acquire(long arg) { 944 if (!tryAcquire(arg) && 945 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 946 selfInterrupt(); 947 } 948 949 /** 950 * Acquires in exclusive mode, aborting if interrupted. 951 * Implemented by first checking interrupt status, then invoking 952 * at least once {@link #tryAcquire}, returning on 953 * success. Otherwise the thread is queued, possibly repeatedly 954 * blocking and unblocking, invoking {@link #tryAcquire} 955 * until success or the thread is interrupted. This method can be 956 * used to implement method {@link Lock#lockInterruptibly}. 957 * 958 * @param arg the acquire argument. This value is conveyed to 959 * {@link #tryAcquire} but is otherwise uninterpreted and 960 * can represent anything you like. 961 * @throws InterruptedException if the current thread is interrupted 962 */ 963 public final void acquireInterruptibly(long arg) 964 throws InterruptedException { 965 if (Thread.interrupted()) 966 throw new InterruptedException(); 967 if (!tryAcquire(arg)) 968 doAcquireInterruptibly(arg); 969 } 970 971 /** 972 * Attempts to acquire in exclusive mode, aborting if interrupted, 973 * and failing if the given timeout elapses. Implemented by first 974 * checking interrupt status, then invoking at least once {@link 975 * #tryAcquire}, returning on success. Otherwise, the thread is 976 * queued, possibly repeatedly blocking and unblocking, invoking 977 * {@link #tryAcquire} until success or the thread is interrupted 978 * or the timeout elapses. This method can be used to implement 979 * method {@link Lock#tryLock(long, TimeUnit)}. 980 * 981 * @param arg the acquire argument. This value is conveyed to 982 * {@link #tryAcquire} but is otherwise uninterpreted and 983 * can represent anything you like. 984 * @param nanosTimeout the maximum number of nanoseconds to wait 985 * @return {@code true} if acquired; {@code false} if timed out 986 * @throws InterruptedException if the current thread is interrupted 987 */ 988 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 989 throws InterruptedException { 990 if (Thread.interrupted()) 991 throw new InterruptedException(); 992 return tryAcquire(arg) || 993 doAcquireNanos(arg, nanosTimeout); 994 } 995 996 /** 997 * Releases in exclusive mode. Implemented by unblocking one or 998 * more threads if {@link #tryRelease} returns true. 999 * This method can be used to implement method {@link Lock#unlock}. 1000 * 1001 * @param arg the release argument. This value is conveyed to 1002 * {@link #tryRelease} but is otherwise uninterpreted and 1003 * can represent anything you like. 1004 * @return the value returned from {@link #tryRelease} 1005 */ 1006 public final boolean release(long arg) { 1007 if (tryRelease(arg)) { 1008 Node h = head; 1009 if (h != null && h.waitStatus != 0) 1010 unparkSuccessor(h); 1011 return true; 1012 } 1013 return false; 1014 } 1015 1016 /** 1017 * Acquires in shared mode, ignoring interrupts. Implemented by 1018 * first invoking at least once {@link #tryAcquireShared}, 1019 * returning on success. Otherwise the thread is queued, possibly 1020 * repeatedly blocking and unblocking, invoking {@link 1021 * #tryAcquireShared} until success. 1022 * 1023 * @param arg the acquire argument. This value is conveyed to 1024 * {@link #tryAcquireShared} but is otherwise uninterpreted 1025 * and can represent anything you like. 1026 */ 1027 public final void acquireShared(long arg) { 1028 if (tryAcquireShared(arg) < 0) 1029 doAcquireShared(arg); 1030 } 1031 1032 /** 1033 * Acquires in shared mode, aborting if interrupted. Implemented 1034 * by first checking interrupt status, then invoking at least once 1035 * {@link #tryAcquireShared}, returning on success. Otherwise the 1036 * thread is queued, possibly repeatedly blocking and unblocking, 1037 * invoking {@link #tryAcquireShared} until success or the thread 1038 * is interrupted. 1039 * @param arg the acquire argument 1040 * This value is conveyed to {@link #tryAcquireShared} but is 1041 * otherwise uninterpreted and can represent anything 1042 * you like. 1043 * @throws InterruptedException if the current thread is interrupted 1044 */ 1045 public final void acquireSharedInterruptibly(long arg) 1046 throws InterruptedException { 1047 if (Thread.interrupted()) 1048 throw new InterruptedException(); 1049 if (tryAcquireShared(arg) < 0) 1050 doAcquireSharedInterruptibly(arg); 1051 } 1052 1053 /** 1054 * Attempts to acquire in shared mode, aborting if interrupted, and 1055 * failing if the given timeout elapses. Implemented by first 1056 * checking interrupt status, then invoking at least once {@link 1057 * #tryAcquireShared}, returning on success. Otherwise, the 1058 * thread is queued, possibly repeatedly blocking and unblocking, 1059 * invoking {@link #tryAcquireShared} until success or the thread 1060 * is interrupted or the timeout elapses. 1061 * 1062 * @param arg the acquire argument. This value is conveyed to 1063 * {@link #tryAcquireShared} but is otherwise uninterpreted 1064 * and can represent anything you like. 1065 * @param nanosTimeout the maximum number of nanoseconds to wait 1066 * @return {@code true} if acquired; {@code false} if timed out 1067 * @throws InterruptedException if the current thread is interrupted 1068 */ 1069 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 1070 throws InterruptedException { 1071 if (Thread.interrupted()) 1072 throw new InterruptedException(); 1073 return tryAcquireShared(arg) >= 0 || 1074 doAcquireSharedNanos(arg, nanosTimeout); 1075 } 1076 1077 /** 1078 * Releases in shared mode. Implemented by unblocking one or more 1079 * threads if {@link #tryReleaseShared} returns true. 1080 * 1081 * @param arg the release argument. This value is conveyed to 1082 * {@link #tryReleaseShared} but is otherwise uninterpreted 1083 * and can represent anything you like. 1084 * @return the value returned from {@link #tryReleaseShared} 1085 */ 1086 public final boolean releaseShared(long arg) { 1087 if (tryReleaseShared(arg)) { 1088 doReleaseShared(); 1089 return true; 1090 } 1091 return false; 1092 } 1093 1094 // Queue inspection methods 1095 1096 /** 1097 * Queries whether any threads are waiting to acquire. Note that 1098 * because cancellations due to interrupts and timeouts may occur 1099 * at any time, a {@code true} return does not guarantee that any 1100 * other thread will ever acquire. 1101 * 1102 * <p>In this implementation, this operation returns in 1103 * constant time. 1104 * 1105 * @return {@code true} if there may be other threads waiting to acquire 1106 */ 1107 public final boolean hasQueuedThreads() { 1108 return head != tail; 1109 } 1110 1111 /** 1112 * Queries whether any threads have ever contended to acquire this 1113 * synchronizer; that is if an acquire method has ever blocked. 1114 * 1115 * <p>In this implementation, this operation returns in 1116 * constant time. 1117 * 1118 * @return {@code true} if there has ever been contention 1119 */ 1120 public final boolean hasContended() { 1121 return head != null; 1122 } 1123 1124 /** 1125 * Returns the first (longest-waiting) thread in the queue, or 1126 * {@code null} if no threads are currently queued. 1127 * 1128 * <p>In this implementation, this operation normally returns in 1129 * constant time, but may iterate upon contention if other threads are 1130 * concurrently modifying the queue. 1131 * 1132 * @return the first (longest-waiting) thread in the queue, or 1133 * {@code null} if no threads are currently queued 1134 */ 1135 public final Thread getFirstQueuedThread() { 1136 // handle only fast path, else relay 1137 return (head == tail) ? null : fullGetFirstQueuedThread(); 1138 } 1139 1140 /** 1141 * Version of getFirstQueuedThread called when fastpath fails 1142 */ 1143 private Thread fullGetFirstQueuedThread() { 1144 /* 1145 * The first node is normally head.next. Try to get its 1146 * thread field, ensuring consistent reads: If thread 1147 * field is nulled out or s.prev is no longer head, then 1148 * some other thread(s) concurrently performed setHead in 1149 * between some of our reads. We try this twice before 1150 * resorting to traversal. 1151 */ 1152 Node h, s; 1153 Thread st; 1154 if (((h = head) != null && (s = h.next) != null && 1155 s.prev == head && (st = s.thread) != null) || 1156 ((h = head) != null && (s = h.next) != null && 1157 s.prev == head && (st = s.thread) != null)) 1158 return st; 1159 1160 /* 1161 * Head's next field might not have been set yet, or may have 1162 * been unset after setHead. So we must check to see if tail 1163 * is actually first node. If not, we continue on, safely 1164 * traversing from tail back to head to find first, 1165 * guaranteeing termination. 1166 */ 1167 1168 Node t = tail; 1169 Thread firstThread = null; 1170 while (t != null && t != head) { 1171 Thread tt = t.thread; 1172 if (tt != null) 1173 firstThread = tt; 1174 t = t.prev; 1175 } 1176 return firstThread; 1177 } 1178 1179 /** 1180 * Returns true if the given thread is currently queued. 1181 * 1182 * <p>This implementation traverses the queue to determine 1183 * presence of the given thread. 1184 * 1185 * @param thread the thread 1186 * @return {@code true} if the given thread is on the queue 1187 * @throws NullPointerException if the thread is null 1188 */ 1189 public final boolean isQueued(Thread thread) { 1190 if (thread == null) 1191 throw new NullPointerException(); 1192 for (Node p = tail; p != null; p = p.prev) 1193 if (p.thread == thread) 1194 return true; 1195 return false; 1196 } 1197 1198 /** 1199 * Returns {@code true} if the apparent first queued thread, if one 1200 * exists, is waiting in exclusive mode. If this method returns 1201 * {@code true}, and the current thread is attempting to acquire in 1202 * shared mode (that is, this method is invoked from {@link 1203 * #tryAcquireShared}) then it is guaranteed that the current thread 1204 * is not the first queued thread. Used only as a heuristic in 1205 * ReentrantReadWriteLock. 1206 */ 1207 final boolean apparentlyFirstQueuedIsExclusive() { 1208 Node h, s; 1209 return (h = head) != null && 1210 (s = h.next) != null && 1211 !s.isShared() && 1212 s.thread != null; 1213 } 1214 1215 /** 1216 * Queries whether any threads have been waiting to acquire longer 1217 * than the current thread. 1218 * 1219 * <p>An invocation of this method is equivalent to (but may be 1220 * more efficient than): 1221 * <pre> {@code 1222 * getFirstQueuedThread() != Thread.currentThread() && 1223 * hasQueuedThreads()}</pre> 1224 * 1225 * <p>Note that because cancellations due to interrupts and 1226 * timeouts may occur at any time, a {@code true} return does not 1227 * guarantee that some other thread will acquire before the current 1228 * thread. Likewise, it is possible for another thread to win a 1229 * race to enqueue after this method has returned {@code false}, 1230 * due to the queue being empty. 1231 * 1232 * <p>This method is designed to be used by a fair synchronizer to 1233 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1234 * Such a synchronizer's {@link #tryAcquire} method should return 1235 * {@code false}, and its {@link #tryAcquireShared} method should 1236 * return a negative value, if this method returns {@code true} 1237 * (unless this is a reentrant acquire). For example, the {@code 1238 * tryAcquire} method for a fair, reentrant, exclusive mode 1239 * synchronizer might look like this: 1240 * 1241 * <pre> {@code 1242 * protected boolean tryAcquire(int arg) { 1243 * if (isHeldExclusively()) { 1244 * // A reentrant acquire; increment hold count 1245 * return true; 1246 * } else if (hasQueuedPredecessors()) { 1247 * return false; 1248 * } else { 1249 * // try to acquire normally 1250 * } 1251 * }}</pre> 1252 * 1253 * @return {@code true} if there is a queued thread preceding the 1254 * current thread, and {@code false} if the current thread 1255 * is at the head of the queue or the queue is empty 1256 * @since 1.7 1257 */ 1258 /*public*/ final boolean hasQueuedPredecessors() { // android-changed 1259 // The correctness of this depends on head being initialized 1260 // before tail and on head.next being accurate if the current 1261 // thread is first in queue. 1262 Node t = tail; // Read fields in reverse initialization order 1263 Node h = head; 1264 Node s; 1265 return h != t && 1266 ((s = h.next) == null || s.thread != Thread.currentThread()); 1267 } 1268 1269 1270 // Instrumentation and monitoring methods 1271 1272 /** 1273 * Returns an estimate of the number of threads waiting to 1274 * acquire. The value is only an estimate because the number of 1275 * threads may change dynamically while this method traverses 1276 * internal data structures. This method is designed for use in 1277 * monitoring system state, not for synchronization 1278 * control. 1279 * 1280 * @return the estimated number of threads waiting to acquire 1281 */ 1282 public final int getQueueLength() { 1283 int n = 0; 1284 for (Node p = tail; p != null; p = p.prev) { 1285 if (p.thread != null) 1286 ++n; 1287 } 1288 return n; 1289 } 1290 1291 /** 1292 * Returns a collection containing threads that may be waiting to 1293 * acquire. Because the actual set of threads may change 1294 * dynamically while constructing this result, the returned 1295 * collection is only a best-effort estimate. The elements of the 1296 * returned collection are in no particular order. This method is 1297 * designed to facilitate construction of subclasses that provide 1298 * more extensive monitoring facilities. 1299 * 1300 * @return the collection of threads 1301 */ 1302 public final Collection<Thread> getQueuedThreads() { 1303 ArrayList<Thread> list = new ArrayList<Thread>(); 1304 for (Node p = tail; p != null; p = p.prev) { 1305 Thread t = p.thread; 1306 if (t != null) 1307 list.add(t); 1308 } 1309 return list; 1310 } 1311 1312 /** 1313 * Returns a collection containing threads that may be waiting to 1314 * acquire in exclusive mode. This has the same properties 1315 * as {@link #getQueuedThreads} except that it only returns 1316 * those threads waiting due to an exclusive acquire. 1317 * 1318 * @return the collection of threads 1319 */ 1320 public final Collection<Thread> getExclusiveQueuedThreads() { 1321 ArrayList<Thread> list = new ArrayList<Thread>(); 1322 for (Node p = tail; p != null; p = p.prev) { 1323 if (!p.isShared()) { 1324 Thread t = p.thread; 1325 if (t != null) 1326 list.add(t); 1327 } 1328 } 1329 return list; 1330 } 1331 1332 /** 1333 * Returns a collection containing threads that may be waiting to 1334 * acquire in shared mode. This has the same properties 1335 * as {@link #getQueuedThreads} except that it only returns 1336 * those threads waiting due to a shared acquire. 1337 * 1338 * @return the collection of threads 1339 */ 1340 public final Collection<Thread> getSharedQueuedThreads() { 1341 ArrayList<Thread> list = new ArrayList<Thread>(); 1342 for (Node p = tail; p != null; p = p.prev) { 1343 if (p.isShared()) { 1344 Thread t = p.thread; 1345 if (t != null) 1346 list.add(t); 1347 } 1348 } 1349 return list; 1350 } 1351 1352 /** 1353 * Returns a string identifying this synchronizer, as well as its state. 1354 * The state, in brackets, includes the String {@code "State ="} 1355 * followed by the current value of {@link #getState}, and either 1356 * {@code "nonempty"} or {@code "empty"} depending on whether the 1357 * queue is empty. 1358 * 1359 * @return a string identifying this synchronizer, as well as its state 1360 */ 1361 public String toString() { 1362 long s = getState(); 1363 String q = hasQueuedThreads() ? "non" : ""; 1364 return super.toString() + 1365 "[State = " + s + ", " + q + "empty queue]"; 1366 } 1367 1368 1369 // Internal support methods for Conditions 1370 1371 /** 1372 * Returns true if a node, always one that was initially placed on 1373 * a condition queue, is now waiting to reacquire on sync queue. 1374 * @param node the node 1375 * @return true if is reacquiring 1376 */ 1377 final boolean isOnSyncQueue(Node node) { 1378 if (node.waitStatus == Node.CONDITION || node.prev == null) 1379 return false; 1380 if (node.next != null) // If has successor, it must be on queue 1381 return true; 1382 /* 1383 * node.prev can be non-null, but not yet on queue because 1384 * the CAS to place it on queue can fail. So we have to 1385 * traverse from tail to make sure it actually made it. It 1386 * will always be near the tail in calls to this method, and 1387 * unless the CAS failed (which is unlikely), it will be 1388 * there, so we hardly ever traverse much. 1389 */ 1390 return findNodeFromTail(node); 1391 } 1392 1393 /** 1394 * Returns true if node is on sync queue by searching backwards from tail. 1395 * Called only when needed by isOnSyncQueue. 1396 * @return true if present 1397 */ 1398 private boolean findNodeFromTail(Node node) { 1399 Node t = tail; 1400 for (;;) { 1401 if (t == node) 1402 return true; 1403 if (t == null) 1404 return false; 1405 t = t.prev; 1406 } 1407 } 1408 1409 /** 1410 * Transfers a node from a condition queue onto sync queue. 1411 * Returns true if successful. 1412 * @param node the node 1413 * @return true if successfully transferred (else the node was 1414 * cancelled before signal). 1415 */ 1416 final boolean transferForSignal(Node node) { 1417 /* 1418 * If cannot change waitStatus, the node has been cancelled. 1419 */ 1420 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 1421 return false; 1422 1423 /* 1424 * Splice onto queue and try to set waitStatus of predecessor to 1425 * indicate that thread is (probably) waiting. If cancelled or 1426 * attempt to set waitStatus fails, wake up to resync (in which 1427 * case the waitStatus can be transiently and harmlessly wrong). 1428 */ 1429 Node p = enq(node); 1430 int ws = p.waitStatus; 1431 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 1432 LockSupport.unpark(node.thread); 1433 return true; 1434 } 1435 1436 /** 1437 * Transfers node, if necessary, to sync queue after a cancelled 1438 * wait. Returns true if thread was cancelled before being 1439 * signalled. 1440 * @param node its node 1441 * @return true if cancelled before the node was signalled 1442 */ 1443 final boolean transferAfterCancelledWait(Node node) { 1444 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 1445 enq(node); 1446 return true; 1447 } 1448 /* 1449 * If we lost out to a signal(), then we can't proceed 1450 * until it finishes its enq(). Cancelling during an 1451 * incomplete transfer is both rare and transient, so just 1452 * spin. 1453 */ 1454 while (!isOnSyncQueue(node)) 1455 Thread.yield(); 1456 return false; 1457 } 1458 1459 /** 1460 * Invokes release with current state value; returns saved state. 1461 * Cancels node and throws exception on failure. 1462 * @param node the condition node for this wait 1463 * @return previous sync state 1464 */ 1465 final long fullyRelease(Node node) { 1466 boolean failed = true; 1467 try { 1468 long savedState = getState(); 1469 if (release(savedState)) { 1470 failed = false; 1471 return savedState; 1472 } else { 1473 throw new IllegalMonitorStateException(); 1474 } 1475 } finally { 1476 if (failed) 1477 node.waitStatus = Node.CANCELLED; 1478 } 1479 } 1480 1481 // Instrumentation methods for conditions 1482 1483 /** 1484 * Queries whether the given ConditionObject 1485 * uses this synchronizer as its lock. 1486 * 1487 * @param condition the condition 1488 * @return <tt>true</tt> if owned 1489 * @throws NullPointerException if the condition is null 1490 */ 1491 public final boolean owns(ConditionObject condition) { 1492 if (condition == null) 1493 throw new NullPointerException(); 1494 return condition.isOwnedBy(this); 1495 } 1496 1497 /** 1498 * Queries whether any threads are waiting on the given condition 1499 * associated with this synchronizer. Note that because timeouts 1500 * and interrupts may occur at any time, a <tt>true</tt> return 1501 * does not guarantee that a future <tt>signal</tt> will awaken 1502 * any threads. This method is designed primarily for use in 1503 * monitoring of the system state. 1504 * 1505 * @param condition the condition 1506 * @return <tt>true</tt> if there are any waiting threads 1507 * @throws IllegalMonitorStateException if exclusive synchronization 1508 * is not held 1509 * @throws IllegalArgumentException if the given condition is 1510 * not associated with this synchronizer 1511 * @throws NullPointerException if the condition is null 1512 */ 1513 public final boolean hasWaiters(ConditionObject condition) { 1514 if (!owns(condition)) 1515 throw new IllegalArgumentException("Not owner"); 1516 return condition.hasWaiters(); 1517 } 1518 1519 /** 1520 * Returns an estimate of the number of threads waiting on the 1521 * given condition associated with this synchronizer. Note that 1522 * because timeouts and interrupts may occur at any time, the 1523 * estimate serves only as an upper bound on the actual number of 1524 * waiters. This method is designed for use in monitoring of the 1525 * system state, not for synchronization control. 1526 * 1527 * @param condition the condition 1528 * @return the estimated number of waiting threads 1529 * @throws IllegalMonitorStateException if exclusive synchronization 1530 * is not held 1531 * @throws IllegalArgumentException if the given condition is 1532 * not associated with this synchronizer 1533 * @throws NullPointerException if the condition is null 1534 */ 1535 public final int getWaitQueueLength(ConditionObject condition) { 1536 if (!owns(condition)) 1537 throw new IllegalArgumentException("Not owner"); 1538 return condition.getWaitQueueLength(); 1539 } 1540 1541 /** 1542 * Returns a collection containing those threads that may be 1543 * waiting on the given condition associated with this 1544 * synchronizer. Because the actual set of threads may change 1545 * dynamically while constructing this result, the returned 1546 * collection is only a best-effort estimate. The elements of the 1547 * returned collection are in no particular order. 1548 * 1549 * @param condition the condition 1550 * @return the collection of threads 1551 * @throws IllegalMonitorStateException if exclusive synchronization 1552 * is not held 1553 * @throws IllegalArgumentException if the given condition is 1554 * not associated with this synchronizer 1555 * @throws NullPointerException if the condition is null 1556 */ 1557 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1558 if (!owns(condition)) 1559 throw new IllegalArgumentException("Not owner"); 1560 return condition.getWaitingThreads(); 1561 } 1562 1563 /** 1564 * Condition implementation for a {@link 1565 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link 1566 * Lock} implementation. 1567 * 1568 * <p>Method documentation for this class describes mechanics, 1569 * not behavioral specifications from the point of view of Lock 1570 * and Condition users. Exported versions of this class will in 1571 * general need to be accompanied by documentation describing 1572 * condition semantics that rely on those of the associated 1573 * <tt>AbstractQueuedLongSynchronizer</tt>. 1574 * 1575 * <p>This class is Serializable, but all fields are transient, 1576 * so deserialized conditions have no waiters. 1577 * 1578 * @since 1.6 1579 */ 1580 public class ConditionObject implements Condition, java.io.Serializable { 1581 private static final long serialVersionUID = 1173984872572414699L; 1582 /** First node of condition queue. */ 1583 private transient Node firstWaiter; 1584 /** Last node of condition queue. */ 1585 private transient Node lastWaiter; 1586 1587 /** 1588 * Creates a new <tt>ConditionObject</tt> instance. 1589 */ 1590 public ConditionObject() { } 1591 1592 // Internal methods 1593 1594 /** 1595 * Adds a new waiter to wait queue. 1596 * @return its new wait node 1597 */ 1598 private Node addConditionWaiter() { 1599 Node t = lastWaiter; 1600 // If lastWaiter is cancelled, clean out. 1601 if (t != null && t.waitStatus != Node.CONDITION) { 1602 unlinkCancelledWaiters(); 1603 t = lastWaiter; 1604 } 1605 Node node = new Node(Thread.currentThread(), Node.CONDITION); 1606 if (t == null) 1607 firstWaiter = node; 1608 else 1609 t.nextWaiter = node; 1610 lastWaiter = node; 1611 return node; 1612 } 1613 1614 /** 1615 * Removes and transfers nodes until hit non-cancelled one or 1616 * null. Split out from signal in part to encourage compilers 1617 * to inline the case of no waiters. 1618 * @param first (non-null) the first node on condition queue 1619 */ 1620 private void doSignal(Node first) { 1621 do { 1622 if ( (firstWaiter = first.nextWaiter) == null) 1623 lastWaiter = null; 1624 first.nextWaiter = null; 1625 } while (!transferForSignal(first) && 1626 (first = firstWaiter) != null); 1627 } 1628 1629 /** 1630 * Removes and transfers all nodes. 1631 * @param first (non-null) the first node on condition queue 1632 */ 1633 private void doSignalAll(Node first) { 1634 lastWaiter = firstWaiter = null; 1635 do { 1636 Node next = first.nextWaiter; 1637 first.nextWaiter = null; 1638 transferForSignal(first); 1639 first = next; 1640 } while (first != null); 1641 } 1642 1643 /** 1644 * Unlinks cancelled waiter nodes from condition queue. 1645 * Called only while holding lock. This is called when 1646 * cancellation occurred during condition wait, and upon 1647 * insertion of a new waiter when lastWaiter is seen to have 1648 * been cancelled. This method is needed to avoid garbage 1649 * retention in the absence of signals. So even though it may 1650 * require a full traversal, it comes into play only when 1651 * timeouts or cancellations occur in the absence of 1652 * signals. It traverses all nodes rather than stopping at a 1653 * particular target to unlink all pointers to garbage nodes 1654 * without requiring many re-traversals during cancellation 1655 * storms. 1656 */ 1657 private void unlinkCancelledWaiters() { 1658 Node t = firstWaiter; 1659 Node trail = null; 1660 while (t != null) { 1661 Node next = t.nextWaiter; 1662 if (t.waitStatus != Node.CONDITION) { 1663 t.nextWaiter = null; 1664 if (trail == null) 1665 firstWaiter = next; 1666 else 1667 trail.nextWaiter = next; 1668 if (next == null) 1669 lastWaiter = trail; 1670 } 1671 else 1672 trail = t; 1673 t = next; 1674 } 1675 } 1676 1677 // public methods 1678 1679 /** 1680 * Moves the longest-waiting thread, if one exists, from the 1681 * wait queue for this condition to the wait queue for the 1682 * owning lock. 1683 * 1684 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1685 * returns {@code false} 1686 */ 1687 public final void signal() { 1688 if (!isHeldExclusively()) 1689 throw new IllegalMonitorStateException(); 1690 Node first = firstWaiter; 1691 if (first != null) 1692 doSignal(first); 1693 } 1694 1695 /** 1696 * Moves all threads from the wait queue for this condition to 1697 * the wait queue for the owning lock. 1698 * 1699 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1700 * returns {@code false} 1701 */ 1702 public final void signalAll() { 1703 if (!isHeldExclusively()) 1704 throw new IllegalMonitorStateException(); 1705 Node first = firstWaiter; 1706 if (first != null) 1707 doSignalAll(first); 1708 } 1709 1710 /** 1711 * Implements uninterruptible condition wait. 1712 * <ol> 1713 * <li> Save lock state returned by {@link #getState}. 1714 * <li> Invoke {@link #release} with 1715 * saved state as argument, throwing 1716 * IllegalMonitorStateException if it fails. 1717 * <li> Block until signalled. 1718 * <li> Reacquire by invoking specialized version of 1719 * {@link #acquire} with saved state as argument. 1720 * </ol> 1721 */ 1722 public final void awaitUninterruptibly() { 1723 Node node = addConditionWaiter(); 1724 long savedState = fullyRelease(node); 1725 boolean interrupted = false; 1726 while (!isOnSyncQueue(node)) { 1727 LockSupport.park(this); 1728 if (Thread.interrupted()) 1729 interrupted = true; 1730 } 1731 if (acquireQueued(node, savedState) || interrupted) 1732 selfInterrupt(); 1733 } 1734 1735 /* 1736 * For interruptible waits, we need to track whether to throw 1737 * InterruptedException, if interrupted while blocked on 1738 * condition, versus reinterrupt current thread, if 1739 * interrupted while blocked waiting to re-acquire. 1740 */ 1741 1742 /** Mode meaning to reinterrupt on exit from wait */ 1743 private static final int REINTERRUPT = 1; 1744 /** Mode meaning to throw InterruptedException on exit from wait */ 1745 private static final int THROW_IE = -1; 1746 1747 /** 1748 * Checks for interrupt, returning THROW_IE if interrupted 1749 * before signalled, REINTERRUPT if after signalled, or 1750 * 0 if not interrupted. 1751 */ 1752 private int checkInterruptWhileWaiting(Node node) { 1753 return Thread.interrupted() ? 1754 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 1755 0; 1756 } 1757 1758 /** 1759 * Throws InterruptedException, reinterrupts current thread, or 1760 * does nothing, depending on mode. 1761 */ 1762 private void reportInterruptAfterWait(int interruptMode) 1763 throws InterruptedException { 1764 if (interruptMode == THROW_IE) 1765 throw new InterruptedException(); 1766 else if (interruptMode == REINTERRUPT) 1767 selfInterrupt(); 1768 } 1769 1770 /** 1771 * Implements interruptible condition wait. 1772 * <ol> 1773 * <li> If current thread is interrupted, throw InterruptedException. 1774 * <li> Save lock state returned by {@link #getState}. 1775 * <li> Invoke {@link #release} with 1776 * saved state as argument, throwing 1777 * IllegalMonitorStateException if it fails. 1778 * <li> Block until signalled or interrupted. 1779 * <li> Reacquire by invoking specialized version of 1780 * {@link #acquire} with saved state as argument. 1781 * <li> If interrupted while blocked in step 4, throw InterruptedException. 1782 * </ol> 1783 */ 1784 public final void await() throws InterruptedException { 1785 if (Thread.interrupted()) 1786 throw new InterruptedException(); 1787 Node node = addConditionWaiter(); 1788 long savedState = fullyRelease(node); 1789 int interruptMode = 0; 1790 while (!isOnSyncQueue(node)) { 1791 LockSupport.park(this); 1792 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1793 break; 1794 } 1795 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1796 interruptMode = REINTERRUPT; 1797 if (node.nextWaiter != null) // clean up if cancelled 1798 unlinkCancelledWaiters(); 1799 if (interruptMode != 0) 1800 reportInterruptAfterWait(interruptMode); 1801 } 1802 1803 /** 1804 * Implements timed condition wait. 1805 * <ol> 1806 * <li> If current thread is interrupted, throw InterruptedException. 1807 * <li> Save lock state returned by {@link #getState}. 1808 * <li> Invoke {@link #release} with 1809 * saved state as argument, throwing 1810 * IllegalMonitorStateException if it fails. 1811 * <li> Block until signalled, interrupted, or timed out. 1812 * <li> Reacquire by invoking specialized version of 1813 * {@link #acquire} with saved state as argument. 1814 * <li> If interrupted while blocked in step 4, throw InterruptedException. 1815 * </ol> 1816 */ 1817 public final long awaitNanos(long nanosTimeout) 1818 throws InterruptedException { 1819 if (Thread.interrupted()) 1820 throw new InterruptedException(); 1821 Node node = addConditionWaiter(); 1822 long savedState = fullyRelease(node); 1823 long lastTime = System.nanoTime(); 1824 int interruptMode = 0; 1825 while (!isOnSyncQueue(node)) { 1826 if (nanosTimeout <= 0L) { 1827 transferAfterCancelledWait(node); 1828 break; 1829 } 1830 LockSupport.parkNanos(this, nanosTimeout); 1831 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1832 break; 1833 1834 long now = System.nanoTime(); 1835 nanosTimeout -= now - lastTime; 1836 lastTime = now; 1837 } 1838 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1839 interruptMode = REINTERRUPT; 1840 if (node.nextWaiter != null) 1841 unlinkCancelledWaiters(); 1842 if (interruptMode != 0) 1843 reportInterruptAfterWait(interruptMode); 1844 return nanosTimeout - (System.nanoTime() - lastTime); 1845 } 1846 1847 /** 1848 * Implements absolute timed condition wait. 1849 * <ol> 1850 * <li> If current thread is interrupted, throw InterruptedException. 1851 * <li> Save lock state returned by {@link #getState}. 1852 * <li> Invoke {@link #release} with 1853 * saved state as argument, throwing 1854 * IllegalMonitorStateException if it fails. 1855 * <li> Block until signalled, interrupted, or timed out. 1856 * <li> Reacquire by invoking specialized version of 1857 * {@link #acquire} with saved state as argument. 1858 * <li> If interrupted while blocked in step 4, throw InterruptedException. 1859 * <li> If timed out while blocked in step 4, return false, else true. 1860 * </ol> 1861 */ 1862 public final boolean awaitUntil(Date deadline) 1863 throws InterruptedException { 1864 if (deadline == null) 1865 throw new NullPointerException(); 1866 long abstime = deadline.getTime(); 1867 if (Thread.interrupted()) 1868 throw new InterruptedException(); 1869 Node node = addConditionWaiter(); 1870 long savedState = fullyRelease(node); 1871 boolean timedout = false; 1872 int interruptMode = 0; 1873 while (!isOnSyncQueue(node)) { 1874 if (System.currentTimeMillis() > abstime) { 1875 timedout = transferAfterCancelledWait(node); 1876 break; 1877 } 1878 LockSupport.parkUntil(this, abstime); 1879 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1880 break; 1881 } 1882 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1883 interruptMode = REINTERRUPT; 1884 if (node.nextWaiter != null) 1885 unlinkCancelledWaiters(); 1886 if (interruptMode != 0) 1887 reportInterruptAfterWait(interruptMode); 1888 return !timedout; 1889 } 1890 1891 /** 1892 * Implements timed condition wait. 1893 * <ol> 1894 * <li> If current thread is interrupted, throw InterruptedException. 1895 * <li> Save lock state returned by {@link #getState}. 1896 * <li> Invoke {@link #release} with 1897 * saved state as argument, throwing 1898 * IllegalMonitorStateException if it fails. 1899 * <li> Block until signalled, interrupted, or timed out. 1900 * <li> Reacquire by invoking specialized version of 1901 * {@link #acquire} with saved state as argument. 1902 * <li> If interrupted while blocked in step 4, throw InterruptedException. 1903 * <li> If timed out while blocked in step 4, return false, else true. 1904 * </ol> 1905 */ 1906 public final boolean await(long time, TimeUnit unit) 1907 throws InterruptedException { 1908 if (unit == null) 1909 throw new NullPointerException(); 1910 long nanosTimeout = unit.toNanos(time); 1911 if (Thread.interrupted()) 1912 throw new InterruptedException(); 1913 Node node = addConditionWaiter(); 1914 long savedState = fullyRelease(node); 1915 long lastTime = System.nanoTime(); 1916 boolean timedout = false; 1917 int interruptMode = 0; 1918 while (!isOnSyncQueue(node)) { 1919 if (nanosTimeout <= 0L) { 1920 timedout = transferAfterCancelledWait(node); 1921 break; 1922 } 1923 if (nanosTimeout >= spinForTimeoutThreshold) 1924 LockSupport.parkNanos(this, nanosTimeout); 1925 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1926 break; 1927 long now = System.nanoTime(); 1928 nanosTimeout -= now - lastTime; 1929 lastTime = now; 1930 } 1931 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1932 interruptMode = REINTERRUPT; 1933 if (node.nextWaiter != null) 1934 unlinkCancelledWaiters(); 1935 if (interruptMode != 0) 1936 reportInterruptAfterWait(interruptMode); 1937 return !timedout; 1938 } 1939 1940 // support for instrumentation 1941 1942 /** 1943 * Returns true if this condition was created by the given 1944 * synchronization object. 1945 * 1946 * @return {@code true} if owned 1947 */ 1948 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1949 return sync == AbstractQueuedLongSynchronizer.this; 1950 } 1951 1952 /** 1953 * Queries whether any threads are waiting on this condition. 1954 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters}. 1955 * 1956 * @return {@code true} if there are any waiting threads 1957 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1958 * returns {@code false} 1959 */ 1960 protected final boolean hasWaiters() { 1961 if (!isHeldExclusively()) 1962 throw new IllegalMonitorStateException(); 1963 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1964 if (w.waitStatus == Node.CONDITION) 1965 return true; 1966 } 1967 return false; 1968 } 1969 1970 /** 1971 * Returns an estimate of the number of threads waiting on 1972 * this condition. 1973 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength}. 1974 * 1975 * @return the estimated number of waiting threads 1976 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1977 * returns {@code false} 1978 */ 1979 protected final int getWaitQueueLength() { 1980 if (!isHeldExclusively()) 1981 throw new IllegalMonitorStateException(); 1982 int n = 0; 1983 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1984 if (w.waitStatus == Node.CONDITION) 1985 ++n; 1986 } 1987 return n; 1988 } 1989 1990 /** 1991 * Returns a collection containing those threads that may be 1992 * waiting on this Condition. 1993 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads}. 1994 * 1995 * @return the collection of threads 1996 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1997 * returns {@code false} 1998 */ 1999 protected final Collection<Thread> getWaitingThreads() { 2000 if (!isHeldExclusively()) 2001 throw new IllegalMonitorStateException(); 2002 ArrayList<Thread> list = new ArrayList<Thread>(); 2003 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2004 if (w.waitStatus == Node.CONDITION) { 2005 Thread t = w.thread; 2006 if (t != null) 2007 list.add(t); 2008 } 2009 } 2010 return list; 2011 } 2012 } 2013 2014 /** 2015 * Setup to support compareAndSet. We need to natively implement 2016 * this here: For the sake of permitting future enhancements, we 2017 * cannot explicitly subclass AtomicLong, which would be 2018 * efficient and useful otherwise. So, as the lesser of evils, we 2019 * natively implement using hotspot intrinsics API. And while we 2020 * are at it, we do the same for other CASable fields (which could 2021 * otherwise be done with atomic field updaters). 2022 */ 2023 private static final Unsafe unsafe = Unsafe.getUnsafe(); 2024 private static final long stateOffset; 2025 private static final long headOffset; 2026 private static final long tailOffset; 2027 private static final long waitStatusOffset; 2028 private static final long nextOffset; 2029 2030 static { 2031 try { 2032 stateOffset = unsafe.objectFieldOffset 2033 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state")); 2034 headOffset = unsafe.objectFieldOffset 2035 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head")); 2036 tailOffset = unsafe.objectFieldOffset 2037 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail")); 2038 waitStatusOffset = unsafe.objectFieldOffset 2039 (Node.class.getDeclaredField("waitStatus")); 2040 nextOffset = unsafe.objectFieldOffset 2041 (Node.class.getDeclaredField("next")); 2042 2043 } catch (Exception ex) { throw new Error(ex); } 2044 } 2045 2046 /** 2047 * CAS head field. Used only by enq. 2048 */ 2049 private final boolean compareAndSetHead(Node update) { 2050 return unsafe.compareAndSwapObject(this, headOffset, null, update); 2051 } 2052 2053 /** 2054 * CAS tail field. Used only by enq. 2055 */ 2056 private final boolean compareAndSetTail(Node expect, Node update) { 2057 return unsafe.compareAndSwapObject(this, tailOffset, expect, update); 2058 } 2059 2060 /** 2061 * CAS waitStatus field of a node. 2062 */ 2063 private static final boolean compareAndSetWaitStatus(Node node, 2064 int expect, 2065 int update) { 2066 return unsafe.compareAndSwapInt(node, waitStatusOffset, 2067 expect, update); 2068 } 2069 2070 /** 2071 * CAS next field of a node. 2072 */ 2073 private static final boolean compareAndSetNext(Node node, 2074 Node expect, 2075 Node update) { 2076 return unsafe.compareAndSwapObject(node, nextOffset, expect, update); 2077 } 2078} 2079