1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent.locks; 8import java.util.*; 9import java.util.concurrent.*; 10import sun.misc.Unsafe; 11 12// BEGIN android-note 13// Use older class level documentation to not @link to hasQueuedPredecessors 14// END android-changed 15 16/** 17 * Provides a framework for implementing blocking locks and related 18 * synchronizers (semaphores, events, etc) that rely on 19 * first-in-first-out (FIFO) wait queues. This class is designed to 20 * be a useful basis for most kinds of synchronizers that rely on a 21 * single atomic <tt>int</tt> value to represent state. Subclasses 22 * must define the protected methods that change this state, and which 23 * define what that state means in terms of this object being acquired 24 * or released. Given these, the other methods in this class carry 25 * out all queuing and blocking mechanics. Subclasses can maintain 26 * other state fields, but only the atomically updated <tt>int</tt> 27 * value manipulated using methods {@link #getState}, {@link 28 * #setState} and {@link #compareAndSetState} is tracked with respect 29 * to synchronization. 30 * 31 * <p>Subclasses should be defined as non-public internal helper 32 * classes that are used to implement the synchronization properties 33 * of their enclosing class. Class 34 * <tt>AbstractQueuedSynchronizer</tt> does not implement any 35 * synchronization interface. Instead it defines methods such as 36 * {@link #acquireInterruptibly} that can be invoked as 37 * appropriate by concrete locks and related synchronizers to 38 * implement their public methods. 39 * 40 * <p>This class supports either or both a default <em>exclusive</em> 41 * mode and a <em>shared</em> mode. When acquired in exclusive mode, 42 * attempted acquires by other threads cannot succeed. Shared mode 43 * acquires by multiple threads may (but need not) succeed. This class 44 * does not "understand" these differences except in the 45 * mechanical sense that when a shared mode acquire succeeds, the next 46 * waiting thread (if one exists) must also determine whether it can 47 * acquire as well. Threads waiting in the different modes share the 48 * same FIFO queue. Usually, implementation subclasses support only 49 * one of these modes, but both can come into play for example in a 50 * {@link ReadWriteLock}. Subclasses that support only exclusive or 51 * only shared modes need not define the methods supporting the unused mode. 52 * 53 * <p>This class defines a nested {@link ConditionObject} class that 54 * can be used as a {@link Condition} implementation by subclasses 55 * supporting exclusive mode for which method {@link 56 * #isHeldExclusively} reports whether synchronization is exclusively 57 * held with respect to the current thread, method {@link #release} 58 * invoked with the current {@link #getState} value fully releases 59 * this object, and {@link #acquire}, given this saved state value, 60 * eventually restores this object to its previous acquired state. No 61 * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a 62 * condition, so if this constraint cannot be met, do not use it. The 63 * behavior of {@link ConditionObject} depends of course on the 64 * semantics of its synchronizer implementation. 65 * 66 * <p>This class provides inspection, instrumentation, and monitoring 67 * methods for the internal queue, as well as similar methods for 68 * condition objects. These can be exported as desired into classes 69 * using an <tt>AbstractQueuedSynchronizer</tt> for their 70 * synchronization mechanics. 71 * 72 * <p>Serialization of this class stores only the underlying atomic 73 * integer maintaining state, so deserialized objects have empty 74 * thread queues. Typical subclasses requiring serializability will 75 * define a <tt>readObject</tt> method that restores this to a known 76 * initial state upon deserialization. 77 * 78 * <h3>Usage</h3> 79 * 80 * <p>To use this class as the basis of a synchronizer, redefine the 81 * following methods, as applicable, by inspecting and/or modifying 82 * the synchronization state using {@link #getState}, {@link 83 * #setState} and/or {@link #compareAndSetState}: 84 * 85 * <ul> 86 * <li> {@link #tryAcquire} 87 * <li> {@link #tryRelease} 88 * <li> {@link #tryAcquireShared} 89 * <li> {@link #tryReleaseShared} 90 * <li> {@link #isHeldExclusively} 91 *</ul> 92 * 93 * Each of these methods by default throws {@link 94 * UnsupportedOperationException}. Implementations of these methods 95 * must be internally thread-safe, and should in general be short and 96 * not block. Defining these methods is the <em>only</em> supported 97 * means of using this class. All other methods are declared 98 * <tt>final</tt> because they cannot be independently varied. 99 * 100 * <p>You may also find the inherited methods from {@link 101 * AbstractOwnableSynchronizer} useful to keep track of the thread 102 * owning an exclusive synchronizer. You are encouraged to use them 103 * -- this enables monitoring and diagnostic tools to assist users in 104 * determining which threads hold locks. 105 * 106 * <p>Even though this class is based on an internal FIFO queue, it 107 * does not automatically enforce FIFO acquisition policies. The core 108 * of exclusive synchronization takes the form: 109 * 110 * <pre> 111 * Acquire: 112 * while (!tryAcquire(arg)) { 113 * <em>enqueue thread if it is not already queued</em>; 114 * <em>possibly block current thread</em>; 115 * } 116 * 117 * Release: 118 * if (tryRelease(arg)) 119 * <em>unblock the first queued thread</em>; 120 * </pre> 121 * 122 * (Shared mode is similar but may involve cascading signals.) 123 * 124 * <p><a name="barging">Because checks in acquire are invoked before 125 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of 126 * others that are blocked and queued. However, you can, if desired, 127 * define <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to 128 * disable barging by internally invoking one or more of the inspection 129 * methods. In particular, a strict FIFO lock can define 130 * <tt>tryAcquire</tt> to immediately return <tt>false</tt> if {@link 131 * #getFirstQueuedThread} does not return the current thread. A 132 * normally preferable non-strict fair version can immediately return 133 * <tt>false</tt> only if {@link #hasQueuedThreads} returns 134 * <tt>true</tt> and <tt>getFirstQueuedThread</tt> is not the current 135 * thread; or equivalently, that <tt>getFirstQueuedThread</tt> is both 136 * non-null and not the current thread. Further variations are 137 * possible. 138 * 139 * <p>Throughput and scalability are generally highest for the 140 * default barging (also known as <em>greedy</em>, 141 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. 142 * While this is not guaranteed to be fair or starvation-free, earlier 143 * queued threads are allowed to recontend before later queued 144 * threads, and each recontention has an unbiased chance to succeed 145 * against incoming threads. Also, while acquires do not 146 * "spin" in the usual sense, they may perform multiple 147 * invocations of <tt>tryAcquire</tt> interspersed with other 148 * computations before blocking. This gives most of the benefits of 149 * spins when exclusive synchronization is only briefly held, without 150 * most of the liabilities when it isn't. If so desired, you can 151 * augment this by preceding calls to acquire methods with 152 * "fast-path" checks, possibly prechecking {@link #hasContended} 153 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer 154 * is likely not to be contended. 155 * 156 * <p>This class provides an efficient and scalable basis for 157 * synchronization in part by specializing its range of use to 158 * synchronizers that can rely on <tt>int</tt> state, acquire, and 159 * release parameters, and an internal FIFO wait queue. When this does 160 * not suffice, you can build synchronizers from a lower level using 161 * {@link java.util.concurrent.atomic atomic} classes, your own custom 162 * {@link java.util.Queue} classes, and {@link LockSupport} blocking 163 * support. 164 * 165 * <h3>Usage Examples</h3> 166 * 167 * <p>Here is a non-reentrant mutual exclusion lock class that uses 168 * the value zero to represent the unlocked state, and one to 169 * represent the locked state. While a non-reentrant lock 170 * does not strictly require recording of the current owner 171 * thread, this class does so anyway to make usage easier to monitor. 172 * It also supports conditions and exposes 173 * one of the instrumentation methods: 174 * 175 * <pre> {@code 176 * class Mutex implements Lock, java.io.Serializable { 177 * 178 * // Our internal helper class 179 * private static class Sync extends AbstractQueuedSynchronizer { 180 * // Report whether in locked state 181 * protected boolean isHeldExclusively() { 182 * return getState() == 1; 183 * } 184 * 185 * // Acquire the lock if state is zero 186 * public boolean tryAcquire(int acquires) { 187 * assert acquires == 1; // Otherwise unused 188 * if (compareAndSetState(0, 1)) { 189 * setExclusiveOwnerThread(Thread.currentThread()); 190 * return true; 191 * } 192 * return false; 193 * } 194 * 195 * // Release the lock by setting state to zero 196 * protected boolean tryRelease(int releases) { 197 * assert releases == 1; // Otherwise unused 198 * if (getState() == 0) throw new IllegalMonitorStateException(); 199 * setExclusiveOwnerThread(null); 200 * setState(0); 201 * return true; 202 * } 203 * 204 * // Provide a Condition 205 * Condition newCondition() { return new ConditionObject(); } 206 * 207 * // Deserialize properly 208 * private void readObject(ObjectInputStream s) 209 * throws IOException, ClassNotFoundException { 210 * s.defaultReadObject(); 211 * setState(0); // reset to unlocked state 212 * } 213 * } 214 * 215 * // The sync object does all the hard work. We just forward to it. 216 * private final Sync sync = new Sync(); 217 * 218 * public void lock() { sync.acquire(1); } 219 * public boolean tryLock() { return sync.tryAcquire(1); } 220 * public void unlock() { sync.release(1); } 221 * public Condition newCondition() { return sync.newCondition(); } 222 * public boolean isLocked() { return sync.isHeldExclusively(); } 223 * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } 224 * public void lockInterruptibly() throws InterruptedException { 225 * sync.acquireInterruptibly(1); 226 * } 227 * public boolean tryLock(long timeout, TimeUnit unit) 228 * throws InterruptedException { 229 * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 230 * } 231 * }}</pre> 232 * 233 * <p>Here is a latch class that is like a {@link CountDownLatch} 234 * except that it only requires a single <tt>signal</tt> to 235 * fire. Because a latch is non-exclusive, it uses the <tt>shared</tt> 236 * acquire and release methods. 237 * 238 * <pre> {@code 239 * class BooleanLatch { 240 * 241 * private static class Sync extends AbstractQueuedSynchronizer { 242 * boolean isSignalled() { return getState() != 0; } 243 * 244 * protected int tryAcquireShared(int ignore) { 245 * return isSignalled() ? 1 : -1; 246 * } 247 * 248 * protected boolean tryReleaseShared(int ignore) { 249 * setState(1); 250 * return true; 251 * } 252 * } 253 * 254 * private final Sync sync = new Sync(); 255 * public boolean isSignalled() { return sync.isSignalled(); } 256 * public void signal() { sync.releaseShared(1); } 257 * public void await() throws InterruptedException { 258 * sync.acquireSharedInterruptibly(1); 259 * } 260 * }}</pre> 261 * 262 * @since 1.5 263 * @author Doug Lea 264 */ 265public abstract class AbstractQueuedSynchronizer 266 extends AbstractOwnableSynchronizer 267 implements java.io.Serializable { 268 269 private static final long serialVersionUID = 7373984972572414691L; 270 271 /** 272 * Creates a new <tt>AbstractQueuedSynchronizer</tt> instance 273 * with initial synchronization state of zero. 274 */ 275 protected AbstractQueuedSynchronizer() { } 276 277 /** 278 * Wait queue node class. 279 * 280 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 281 * Hagersten) lock queue. CLH locks are normally used for 282 * spinlocks. We instead use them for blocking synchronizers, but 283 * use the same basic tactic of holding some of the control 284 * information about a thread in the predecessor of its node. A 285 * "status" field in each node keeps track of whether a thread 286 * should block. A node is signalled when its predecessor 287 * releases. Each node of the queue otherwise serves as a 288 * specific-notification-style monitor holding a single waiting 289 * thread. The status field does NOT control whether threads are 290 * granted locks etc though. A thread may try to acquire if it is 291 * first in the queue. But being first does not guarantee success; 292 * it only gives the right to contend. So the currently released 293 * contender thread may need to rewait. 294 * 295 * <p>To enqueue into a CLH lock, you atomically splice it in as new 296 * tail. To dequeue, you just set the head field. 297 * <pre> 298 * +------+ prev +-----+ +-----+ 299 * head | | <---- | | <---- | | tail 300 * +------+ +-----+ +-----+ 301 * </pre> 302 * 303 * <p>Insertion into a CLH queue requires only a single atomic 304 * operation on "tail", so there is a simple atomic point of 305 * demarcation from unqueued to queued. Similarly, dequeing 306 * involves only updating the "head". However, it takes a bit 307 * more work for nodes to determine who their successors are, 308 * in part to deal with possible cancellation due to timeouts 309 * and interrupts. 310 * 311 * <p>The "prev" links (not used in original CLH locks), are mainly 312 * needed to handle cancellation. If a node is cancelled, its 313 * successor is (normally) relinked to a non-cancelled 314 * predecessor. For explanation of similar mechanics in the case 315 * of spin locks, see the papers by Scott and Scherer at 316 * http://www.cs.rochester.edu/u/scott/synchronization/ 317 * 318 * <p>We also use "next" links to implement blocking mechanics. 319 * The thread id for each node is kept in its own node, so a 320 * predecessor signals the next node to wake up by traversing 321 * next link to determine which thread it is. Determination of 322 * successor must avoid races with newly queued nodes to set 323 * the "next" fields of their predecessors. This is solved 324 * when necessary by checking backwards from the atomically 325 * updated "tail" when a node's successor appears to be null. 326 * (Or, said differently, the next-links are an optimization 327 * so that we don't usually need a backward scan.) 328 * 329 * <p>Cancellation introduces some conservatism to the basic 330 * algorithms. Since we must poll for cancellation of other 331 * nodes, we can miss noticing whether a cancelled node is 332 * ahead or behind us. This is dealt with by always unparking 333 * successors upon cancellation, allowing them to stabilize on 334 * a new predecessor, unless we can identify an uncancelled 335 * predecessor who will carry this responsibility. 336 * 337 * <p>CLH queues need a dummy header node to get started. But 338 * we don't create them on construction, because it would be wasted 339 * effort if there is never contention. Instead, the node 340 * is constructed and head and tail pointers are set upon first 341 * contention. 342 * 343 * <p>Threads waiting on Conditions use the same nodes, but 344 * use an additional link. Conditions only need to link nodes 345 * in simple (non-concurrent) linked queues because they are 346 * only accessed when exclusively held. Upon await, a node is 347 * inserted into a condition queue. Upon signal, the node is 348 * transferred to the main queue. A special value of status 349 * field is used to mark which queue a node is on. 350 * 351 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 352 * Scherer and Michael Scott, along with members of JSR-166 353 * expert group, for helpful ideas, discussions, and critiques 354 * on the design of this class. 355 */ 356 static final class Node { 357 /** Marker to indicate a node is waiting in shared mode */ 358 static final Node SHARED = new Node(); 359 /** Marker to indicate a node is waiting in exclusive mode */ 360 static final Node EXCLUSIVE = null; 361 362 /** waitStatus value to indicate thread has cancelled */ 363 static final int CANCELLED = 1; 364 /** waitStatus value to indicate successor's thread needs unparking */ 365 static final int SIGNAL = -1; 366 /** waitStatus value to indicate thread is waiting on condition */ 367 static final int CONDITION = -2; 368 /** 369 * waitStatus value to indicate the next acquireShared should 370 * unconditionally propagate 371 */ 372 static final int PROPAGATE = -3; 373 374 /** 375 * Status field, taking on only the values: 376 * SIGNAL: The successor of this node is (or will soon be) 377 * blocked (via park), so the current node must 378 * unpark its successor when it releases or 379 * cancels. To avoid races, acquire methods must 380 * first indicate they need a signal, 381 * then retry the atomic acquire, and then, 382 * on failure, block. 383 * CANCELLED: This node is cancelled due to timeout or interrupt. 384 * Nodes never leave this state. In particular, 385 * a thread with cancelled node never again blocks. 386 * CONDITION: This node is currently on a condition queue. 387 * It will not be used as a sync queue node 388 * until transferred, at which time the status 389 * will be set to 0. (Use of this value here has 390 * nothing to do with the other uses of the 391 * field, but simplifies mechanics.) 392 * PROPAGATE: A releaseShared should be propagated to other 393 * nodes. This is set (for head node only) in 394 * doReleaseShared to ensure propagation 395 * continues, even if other operations have 396 * since intervened. 397 * 0: None of the above 398 * 399 * The values are arranged numerically to simplify use. 400 * Non-negative values mean that a node doesn't need to 401 * signal. So, most code doesn't need to check for particular 402 * values, just for sign. 403 * 404 * The field is initialized to 0 for normal sync nodes, and 405 * CONDITION for condition nodes. It is modified using CAS 406 * (or when possible, unconditional volatile writes). 407 */ 408 volatile int waitStatus; 409 410 /** 411 * Link to predecessor node that current node/thread relies on 412 * for checking waitStatus. Assigned during enqueing, and nulled 413 * out (for sake of GC) only upon dequeuing. Also, upon 414 * cancellation of a predecessor, we short-circuit while 415 * finding a non-cancelled one, which will always exist 416 * because the head node is never cancelled: A node becomes 417 * head only as a result of successful acquire. A 418 * cancelled thread never succeeds in acquiring, and a thread only 419 * cancels itself, not any other node. 420 */ 421 volatile Node prev; 422 423 /** 424 * Link to the successor node that the current node/thread 425 * unparks upon release. Assigned during enqueuing, adjusted 426 * when bypassing cancelled predecessors, and nulled out (for 427 * sake of GC) when dequeued. The enq operation does not 428 * assign next field of a predecessor until after attachment, 429 * so seeing a null next field does not necessarily mean that 430 * node is at end of queue. However, if a next field appears 431 * to be null, we can scan prev's from the tail to 432 * double-check. The next field of cancelled nodes is set to 433 * point to the node itself instead of null, to make life 434 * easier for isOnSyncQueue. 435 */ 436 volatile Node next; 437 438 /** 439 * The thread that enqueued this node. Initialized on 440 * construction and nulled out after use. 441 */ 442 volatile Thread thread; 443 444 /** 445 * Link to next node waiting on condition, or the special 446 * value SHARED. Because condition queues are accessed only 447 * when holding in exclusive mode, we just need a simple 448 * linked queue to hold nodes while they are waiting on 449 * conditions. They are then transferred to the queue to 450 * re-acquire. And because conditions can only be exclusive, 451 * we save a field by using special value to indicate shared 452 * mode. 453 */ 454 Node nextWaiter; 455 456 /** 457 * Returns true if node is waiting in shared mode 458 */ 459 final boolean isShared() { 460 return nextWaiter == SHARED; 461 } 462 463 /** 464 * Returns previous node, or throws NullPointerException if null. 465 * Use when predecessor cannot be null. The null check could 466 * be elided, but is present to help the VM. 467 * 468 * @return the predecessor of this node 469 */ 470 final Node predecessor() throws NullPointerException { 471 Node p = prev; 472 if (p == null) 473 throw new NullPointerException(); 474 else 475 return p; 476 } 477 478 Node() { // Used to establish initial head or SHARED marker 479 } 480 481 Node(Thread thread, Node mode) { // Used by addWaiter 482 this.nextWaiter = mode; 483 this.thread = thread; 484 } 485 486 Node(Thread thread, int waitStatus) { // Used by Condition 487 this.waitStatus = waitStatus; 488 this.thread = thread; 489 } 490 } 491 492 /** 493 * Head of the wait queue, lazily initialized. Except for 494 * initialization, it is modified only via method setHead. Note: 495 * If head exists, its waitStatus is guaranteed not to be 496 * CANCELLED. 497 */ 498 private transient volatile Node head; 499 500 /** 501 * Tail of the wait queue, lazily initialized. Modified only via 502 * method enq to add new wait node. 503 */ 504 private transient volatile Node tail; 505 506 /** 507 * The synchronization state. 508 */ 509 private volatile int state; 510 511 /** 512 * Returns the current value of synchronization state. 513 * This operation has memory semantics of a <tt>volatile</tt> read. 514 * @return current state value 515 */ 516 protected final int getState() { 517 return state; 518 } 519 520 /** 521 * Sets the value of synchronization state. 522 * This operation has memory semantics of a <tt>volatile</tt> write. 523 * @param newState the new state value 524 */ 525 protected final void setState(int newState) { 526 state = newState; 527 } 528 529 /** 530 * Atomically sets synchronization state to the given updated 531 * value if the current state value equals the expected value. 532 * This operation has memory semantics of a <tt>volatile</tt> read 533 * and write. 534 * 535 * @param expect the expected value 536 * @param update the new value 537 * @return true if successful. False return indicates that the actual 538 * value was not equal to the expected value. 539 */ 540 protected final boolean compareAndSetState(int expect, int update) { 541 // See below for intrinsics setup to support this 542 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 543 } 544 545 // Queuing utilities 546 547 /** 548 * The number of nanoseconds for which it is faster to spin 549 * rather than to use timed park. A rough estimate suffices 550 * to improve responsiveness with very short timeouts. 551 */ 552 static final long spinForTimeoutThreshold = 1000L; 553 554 /** 555 * Inserts node into queue, initializing if necessary. See picture above. 556 * @param node the node to insert 557 * @return node's predecessor 558 */ 559 private Node enq(final Node node) { 560 for (;;) { 561 Node t = tail; 562 if (t == null) { // Must initialize 563 if (compareAndSetHead(new Node())) 564 tail = head; 565 } else { 566 node.prev = t; 567 if (compareAndSetTail(t, node)) { 568 t.next = node; 569 return t; 570 } 571 } 572 } 573 } 574 575 /** 576 * Creates and enqueues node for current thread and given mode. 577 * 578 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 579 * @return the new node 580 */ 581 private Node addWaiter(Node mode) { 582 Node node = new Node(Thread.currentThread(), mode); 583 // Try the fast path of enq; backup to full enq on failure 584 Node pred = tail; 585 if (pred != null) { 586 node.prev = pred; 587 if (compareAndSetTail(pred, node)) { 588 pred.next = node; 589 return node; 590 } 591 } 592 enq(node); 593 return node; 594 } 595 596 /** 597 * Sets head of queue to be node, thus dequeuing. Called only by 598 * acquire methods. Also nulls out unused fields for sake of GC 599 * and to suppress unnecessary signals and traversals. 600 * 601 * @param node the node 602 */ 603 private void setHead(Node node) { 604 head = node; 605 node.thread = null; 606 node.prev = null; 607 } 608 609 /** 610 * Wakes up node's successor, if one exists. 611 * 612 * @param node the node 613 */ 614 private void unparkSuccessor(Node node) { 615 /* 616 * If status is negative (i.e., possibly needing signal) try 617 * to clear in anticipation of signalling. It is OK if this 618 * fails or if status is changed by waiting thread. 619 */ 620 int ws = node.waitStatus; 621 if (ws < 0) 622 compareAndSetWaitStatus(node, ws, 0); 623 624 /* 625 * Thread to unpark is held in successor, which is normally 626 * just the next node. But if cancelled or apparently null, 627 * traverse backwards from tail to find the actual 628 * non-cancelled successor. 629 */ 630 Node s = node.next; 631 if (s == null || s.waitStatus > 0) { 632 s = null; 633 for (Node t = tail; t != null && t != node; t = t.prev) 634 if (t.waitStatus <= 0) 635 s = t; 636 } 637 if (s != null) 638 LockSupport.unpark(s.thread); 639 } 640 641 /** 642 * Release action for shared mode -- signal successor and ensure 643 * propagation. (Note: For exclusive mode, release just amounts 644 * to calling unparkSuccessor of head if it needs signal.) 645 */ 646 private void doReleaseShared() { 647 /* 648 * Ensure that a release propagates, even if there are other 649 * in-progress acquires/releases. This proceeds in the usual 650 * way of trying to unparkSuccessor of head if it needs 651 * signal. But if it does not, status is set to PROPAGATE to 652 * ensure that upon release, propagation continues. 653 * Additionally, we must loop in case a new node is added 654 * while we are doing this. Also, unlike other uses of 655 * unparkSuccessor, we need to know if CAS to reset status 656 * fails, if so rechecking. 657 */ 658 for (;;) { 659 Node h = head; 660 if (h != null && h != tail) { 661 int ws = h.waitStatus; 662 if (ws == Node.SIGNAL) { 663 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 664 continue; // loop to recheck cases 665 unparkSuccessor(h); 666 } 667 else if (ws == 0 && 668 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 669 continue; // loop on failed CAS 670 } 671 if (h == head) // loop if head changed 672 break; 673 } 674 } 675 676 /** 677 * Sets head of queue, and checks if successor may be waiting 678 * in shared mode, if so propagating if either propagate > 0 or 679 * PROPAGATE status was set. 680 * 681 * @param node the node 682 * @param propagate the return value from a tryAcquireShared 683 */ 684 private void setHeadAndPropagate(Node node, int propagate) { 685 Node h = head; // Record old head for check below 686 setHead(node); 687 /* 688 * Try to signal next queued node if: 689 * Propagation was indicated by caller, 690 * or was recorded (as h.waitStatus) by a previous operation 691 * (note: this uses sign-check of waitStatus because 692 * PROPAGATE status may transition to SIGNAL.) 693 * and 694 * The next node is waiting in shared mode, 695 * or we don't know, because it appears null 696 * 697 * The conservatism in both of these checks may cause 698 * unnecessary wake-ups, but only when there are multiple 699 * racing acquires/releases, so most need signals now or soon 700 * anyway. 701 */ 702 if (propagate > 0 || h == null || h.waitStatus < 0) { 703 Node s = node.next; 704 if (s == null || s.isShared()) 705 doReleaseShared(); 706 } 707 } 708 709 // Utilities for various versions of acquire 710 711 /** 712 * Cancels an ongoing attempt to acquire. 713 * 714 * @param node the node 715 */ 716 private void cancelAcquire(Node node) { 717 // Ignore if node doesn't exist 718 if (node == null) 719 return; 720 721 node.thread = null; 722 723 // Skip cancelled predecessors 724 Node pred = node.prev; 725 while (pred.waitStatus > 0) 726 node.prev = pred = pred.prev; 727 728 // predNext is the apparent node to unsplice. CASes below will 729 // fail if not, in which case, we lost race vs another cancel 730 // or signal, so no further action is necessary. 731 Node predNext = pred.next; 732 733 // Can use unconditional write instead of CAS here. 734 // After this atomic step, other Nodes can skip past us. 735 // Before, we are free of interference from other threads. 736 node.waitStatus = Node.CANCELLED; 737 738 // If we are the tail, remove ourselves. 739 if (node == tail && compareAndSetTail(node, pred)) { 740 compareAndSetNext(pred, predNext, null); 741 } else { 742 // If successor needs signal, try to set pred's next-link 743 // so it will get one. Otherwise wake it up to propagate. 744 int ws; 745 if (pred != head && 746 ((ws = pred.waitStatus) == Node.SIGNAL || 747 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 748 pred.thread != null) { 749 Node next = node.next; 750 if (next != null && next.waitStatus <= 0) 751 compareAndSetNext(pred, predNext, next); 752 } else { 753 unparkSuccessor(node); 754 } 755 756 node.next = node; // help GC 757 } 758 } 759 760 /** 761 * Checks and updates status for a node that failed to acquire. 762 * Returns true if thread should block. This is the main signal 763 * control in all acquire loops. Requires that pred == node.prev 764 * 765 * @param pred node's predecessor holding status 766 * @param node the node 767 * @return {@code true} if thread should block 768 */ 769 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 770 int ws = pred.waitStatus; 771 if (ws == Node.SIGNAL) 772 /* 773 * This node has already set status asking a release 774 * to signal it, so it can safely park. 775 */ 776 return true; 777 if (ws > 0) { 778 /* 779 * Predecessor was cancelled. Skip over predecessors and 780 * indicate retry. 781 */ 782 do { 783 node.prev = pred = pred.prev; 784 } while (pred.waitStatus > 0); 785 pred.next = node; 786 } else { 787 /* 788 * waitStatus must be 0 or PROPAGATE. Indicate that we 789 * need a signal, but don't park yet. Caller will need to 790 * retry to make sure it cannot acquire before parking. 791 */ 792 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 793 } 794 return false; 795 } 796 797 /** 798 * Convenience method to interrupt current thread. 799 */ 800 static void selfInterrupt() { 801 Thread.currentThread().interrupt(); 802 } 803 804 /** 805 * Convenience method to park and then check if interrupted 806 * 807 * @return {@code true} if interrupted 808 */ 809 private final boolean parkAndCheckInterrupt() { 810 LockSupport.park(this); 811 return Thread.interrupted(); 812 } 813 814 /* 815 * Various flavors of acquire, varying in exclusive/shared and 816 * control modes. Each is mostly the same, but annoyingly 817 * different. Only a little bit of factoring is possible due to 818 * interactions of exception mechanics (including ensuring that we 819 * cancel if tryAcquire throws exception) and other control, at 820 * least not without hurting performance too much. 821 */ 822 823 /** 824 * Acquires in exclusive uninterruptible mode for thread already in 825 * queue. Used by condition wait methods as well as acquire. 826 * 827 * @param node the node 828 * @param arg the acquire argument 829 * @return {@code true} if interrupted while waiting 830 */ 831 final boolean acquireQueued(final Node node, int arg) { 832 boolean failed = true; 833 try { 834 boolean interrupted = false; 835 for (;;) { 836 final Node p = node.predecessor(); 837 if (p == head && tryAcquire(arg)) { 838 setHead(node); 839 p.next = null; // help GC 840 failed = false; 841 return interrupted; 842 } 843 if (shouldParkAfterFailedAcquire(p, node) && 844 parkAndCheckInterrupt()) 845 interrupted = true; 846 } 847 } finally { 848 if (failed) 849 cancelAcquire(node); 850 } 851 } 852 853 /** 854 * Acquires in exclusive interruptible mode. 855 * @param arg the acquire argument 856 */ 857 private void doAcquireInterruptibly(int arg) 858 throws InterruptedException { 859 final Node node = addWaiter(Node.EXCLUSIVE); 860 boolean failed = true; 861 try { 862 for (;;) { 863 final Node p = node.predecessor(); 864 if (p == head && tryAcquire(arg)) { 865 setHead(node); 866 p.next = null; // help GC 867 failed = false; 868 return; 869 } 870 if (shouldParkAfterFailedAcquire(p, node) && 871 parkAndCheckInterrupt()) 872 throw new InterruptedException(); 873 } 874 } finally { 875 if (failed) 876 cancelAcquire(node); 877 } 878 } 879 880 /** 881 * Acquires in exclusive timed mode. 882 * 883 * @param arg the acquire argument 884 * @param nanosTimeout max wait time 885 * @return {@code true} if acquired 886 */ 887 private boolean doAcquireNanos(int arg, long nanosTimeout) 888 throws InterruptedException { 889 long lastTime = System.nanoTime(); 890 final Node node = addWaiter(Node.EXCLUSIVE); 891 boolean failed = true; 892 try { 893 for (;;) { 894 final Node p = node.predecessor(); 895 if (p == head && tryAcquire(arg)) { 896 setHead(node); 897 p.next = null; // help GC 898 failed = false; 899 return true; 900 } 901 if (nanosTimeout <= 0) 902 return false; 903 if (shouldParkAfterFailedAcquire(p, node) && 904 nanosTimeout > spinForTimeoutThreshold) 905 LockSupport.parkNanos(this, nanosTimeout); 906 long now = System.nanoTime(); 907 nanosTimeout -= now - lastTime; 908 lastTime = now; 909 if (Thread.interrupted()) 910 throw new InterruptedException(); 911 } 912 } finally { 913 if (failed) 914 cancelAcquire(node); 915 } 916 } 917 918 /** 919 * Acquires in shared uninterruptible mode. 920 * @param arg the acquire argument 921 */ 922 private void doAcquireShared(int arg) { 923 final Node node = addWaiter(Node.SHARED); 924 boolean failed = true; 925 try { 926 boolean interrupted = false; 927 for (;;) { 928 final Node p = node.predecessor(); 929 if (p == head) { 930 int r = tryAcquireShared(arg); 931 if (r >= 0) { 932 setHeadAndPropagate(node, r); 933 p.next = null; // help GC 934 if (interrupted) 935 selfInterrupt(); 936 failed = false; 937 return; 938 } 939 } 940 if (shouldParkAfterFailedAcquire(p, node) && 941 parkAndCheckInterrupt()) 942 interrupted = true; 943 } 944 } finally { 945 if (failed) 946 cancelAcquire(node); 947 } 948 } 949 950 /** 951 * Acquires in shared interruptible mode. 952 * @param arg the acquire argument 953 */ 954 private void doAcquireSharedInterruptibly(int arg) 955 throws InterruptedException { 956 final Node node = addWaiter(Node.SHARED); 957 boolean failed = true; 958 try { 959 for (;;) { 960 final Node p = node.predecessor(); 961 if (p == head) { 962 int r = tryAcquireShared(arg); 963 if (r >= 0) { 964 setHeadAndPropagate(node, r); 965 p.next = null; // help GC 966 failed = false; 967 return; 968 } 969 } 970 if (shouldParkAfterFailedAcquire(p, node) && 971 parkAndCheckInterrupt()) 972 throw new InterruptedException(); 973 } 974 } finally { 975 if (failed) 976 cancelAcquire(node); 977 } 978 } 979 980 /** 981 * Acquires in shared timed mode. 982 * 983 * @param arg the acquire argument 984 * @param nanosTimeout max wait time 985 * @return {@code true} if acquired 986 */ 987 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) 988 throws InterruptedException { 989 990 long lastTime = System.nanoTime(); 991 final Node node = addWaiter(Node.SHARED); 992 boolean failed = true; 993 try { 994 for (;;) { 995 final Node p = node.predecessor(); 996 if (p == head) { 997 int r = tryAcquireShared(arg); 998 if (r >= 0) { 999 setHeadAndPropagate(node, r); 1000 p.next = null; // help GC 1001 failed = false; 1002 return true; 1003 } 1004 } 1005 if (nanosTimeout <= 0) 1006 return false; 1007 if (shouldParkAfterFailedAcquire(p, node) && 1008 nanosTimeout > spinForTimeoutThreshold) 1009 LockSupport.parkNanos(this, nanosTimeout); 1010 long now = System.nanoTime(); 1011 nanosTimeout -= now - lastTime; 1012 lastTime = now; 1013 if (Thread.interrupted()) 1014 throw new InterruptedException(); 1015 } 1016 } finally { 1017 if (failed) 1018 cancelAcquire(node); 1019 } 1020 } 1021 1022 // Main exported methods 1023 1024 /** 1025 * Attempts to acquire in exclusive mode. This method should query 1026 * if the state of the object permits it to be acquired in the 1027 * exclusive mode, and if so to acquire it. 1028 * 1029 * <p>This method is always invoked by the thread performing 1030 * acquire. If this method reports failure, the acquire method 1031 * may queue the thread, if it is not already queued, until it is 1032 * signalled by a release from some other thread. This can be used 1033 * to implement method {@link Lock#tryLock()}. 1034 * 1035 * <p>The default 1036 * implementation throws {@link UnsupportedOperationException}. 1037 * 1038 * @param arg the acquire argument. This value is always the one 1039 * passed to an acquire method, or is the value saved on entry 1040 * to a condition wait. The value is otherwise uninterpreted 1041 * and can represent anything you like. 1042 * @return {@code true} if successful. Upon success, this object has 1043 * been acquired. 1044 * @throws IllegalMonitorStateException if acquiring would place this 1045 * synchronizer in an illegal state. This exception must be 1046 * thrown in a consistent fashion for synchronization to work 1047 * correctly. 1048 * @throws UnsupportedOperationException if exclusive mode is not supported 1049 */ 1050 protected boolean tryAcquire(int arg) { 1051 throw new UnsupportedOperationException(); 1052 } 1053 1054 /** 1055 * Attempts to set the state to reflect a release in exclusive 1056 * mode. 1057 * 1058 * <p>This method is always invoked by the thread performing release. 1059 * 1060 * <p>The default implementation throws 1061 * {@link UnsupportedOperationException}. 1062 * 1063 * @param arg the release argument. This value is always the one 1064 * passed to a release method, or the current state value upon 1065 * entry to a condition wait. The value is otherwise 1066 * uninterpreted and can represent anything you like. 1067 * @return {@code true} if this object is now in a fully released 1068 * state, so that any waiting threads may attempt to acquire; 1069 * and {@code false} otherwise. 1070 * @throws IllegalMonitorStateException if releasing would place this 1071 * synchronizer in an illegal state. This exception must be 1072 * thrown in a consistent fashion for synchronization to work 1073 * correctly. 1074 * @throws UnsupportedOperationException if exclusive mode is not supported 1075 */ 1076 protected boolean tryRelease(int arg) { 1077 throw new UnsupportedOperationException(); 1078 } 1079 1080 /** 1081 * Attempts to acquire in shared mode. This method should query if 1082 * the state of the object permits it to be acquired in the shared 1083 * mode, and if so to acquire it. 1084 * 1085 * <p>This method is always invoked by the thread performing 1086 * acquire. If this method reports failure, the acquire method 1087 * may queue the thread, if it is not already queued, until it is 1088 * signalled by a release from some other thread. 1089 * 1090 * <p>The default implementation throws {@link 1091 * UnsupportedOperationException}. 1092 * 1093 * @param arg the acquire argument. This value is always the one 1094 * passed to an acquire method, or is the value saved on entry 1095 * to a condition wait. The value is otherwise uninterpreted 1096 * and can represent anything you like. 1097 * @return a negative value on failure; zero if acquisition in shared 1098 * mode succeeded but no subsequent shared-mode acquire can 1099 * succeed; and a positive value if acquisition in shared 1100 * mode succeeded and subsequent shared-mode acquires might 1101 * also succeed, in which case a subsequent waiting thread 1102 * must check availability. (Support for three different 1103 * return values enables this method to be used in contexts 1104 * where acquires only sometimes act exclusively.) Upon 1105 * success, this object has been acquired. 1106 * @throws IllegalMonitorStateException if acquiring would place this 1107 * synchronizer in an illegal state. This exception must be 1108 * thrown in a consistent fashion for synchronization to work 1109 * correctly. 1110 * @throws UnsupportedOperationException if shared mode is not supported 1111 */ 1112 protected int tryAcquireShared(int arg) { 1113 throw new UnsupportedOperationException(); 1114 } 1115 1116 /** 1117 * Attempts to set the state to reflect a release in shared mode. 1118 * 1119 * <p>This method is always invoked by the thread performing release. 1120 * 1121 * <p>The default implementation throws 1122 * {@link UnsupportedOperationException}. 1123 * 1124 * @param arg the release argument. This value is always the one 1125 * passed to a release method, or the current state value upon 1126 * entry to a condition wait. The value is otherwise 1127 * uninterpreted and can represent anything you like. 1128 * @return {@code true} if this release of shared mode may permit a 1129 * waiting acquire (shared or exclusive) to succeed; and 1130 * {@code false} otherwise 1131 * @throws IllegalMonitorStateException if releasing would place this 1132 * synchronizer in an illegal state. This exception must be 1133 * thrown in a consistent fashion for synchronization to work 1134 * correctly. 1135 * @throws UnsupportedOperationException if shared mode is not supported 1136 */ 1137 protected boolean tryReleaseShared(int arg) { 1138 throw new UnsupportedOperationException(); 1139 } 1140 1141 /** 1142 * Returns {@code true} if synchronization is held exclusively with 1143 * respect to the current (calling) thread. This method is invoked 1144 * upon each call to a non-waiting {@link ConditionObject} method. 1145 * (Waiting methods instead invoke {@link #release}.) 1146 * 1147 * <p>The default implementation throws {@link 1148 * UnsupportedOperationException}. This method is invoked 1149 * internally only within {@link ConditionObject} methods, so need 1150 * not be defined if conditions are not used. 1151 * 1152 * @return {@code true} if synchronization is held exclusively; 1153 * {@code false} otherwise 1154 * @throws UnsupportedOperationException if conditions are not supported 1155 */ 1156 protected boolean isHeldExclusively() { 1157 throw new UnsupportedOperationException(); 1158 } 1159 1160 /** 1161 * Acquires in exclusive mode, ignoring interrupts. Implemented 1162 * by invoking at least once {@link #tryAcquire}, 1163 * returning on success. Otherwise the thread is queued, possibly 1164 * repeatedly blocking and unblocking, invoking {@link 1165 * #tryAcquire} until success. This method can be used 1166 * to implement method {@link Lock#lock}. 1167 * 1168 * @param arg the acquire argument. This value is conveyed to 1169 * {@link #tryAcquire} but is otherwise uninterpreted and 1170 * can represent anything you like. 1171 */ 1172 public final void acquire(int arg) { 1173 if (!tryAcquire(arg) && 1174 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 1175 selfInterrupt(); 1176 } 1177 1178 /** 1179 * Acquires in exclusive mode, aborting if interrupted. 1180 * Implemented by first checking interrupt status, then invoking 1181 * at least once {@link #tryAcquire}, returning on 1182 * success. Otherwise the thread is queued, possibly repeatedly 1183 * blocking and unblocking, invoking {@link #tryAcquire} 1184 * until success or the thread is interrupted. This method can be 1185 * used to implement method {@link Lock#lockInterruptibly}. 1186 * 1187 * @param arg the acquire argument. This value is conveyed to 1188 * {@link #tryAcquire} but is otherwise uninterpreted and 1189 * can represent anything you like. 1190 * @throws InterruptedException if the current thread is interrupted 1191 */ 1192 public final void acquireInterruptibly(int arg) 1193 throws InterruptedException { 1194 if (Thread.interrupted()) 1195 throw new InterruptedException(); 1196 if (!tryAcquire(arg)) 1197 doAcquireInterruptibly(arg); 1198 } 1199 1200 /** 1201 * Attempts to acquire in exclusive mode, aborting if interrupted, 1202 * and failing if the given timeout elapses. Implemented by first 1203 * checking interrupt status, then invoking at least once {@link 1204 * #tryAcquire}, returning on success. Otherwise, the thread is 1205 * queued, possibly repeatedly blocking and unblocking, invoking 1206 * {@link #tryAcquire} until success or the thread is interrupted 1207 * or the timeout elapses. This method can be used to implement 1208 * method {@link Lock#tryLock(long, TimeUnit)}. 1209 * 1210 * @param arg the acquire argument. This value is conveyed to 1211 * {@link #tryAcquire} but is otherwise uninterpreted and 1212 * can represent anything you like. 1213 * @param nanosTimeout the maximum number of nanoseconds to wait 1214 * @return {@code true} if acquired; {@code false} if timed out 1215 * @throws InterruptedException if the current thread is interrupted 1216 */ 1217 public final boolean tryAcquireNanos(int arg, long nanosTimeout) 1218 throws InterruptedException { 1219 if (Thread.interrupted()) 1220 throw new InterruptedException(); 1221 return tryAcquire(arg) || 1222 doAcquireNanos(arg, nanosTimeout); 1223 } 1224 1225 /** 1226 * Releases in exclusive mode. Implemented by unblocking one or 1227 * more threads if {@link #tryRelease} returns true. 1228 * This method can be used to implement method {@link Lock#unlock}. 1229 * 1230 * @param arg the release argument. This value is conveyed to 1231 * {@link #tryRelease} but is otherwise uninterpreted and 1232 * can represent anything you like. 1233 * @return the value returned from {@link #tryRelease} 1234 */ 1235 public final boolean release(int arg) { 1236 if (tryRelease(arg)) { 1237 Node h = head; 1238 if (h != null && h.waitStatus != 0) 1239 unparkSuccessor(h); 1240 return true; 1241 } 1242 return false; 1243 } 1244 1245 /** 1246 * Acquires in shared mode, ignoring interrupts. Implemented by 1247 * first invoking at least once {@link #tryAcquireShared}, 1248 * returning on success. Otherwise the thread is queued, possibly 1249 * repeatedly blocking and unblocking, invoking {@link 1250 * #tryAcquireShared} until success. 1251 * 1252 * @param arg the acquire argument. This value is conveyed to 1253 * {@link #tryAcquireShared} but is otherwise uninterpreted 1254 * and can represent anything you like. 1255 */ 1256 public final void acquireShared(int arg) { 1257 if (tryAcquireShared(arg) < 0) 1258 doAcquireShared(arg); 1259 } 1260 1261 /** 1262 * Acquires in shared mode, aborting if interrupted. Implemented 1263 * by first checking interrupt status, then invoking at least once 1264 * {@link #tryAcquireShared}, returning on success. Otherwise the 1265 * thread is queued, possibly repeatedly blocking and unblocking, 1266 * invoking {@link #tryAcquireShared} until success or the thread 1267 * is interrupted. 1268 * @param arg the acquire argument 1269 * This value is conveyed to {@link #tryAcquireShared} but is 1270 * otherwise uninterpreted and can represent anything 1271 * you like. 1272 * @throws InterruptedException if the current thread is interrupted 1273 */ 1274 public final void acquireSharedInterruptibly(int arg) 1275 throws InterruptedException { 1276 if (Thread.interrupted()) 1277 throw new InterruptedException(); 1278 if (tryAcquireShared(arg) < 0) 1279 doAcquireSharedInterruptibly(arg); 1280 } 1281 1282 /** 1283 * Attempts to acquire in shared mode, aborting if interrupted, and 1284 * failing if the given timeout elapses. Implemented by first 1285 * checking interrupt status, then invoking at least once {@link 1286 * #tryAcquireShared}, returning on success. Otherwise, the 1287 * thread is queued, possibly repeatedly blocking and unblocking, 1288 * invoking {@link #tryAcquireShared} until success or the thread 1289 * is interrupted or the timeout elapses. 1290 * 1291 * @param arg the acquire argument. This value is conveyed to 1292 * {@link #tryAcquireShared} but is otherwise uninterpreted 1293 * and can represent anything you like. 1294 * @param nanosTimeout the maximum number of nanoseconds to wait 1295 * @return {@code true} if acquired; {@code false} if timed out 1296 * @throws InterruptedException if the current thread is interrupted 1297 */ 1298 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 1299 throws InterruptedException { 1300 if (Thread.interrupted()) 1301 throw new InterruptedException(); 1302 return tryAcquireShared(arg) >= 0 || 1303 doAcquireSharedNanos(arg, nanosTimeout); 1304 } 1305 1306 /** 1307 * Releases in shared mode. Implemented by unblocking one or more 1308 * threads if {@link #tryReleaseShared} returns true. 1309 * 1310 * @param arg the release argument. This value is conveyed to 1311 * {@link #tryReleaseShared} but is otherwise uninterpreted 1312 * and can represent anything you like. 1313 * @return the value returned from {@link #tryReleaseShared} 1314 */ 1315 public final boolean releaseShared(int arg) { 1316 if (tryReleaseShared(arg)) { 1317 doReleaseShared(); 1318 return true; 1319 } 1320 return false; 1321 } 1322 1323 // Queue inspection methods 1324 1325 /** 1326 * Queries whether any threads are waiting to acquire. Note that 1327 * because cancellations due to interrupts and timeouts may occur 1328 * at any time, a {@code true} return does not guarantee that any 1329 * other thread will ever acquire. 1330 * 1331 * <p>In this implementation, this operation returns in 1332 * constant time. 1333 * 1334 * @return {@code true} if there may be other threads waiting to acquire 1335 */ 1336 public final boolean hasQueuedThreads() { 1337 return head != tail; 1338 } 1339 1340 /** 1341 * Queries whether any threads have ever contended to acquire this 1342 * synchronizer; that is if an acquire method has ever blocked. 1343 * 1344 * <p>In this implementation, this operation returns in 1345 * constant time. 1346 * 1347 * @return {@code true} if there has ever been contention 1348 */ 1349 public final boolean hasContended() { 1350 return head != null; 1351 } 1352 1353 /** 1354 * Returns the first (longest-waiting) thread in the queue, or 1355 * {@code null} if no threads are currently queued. 1356 * 1357 * <p>In this implementation, this operation normally returns in 1358 * constant time, but may iterate upon contention if other threads are 1359 * concurrently modifying the queue. 1360 * 1361 * @return the first (longest-waiting) thread in the queue, or 1362 * {@code null} if no threads are currently queued 1363 */ 1364 public final Thread getFirstQueuedThread() { 1365 // handle only fast path, else relay 1366 return (head == tail) ? null : fullGetFirstQueuedThread(); 1367 } 1368 1369 /** 1370 * Version of getFirstQueuedThread called when fastpath fails 1371 */ 1372 private Thread fullGetFirstQueuedThread() { 1373 /* 1374 * The first node is normally head.next. Try to get its 1375 * thread field, ensuring consistent reads: If thread 1376 * field is nulled out or s.prev is no longer head, then 1377 * some other thread(s) concurrently performed setHead in 1378 * between some of our reads. We try this twice before 1379 * resorting to traversal. 1380 */ 1381 Node h, s; 1382 Thread st; 1383 if (((h = head) != null && (s = h.next) != null && 1384 s.prev == head && (st = s.thread) != null) || 1385 ((h = head) != null && (s = h.next) != null && 1386 s.prev == head && (st = s.thread) != null)) 1387 return st; 1388 1389 /* 1390 * Head's next field might not have been set yet, or may have 1391 * been unset after setHead. So we must check to see if tail 1392 * is actually first node. If not, we continue on, safely 1393 * traversing from tail back to head to find first, 1394 * guaranteeing termination. 1395 */ 1396 1397 Node t = tail; 1398 Thread firstThread = null; 1399 while (t != null && t != head) { 1400 Thread tt = t.thread; 1401 if (tt != null) 1402 firstThread = tt; 1403 t = t.prev; 1404 } 1405 return firstThread; 1406 } 1407 1408 /** 1409 * Returns true if the given thread is currently queued. 1410 * 1411 * <p>This implementation traverses the queue to determine 1412 * presence of the given thread. 1413 * 1414 * @param thread the thread 1415 * @return {@code true} if the given thread is on the queue 1416 * @throws NullPointerException if the thread is null 1417 */ 1418 public final boolean isQueued(Thread thread) { 1419 if (thread == null) 1420 throw new NullPointerException(); 1421 for (Node p = tail; p != null; p = p.prev) 1422 if (p.thread == thread) 1423 return true; 1424 return false; 1425 } 1426 1427 /** 1428 * Returns {@code true} if the apparent first queued thread, if one 1429 * exists, is waiting in exclusive mode. If this method returns 1430 * {@code true}, and the current thread is attempting to acquire in 1431 * shared mode (that is, this method is invoked from {@link 1432 * #tryAcquireShared}) then it is guaranteed that the current thread 1433 * is not the first queued thread. Used only as a heuristic in 1434 * ReentrantReadWriteLock. 1435 */ 1436 final boolean apparentlyFirstQueuedIsExclusive() { 1437 Node h, s; 1438 return (h = head) != null && 1439 (s = h.next) != null && 1440 !s.isShared() && 1441 s.thread != null; 1442 } 1443 1444 /** 1445 * Queries whether any threads have been waiting to acquire longer 1446 * than the current thread. 1447 * 1448 * <p>An invocation of this method is equivalent to (but may be 1449 * more efficient than): 1450 * <pre> {@code 1451 * getFirstQueuedThread() != Thread.currentThread() && 1452 * hasQueuedThreads()}</pre> 1453 * 1454 * <p>Note that because cancellations due to interrupts and 1455 * timeouts may occur at any time, a {@code true} return does not 1456 * guarantee that some other thread will acquire before the current 1457 * thread. Likewise, it is possible for another thread to win a 1458 * race to enqueue after this method has returned {@code false}, 1459 * due to the queue being empty. 1460 * 1461 * <p>This method is designed to be used by a fair synchronizer to 1462 * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>. 1463 * Such a synchronizer's {@link #tryAcquire} method should return 1464 * {@code false}, and its {@link #tryAcquireShared} method should 1465 * return a negative value, if this method returns {@code true} 1466 * (unless this is a reentrant acquire). For example, the {@code 1467 * tryAcquire} method for a fair, reentrant, exclusive mode 1468 * synchronizer might look like this: 1469 * 1470 * <pre> {@code 1471 * protected boolean tryAcquire(int arg) { 1472 * if (isHeldExclusively()) { 1473 * // A reentrant acquire; increment hold count 1474 * return true; 1475 * } else if (hasQueuedPredecessors()) { 1476 * return false; 1477 * } else { 1478 * // try to acquire normally 1479 * } 1480 * }}</pre> 1481 * 1482 * @return {@code true} if there is a queued thread preceding the 1483 * current thread, and {@code false} if the current thread 1484 * is at the head of the queue or the queue is empty 1485 * @since 1.7 1486 * @hide 1487 */ 1488 public final boolean hasQueuedPredecessors() { 1489 // The correctness of this depends on head being initialized 1490 // before tail and on head.next being accurate if the current 1491 // thread is first in queue. 1492 Node t = tail; // Read fields in reverse initialization order 1493 Node h = head; 1494 Node s; 1495 return h != t && 1496 ((s = h.next) == null || s.thread != Thread.currentThread()); 1497 } 1498 1499 1500 // Instrumentation and monitoring methods 1501 1502 /** 1503 * Returns an estimate of the number of threads waiting to 1504 * acquire. The value is only an estimate because the number of 1505 * threads may change dynamically while this method traverses 1506 * internal data structures. This method is designed for use in 1507 * monitoring system state, not for synchronization 1508 * control. 1509 * 1510 * @return the estimated number of threads waiting to acquire 1511 */ 1512 public final int getQueueLength() { 1513 int n = 0; 1514 for (Node p = tail; p != null; p = p.prev) { 1515 if (p.thread != null) 1516 ++n; 1517 } 1518 return n; 1519 } 1520 1521 /** 1522 * Returns a collection containing threads that may be waiting to 1523 * acquire. Because the actual set of threads may change 1524 * dynamically while constructing this result, the returned 1525 * collection is only a best-effort estimate. The elements of the 1526 * returned collection are in no particular order. This method is 1527 * designed to facilitate construction of subclasses that provide 1528 * more extensive monitoring facilities. 1529 * 1530 * @return the collection of threads 1531 */ 1532 public final Collection<Thread> getQueuedThreads() { 1533 ArrayList<Thread> list = new ArrayList<Thread>(); 1534 for (Node p = tail; p != null; p = p.prev) { 1535 Thread t = p.thread; 1536 if (t != null) 1537 list.add(t); 1538 } 1539 return list; 1540 } 1541 1542 /** 1543 * Returns a collection containing threads that may be waiting to 1544 * acquire in exclusive mode. This has the same properties 1545 * as {@link #getQueuedThreads} except that it only returns 1546 * those threads waiting due to an exclusive acquire. 1547 * 1548 * @return the collection of threads 1549 */ 1550 public final Collection<Thread> getExclusiveQueuedThreads() { 1551 ArrayList<Thread> list = new ArrayList<Thread>(); 1552 for (Node p = tail; p != null; p = p.prev) { 1553 if (!p.isShared()) { 1554 Thread t = p.thread; 1555 if (t != null) 1556 list.add(t); 1557 } 1558 } 1559 return list; 1560 } 1561 1562 /** 1563 * Returns a collection containing threads that may be waiting to 1564 * acquire in shared mode. This has the same properties 1565 * as {@link #getQueuedThreads} except that it only returns 1566 * those threads waiting due to a shared acquire. 1567 * 1568 * @return the collection of threads 1569 */ 1570 public final Collection<Thread> getSharedQueuedThreads() { 1571 ArrayList<Thread> list = new ArrayList<Thread>(); 1572 for (Node p = tail; p != null; p = p.prev) { 1573 if (p.isShared()) { 1574 Thread t = p.thread; 1575 if (t != null) 1576 list.add(t); 1577 } 1578 } 1579 return list; 1580 } 1581 1582 /** 1583 * Returns a string identifying this synchronizer, as well as its state. 1584 * The state, in brackets, includes the String {@code "State ="} 1585 * followed by the current value of {@link #getState}, and either 1586 * {@code "nonempty"} or {@code "empty"} depending on whether the 1587 * queue is empty. 1588 * 1589 * @return a string identifying this synchronizer, as well as its state 1590 */ 1591 public String toString() { 1592 int s = getState(); 1593 String q = hasQueuedThreads() ? "non" : ""; 1594 return super.toString() + 1595 "[State = " + s + ", " + q + "empty queue]"; 1596 } 1597 1598 1599 // Internal support methods for Conditions 1600 1601 /** 1602 * Returns true if a node, always one that was initially placed on 1603 * a condition queue, is now waiting to reacquire on sync queue. 1604 * @param node the node 1605 * @return true if is reacquiring 1606 */ 1607 final boolean isOnSyncQueue(Node node) { 1608 if (node.waitStatus == Node.CONDITION || node.prev == null) 1609 return false; 1610 if (node.next != null) // If has successor, it must be on queue 1611 return true; 1612 /* 1613 * node.prev can be non-null, but not yet on queue because 1614 * the CAS to place it on queue can fail. So we have to 1615 * traverse from tail to make sure it actually made it. It 1616 * will always be near the tail in calls to this method, and 1617 * unless the CAS failed (which is unlikely), it will be 1618 * there, so we hardly ever traverse much. 1619 */ 1620 return findNodeFromTail(node); 1621 } 1622 1623 /** 1624 * Returns true if node is on sync queue by searching backwards from tail. 1625 * Called only when needed by isOnSyncQueue. 1626 * @return true if present 1627 */ 1628 private boolean findNodeFromTail(Node node) { 1629 Node t = tail; 1630 for (;;) { 1631 if (t == node) 1632 return true; 1633 if (t == null) 1634 return false; 1635 t = t.prev; 1636 } 1637 } 1638 1639 /** 1640 * Transfers a node from a condition queue onto sync queue. 1641 * Returns true if successful. 1642 * @param node the node 1643 * @return true if successfully transferred (else the node was 1644 * cancelled before signal). 1645 */ 1646 final boolean transferForSignal(Node node) { 1647 /* 1648 * If cannot change waitStatus, the node has been cancelled. 1649 */ 1650 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 1651 return false; 1652 1653 /* 1654 * Splice onto queue and try to set waitStatus of predecessor to 1655 * indicate that thread is (probably) waiting. If cancelled or 1656 * attempt to set waitStatus fails, wake up to resync (in which 1657 * case the waitStatus can be transiently and harmlessly wrong). 1658 */ 1659 Node p = enq(node); 1660 int ws = p.waitStatus; 1661 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 1662 LockSupport.unpark(node.thread); 1663 return true; 1664 } 1665 1666 /** 1667 * Transfers node, if necessary, to sync queue after a cancelled 1668 * wait. Returns true if thread was cancelled before being 1669 * signalled. 1670 * @param node its node 1671 * @return true if cancelled before the node was signalled 1672 */ 1673 final boolean transferAfterCancelledWait(Node node) { 1674 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 1675 enq(node); 1676 return true; 1677 } 1678 /* 1679 * If we lost out to a signal(), then we can't proceed 1680 * until it finishes its enq(). Cancelling during an 1681 * incomplete transfer is both rare and transient, so just 1682 * spin. 1683 */ 1684 while (!isOnSyncQueue(node)) 1685 Thread.yield(); 1686 return false; 1687 } 1688 1689 /** 1690 * Invokes release with current state value; returns saved state. 1691 * Cancels node and throws exception on failure. 1692 * @param node the condition node for this wait 1693 * @return previous sync state 1694 */ 1695 final int fullyRelease(Node node) { 1696 boolean failed = true; 1697 try { 1698 int savedState = getState(); 1699 if (release(savedState)) { 1700 failed = false; 1701 return savedState; 1702 } else { 1703 throw new IllegalMonitorStateException(); 1704 } 1705 } finally { 1706 if (failed) 1707 node.waitStatus = Node.CANCELLED; 1708 } 1709 } 1710 1711 // Instrumentation methods for conditions 1712 1713 /** 1714 * Queries whether the given ConditionObject 1715 * uses this synchronizer as its lock. 1716 * 1717 * @param condition the condition 1718 * @return <tt>true</tt> if owned 1719 * @throws NullPointerException if the condition is null 1720 */ 1721 public final boolean owns(ConditionObject condition) { 1722 if (condition == null) 1723 throw new NullPointerException(); 1724 return condition.isOwnedBy(this); 1725 } 1726 1727 /** 1728 * Queries whether any threads are waiting on the given condition 1729 * associated with this synchronizer. Note that because timeouts 1730 * and interrupts may occur at any time, a <tt>true</tt> return 1731 * does not guarantee that a future <tt>signal</tt> will awaken 1732 * any threads. This method is designed primarily for use in 1733 * monitoring of the system state. 1734 * 1735 * @param condition the condition 1736 * @return <tt>true</tt> if there are any waiting threads 1737 * @throws IllegalMonitorStateException if exclusive synchronization 1738 * is not held 1739 * @throws IllegalArgumentException if the given condition is 1740 * not associated with this synchronizer 1741 * @throws NullPointerException if the condition is null 1742 */ 1743 public final boolean hasWaiters(ConditionObject condition) { 1744 if (!owns(condition)) 1745 throw new IllegalArgumentException("Not owner"); 1746 return condition.hasWaiters(); 1747 } 1748 1749 /** 1750 * Returns an estimate of the number of threads waiting on the 1751 * given condition associated with this synchronizer. Note that 1752 * because timeouts and interrupts may occur at any time, the 1753 * estimate serves only as an upper bound on the actual number of 1754 * waiters. This method is designed for use in monitoring of the 1755 * system state, not for synchronization control. 1756 * 1757 * @param condition the condition 1758 * @return the estimated number of waiting threads 1759 * @throws IllegalMonitorStateException if exclusive synchronization 1760 * is not held 1761 * @throws IllegalArgumentException if the given condition is 1762 * not associated with this synchronizer 1763 * @throws NullPointerException if the condition is null 1764 */ 1765 public final int getWaitQueueLength(ConditionObject condition) { 1766 if (!owns(condition)) 1767 throw new IllegalArgumentException("Not owner"); 1768 return condition.getWaitQueueLength(); 1769 } 1770 1771 /** 1772 * Returns a collection containing those threads that may be 1773 * waiting on the given condition associated with this 1774 * synchronizer. Because the actual set of threads may change 1775 * dynamically while constructing this result, the returned 1776 * collection is only a best-effort estimate. The elements of the 1777 * returned collection are in no particular order. 1778 * 1779 * @param condition the condition 1780 * @return the collection of threads 1781 * @throws IllegalMonitorStateException if exclusive synchronization 1782 * is not held 1783 * @throws IllegalArgumentException if the given condition is 1784 * not associated with this synchronizer 1785 * @throws NullPointerException if the condition is null 1786 */ 1787 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1788 if (!owns(condition)) 1789 throw new IllegalArgumentException("Not owner"); 1790 return condition.getWaitingThreads(); 1791 } 1792 1793 /** 1794 * Condition implementation for a {@link 1795 * AbstractQueuedSynchronizer} serving as the basis of a {@link 1796 * Lock} implementation. 1797 * 1798 * <p>Method documentation for this class describes mechanics, 1799 * not behavioral specifications from the point of view of Lock 1800 * and Condition users. Exported versions of this class will in 1801 * general need to be accompanied by documentation describing 1802 * condition semantics that rely on those of the associated 1803 * <tt>AbstractQueuedSynchronizer</tt>. 1804 * 1805 * <p>This class is Serializable, but all fields are transient, 1806 * so deserialized conditions have no waiters. 1807 */ 1808 public class ConditionObject implements Condition, java.io.Serializable { 1809 private static final long serialVersionUID = 1173984872572414699L; 1810 /** First node of condition queue. */ 1811 private transient Node firstWaiter; 1812 /** Last node of condition queue. */ 1813 private transient Node lastWaiter; 1814 1815 /** 1816 * Creates a new <tt>ConditionObject</tt> instance. 1817 */ 1818 public ConditionObject() { } 1819 1820 // Internal methods 1821 1822 /** 1823 * Adds a new waiter to wait queue. 1824 * @return its new wait node 1825 */ 1826 private Node addConditionWaiter() { 1827 Node t = lastWaiter; 1828 // If lastWaiter is cancelled, clean out. 1829 if (t != null && t.waitStatus != Node.CONDITION) { 1830 unlinkCancelledWaiters(); 1831 t = lastWaiter; 1832 } 1833 Node node = new Node(Thread.currentThread(), Node.CONDITION); 1834 if (t == null) 1835 firstWaiter = node; 1836 else 1837 t.nextWaiter = node; 1838 lastWaiter = node; 1839 return node; 1840 } 1841 1842 /** 1843 * Removes and transfers nodes until hit non-cancelled one or 1844 * null. Split out from signal in part to encourage compilers 1845 * to inline the case of no waiters. 1846 * @param first (non-null) the first node on condition queue 1847 */ 1848 private void doSignal(Node first) { 1849 do { 1850 if ( (firstWaiter = first.nextWaiter) == null) 1851 lastWaiter = null; 1852 first.nextWaiter = null; 1853 } while (!transferForSignal(first) && 1854 (first = firstWaiter) != null); 1855 } 1856 1857 /** 1858 * Removes and transfers all nodes. 1859 * @param first (non-null) the first node on condition queue 1860 */ 1861 private void doSignalAll(Node first) { 1862 lastWaiter = firstWaiter = null; 1863 do { 1864 Node next = first.nextWaiter; 1865 first.nextWaiter = null; 1866 transferForSignal(first); 1867 first = next; 1868 } while (first != null); 1869 } 1870 1871 /** 1872 * Unlinks cancelled waiter nodes from condition queue. 1873 * Called only while holding lock. This is called when 1874 * cancellation occurred during condition wait, and upon 1875 * insertion of a new waiter when lastWaiter is seen to have 1876 * been cancelled. This method is needed to avoid garbage 1877 * retention in the absence of signals. So even though it may 1878 * require a full traversal, it comes into play only when 1879 * timeouts or cancellations occur in the absence of 1880 * signals. It traverses all nodes rather than stopping at a 1881 * particular target to unlink all pointers to garbage nodes 1882 * without requiring many re-traversals during cancellation 1883 * storms. 1884 */ 1885 private void unlinkCancelledWaiters() { 1886 Node t = firstWaiter; 1887 Node trail = null; 1888 while (t != null) { 1889 Node next = t.nextWaiter; 1890 if (t.waitStatus != Node.CONDITION) { 1891 t.nextWaiter = null; 1892 if (trail == null) 1893 firstWaiter = next; 1894 else 1895 trail.nextWaiter = next; 1896 if (next == null) 1897 lastWaiter = trail; 1898 } 1899 else 1900 trail = t; 1901 t = next; 1902 } 1903 } 1904 1905 // public methods 1906 1907 /** 1908 * Moves the longest-waiting thread, if one exists, from the 1909 * wait queue for this condition to the wait queue for the 1910 * owning lock. 1911 * 1912 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1913 * returns {@code false} 1914 */ 1915 public final void signal() { 1916 if (!isHeldExclusively()) 1917 throw new IllegalMonitorStateException(); 1918 Node first = firstWaiter; 1919 if (first != null) 1920 doSignal(first); 1921 } 1922 1923 /** 1924 * Moves all threads from the wait queue for this condition to 1925 * the wait queue for the owning lock. 1926 * 1927 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1928 * returns {@code false} 1929 */ 1930 public final void signalAll() { 1931 if (!isHeldExclusively()) 1932 throw new IllegalMonitorStateException(); 1933 Node first = firstWaiter; 1934 if (first != null) 1935 doSignalAll(first); 1936 } 1937 1938 /** 1939 * Implements uninterruptible condition wait. 1940 * <ol> 1941 * <li> Save lock state returned by {@link #getState}. 1942 * <li> Invoke {@link #release} with 1943 * saved state as argument, throwing 1944 * IllegalMonitorStateException if it fails. 1945 * <li> Block until signalled. 1946 * <li> Reacquire by invoking specialized version of 1947 * {@link #acquire} with saved state as argument. 1948 * </ol> 1949 */ 1950 public final void awaitUninterruptibly() { 1951 Node node = addConditionWaiter(); 1952 int savedState = fullyRelease(node); 1953 boolean interrupted = false; 1954 while (!isOnSyncQueue(node)) { 1955 LockSupport.park(this); 1956 if (Thread.interrupted()) 1957 interrupted = true; 1958 } 1959 if (acquireQueued(node, savedState) || interrupted) 1960 selfInterrupt(); 1961 } 1962 1963 /* 1964 * For interruptible waits, we need to track whether to throw 1965 * InterruptedException, if interrupted while blocked on 1966 * condition, versus reinterrupt current thread, if 1967 * interrupted while blocked waiting to re-acquire. 1968 */ 1969 1970 /** Mode meaning to reinterrupt on exit from wait */ 1971 private static final int REINTERRUPT = 1; 1972 /** Mode meaning to throw InterruptedException on exit from wait */ 1973 private static final int THROW_IE = -1; 1974 1975 /** 1976 * Checks for interrupt, returning THROW_IE if interrupted 1977 * before signalled, REINTERRUPT if after signalled, or 1978 * 0 if not interrupted. 1979 */ 1980 private int checkInterruptWhileWaiting(Node node) { 1981 return Thread.interrupted() ? 1982 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 1983 0; 1984 } 1985 1986 /** 1987 * Throws InterruptedException, reinterrupts current thread, or 1988 * does nothing, depending on mode. 1989 */ 1990 private void reportInterruptAfterWait(int interruptMode) 1991 throws InterruptedException { 1992 if (interruptMode == THROW_IE) 1993 throw new InterruptedException(); 1994 else if (interruptMode == REINTERRUPT) 1995 selfInterrupt(); 1996 } 1997 1998 /** 1999 * Implements interruptible condition wait. 2000 * <ol> 2001 * <li> If current thread is interrupted, throw InterruptedException. 2002 * <li> Save lock state returned by {@link #getState}. 2003 * <li> Invoke {@link #release} with 2004 * saved state as argument, throwing 2005 * IllegalMonitorStateException if it fails. 2006 * <li> Block until signalled or interrupted. 2007 * <li> Reacquire by invoking specialized version of 2008 * {@link #acquire} with saved state as argument. 2009 * <li> If interrupted while blocked in step 4, throw InterruptedException. 2010 * </ol> 2011 */ 2012 public final void await() throws InterruptedException { 2013 if (Thread.interrupted()) 2014 throw new InterruptedException(); 2015 Node node = addConditionWaiter(); 2016 int savedState = fullyRelease(node); 2017 int interruptMode = 0; 2018 while (!isOnSyncQueue(node)) { 2019 LockSupport.park(this); 2020 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2021 break; 2022 } 2023 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2024 interruptMode = REINTERRUPT; 2025 if (node.nextWaiter != null) // clean up if cancelled 2026 unlinkCancelledWaiters(); 2027 if (interruptMode != 0) 2028 reportInterruptAfterWait(interruptMode); 2029 } 2030 2031 /** 2032 * Implements timed condition wait. 2033 * <ol> 2034 * <li> If current thread is interrupted, throw InterruptedException. 2035 * <li> Save lock state returned by {@link #getState}. 2036 * <li> Invoke {@link #release} with 2037 * saved state as argument, throwing 2038 * IllegalMonitorStateException if it fails. 2039 * <li> Block until signalled, interrupted, or timed out. 2040 * <li> Reacquire by invoking specialized version of 2041 * {@link #acquire} with saved state as argument. 2042 * <li> If interrupted while blocked in step 4, throw InterruptedException. 2043 * </ol> 2044 */ 2045 public final long awaitNanos(long nanosTimeout) 2046 throws InterruptedException { 2047 if (Thread.interrupted()) 2048 throw new InterruptedException(); 2049 Node node = addConditionWaiter(); 2050 int savedState = fullyRelease(node); 2051 long lastTime = System.nanoTime(); 2052 int interruptMode = 0; 2053 while (!isOnSyncQueue(node)) { 2054 if (nanosTimeout <= 0L) { 2055 transferAfterCancelledWait(node); 2056 break; 2057 } 2058 LockSupport.parkNanos(this, nanosTimeout); 2059 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2060 break; 2061 2062 long now = System.nanoTime(); 2063 nanosTimeout -= now - lastTime; 2064 lastTime = now; 2065 } 2066 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2067 interruptMode = REINTERRUPT; 2068 if (node.nextWaiter != null) 2069 unlinkCancelledWaiters(); 2070 if (interruptMode != 0) 2071 reportInterruptAfterWait(interruptMode); 2072 return nanosTimeout - (System.nanoTime() - lastTime); 2073 } 2074 2075 /** 2076 * Implements absolute timed condition wait. 2077 * <ol> 2078 * <li> If current thread is interrupted, throw InterruptedException. 2079 * <li> Save lock state returned by {@link #getState}. 2080 * <li> Invoke {@link #release} with 2081 * saved state as argument, throwing 2082 * IllegalMonitorStateException if it fails. 2083 * <li> Block until signalled, interrupted, or timed out. 2084 * <li> Reacquire by invoking specialized version of 2085 * {@link #acquire} with saved state as argument. 2086 * <li> If interrupted while blocked in step 4, throw InterruptedException. 2087 * <li> If timed out while blocked in step 4, return false, else true. 2088 * </ol> 2089 */ 2090 public final boolean awaitUntil(Date deadline) 2091 throws InterruptedException { 2092 if (deadline == null) 2093 throw new NullPointerException(); 2094 long abstime = deadline.getTime(); 2095 if (Thread.interrupted()) 2096 throw new InterruptedException(); 2097 Node node = addConditionWaiter(); 2098 int savedState = fullyRelease(node); 2099 boolean timedout = false; 2100 int interruptMode = 0; 2101 while (!isOnSyncQueue(node)) { 2102 if (System.currentTimeMillis() > abstime) { 2103 timedout = transferAfterCancelledWait(node); 2104 break; 2105 } 2106 LockSupport.parkUntil(this, abstime); 2107 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2108 break; 2109 } 2110 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2111 interruptMode = REINTERRUPT; 2112 if (node.nextWaiter != null) 2113 unlinkCancelledWaiters(); 2114 if (interruptMode != 0) 2115 reportInterruptAfterWait(interruptMode); 2116 return !timedout; 2117 } 2118 2119 /** 2120 * Implements timed condition wait. 2121 * <ol> 2122 * <li> If current thread is interrupted, throw InterruptedException. 2123 * <li> Save lock state returned by {@link #getState}. 2124 * <li> Invoke {@link #release} with 2125 * saved state as argument, throwing 2126 * IllegalMonitorStateException if it fails. 2127 * <li> Block until signalled, interrupted, or timed out. 2128 * <li> Reacquire by invoking specialized version of 2129 * {@link #acquire} with saved state as argument. 2130 * <li> If interrupted while blocked in step 4, throw InterruptedException. 2131 * <li> If timed out while blocked in step 4, return false, else true. 2132 * </ol> 2133 */ 2134 public final boolean await(long time, TimeUnit unit) 2135 throws InterruptedException { 2136 if (unit == null) 2137 throw new NullPointerException(); 2138 long nanosTimeout = unit.toNanos(time); 2139 if (Thread.interrupted()) 2140 throw new InterruptedException(); 2141 Node node = addConditionWaiter(); 2142 int savedState = fullyRelease(node); 2143 long lastTime = System.nanoTime(); 2144 boolean timedout = false; 2145 int interruptMode = 0; 2146 while (!isOnSyncQueue(node)) { 2147 if (nanosTimeout <= 0L) { 2148 timedout = transferAfterCancelledWait(node); 2149 break; 2150 } 2151 if (nanosTimeout >= spinForTimeoutThreshold) 2152 LockSupport.parkNanos(this, nanosTimeout); 2153 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2154 break; 2155 long now = System.nanoTime(); 2156 nanosTimeout -= now - lastTime; 2157 lastTime = now; 2158 } 2159 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2160 interruptMode = REINTERRUPT; 2161 if (node.nextWaiter != null) 2162 unlinkCancelledWaiters(); 2163 if (interruptMode != 0) 2164 reportInterruptAfterWait(interruptMode); 2165 return !timedout; 2166 } 2167 2168 // support for instrumentation 2169 2170 /** 2171 * Returns true if this condition was created by the given 2172 * synchronization object. 2173 * 2174 * @return {@code true} if owned 2175 */ 2176 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 2177 return sync == AbstractQueuedSynchronizer.this; 2178 } 2179 2180 /** 2181 * Queries whether any threads are waiting on this condition. 2182 * Implements {@link AbstractQueuedSynchronizer#hasWaiters}. 2183 * 2184 * @return {@code true} if there are any waiting threads 2185 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2186 * returns {@code false} 2187 */ 2188 protected final boolean hasWaiters() { 2189 if (!isHeldExclusively()) 2190 throw new IllegalMonitorStateException(); 2191 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2192 if (w.waitStatus == Node.CONDITION) 2193 return true; 2194 } 2195 return false; 2196 } 2197 2198 /** 2199 * Returns an estimate of the number of threads waiting on 2200 * this condition. 2201 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength}. 2202 * 2203 * @return the estimated number of waiting threads 2204 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2205 * returns {@code false} 2206 */ 2207 protected final int getWaitQueueLength() { 2208 if (!isHeldExclusively()) 2209 throw new IllegalMonitorStateException(); 2210 int n = 0; 2211 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2212 if (w.waitStatus == Node.CONDITION) 2213 ++n; 2214 } 2215 return n; 2216 } 2217 2218 /** 2219 * Returns a collection containing those threads that may be 2220 * waiting on this Condition. 2221 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads}. 2222 * 2223 * @return the collection of threads 2224 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2225 * returns {@code false} 2226 */ 2227 protected final Collection<Thread> getWaitingThreads() { 2228 if (!isHeldExclusively()) 2229 throw new IllegalMonitorStateException(); 2230 ArrayList<Thread> list = new ArrayList<Thread>(); 2231 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2232 if (w.waitStatus == Node.CONDITION) { 2233 Thread t = w.thread; 2234 if (t != null) 2235 list.add(t); 2236 } 2237 } 2238 return list; 2239 } 2240 } 2241 2242 /** 2243 * Setup to support compareAndSet. We need to natively implement 2244 * this here: For the sake of permitting future enhancements, we 2245 * cannot explicitly subclass AtomicInteger, which would be 2246 * efficient and useful otherwise. So, as the lesser of evils, we 2247 * natively implement using hotspot intrinsics API. And while we 2248 * are at it, we do the same for other CASable fields (which could 2249 * otherwise be done with atomic field updaters). 2250 */ 2251 private static final Unsafe unsafe = Unsafe.getUnsafe(); 2252 private static final long stateOffset; 2253 private static final long headOffset; 2254 private static final long tailOffset; 2255 private static final long waitStatusOffset; 2256 private static final long nextOffset; 2257 2258 static { 2259 try { 2260 stateOffset = unsafe.objectFieldOffset 2261 (AbstractQueuedSynchronizer.class.getDeclaredField("state")); 2262 headOffset = unsafe.objectFieldOffset 2263 (AbstractQueuedSynchronizer.class.getDeclaredField("head")); 2264 tailOffset = unsafe.objectFieldOffset 2265 (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); 2266 waitStatusOffset = unsafe.objectFieldOffset 2267 (Node.class.getDeclaredField("waitStatus")); 2268 nextOffset = unsafe.objectFieldOffset 2269 (Node.class.getDeclaredField("next")); 2270 2271 } catch (Exception ex) { throw new Error(ex); } 2272 } 2273 2274 /** 2275 * CAS head field. Used only by enq. 2276 */ 2277 private final boolean compareAndSetHead(Node update) { 2278 return unsafe.compareAndSwapObject(this, headOffset, null, update); 2279 } 2280 2281 /** 2282 * CAS tail field. Used only by enq. 2283 */ 2284 private final boolean compareAndSetTail(Node expect, Node update) { 2285 return unsafe.compareAndSwapObject(this, tailOffset, expect, update); 2286 } 2287 2288 /** 2289 * CAS waitStatus field of a node. 2290 */ 2291 private static final boolean compareAndSetWaitStatus(Node node, 2292 int expect, 2293 int update) { 2294 return unsafe.compareAndSwapInt(node, waitStatusOffset, 2295 expect, update); 2296 } 2297 2298 /** 2299 * CAS next field of a node. 2300 */ 2301 private static final boolean compareAndSetNext(Node node, 2302 Node expect, 2303 Node update) { 2304 return unsafe.compareAndSwapObject(node, nextOffset, expect, update); 2305 } 2306} 2307