AbstractQueuedLongSynchronizer.java revision e8a958066d95a4e15a9834e8b9067d106efd9b53
1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent.locks; 8import java.util.concurrent.TimeUnit; 9import java.util.ArrayList; 10import java.util.Collection; 11import java.util.Date; 12import sun.misc.Unsafe; 13 14/** 15 * A version of {@link AbstractQueuedSynchronizer} in 16 * which synchronization state is maintained as a {@code long}. 17 * This class has exactly the same structure, properties, and methods 18 * as {@code AbstractQueuedSynchronizer} with the exception 19 * that all state-related parameters and results are defined 20 * as {@code long} rather than {@code int}. This class 21 * may be useful when creating synchronizers such as 22 * multilevel locks and barriers that require 23 * 64 bits of state. 24 * 25 * <p>See {@link AbstractQueuedSynchronizer} for usage 26 * notes and examples. 27 * 28 * @since 1.6 29 * @author Doug Lea 30 */ 31public abstract class AbstractQueuedLongSynchronizer 32 extends AbstractOwnableSynchronizer 33 implements java.io.Serializable { 34 35 private static final long serialVersionUID = 7373984972572414692L; 36 37 /* 38 To keep sources in sync, the remainder of this source file is 39 exactly cloned from AbstractQueuedSynchronizer, replacing class 40 name and changing ints related with sync state to longs. Please 41 keep it that way. 42 */ 43 44 /** 45 * Creates a new {@code AbstractQueuedLongSynchronizer} instance 46 * with initial synchronization state of zero. 47 */ 48 protected AbstractQueuedLongSynchronizer() { } 49 50 /** 51 * Wait queue node class. 52 * 53 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 54 * Hagersten) lock queue. CLH locks are normally used for 55 * spinlocks. We instead use them for blocking synchronizers, but 56 * use the same basic tactic of holding some of the control 57 * information about a thread in the predecessor of its node. A 58 * "status" field in each node keeps track of whether a thread 59 * should block. A node is signalled when its predecessor 60 * releases. Each node of the queue otherwise serves as a 61 * specific-notification-style monitor holding a single waiting 62 * thread. The status field does NOT control whether threads are 63 * granted locks etc though. A thread may try to acquire if it is 64 * first in the queue. But being first does not guarantee success; 65 * it only gives the right to contend. So the currently released 66 * contender thread may need to rewait. 67 * 68 * <p>To enqueue into a CLH lock, you atomically splice it in as new 69 * tail. To dequeue, you just set the head field. 70 * <pre> 71 * +------+ prev +-----+ +-----+ 72 * head | | <---- | | <---- | | tail 73 * +------+ +-----+ +-----+ 74 * </pre> 75 * 76 * <p>Insertion into a CLH queue requires only a single atomic 77 * operation on "tail", so there is a simple atomic point of 78 * demarcation from unqueued to queued. Similarly, dequeuing 79 * involves only updating the "head". However, it takes a bit 80 * more work for nodes to determine who their successors are, 81 * in part to deal with possible cancellation due to timeouts 82 * and interrupts. 83 * 84 * <p>The "prev" links (not used in original CLH locks), are mainly 85 * needed to handle cancellation. If a node is cancelled, its 86 * successor is (normally) relinked to a non-cancelled 87 * predecessor. For explanation of similar mechanics in the case 88 * of spin locks, see the papers by Scott and Scherer at 89 * http://www.cs.rochester.edu/u/scott/synchronization/ 90 * 91 * <p>We also use "next" links to implement blocking mechanics. 92 * The thread id for each node is kept in its own node, so a 93 * predecessor signals the next node to wake up by traversing 94 * next link to determine which thread it is. Determination of 95 * successor must avoid races with newly queued nodes to set 96 * the "next" fields of their predecessors. This is solved 97 * when necessary by checking backwards from the atomically 98 * updated "tail" when a node's successor appears to be null. 99 * (Or, said differently, the next-links are an optimization 100 * so that we don't usually need a backward scan.) 101 * 102 * <p>Cancellation introduces some conservatism to the basic 103 * algorithms. Since we must poll for cancellation of other 104 * nodes, we can miss noticing whether a cancelled node is 105 * ahead or behind us. This is dealt with by always unparking 106 * successors upon cancellation, allowing them to stabilize on 107 * a new predecessor, unless we can identify an uncancelled 108 * predecessor who will carry this responsibility. 109 * 110 * <p>CLH queues need a dummy header node to get started. But 111 * we don't create them on construction, because it would be wasted 112 * effort if there is never contention. Instead, the node 113 * is constructed and head and tail pointers are set upon first 114 * contention. 115 * 116 * <p>Threads waiting on Conditions use the same nodes, but 117 * use an additional link. Conditions only need to link nodes 118 * in simple (non-concurrent) linked queues because they are 119 * only accessed when exclusively held. Upon await, a node is 120 * inserted into a condition queue. Upon signal, the node is 121 * transferred to the main queue. A special value of status 122 * field is used to mark which queue a node is on. 123 * 124 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 125 * Scherer and Michael Scott, along with members of JSR-166 126 * expert group, for helpful ideas, discussions, and critiques 127 * on the design of this class. 128 */ 129 static final class Node { 130 /** Marker to indicate a node is waiting in shared mode */ 131 static final Node SHARED = new Node(); 132 /** Marker to indicate a node is waiting in exclusive mode */ 133 static final Node EXCLUSIVE = null; 134 135 /** waitStatus value to indicate thread has cancelled */ 136 static final int CANCELLED = 1; 137 /** waitStatus value to indicate successor's thread needs unparking */ 138 static final int SIGNAL = -1; 139 /** waitStatus value to indicate thread is waiting on condition */ 140 static final int CONDITION = -2; 141 /** 142 * waitStatus value to indicate the next acquireShared should 143 * unconditionally propagate 144 */ 145 static final int PROPAGATE = -3; 146 147 /** 148 * Status field, taking on only the values: 149 * SIGNAL: The successor of this node is (or will soon be) 150 * blocked (via park), so the current node must 151 * unpark its successor when it releases or 152 * cancels. To avoid races, acquire methods must 153 * first indicate they need a signal, 154 * then retry the atomic acquire, and then, 155 * on failure, block. 156 * CANCELLED: This node is cancelled due to timeout or interrupt. 157 * Nodes never leave this state. In particular, 158 * a thread with cancelled node never again blocks. 159 * CONDITION: This node is currently on a condition queue. 160 * It will not be used as a sync queue node 161 * until transferred, at which time the status 162 * will be set to 0. (Use of this value here has 163 * nothing to do with the other uses of the 164 * field, but simplifies mechanics.) 165 * PROPAGATE: A releaseShared should be propagated to other 166 * nodes. This is set (for head node only) in 167 * doReleaseShared to ensure propagation 168 * continues, even if other operations have 169 * since intervened. 170 * 0: None of the above 171 * 172 * The values are arranged numerically to simplify use. 173 * Non-negative values mean that a node doesn't need to 174 * signal. So, most code doesn't need to check for particular 175 * values, just for sign. 176 * 177 * The field is initialized to 0 for normal sync nodes, and 178 * CONDITION for condition nodes. It is modified using CAS 179 * (or when possible, unconditional volatile writes). 180 */ 181 volatile int waitStatus; 182 183 /** 184 * Link to predecessor node that current node/thread relies on 185 * for checking waitStatus. Assigned during enqueuing, and nulled 186 * out (for sake of GC) only upon dequeuing. Also, upon 187 * cancellation of a predecessor, we short-circuit while 188 * finding a non-cancelled one, which will always exist 189 * because the head node is never cancelled: A node becomes 190 * head only as a result of successful acquire. A 191 * cancelled thread never succeeds in acquiring, and a thread only 192 * cancels itself, not any other node. 193 */ 194 volatile Node prev; 195 196 /** 197 * Link to the successor node that the current node/thread 198 * unparks upon release. Assigned during enqueuing, adjusted 199 * when bypassing cancelled predecessors, and nulled out (for 200 * sake of GC) when dequeued. The enq operation does not 201 * assign next field of a predecessor until after attachment, 202 * so seeing a null next field does not necessarily mean that 203 * node is at end of queue. However, if a next field appears 204 * to be null, we can scan prev's from the tail to 205 * double-check. The next field of cancelled nodes is set to 206 * point to the node itself instead of null, to make life 207 * easier for isOnSyncQueue. 208 */ 209 volatile Node next; 210 211 /** 212 * The thread that enqueued this node. Initialized on 213 * construction and nulled out after use. 214 */ 215 volatile Thread thread; 216 217 /** 218 * Link to next node waiting on condition, or the special 219 * value SHARED. Because condition queues are accessed only 220 * when holding in exclusive mode, we just need a simple 221 * linked queue to hold nodes while they are waiting on 222 * conditions. They are then transferred to the queue to 223 * re-acquire. And because conditions can only be exclusive, 224 * we save a field by using special value to indicate shared 225 * mode. 226 */ 227 Node nextWaiter; 228 229 /** 230 * @return true if node is waiting in shared mode 231 */ 232 final boolean isShared() { 233 return nextWaiter == SHARED; 234 } 235 236 /** 237 * Returns previous node, or throws NullPointerException if null. 238 * Use when predecessor cannot be null. The null check could 239 * be elided, but is present to help the VM. 240 * 241 * @return the predecessor of this node 242 */ 243 final Node predecessor() throws NullPointerException { 244 Node p = prev; 245 if (p == null) 246 throw new NullPointerException(); 247 else 248 return p; 249 } 250 251 Node() { // Used to establish initial head or SHARED marker 252 } 253 254 Node(Thread thread, Node mode) { // Used by addWaiter 255 this.nextWaiter = mode; 256 this.thread = thread; 257 } 258 259 Node(Thread thread, int waitStatus) { // Used by Condition 260 this.waitStatus = waitStatus; 261 this.thread = thread; 262 } 263 } 264 265 /** 266 * Head of the wait queue, lazily initialized. Except for 267 * initialization, it is modified only via method setHead. Note: 268 * If head exists, its waitStatus is guaranteed not to be 269 * CANCELLED. 270 */ 271 private transient volatile Node head; 272 273 /** 274 * Tail of the wait queue, lazily initialized. Modified only via 275 * method enq to add new wait node. 276 */ 277 private transient volatile Node tail; 278 279 /** 280 * The synchronization state. 281 */ 282 private volatile long state; 283 284 /** 285 * Returns the current value of synchronization state. 286 * This operation has memory semantics of a {@code volatile} read. 287 * @return current state value 288 */ 289 protected final long getState() { 290 return state; 291 } 292 293 /** 294 * Sets the value of synchronization state. 295 * This operation has memory semantics of a {@code volatile} write. 296 * @param newState the new state value 297 */ 298 protected final void setState(long newState) { 299 state = newState; 300 } 301 302 /** 303 * Atomically sets synchronization state to the given updated 304 * value if the current state value equals the expected value. 305 * This operation has memory semantics of a {@code volatile} read 306 * and write. 307 * 308 * @param expect the expected value 309 * @param update the new value 310 * @return true if successful. False return indicates that the actual 311 * value was not equal to the expected value. 312 */ 313 protected final boolean compareAndSetState(long expect, long update) { 314 // See below for intrinsics setup to support this 315 return unsafe.compareAndSwapLong(this, stateOffset, expect, update); 316 } 317 318 // Queuing utilities 319 320 /** 321 * The number of nanoseconds for which it is faster to spin 322 * rather than to use timed park. A rough estimate suffices 323 * to improve responsiveness with very short timeouts. 324 */ 325 static final long spinForTimeoutThreshold = 1000L; 326 327 /** 328 * Inserts node into queue, initializing if necessary. See picture above. 329 * @param node the node to insert 330 * @return node's predecessor 331 */ 332 private Node enq(final Node node) { 333 for (;;) { 334 Node t = tail; 335 if (t == null) { // Must initialize 336 if (compareAndSetHead(new Node())) 337 tail = head; 338 } else { 339 node.prev = t; 340 if (compareAndSetTail(t, node)) { 341 t.next = node; 342 return t; 343 } 344 } 345 } 346 } 347 348 /** 349 * Creates and enqueues node for current thread and given mode. 350 * 351 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 352 * @return the new node 353 */ 354 private Node addWaiter(Node mode) { 355 Node node = new Node(Thread.currentThread(), mode); 356 // Try the fast path of enq; backup to full enq on failure 357 Node pred = tail; 358 if (pred != null) { 359 node.prev = pred; 360 if (compareAndSetTail(pred, node)) { 361 pred.next = node; 362 return node; 363 } 364 } 365 enq(node); 366 return node; 367 } 368 369 /** 370 * Sets head of queue to be node, thus dequeuing. Called only by 371 * acquire methods. Also nulls out unused fields for sake of GC 372 * and to suppress unnecessary signals and traversals. 373 * 374 * @param node the node 375 */ 376 private void setHead(Node node) { 377 head = node; 378 node.thread = null; 379 node.prev = null; 380 } 381 382 /** 383 * Wakes up node's successor, if one exists. 384 * 385 * @param node the node 386 */ 387 private void unparkSuccessor(Node node) { 388 /* 389 * If status is negative (i.e., possibly needing signal) try 390 * to clear in anticipation of signalling. It is OK if this 391 * fails or if status is changed by waiting thread. 392 */ 393 int ws = node.waitStatus; 394 if (ws < 0) 395 compareAndSetWaitStatus(node, ws, 0); 396 397 /* 398 * Thread to unpark is held in successor, which is normally 399 * just the next node. But if cancelled or apparently null, 400 * traverse backwards from tail to find the actual 401 * non-cancelled successor. 402 */ 403 Node s = node.next; 404 if (s == null || s.waitStatus > 0) { 405 s = null; 406 for (Node t = tail; t != null && t != node; t = t.prev) 407 if (t.waitStatus <= 0) 408 s = t; 409 } 410 if (s != null) 411 LockSupport.unpark(s.thread); 412 } 413 414 /** 415 * Release action for shared mode -- signals successor and ensures 416 * propagation. (Note: For exclusive mode, release just amounts 417 * to calling unparkSuccessor of head if it needs signal.) 418 */ 419 private void doReleaseShared() { 420 /* 421 * Ensure that a release propagates, even if there are other 422 * in-progress acquires/releases. This proceeds in the usual 423 * way of trying to unparkSuccessor of head if it needs 424 * signal. But if it does not, status is set to PROPAGATE to 425 * ensure that upon release, propagation continues. 426 * Additionally, we must loop in case a new node is added 427 * while we are doing this. Also, unlike other uses of 428 * unparkSuccessor, we need to know if CAS to reset status 429 * fails, if so rechecking. 430 */ 431 for (;;) { 432 Node h = head; 433 if (h != null && h != tail) { 434 int ws = h.waitStatus; 435 if (ws == Node.SIGNAL) { 436 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 437 continue; // loop to recheck cases 438 unparkSuccessor(h); 439 } 440 else if (ws == 0 && 441 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 442 continue; // loop on failed CAS 443 } 444 if (h == head) // loop if head changed 445 break; 446 } 447 } 448 449 /** 450 * Sets head of queue, and checks if successor may be waiting 451 * in shared mode, if so propagating if either propagate > 0 or 452 * PROPAGATE status was set. 453 * 454 * @param node the node 455 * @param propagate the return value from a tryAcquireShared 456 */ 457 private void setHeadAndPropagate(Node node, long propagate) { 458 Node h = head; // Record old head for check below 459 setHead(node); 460 /* 461 * Try to signal next queued node if: 462 * Propagation was indicated by caller, 463 * or was recorded (as h.waitStatus) by a previous operation 464 * (note: this uses sign-check of waitStatus because 465 * PROPAGATE status may transition to SIGNAL.) 466 * and 467 * The next node is waiting in shared mode, 468 * or we don't know, because it appears null 469 * 470 * The conservatism in both of these checks may cause 471 * unnecessary wake-ups, but only when there are multiple 472 * racing acquires/releases, so most need signals now or soon 473 * anyway. 474 */ 475 if (propagate > 0 || h == null || h.waitStatus < 0) { 476 Node s = node.next; 477 if (s == null || s.isShared()) 478 doReleaseShared(); 479 } 480 } 481 482 // Utilities for various versions of acquire 483 484 /** 485 * Cancels an ongoing attempt to acquire. 486 * 487 * @param node the node 488 */ 489 private void cancelAcquire(Node node) { 490 // Ignore if node doesn't exist 491 if (node == null) 492 return; 493 494 node.thread = null; 495 496 // Skip cancelled predecessors 497 Node pred = node.prev; 498 while (pred.waitStatus > 0) 499 node.prev = pred = pred.prev; 500 501 // predNext is the apparent node to unsplice. CASes below will 502 // fail if not, in which case, we lost race vs another cancel 503 // or signal, so no further action is necessary. 504 Node predNext = pred.next; 505 506 // Can use unconditional write instead of CAS here. 507 // After this atomic step, other Nodes can skip past us. 508 // Before, we are free of interference from other threads. 509 node.waitStatus = Node.CANCELLED; 510 511 // If we are the tail, remove ourselves. 512 if (node == tail && compareAndSetTail(node, pred)) { 513 compareAndSetNext(pred, predNext, null); 514 } else { 515 // If successor needs signal, try to set pred's next-link 516 // so it will get one. Otherwise wake it up to propagate. 517 int ws; 518 if (pred != head && 519 ((ws = pred.waitStatus) == Node.SIGNAL || 520 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 521 pred.thread != null) { 522 Node next = node.next; 523 if (next != null && next.waitStatus <= 0) 524 compareAndSetNext(pred, predNext, next); 525 } else { 526 unparkSuccessor(node); 527 } 528 529 node.next = node; // help GC 530 } 531 } 532 533 /** 534 * Checks and updates status for a node that failed to acquire. 535 * Returns true if thread should block. This is the main signal 536 * control in all acquire loops. Requires that pred == node.prev. 537 * 538 * @param pred node's predecessor holding status 539 * @param node the node 540 * @return {@code true} if thread should block 541 */ 542 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 543 int ws = pred.waitStatus; 544 if (ws == Node.SIGNAL) 545 /* 546 * This node has already set status asking a release 547 * to signal it, so it can safely park. 548 */ 549 return true; 550 if (ws > 0) { 551 /* 552 * Predecessor was cancelled. Skip over predecessors and 553 * indicate retry. 554 */ 555 do { 556 node.prev = pred = pred.prev; 557 } while (pred.waitStatus > 0); 558 pred.next = node; 559 } else { 560 /* 561 * waitStatus must be 0 or PROPAGATE. Indicate that we 562 * need a signal, but don't park yet. Caller will need to 563 * retry to make sure it cannot acquire before parking. 564 */ 565 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 566 } 567 return false; 568 } 569 570 /** 571 * Convenience method to interrupt current thread. 572 */ 573 static void selfInterrupt() { 574 Thread.currentThread().interrupt(); 575 } 576 577 /** 578 * Convenience method to park and then check if interrupted 579 * 580 * @return {@code true} if interrupted 581 */ 582 private final boolean parkAndCheckInterrupt() { 583 LockSupport.park(this); 584 return Thread.interrupted(); 585 } 586 587 /* 588 * Various flavors of acquire, varying in exclusive/shared and 589 * control modes. Each is mostly the same, but annoyingly 590 * different. Only a little bit of factoring is possible due to 591 * interactions of exception mechanics (including ensuring that we 592 * cancel if tryAcquire throws exception) and other control, at 593 * least not without hurting performance too much. 594 */ 595 596 /** 597 * Acquires in exclusive uninterruptible mode for thread already in 598 * queue. Used by condition wait methods as well as acquire. 599 * 600 * @param node the node 601 * @param arg the acquire argument 602 * @return {@code true} if interrupted while waiting 603 */ 604 final boolean acquireQueued(final Node node, long arg) { 605 boolean failed = true; 606 try { 607 boolean interrupted = false; 608 for (;;) { 609 final Node p = node.predecessor(); 610 if (p == head && tryAcquire(arg)) { 611 setHead(node); 612 p.next = null; // help GC 613 failed = false; 614 return interrupted; 615 } 616 if (shouldParkAfterFailedAcquire(p, node) && 617 parkAndCheckInterrupt()) 618 interrupted = true; 619 } 620 } finally { 621 if (failed) 622 cancelAcquire(node); 623 } 624 } 625 626 /** 627 * Acquires in exclusive interruptible mode. 628 * @param arg the acquire argument 629 */ 630 private void doAcquireInterruptibly(long arg) 631 throws InterruptedException { 632 final Node node = addWaiter(Node.EXCLUSIVE); 633 boolean failed = true; 634 try { 635 for (;;) { 636 final Node p = node.predecessor(); 637 if (p == head && tryAcquire(arg)) { 638 setHead(node); 639 p.next = null; // help GC 640 failed = false; 641 return; 642 } 643 if (shouldParkAfterFailedAcquire(p, node) && 644 parkAndCheckInterrupt()) 645 throw new InterruptedException(); 646 } 647 } finally { 648 if (failed) 649 cancelAcquire(node); 650 } 651 } 652 653 /** 654 * Acquires in exclusive timed mode. 655 * 656 * @param arg the acquire argument 657 * @param nanosTimeout max wait time 658 * @return {@code true} if acquired 659 */ 660 private boolean doAcquireNanos(long arg, long nanosTimeout) 661 throws InterruptedException { 662 if (nanosTimeout <= 0L) 663 return false; 664 final long deadline = System.nanoTime() + nanosTimeout; 665 final Node node = addWaiter(Node.EXCLUSIVE); 666 boolean failed = true; 667 try { 668 for (;;) { 669 final Node p = node.predecessor(); 670 if (p == head && tryAcquire(arg)) { 671 setHead(node); 672 p.next = null; // help GC 673 failed = false; 674 return true; 675 } 676 nanosTimeout = deadline - System.nanoTime(); 677 if (nanosTimeout <= 0L) 678 return false; 679 if (shouldParkAfterFailedAcquire(p, node) && 680 nanosTimeout > spinForTimeoutThreshold) 681 LockSupport.parkNanos(this, nanosTimeout); 682 if (Thread.interrupted()) 683 throw new InterruptedException(); 684 } 685 } finally { 686 if (failed) 687 cancelAcquire(node); 688 } 689 } 690 691 /** 692 * Acquires in shared uninterruptible mode. 693 * @param arg the acquire argument 694 */ 695 private void doAcquireShared(long arg) { 696 final Node node = addWaiter(Node.SHARED); 697 boolean failed = true; 698 try { 699 boolean interrupted = false; 700 for (;;) { 701 final Node p = node.predecessor(); 702 if (p == head) { 703 long r = tryAcquireShared(arg); 704 if (r >= 0) { 705 setHeadAndPropagate(node, r); 706 p.next = null; // help GC 707 if (interrupted) 708 selfInterrupt(); 709 failed = false; 710 return; 711 } 712 } 713 if (shouldParkAfterFailedAcquire(p, node) && 714 parkAndCheckInterrupt()) 715 interrupted = true; 716 } 717 } finally { 718 if (failed) 719 cancelAcquire(node); 720 } 721 } 722 723 /** 724 * Acquires in shared interruptible mode. 725 * @param arg the acquire argument 726 */ 727 private void doAcquireSharedInterruptibly(long arg) 728 throws InterruptedException { 729 final Node node = addWaiter(Node.SHARED); 730 boolean failed = true; 731 try { 732 for (;;) { 733 final Node p = node.predecessor(); 734 if (p == head) { 735 long r = tryAcquireShared(arg); 736 if (r >= 0) { 737 setHeadAndPropagate(node, r); 738 p.next = null; // help GC 739 failed = false; 740 return; 741 } 742 } 743 if (shouldParkAfterFailedAcquire(p, node) && 744 parkAndCheckInterrupt()) 745 throw new InterruptedException(); 746 } 747 } finally { 748 if (failed) 749 cancelAcquire(node); 750 } 751 } 752 753 /** 754 * Acquires in shared timed mode. 755 * 756 * @param arg the acquire argument 757 * @param nanosTimeout max wait time 758 * @return {@code true} if acquired 759 */ 760 private boolean doAcquireSharedNanos(long arg, long nanosTimeout) 761 throws InterruptedException { 762 if (nanosTimeout <= 0L) 763 return false; 764 final long deadline = System.nanoTime() + nanosTimeout; 765 final Node node = addWaiter(Node.SHARED); 766 boolean failed = true; 767 try { 768 for (;;) { 769 final Node p = node.predecessor(); 770 if (p == head) { 771 long r = tryAcquireShared(arg); 772 if (r >= 0) { 773 setHeadAndPropagate(node, r); 774 p.next = null; // help GC 775 failed = false; 776 return true; 777 } 778 } 779 nanosTimeout = deadline - System.nanoTime(); 780 if (nanosTimeout <= 0L) 781 return false; 782 if (shouldParkAfterFailedAcquire(p, node) && 783 nanosTimeout > spinForTimeoutThreshold) 784 LockSupport.parkNanos(this, nanosTimeout); 785 if (Thread.interrupted()) 786 throw new InterruptedException(); 787 } 788 } finally { 789 if (failed) 790 cancelAcquire(node); 791 } 792 } 793 794 // Main exported methods 795 796 /** 797 * Attempts to acquire in exclusive mode. This method should query 798 * if the state of the object permits it to be acquired in the 799 * exclusive mode, and if so to acquire it. 800 * 801 * <p>This method is always invoked by the thread performing 802 * acquire. If this method reports failure, the acquire method 803 * may queue the thread, if it is not already queued, until it is 804 * signalled by a release from some other thread. This can be used 805 * to implement method {@link Lock#tryLock()}. 806 * 807 * <p>The default 808 * implementation throws {@link UnsupportedOperationException}. 809 * 810 * @param arg the acquire argument. This value is always the one 811 * passed to an acquire method, or is the value saved on entry 812 * to a condition wait. The value is otherwise uninterpreted 813 * and can represent anything you like. 814 * @return {@code true} if successful. Upon success, this object has 815 * been acquired. 816 * @throws IllegalMonitorStateException if acquiring would place this 817 * synchronizer in an illegal state. This exception must be 818 * thrown in a consistent fashion for synchronization to work 819 * correctly. 820 * @throws UnsupportedOperationException if exclusive mode is not supported 821 */ 822 protected boolean tryAcquire(long arg) { 823 throw new UnsupportedOperationException(); 824 } 825 826 /** 827 * Attempts to set the state to reflect a release in exclusive 828 * mode. 829 * 830 * <p>This method is always invoked by the thread performing release. 831 * 832 * <p>The default implementation throws 833 * {@link UnsupportedOperationException}. 834 * 835 * @param arg the release argument. This value is always the one 836 * passed to a release method, or the current state value upon 837 * entry to a condition wait. The value is otherwise 838 * uninterpreted and can represent anything you like. 839 * @return {@code true} if this object is now in a fully released 840 * state, so that any waiting threads may attempt to acquire; 841 * and {@code false} otherwise. 842 * @throws IllegalMonitorStateException if releasing would place this 843 * synchronizer in an illegal state. This exception must be 844 * thrown in a consistent fashion for synchronization to work 845 * correctly. 846 * @throws UnsupportedOperationException if exclusive mode is not supported 847 */ 848 protected boolean tryRelease(long arg) { 849 throw new UnsupportedOperationException(); 850 } 851 852 /** 853 * Attempts to acquire in shared mode. This method should query if 854 * the state of the object permits it to be acquired in the shared 855 * mode, and if so to acquire it. 856 * 857 * <p>This method is always invoked by the thread performing 858 * acquire. If this method reports failure, the acquire method 859 * may queue the thread, if it is not already queued, until it is 860 * signalled by a release from some other thread. 861 * 862 * <p>The default implementation throws {@link 863 * UnsupportedOperationException}. 864 * 865 * @param arg the acquire argument. This value is always the one 866 * passed to an acquire method, or is the value saved on entry 867 * to a condition wait. The value is otherwise uninterpreted 868 * and can represent anything you like. 869 * @return a negative value on failure; zero if acquisition in shared 870 * mode succeeded but no subsequent shared-mode acquire can 871 * succeed; and a positive value if acquisition in shared 872 * mode succeeded and subsequent shared-mode acquires might 873 * also succeed, in which case a subsequent waiting thread 874 * must check availability. (Support for three different 875 * return values enables this method to be used in contexts 876 * where acquires only sometimes act exclusively.) Upon 877 * success, this object has been acquired. 878 * @throws IllegalMonitorStateException if acquiring would place this 879 * synchronizer in an illegal state. This exception must be 880 * thrown in a consistent fashion for synchronization to work 881 * correctly. 882 * @throws UnsupportedOperationException if shared mode is not supported 883 */ 884 protected long tryAcquireShared(long arg) { 885 throw new UnsupportedOperationException(); 886 } 887 888 /** 889 * Attempts to set the state to reflect a release in shared mode. 890 * 891 * <p>This method is always invoked by the thread performing release. 892 * 893 * <p>The default implementation throws 894 * {@link UnsupportedOperationException}. 895 * 896 * @param arg the release argument. This value is always the one 897 * passed to a release method, or the current state value upon 898 * entry to a condition wait. The value is otherwise 899 * uninterpreted and can represent anything you like. 900 * @return {@code true} if this release of shared mode may permit a 901 * waiting acquire (shared or exclusive) to succeed; and 902 * {@code false} otherwise 903 * @throws IllegalMonitorStateException if releasing would place this 904 * synchronizer in an illegal state. This exception must be 905 * thrown in a consistent fashion for synchronization to work 906 * correctly. 907 * @throws UnsupportedOperationException if shared mode is not supported 908 */ 909 protected boolean tryReleaseShared(long arg) { 910 throw new UnsupportedOperationException(); 911 } 912 913 /** 914 * Returns {@code true} if synchronization is held exclusively with 915 * respect to the current (calling) thread. This method is invoked 916 * upon each call to a non-waiting {@link ConditionObject} method. 917 * (Waiting methods instead invoke {@link #release}.) 918 * 919 * <p>The default implementation throws {@link 920 * UnsupportedOperationException}. This method is invoked 921 * internally only within {@link ConditionObject} methods, so need 922 * not be defined if conditions are not used. 923 * 924 * @return {@code true} if synchronization is held exclusively; 925 * {@code false} otherwise 926 * @throws UnsupportedOperationException if conditions are not supported 927 */ 928 protected boolean isHeldExclusively() { 929 throw new UnsupportedOperationException(); 930 } 931 932 /** 933 * Acquires in exclusive mode, ignoring interrupts. Implemented 934 * by invoking at least once {@link #tryAcquire}, 935 * returning on success. Otherwise the thread is queued, possibly 936 * repeatedly blocking and unblocking, invoking {@link 937 * #tryAcquire} until success. This method can be used 938 * to implement method {@link Lock#lock}. 939 * 940 * @param arg the acquire argument. This value is conveyed to 941 * {@link #tryAcquire} but is otherwise uninterpreted and 942 * can represent anything you like. 943 */ 944 public final void acquire(long arg) { 945 if (!tryAcquire(arg) && 946 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 947 selfInterrupt(); 948 } 949 950 /** 951 * Acquires in exclusive mode, aborting if interrupted. 952 * Implemented by first checking interrupt status, then invoking 953 * at least once {@link #tryAcquire}, returning on 954 * success. Otherwise the thread is queued, possibly repeatedly 955 * blocking and unblocking, invoking {@link #tryAcquire} 956 * until success or the thread is interrupted. This method can be 957 * used to implement method {@link Lock#lockInterruptibly}. 958 * 959 * @param arg the acquire argument. This value is conveyed to 960 * {@link #tryAcquire} but is otherwise uninterpreted and 961 * can represent anything you like. 962 * @throws InterruptedException if the current thread is interrupted 963 */ 964 public final void acquireInterruptibly(long arg) 965 throws InterruptedException { 966 if (Thread.interrupted()) 967 throw new InterruptedException(); 968 if (!tryAcquire(arg)) 969 doAcquireInterruptibly(arg); 970 } 971 972 /** 973 * Attempts to acquire in exclusive mode, aborting if interrupted, 974 * and failing if the given timeout elapses. Implemented by first 975 * checking interrupt status, then invoking at least once {@link 976 * #tryAcquire}, returning on success. Otherwise, the thread is 977 * queued, possibly repeatedly blocking and unblocking, invoking 978 * {@link #tryAcquire} until success or the thread is interrupted 979 * or the timeout elapses. This method can be used to implement 980 * method {@link Lock#tryLock(long, TimeUnit)}. 981 * 982 * @param arg the acquire argument. This value is conveyed to 983 * {@link #tryAcquire} but is otherwise uninterpreted and 984 * can represent anything you like. 985 * @param nanosTimeout the maximum number of nanoseconds to wait 986 * @return {@code true} if acquired; {@code false} if timed out 987 * @throws InterruptedException if the current thread is interrupted 988 */ 989 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 990 throws InterruptedException { 991 if (Thread.interrupted()) 992 throw new InterruptedException(); 993 return tryAcquire(arg) || 994 doAcquireNanos(arg, nanosTimeout); 995 } 996 997 /** 998 * Releases in exclusive mode. Implemented by unblocking one or 999 * more threads if {@link #tryRelease} returns true. 1000 * This method can be used to implement method {@link Lock#unlock}. 1001 * 1002 * @param arg the release argument. This value is conveyed to 1003 * {@link #tryRelease} but is otherwise uninterpreted and 1004 * can represent anything you like. 1005 * @return the value returned from {@link #tryRelease} 1006 */ 1007 public final boolean release(long arg) { 1008 if (tryRelease(arg)) { 1009 Node h = head; 1010 if (h != null && h.waitStatus != 0) 1011 unparkSuccessor(h); 1012 return true; 1013 } 1014 return false; 1015 } 1016 1017 /** 1018 * Acquires in shared mode, ignoring interrupts. Implemented by 1019 * first invoking at least once {@link #tryAcquireShared}, 1020 * returning on success. Otherwise the thread is queued, possibly 1021 * repeatedly blocking and unblocking, invoking {@link 1022 * #tryAcquireShared} until success. 1023 * 1024 * @param arg the acquire argument. This value is conveyed to 1025 * {@link #tryAcquireShared} but is otherwise uninterpreted 1026 * and can represent anything you like. 1027 */ 1028 public final void acquireShared(long arg) { 1029 if (tryAcquireShared(arg) < 0) 1030 doAcquireShared(arg); 1031 } 1032 1033 /** 1034 * Acquires in shared mode, aborting if interrupted. Implemented 1035 * by first checking interrupt status, then invoking at least once 1036 * {@link #tryAcquireShared}, returning on success. Otherwise the 1037 * thread is queued, possibly repeatedly blocking and unblocking, 1038 * invoking {@link #tryAcquireShared} until success or the thread 1039 * is interrupted. 1040 * @param arg the acquire argument. 1041 * This value is conveyed to {@link #tryAcquireShared} but is 1042 * otherwise uninterpreted and can represent anything 1043 * you like. 1044 * @throws InterruptedException if the current thread is interrupted 1045 */ 1046 public final void acquireSharedInterruptibly(long arg) 1047 throws InterruptedException { 1048 if (Thread.interrupted()) 1049 throw new InterruptedException(); 1050 if (tryAcquireShared(arg) < 0) 1051 doAcquireSharedInterruptibly(arg); 1052 } 1053 1054 /** 1055 * Attempts to acquire in shared mode, aborting if interrupted, and 1056 * failing if the given timeout elapses. Implemented by first 1057 * checking interrupt status, then invoking at least once {@link 1058 * #tryAcquireShared}, returning on success. Otherwise, the 1059 * thread is queued, possibly repeatedly blocking and unblocking, 1060 * invoking {@link #tryAcquireShared} until success or the thread 1061 * is interrupted or the timeout elapses. 1062 * 1063 * @param arg the acquire argument. This value is conveyed to 1064 * {@link #tryAcquireShared} but is otherwise uninterpreted 1065 * and can represent anything you like. 1066 * @param nanosTimeout the maximum number of nanoseconds to wait 1067 * @return {@code true} if acquired; {@code false} if timed out 1068 * @throws InterruptedException if the current thread is interrupted 1069 */ 1070 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 1071 throws InterruptedException { 1072 if (Thread.interrupted()) 1073 throw new InterruptedException(); 1074 return tryAcquireShared(arg) >= 0 || 1075 doAcquireSharedNanos(arg, nanosTimeout); 1076 } 1077 1078 /** 1079 * Releases in shared mode. Implemented by unblocking one or more 1080 * threads if {@link #tryReleaseShared} returns true. 1081 * 1082 * @param arg the release argument. This value is conveyed to 1083 * {@link #tryReleaseShared} but is otherwise uninterpreted 1084 * and can represent anything you like. 1085 * @return the value returned from {@link #tryReleaseShared} 1086 */ 1087 public final boolean releaseShared(long arg) { 1088 if (tryReleaseShared(arg)) { 1089 doReleaseShared(); 1090 return true; 1091 } 1092 return false; 1093 } 1094 1095 // Queue inspection methods 1096 1097 /** 1098 * Queries whether any threads are waiting to acquire. Note that 1099 * because cancellations due to interrupts and timeouts may occur 1100 * at any time, a {@code true} return does not guarantee that any 1101 * other thread will ever acquire. 1102 * 1103 * <p>In this implementation, this operation returns in 1104 * constant time. 1105 * 1106 * @return {@code true} if there may be other threads waiting to acquire 1107 */ 1108 public final boolean hasQueuedThreads() { 1109 return head != tail; 1110 } 1111 1112 /** 1113 * Queries whether any threads have ever contended to acquire this 1114 * synchronizer; that is if an acquire method has ever blocked. 1115 * 1116 * <p>In this implementation, this operation returns in 1117 * constant time. 1118 * 1119 * @return {@code true} if there has ever been contention 1120 */ 1121 public final boolean hasContended() { 1122 return head != null; 1123 } 1124 1125 /** 1126 * Returns the first (longest-waiting) thread in the queue, or 1127 * {@code null} if no threads are currently queued. 1128 * 1129 * <p>In this implementation, this operation normally returns in 1130 * constant time, but may iterate upon contention if other threads are 1131 * concurrently modifying the queue. 1132 * 1133 * @return the first (longest-waiting) thread in the queue, or 1134 * {@code null} if no threads are currently queued 1135 */ 1136 public final Thread getFirstQueuedThread() { 1137 // handle only fast path, else relay 1138 return (head == tail) ? null : fullGetFirstQueuedThread(); 1139 } 1140 1141 /** 1142 * Version of getFirstQueuedThread called when fastpath fails 1143 */ 1144 private Thread fullGetFirstQueuedThread() { 1145 /* 1146 * The first node is normally head.next. Try to get its 1147 * thread field, ensuring consistent reads: If thread 1148 * field is nulled out or s.prev is no longer head, then 1149 * some other thread(s) concurrently performed setHead in 1150 * between some of our reads. We try this twice before 1151 * resorting to traversal. 1152 */ 1153 Node h, s; 1154 Thread st; 1155 if (((h = head) != null && (s = h.next) != null && 1156 s.prev == head && (st = s.thread) != null) || 1157 ((h = head) != null && (s = h.next) != null && 1158 s.prev == head && (st = s.thread) != null)) 1159 return st; 1160 1161 /* 1162 * Head's next field might not have been set yet, or may have 1163 * been unset after setHead. So we must check to see if tail 1164 * is actually first node. If not, we continue on, safely 1165 * traversing from tail back to head to find first, 1166 * guaranteeing termination. 1167 */ 1168 1169 Node t = tail; 1170 Thread firstThread = null; 1171 while (t != null && t != head) { 1172 Thread tt = t.thread; 1173 if (tt != null) 1174 firstThread = tt; 1175 t = t.prev; 1176 } 1177 return firstThread; 1178 } 1179 1180 /** 1181 * Returns true if the given thread is currently queued. 1182 * 1183 * <p>This implementation traverses the queue to determine 1184 * presence of the given thread. 1185 * 1186 * @param thread the thread 1187 * @return {@code true} if the given thread is on the queue 1188 * @throws NullPointerException if the thread is null 1189 */ 1190 public final boolean isQueued(Thread thread) { 1191 if (thread == null) 1192 throw new NullPointerException(); 1193 for (Node p = tail; p != null; p = p.prev) 1194 if (p.thread == thread) 1195 return true; 1196 return false; 1197 } 1198 1199 /** 1200 * Returns {@code true} if the apparent first queued thread, if one 1201 * exists, is waiting in exclusive mode. If this method returns 1202 * {@code true}, and the current thread is attempting to acquire in 1203 * shared mode (that is, this method is invoked from {@link 1204 * #tryAcquireShared}) then it is guaranteed that the current thread 1205 * is not the first queued thread. Used only as a heuristic in 1206 * ReentrantReadWriteLock. 1207 */ 1208 final boolean apparentlyFirstQueuedIsExclusive() { 1209 Node h, s; 1210 return (h = head) != null && 1211 (s = h.next) != null && 1212 !s.isShared() && 1213 s.thread != null; 1214 } 1215 1216 /** 1217 * Queries whether any threads have been waiting to acquire longer 1218 * than the current thread. 1219 * 1220 * <p>An invocation of this method is equivalent to (but may be 1221 * more efficient than): 1222 * <pre> {@code 1223 * getFirstQueuedThread() != Thread.currentThread() && 1224 * hasQueuedThreads()}</pre> 1225 * 1226 * <p>Note that because cancellations due to interrupts and 1227 * timeouts may occur at any time, a {@code true} return does not 1228 * guarantee that some other thread will acquire before the current 1229 * thread. Likewise, it is possible for another thread to win a 1230 * race to enqueue after this method has returned {@code false}, 1231 * due to the queue being empty. 1232 * 1233 * <p>This method is designed to be used by a fair synchronizer to 1234 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1235 * Such a synchronizer's {@link #tryAcquire} method should return 1236 * {@code false}, and its {@link #tryAcquireShared} method should 1237 * return a negative value, if this method returns {@code true} 1238 * (unless this is a reentrant acquire). For example, the {@code 1239 * tryAcquire} method for a fair, reentrant, exclusive mode 1240 * synchronizer might look like this: 1241 * 1242 * <pre> {@code 1243 * protected boolean tryAcquire(int arg) { 1244 * if (isHeldExclusively()) { 1245 * // A reentrant acquire; increment hold count 1246 * return true; 1247 * } else if (hasQueuedPredecessors()) { 1248 * return false; 1249 * } else { 1250 * // try to acquire normally 1251 * } 1252 * }}</pre> 1253 * 1254 * @return {@code true} if there is a queued thread preceding the 1255 * current thread, and {@code false} if the current thread 1256 * is at the head of the queue or the queue is empty 1257 * @since 1.7 1258 * @hide 1.7 1259 */ 1260 public final boolean hasQueuedPredecessors() { 1261 // The correctness of this depends on head being initialized 1262 // before tail and on head.next being accurate if the current 1263 // thread is first in queue. 1264 Node t = tail; // Read fields in reverse initialization order 1265 Node h = head; 1266 Node s; 1267 return h != t && 1268 ((s = h.next) == null || s.thread != Thread.currentThread()); 1269 } 1270 1271 1272 // Instrumentation and monitoring methods 1273 1274 /** 1275 * Returns an estimate of the number of threads waiting to 1276 * acquire. The value is only an estimate because the number of 1277 * threads may change dynamically while this method traverses 1278 * internal data structures. This method is designed for use in 1279 * monitoring system state, not for synchronization 1280 * control. 1281 * 1282 * @return the estimated number of threads waiting to acquire 1283 */ 1284 public final int getQueueLength() { 1285 int n = 0; 1286 for (Node p = tail; p != null; p = p.prev) { 1287 if (p.thread != null) 1288 ++n; 1289 } 1290 return n; 1291 } 1292 1293 /** 1294 * Returns a collection containing threads that may be waiting to 1295 * acquire. Because the actual set of threads may change 1296 * dynamically while constructing this result, the returned 1297 * collection is only a best-effort estimate. The elements of the 1298 * returned collection are in no particular order. This method is 1299 * designed to facilitate construction of subclasses that provide 1300 * more extensive monitoring facilities. 1301 * 1302 * @return the collection of threads 1303 */ 1304 public final Collection<Thread> getQueuedThreads() { 1305 ArrayList<Thread> list = new ArrayList<Thread>(); 1306 for (Node p = tail; p != null; p = p.prev) { 1307 Thread t = p.thread; 1308 if (t != null) 1309 list.add(t); 1310 } 1311 return list; 1312 } 1313 1314 /** 1315 * Returns a collection containing threads that may be waiting to 1316 * acquire in exclusive mode. This has the same properties 1317 * as {@link #getQueuedThreads} except that it only returns 1318 * those threads waiting due to an exclusive acquire. 1319 * 1320 * @return the collection of threads 1321 */ 1322 public final Collection<Thread> getExclusiveQueuedThreads() { 1323 ArrayList<Thread> list = new ArrayList<Thread>(); 1324 for (Node p = tail; p != null; p = p.prev) { 1325 if (!p.isShared()) { 1326 Thread t = p.thread; 1327 if (t != null) 1328 list.add(t); 1329 } 1330 } 1331 return list; 1332 } 1333 1334 /** 1335 * Returns a collection containing threads that may be waiting to 1336 * acquire in shared mode. This has the same properties 1337 * as {@link #getQueuedThreads} except that it only returns 1338 * those threads waiting due to a shared acquire. 1339 * 1340 * @return the collection of threads 1341 */ 1342 public final Collection<Thread> getSharedQueuedThreads() { 1343 ArrayList<Thread> list = new ArrayList<Thread>(); 1344 for (Node p = tail; p != null; p = p.prev) { 1345 if (p.isShared()) { 1346 Thread t = p.thread; 1347 if (t != null) 1348 list.add(t); 1349 } 1350 } 1351 return list; 1352 } 1353 1354 /** 1355 * Returns a string identifying this synchronizer, as well as its state. 1356 * The state, in brackets, includes the String {@code "State ="} 1357 * followed by the current value of {@link #getState}, and either 1358 * {@code "nonempty"} or {@code "empty"} depending on whether the 1359 * queue is empty. 1360 * 1361 * @return a string identifying this synchronizer, as well as its state 1362 */ 1363 public String toString() { 1364 long s = getState(); 1365 String q = hasQueuedThreads() ? "non" : ""; 1366 return super.toString() + 1367 "[State = " + s + ", " + q + "empty queue]"; 1368 } 1369 1370 1371 // Internal support methods for Conditions 1372 1373 /** 1374 * Returns true if a node, always one that was initially placed on 1375 * a condition queue, is now waiting to reacquire on sync queue. 1376 * @param node the node 1377 * @return true if is reacquiring 1378 */ 1379 final boolean isOnSyncQueue(Node node) { 1380 if (node.waitStatus == Node.CONDITION || node.prev == null) 1381 return false; 1382 if (node.next != null) // If has successor, it must be on queue 1383 return true; 1384 /* 1385 * node.prev can be non-null, but not yet on queue because 1386 * the CAS to place it on queue can fail. So we have to 1387 * traverse from tail to make sure it actually made it. It 1388 * will always be near the tail in calls to this method, and 1389 * unless the CAS failed (which is unlikely), it will be 1390 * there, so we hardly ever traverse much. 1391 */ 1392 return findNodeFromTail(node); 1393 } 1394 1395 /** 1396 * Returns true if node is on sync queue by searching backwards from tail. 1397 * Called only when needed by isOnSyncQueue. 1398 * @return true if present 1399 */ 1400 private boolean findNodeFromTail(Node node) { 1401 Node t = tail; 1402 for (;;) { 1403 if (t == node) 1404 return true; 1405 if (t == null) 1406 return false; 1407 t = t.prev; 1408 } 1409 } 1410 1411 /** 1412 * Transfers a node from a condition queue onto sync queue. 1413 * Returns true if successful. 1414 * @param node the node 1415 * @return true if successfully transferred (else the node was 1416 * cancelled before signal) 1417 */ 1418 final boolean transferForSignal(Node node) { 1419 /* 1420 * If cannot change waitStatus, the node has been cancelled. 1421 */ 1422 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 1423 return false; 1424 1425 /* 1426 * Splice onto queue and try to set waitStatus of predecessor to 1427 * indicate that thread is (probably) waiting. If cancelled or 1428 * attempt to set waitStatus fails, wake up to resync (in which 1429 * case the waitStatus can be transiently and harmlessly wrong). 1430 */ 1431 Node p = enq(node); 1432 int ws = p.waitStatus; 1433 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 1434 LockSupport.unpark(node.thread); 1435 return true; 1436 } 1437 1438 /** 1439 * Transfers node, if necessary, to sync queue after a cancelled wait. 1440 * Returns true if thread was cancelled before being signalled. 1441 * 1442 * @param node the node 1443 * @return true if cancelled before the node was signalled 1444 */ 1445 final boolean transferAfterCancelledWait(Node node) { 1446 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 1447 enq(node); 1448 return true; 1449 } 1450 /* 1451 * If we lost out to a signal(), then we can't proceed 1452 * until it finishes its enq(). Cancelling during an 1453 * incomplete transfer is both rare and transient, so just 1454 * spin. 1455 */ 1456 while (!isOnSyncQueue(node)) 1457 Thread.yield(); 1458 return false; 1459 } 1460 1461 /** 1462 * Invokes release with current state value; returns saved state. 1463 * Cancels node and throws exception on failure. 1464 * @param node the condition node for this wait 1465 * @return previous sync state 1466 */ 1467 final long fullyRelease(Node node) { 1468 boolean failed = true; 1469 try { 1470 long savedState = getState(); 1471 if (release(savedState)) { 1472 failed = false; 1473 return savedState; 1474 } else { 1475 throw new IllegalMonitorStateException(); 1476 } 1477 } finally { 1478 if (failed) 1479 node.waitStatus = Node.CANCELLED; 1480 } 1481 } 1482 1483 // Instrumentation methods for conditions 1484 1485 /** 1486 * Queries whether the given ConditionObject 1487 * uses this synchronizer as its lock. 1488 * 1489 * @param condition the condition 1490 * @return {@code true} if owned 1491 * @throws NullPointerException if the condition is null 1492 */ 1493 public final boolean owns(ConditionObject condition) { 1494 return condition.isOwnedBy(this); 1495 } 1496 1497 /** 1498 * Queries whether any threads are waiting on the given condition 1499 * associated with this synchronizer. Note that because timeouts 1500 * and interrupts may occur at any time, a {@code true} return 1501 * does not guarantee that a future {@code signal} will awaken 1502 * any threads. This method is designed primarily for use in 1503 * monitoring of the system state. 1504 * 1505 * @param condition the condition 1506 * @return {@code true} if there are any waiting threads 1507 * @throws IllegalMonitorStateException if exclusive synchronization 1508 * is not held 1509 * @throws IllegalArgumentException if the given condition is 1510 * not associated with this synchronizer 1511 * @throws NullPointerException if the condition is null 1512 */ 1513 public final boolean hasWaiters(ConditionObject condition) { 1514 if (!owns(condition)) 1515 throw new IllegalArgumentException("Not owner"); 1516 return condition.hasWaiters(); 1517 } 1518 1519 /** 1520 * Returns an estimate of the number of threads waiting on the 1521 * given condition associated with this synchronizer. Note that 1522 * because timeouts and interrupts may occur at any time, the 1523 * estimate serves only as an upper bound on the actual number of 1524 * waiters. This method is designed for use in monitoring of the 1525 * system state, not for synchronization control. 1526 * 1527 * @param condition the condition 1528 * @return the estimated number of waiting threads 1529 * @throws IllegalMonitorStateException if exclusive synchronization 1530 * is not held 1531 * @throws IllegalArgumentException if the given condition is 1532 * not associated with this synchronizer 1533 * @throws NullPointerException if the condition is null 1534 */ 1535 public final int getWaitQueueLength(ConditionObject condition) { 1536 if (!owns(condition)) 1537 throw new IllegalArgumentException("Not owner"); 1538 return condition.getWaitQueueLength(); 1539 } 1540 1541 /** 1542 * Returns a collection containing those threads that may be 1543 * waiting on the given condition associated with this 1544 * synchronizer. Because the actual set of threads may change 1545 * dynamically while constructing this result, the returned 1546 * collection is only a best-effort estimate. The elements of the 1547 * returned collection are in no particular order. 1548 * 1549 * @param condition the condition 1550 * @return the collection of threads 1551 * @throws IllegalMonitorStateException if exclusive synchronization 1552 * is not held 1553 * @throws IllegalArgumentException if the given condition is 1554 * not associated with this synchronizer 1555 * @throws NullPointerException if the condition is null 1556 */ 1557 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1558 if (!owns(condition)) 1559 throw new IllegalArgumentException("Not owner"); 1560 return condition.getWaitingThreads(); 1561 } 1562 1563 /** 1564 * Condition implementation for a {@link 1565 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link 1566 * Lock} implementation. 1567 * 1568 * <p>Method documentation for this class describes mechanics, 1569 * not behavioral specifications from the point of view of Lock 1570 * and Condition users. Exported versions of this class will in 1571 * general need to be accompanied by documentation describing 1572 * condition semantics that rely on those of the associated 1573 * {@code AbstractQueuedLongSynchronizer}. 1574 * 1575 * <p>This class is Serializable, but all fields are transient, 1576 * so deserialized conditions have no waiters. 1577 * 1578 * @since 1.6 1579 */ 1580 public class ConditionObject implements Condition, java.io.Serializable { 1581 private static final long serialVersionUID = 1173984872572414699L; 1582 /** First node of condition queue. */ 1583 private transient Node firstWaiter; 1584 /** Last node of condition queue. */ 1585 private transient Node lastWaiter; 1586 1587 /** 1588 * Creates a new {@code ConditionObject} instance. 1589 */ 1590 public ConditionObject() { } 1591 1592 // Internal methods 1593 1594 /** 1595 * Adds a new waiter to wait queue. 1596 * @return its new wait node 1597 */ 1598 private Node addConditionWaiter() { 1599 Node t = lastWaiter; 1600 // If lastWaiter is cancelled, clean out. 1601 if (t != null && t.waitStatus != Node.CONDITION) { 1602 unlinkCancelledWaiters(); 1603 t = lastWaiter; 1604 } 1605 Node node = new Node(Thread.currentThread(), Node.CONDITION); 1606 if (t == null) 1607 firstWaiter = node; 1608 else 1609 t.nextWaiter = node; 1610 lastWaiter = node; 1611 return node; 1612 } 1613 1614 /** 1615 * Removes and transfers nodes until hit non-cancelled one or 1616 * null. Split out from signal in part to encourage compilers 1617 * to inline the case of no waiters. 1618 * @param first (non-null) the first node on condition queue 1619 */ 1620 private void doSignal(Node first) { 1621 do { 1622 if ( (firstWaiter = first.nextWaiter) == null) 1623 lastWaiter = null; 1624 first.nextWaiter = null; 1625 } while (!transferForSignal(first) && 1626 (first = firstWaiter) != null); 1627 } 1628 1629 /** 1630 * Removes and transfers all nodes. 1631 * @param first (non-null) the first node on condition queue 1632 */ 1633 private void doSignalAll(Node first) { 1634 lastWaiter = firstWaiter = null; 1635 do { 1636 Node next = first.nextWaiter; 1637 first.nextWaiter = null; 1638 transferForSignal(first); 1639 first = next; 1640 } while (first != null); 1641 } 1642 1643 /** 1644 * Unlinks cancelled waiter nodes from condition queue. 1645 * Called only while holding lock. This is called when 1646 * cancellation occurred during condition wait, and upon 1647 * insertion of a new waiter when lastWaiter is seen to have 1648 * been cancelled. This method is needed to avoid garbage 1649 * retention in the absence of signals. So even though it may 1650 * require a full traversal, it comes into play only when 1651 * timeouts or cancellations occur in the absence of 1652 * signals. It traverses all nodes rather than stopping at a 1653 * particular target to unlink all pointers to garbage nodes 1654 * without requiring many re-traversals during cancellation 1655 * storms. 1656 */ 1657 private void unlinkCancelledWaiters() { 1658 Node t = firstWaiter; 1659 Node trail = null; 1660 while (t != null) { 1661 Node next = t.nextWaiter; 1662 if (t.waitStatus != Node.CONDITION) { 1663 t.nextWaiter = null; 1664 if (trail == null) 1665 firstWaiter = next; 1666 else 1667 trail.nextWaiter = next; 1668 if (next == null) 1669 lastWaiter = trail; 1670 } 1671 else 1672 trail = t; 1673 t = next; 1674 } 1675 } 1676 1677 // public methods 1678 1679 /** 1680 * Moves the longest-waiting thread, if one exists, from the 1681 * wait queue for this condition to the wait queue for the 1682 * owning lock. 1683 * 1684 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1685 * returns {@code false} 1686 */ 1687 public final void signal() { 1688 if (!isHeldExclusively()) 1689 throw new IllegalMonitorStateException(); 1690 Node first = firstWaiter; 1691 if (first != null) 1692 doSignal(first); 1693 } 1694 1695 /** 1696 * Moves all threads from the wait queue for this condition to 1697 * the wait queue for the owning lock. 1698 * 1699 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1700 * returns {@code false} 1701 */ 1702 public final void signalAll() { 1703 if (!isHeldExclusively()) 1704 throw new IllegalMonitorStateException(); 1705 Node first = firstWaiter; 1706 if (first != null) 1707 doSignalAll(first); 1708 } 1709 1710 /** 1711 * Implements uninterruptible condition wait. 1712 * <ol> 1713 * <li> Save lock state returned by {@link #getState}. 1714 * <li> Invoke {@link #release} with saved state as argument, 1715 * throwing IllegalMonitorStateException if it fails. 1716 * <li> Block until signalled. 1717 * <li> Reacquire by invoking specialized version of 1718 * {@link #acquire} with saved state as argument. 1719 * </ol> 1720 */ 1721 public final void awaitUninterruptibly() { 1722 Node node = addConditionWaiter(); 1723 long savedState = fullyRelease(node); 1724 boolean interrupted = false; 1725 while (!isOnSyncQueue(node)) { 1726 LockSupport.park(this); 1727 if (Thread.interrupted()) 1728 interrupted = true; 1729 } 1730 if (acquireQueued(node, savedState) || interrupted) 1731 selfInterrupt(); 1732 } 1733 1734 /* 1735 * For interruptible waits, we need to track whether to throw 1736 * InterruptedException, if interrupted while blocked on 1737 * condition, versus reinterrupt current thread, if 1738 * interrupted while blocked waiting to re-acquire. 1739 */ 1740 1741 /** Mode meaning to reinterrupt on exit from wait */ 1742 private static final int REINTERRUPT = 1; 1743 /** Mode meaning to throw InterruptedException on exit from wait */ 1744 private static final int THROW_IE = -1; 1745 1746 /** 1747 * Checks for interrupt, returning THROW_IE if interrupted 1748 * before signalled, REINTERRUPT if after signalled, or 1749 * 0 if not interrupted. 1750 */ 1751 private int checkInterruptWhileWaiting(Node node) { 1752 return Thread.interrupted() ? 1753 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 1754 0; 1755 } 1756 1757 /** 1758 * Throws InterruptedException, reinterrupts current thread, or 1759 * does nothing, depending on mode. 1760 */ 1761 private void reportInterruptAfterWait(int interruptMode) 1762 throws InterruptedException { 1763 if (interruptMode == THROW_IE) 1764 throw new InterruptedException(); 1765 else if (interruptMode == REINTERRUPT) 1766 selfInterrupt(); 1767 } 1768 1769 /** 1770 * Implements interruptible condition wait. 1771 * <ol> 1772 * <li> If current thread is interrupted, throw InterruptedException. 1773 * <li> Save lock state returned by {@link #getState}. 1774 * <li> Invoke {@link #release} with saved state as argument, 1775 * throwing IllegalMonitorStateException if it fails. 1776 * <li> Block until signalled or interrupted. 1777 * <li> Reacquire by invoking specialized version of 1778 * {@link #acquire} with saved state as argument. 1779 * <li> If interrupted while blocked in step 4, throw InterruptedException. 1780 * </ol> 1781 */ 1782 public final void await() throws InterruptedException { 1783 if (Thread.interrupted()) 1784 throw new InterruptedException(); 1785 Node node = addConditionWaiter(); 1786 long savedState = fullyRelease(node); 1787 int interruptMode = 0; 1788 while (!isOnSyncQueue(node)) { 1789 LockSupport.park(this); 1790 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1791 break; 1792 } 1793 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1794 interruptMode = REINTERRUPT; 1795 if (node.nextWaiter != null) // clean up if cancelled 1796 unlinkCancelledWaiters(); 1797 if (interruptMode != 0) 1798 reportInterruptAfterWait(interruptMode); 1799 } 1800 1801 /** 1802 * Implements timed condition wait. 1803 * <ol> 1804 * <li> If current thread is interrupted, throw InterruptedException. 1805 * <li> Save lock state returned by {@link #getState}. 1806 * <li> Invoke {@link #release} with saved state as argument, 1807 * throwing IllegalMonitorStateException if it fails. 1808 * <li> Block until signalled, interrupted, or timed out. 1809 * <li> Reacquire by invoking specialized version of 1810 * {@link #acquire} with saved state as argument. 1811 * <li> If interrupted while blocked in step 4, throw InterruptedException. 1812 * </ol> 1813 */ 1814 public final long awaitNanos(long nanosTimeout) 1815 throws InterruptedException { 1816 if (Thread.interrupted()) 1817 throw new InterruptedException(); 1818 Node node = addConditionWaiter(); 1819 long savedState = fullyRelease(node); 1820 final long deadline = System.nanoTime() + nanosTimeout; 1821 int interruptMode = 0; 1822 while (!isOnSyncQueue(node)) { 1823 if (nanosTimeout <= 0L) { 1824 transferAfterCancelledWait(node); 1825 break; 1826 } 1827 if (nanosTimeout >= spinForTimeoutThreshold) 1828 LockSupport.parkNanos(this, nanosTimeout); 1829 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1830 break; 1831 nanosTimeout = deadline - System.nanoTime(); 1832 } 1833 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1834 interruptMode = REINTERRUPT; 1835 if (node.nextWaiter != null) 1836 unlinkCancelledWaiters(); 1837 if (interruptMode != 0) 1838 reportInterruptAfterWait(interruptMode); 1839 return deadline - System.nanoTime(); 1840 } 1841 1842 /** 1843 * Implements absolute timed condition wait. 1844 * <ol> 1845 * <li> If current thread is interrupted, throw InterruptedException. 1846 * <li> Save lock state returned by {@link #getState}. 1847 * <li> Invoke {@link #release} with saved state as argument, 1848 * throwing IllegalMonitorStateException if it fails. 1849 * <li> Block until signalled, interrupted, or timed out. 1850 * <li> Reacquire by invoking specialized version of 1851 * {@link #acquire} with saved state as argument. 1852 * <li> If interrupted while blocked in step 4, throw InterruptedException. 1853 * <li> If timed out while blocked in step 4, return false, else true. 1854 * </ol> 1855 */ 1856 public final boolean awaitUntil(Date deadline) 1857 throws InterruptedException { 1858 long abstime = deadline.getTime(); 1859 if (Thread.interrupted()) 1860 throw new InterruptedException(); 1861 Node node = addConditionWaiter(); 1862 long savedState = fullyRelease(node); 1863 boolean timedout = false; 1864 int interruptMode = 0; 1865 while (!isOnSyncQueue(node)) { 1866 if (System.currentTimeMillis() > abstime) { 1867 timedout = transferAfterCancelledWait(node); 1868 break; 1869 } 1870 LockSupport.parkUntil(this, abstime); 1871 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1872 break; 1873 } 1874 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1875 interruptMode = REINTERRUPT; 1876 if (node.nextWaiter != null) 1877 unlinkCancelledWaiters(); 1878 if (interruptMode != 0) 1879 reportInterruptAfterWait(interruptMode); 1880 return !timedout; 1881 } 1882 1883 /** 1884 * Implements timed condition wait. 1885 * <ol> 1886 * <li> If current thread is interrupted, throw InterruptedException. 1887 * <li> Save lock state returned by {@link #getState}. 1888 * <li> Invoke {@link #release} with saved state as argument, 1889 * throwing IllegalMonitorStateException if it fails. 1890 * <li> Block until signalled, interrupted, or timed out. 1891 * <li> Reacquire by invoking specialized version of 1892 * {@link #acquire} with saved state as argument. 1893 * <li> If interrupted while blocked in step 4, throw InterruptedException. 1894 * <li> If timed out while blocked in step 4, return false, else true. 1895 * </ol> 1896 */ 1897 public final boolean await(long time, TimeUnit unit) 1898 throws InterruptedException { 1899 long nanosTimeout = unit.toNanos(time); 1900 if (Thread.interrupted()) 1901 throw new InterruptedException(); 1902 Node node = addConditionWaiter(); 1903 long savedState = fullyRelease(node); 1904 final long deadline = System.nanoTime() + nanosTimeout; 1905 boolean timedout = false; 1906 int interruptMode = 0; 1907 while (!isOnSyncQueue(node)) { 1908 if (nanosTimeout <= 0L) { 1909 timedout = transferAfterCancelledWait(node); 1910 break; 1911 } 1912 if (nanosTimeout >= spinForTimeoutThreshold) 1913 LockSupport.parkNanos(this, nanosTimeout); 1914 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1915 break; 1916 nanosTimeout = deadline - System.nanoTime(); 1917 } 1918 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1919 interruptMode = REINTERRUPT; 1920 if (node.nextWaiter != null) 1921 unlinkCancelledWaiters(); 1922 if (interruptMode != 0) 1923 reportInterruptAfterWait(interruptMode); 1924 return !timedout; 1925 } 1926 1927 // support for instrumentation 1928 1929 /** 1930 * Returns true if this condition was created by the given 1931 * synchronization object. 1932 * 1933 * @return {@code true} if owned 1934 */ 1935 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1936 return sync == AbstractQueuedLongSynchronizer.this; 1937 } 1938 1939 /** 1940 * Queries whether any threads are waiting on this condition. 1941 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters}. 1942 * 1943 * @return {@code true} if there are any waiting threads 1944 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1945 * returns {@code false} 1946 */ 1947 protected final boolean hasWaiters() { 1948 if (!isHeldExclusively()) 1949 throw new IllegalMonitorStateException(); 1950 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1951 if (w.waitStatus == Node.CONDITION) 1952 return true; 1953 } 1954 return false; 1955 } 1956 1957 /** 1958 * Returns an estimate of the number of threads waiting on 1959 * this condition. 1960 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength}. 1961 * 1962 * @return the estimated number of waiting threads 1963 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1964 * returns {@code false} 1965 */ 1966 protected final int getWaitQueueLength() { 1967 if (!isHeldExclusively()) 1968 throw new IllegalMonitorStateException(); 1969 int n = 0; 1970 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1971 if (w.waitStatus == Node.CONDITION) 1972 ++n; 1973 } 1974 return n; 1975 } 1976 1977 /** 1978 * Returns a collection containing those threads that may be 1979 * waiting on this Condition. 1980 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads}. 1981 * 1982 * @return the collection of threads 1983 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1984 * returns {@code false} 1985 */ 1986 protected final Collection<Thread> getWaitingThreads() { 1987 if (!isHeldExclusively()) 1988 throw new IllegalMonitorStateException(); 1989 ArrayList<Thread> list = new ArrayList<Thread>(); 1990 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1991 if (w.waitStatus == Node.CONDITION) { 1992 Thread t = w.thread; 1993 if (t != null) 1994 list.add(t); 1995 } 1996 } 1997 return list; 1998 } 1999 } 2000 2001 /** 2002 * Setup to support compareAndSet. We need to natively implement 2003 * this here: For the sake of permitting future enhancements, we 2004 * cannot explicitly subclass AtomicLong, which would be 2005 * efficient and useful otherwise. So, as the lesser of evils, we 2006 * natively implement using hotspot intrinsics API. And while we 2007 * are at it, we do the same for other CASable fields (which could 2008 * otherwise be done with atomic field updaters). 2009 */ 2010 private static final Unsafe unsafe = Unsafe.getUnsafe(); 2011 private static final long stateOffset; 2012 private static final long headOffset; 2013 private static final long tailOffset; 2014 private static final long waitStatusOffset; 2015 private static final long nextOffset; 2016 2017 static { 2018 try { 2019 stateOffset = unsafe.objectFieldOffset 2020 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state")); 2021 headOffset = unsafe.objectFieldOffset 2022 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head")); 2023 tailOffset = unsafe.objectFieldOffset 2024 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail")); 2025 waitStatusOffset = unsafe.objectFieldOffset 2026 (Node.class.getDeclaredField("waitStatus")); 2027 nextOffset = unsafe.objectFieldOffset 2028 (Node.class.getDeclaredField("next")); 2029 2030 } catch (Exception ex) { throw new Error(ex); } 2031 } 2032 2033 /** 2034 * CAS head field. Used only by enq. 2035 */ 2036 private final boolean compareAndSetHead(Node update) { 2037 return unsafe.compareAndSwapObject(this, headOffset, null, update); 2038 } 2039 2040 /** 2041 * CAS tail field. Used only by enq. 2042 */ 2043 private final boolean compareAndSetTail(Node expect, Node update) { 2044 return unsafe.compareAndSwapObject(this, tailOffset, expect, update); 2045 } 2046 2047 /** 2048 * CAS waitStatus field of a node. 2049 */ 2050 private static final boolean compareAndSetWaitStatus(Node node, 2051 int expect, 2052 int update) { 2053 return unsafe.compareAndSwapInt(node, waitStatusOffset, 2054 expect, update); 2055 } 2056 2057 /** 2058 * CAS next field of a node. 2059 */ 2060 private static final boolean compareAndSetNext(Node node, 2061 Node expect, 2062 Node update) { 2063 return unsafe.compareAndSwapObject(node, nextOffset, expect, update); 2064 } 2065} 2066