1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent; 8import java.util.concurrent.locks.LockSupport; 9 10/** 11 * A cancellable asynchronous computation. This class provides a base 12 * implementation of {@link Future}, with methods to start and cancel 13 * a computation, query to see if the computation is complete, and 14 * retrieve the result of the computation. The result can only be 15 * retrieved when the computation has completed; the {@code get} 16 * methods will block if the computation has not yet completed. Once 17 * the computation has completed, the computation cannot be restarted 18 * or cancelled (unless the computation is invoked using 19 * {@link #runAndReset}). 20 * 21 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or 22 * {@link Runnable} object. Because {@code FutureTask} implements 23 * {@code Runnable}, a {@code FutureTask} can be submitted to an 24 * {@link Executor} for execution. 25 * 26 * <p>In addition to serving as a standalone class, this class provides 27 * {@code protected} functionality that may be useful when creating 28 * customized task classes. 29 * 30 * @since 1.5 31 * @author Doug Lea 32 * @param <V> The result type returned by this FutureTask's {@code get} methods 33 */ 34public class FutureTask<V> implements RunnableFuture<V> { 35 /* 36 * Revision notes: This differs from previous versions of this 37 * class that relied on AbstractQueuedSynchronizer, mainly to 38 * avoid surprising users about retaining interrupt status during 39 * cancellation races. Sync control in the current design relies 40 * on a "state" field updated via CAS to track completion, along 41 * with a simple Treiber stack to hold waiting threads. 42 * 43 * Style note: As usual, we bypass overhead of using 44 * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics. 45 */ 46 47 /** 48 * The run state of this task, initially NEW. The run state 49 * transitions to a terminal state only in methods set, 50 * setException, and cancel. During completion, state may take on 51 * transient values of COMPLETING (while outcome is being set) or 52 * INTERRUPTING (only while interrupting the runner to satisfy a 53 * cancel(true)). Transitions from these intermediate to final 54 * states use cheaper ordered/lazy writes because values are unique 55 * and cannot be further modified. 56 * 57 * Possible state transitions: 58 * NEW -> COMPLETING -> NORMAL 59 * NEW -> COMPLETING -> EXCEPTIONAL 60 * NEW -> CANCELLED 61 * NEW -> INTERRUPTING -> INTERRUPTED 62 */ 63 private volatile int state; 64 private static final int NEW = 0; 65 private static final int COMPLETING = 1; 66 private static final int NORMAL = 2; 67 private static final int EXCEPTIONAL = 3; 68 private static final int CANCELLED = 4; 69 private static final int INTERRUPTING = 5; 70 private static final int INTERRUPTED = 6; 71 72 /** The underlying callable; nulled out after running */ 73 private Callable<V> callable; 74 /** The result to return or exception to throw from get() */ 75 private Object outcome; // non-volatile, protected by state reads/writes 76 /** The thread running the callable; CASed during run() */ 77 private volatile Thread runner; 78 /** Treiber stack of waiting threads */ 79 private volatile WaitNode waiters; 80 81 /** 82 * Returns result or throws exception for completed task. 83 * 84 * @param s completed state value 85 */ 86 private V report(int s) throws ExecutionException { 87 Object x = outcome; 88 if (s == NORMAL) { 89 @SuppressWarnings("unchecked") V v = (V)x; 90 return v; 91 } 92 if (s >= CANCELLED) 93 throw new CancellationException(); 94 throw new ExecutionException((Throwable)x); 95 } 96 97 /** 98 * Creates a {@code FutureTask} that will, upon running, execute the 99 * given {@code Callable}. 100 * 101 * @param callable the callable task 102 * @throws NullPointerException if the callable is null 103 */ 104 public FutureTask(Callable<V> callable) { 105 if (callable == null) 106 throw new NullPointerException(); 107 this.callable = callable; 108 this.state = NEW; // ensure visibility of callable 109 } 110 111 /** 112 * Creates a {@code FutureTask} that will, upon running, execute the 113 * given {@code Runnable}, and arrange that {@code get} will return the 114 * given result on successful completion. 115 * 116 * @param runnable the runnable task 117 * @param result the result to return on successful completion. If 118 * you don't need a particular result, consider using 119 * constructions of the form: 120 * {@code Future<?> f = new FutureTask<Void>(runnable, null)} 121 * @throws NullPointerException if the runnable is null 122 */ 123 public FutureTask(Runnable runnable, V result) { 124 this.callable = Executors.callable(runnable, result); 125 this.state = NEW; // ensure visibility of callable 126 } 127 128 public boolean isCancelled() { 129 return state >= CANCELLED; 130 } 131 132 public boolean isDone() { 133 return state != NEW; 134 } 135 136 public boolean cancel(boolean mayInterruptIfRunning) { 137 if (state != NEW) 138 return false; 139 if (mayInterruptIfRunning) { 140 if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) 141 return false; 142 Thread t = runner; 143 if (t != null) 144 t.interrupt(); 145 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state 146 } 147 else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) 148 return false; 149 finishCompletion(); 150 return true; 151 } 152 153 /** 154 * @throws CancellationException {@inheritDoc} 155 */ 156 public V get() throws InterruptedException, ExecutionException { 157 int s = state; 158 if (s <= COMPLETING) 159 s = awaitDone(false, 0L); 160 return report(s); 161 } 162 163 /** 164 * @throws CancellationException {@inheritDoc} 165 */ 166 public V get(long timeout, TimeUnit unit) 167 throws InterruptedException, ExecutionException, TimeoutException { 168 if (unit == null) 169 throw new NullPointerException(); 170 int s = state; 171 if (s <= COMPLETING && 172 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) 173 throw new TimeoutException(); 174 return report(s); 175 } 176 177 /** 178 * Protected method invoked when this task transitions to state 179 * {@code isDone} (whether normally or via cancellation). The 180 * default implementation does nothing. Subclasses may override 181 * this method to invoke completion callbacks or perform 182 * bookkeeping. Note that you can query status inside the 183 * implementation of this method to determine whether this task 184 * has been cancelled. 185 */ 186 protected void done() { } 187 188 /** 189 * Sets the result of this future to the given value unless 190 * this future has already been set or has been cancelled. 191 * 192 * <p>This method is invoked internally by the {@link #run} method 193 * upon successful completion of the computation. 194 * 195 * @param v the value 196 */ 197 protected void set(V v) { 198 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 199 outcome = v; 200 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 201 finishCompletion(); 202 } 203 } 204 205 /** 206 * Causes this future to report an {@link ExecutionException} 207 * with the given throwable as its cause, unless this future has 208 * already been set or has been cancelled. 209 * 210 * <p>This method is invoked internally by the {@link #run} method 211 * upon failure of the computation. 212 * 213 * @param t the cause of failure 214 */ 215 protected void setException(Throwable t) { 216 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 217 outcome = t; 218 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state 219 finishCompletion(); 220 } 221 } 222 223 public void run() { 224 if (state != NEW || 225 !UNSAFE.compareAndSwapObject(this, runnerOffset, 226 null, Thread.currentThread())) 227 return; 228 try { 229 Callable<V> c = callable; 230 if (c != null && state == NEW) { 231 V result; 232 boolean ran; 233 try { 234 result = c.call(); 235 ran = true; 236 } catch (Throwable ex) { 237 result = null; 238 ran = false; 239 setException(ex); 240 } 241 if (ran) 242 set(result); 243 } 244 } finally { 245 // runner must be non-null until state is settled to 246 // prevent concurrent calls to run() 247 runner = null; 248 // state must be re-read after nulling runner to prevent 249 // leaked interrupts 250 int s = state; 251 if (s >= INTERRUPTING) 252 handlePossibleCancellationInterrupt(s); 253 } 254 } 255 256 /** 257 * Executes the computation without setting its result, and then 258 * resets this future to initial state, failing to do so if the 259 * computation encounters an exception or is cancelled. This is 260 * designed for use with tasks that intrinsically execute more 261 * than once. 262 * 263 * @return true if successfully run and reset 264 */ 265 protected boolean runAndReset() { 266 if (state != NEW || 267 !UNSAFE.compareAndSwapObject(this, runnerOffset, 268 null, Thread.currentThread())) 269 return false; 270 boolean ran = false; 271 int s = state; 272 try { 273 Callable<V> c = callable; 274 if (c != null && s == NEW) { 275 try { 276 c.call(); // don't set result 277 ran = true; 278 } catch (Throwable ex) { 279 setException(ex); 280 } 281 } 282 } finally { 283 // runner must be non-null until state is settled to 284 // prevent concurrent calls to run() 285 runner = null; 286 // state must be re-read after nulling runner to prevent 287 // leaked interrupts 288 s = state; 289 if (s >= INTERRUPTING) 290 handlePossibleCancellationInterrupt(s); 291 } 292 return ran && s == NEW; 293 } 294 295 /** 296 * Ensures that any interrupt from a possible cancel(true) is only 297 * delivered to a task while in run or runAndReset. 298 */ 299 private void handlePossibleCancellationInterrupt(int s) { 300 // It is possible for our interrupter to stall before getting a 301 // chance to interrupt us. Let's spin-wait patiently. 302 if (s == INTERRUPTING) 303 while (state == INTERRUPTING) 304 Thread.yield(); // wait out pending interrupt 305 306 // assert state == INTERRUPTED; 307 308 // We want to clear any interrupt we may have received from 309 // cancel(true). However, it is permissible to use interrupts 310 // as an independent mechanism for a task to communicate with 311 // its caller, and there is no way to clear only the 312 // cancellation interrupt. 313 // 314 // Thread.interrupted(); 315 } 316 317 /** 318 * Simple linked list nodes to record waiting threads in a Treiber 319 * stack. See other classes such as Phaser and SynchronousQueue 320 * for more detailed explanation. 321 */ 322 static final class WaitNode { 323 volatile Thread thread; 324 volatile WaitNode next; 325 WaitNode() { thread = Thread.currentThread(); } 326 } 327 328 /** 329 * Removes and signals all waiting threads, invokes done(), and 330 * nulls out callable. 331 */ 332 private void finishCompletion() { 333 // assert state > COMPLETING; 334 for (WaitNode q; (q = waiters) != null;) { 335 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 336 for (;;) { 337 Thread t = q.thread; 338 if (t != null) { 339 q.thread = null; 340 LockSupport.unpark(t); 341 } 342 WaitNode next = q.next; 343 if (next == null) 344 break; 345 q.next = null; // unlink to help gc 346 q = next; 347 } 348 break; 349 } 350 } 351 352 done(); 353 354 callable = null; // to reduce footprint 355 } 356 357 /** 358 * Awaits completion or aborts on interrupt or timeout. 359 * 360 * @param timed true if use timed waits 361 * @param nanos time to wait, if timed 362 * @return state upon completion 363 */ 364 private int awaitDone(boolean timed, long nanos) 365 throws InterruptedException { 366 long last = timed ? System.nanoTime() : 0L; 367 WaitNode q = null; 368 boolean queued = false; 369 for (;;) { 370 if (Thread.interrupted()) { 371 removeWaiter(q); 372 throw new InterruptedException(); 373 } 374 375 int s = state; 376 if (s > COMPLETING) { 377 if (q != null) 378 q.thread = null; 379 return s; 380 } 381 else if (q == null) 382 q = new WaitNode(); 383 else if (!queued) 384 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 385 q.next = waiters, q); 386 else if (timed) { 387 long now = System.nanoTime(); 388 if ((nanos -= (now - last)) <= 0L) { 389 removeWaiter(q); 390 return state; 391 } 392 last = now; 393 LockSupport.parkNanos(this, nanos); 394 } 395 else 396 LockSupport.park(this); 397 } 398 } 399 400 /** 401 * Tries to unlink a timed-out or interrupted wait node to avoid 402 * accumulating garbage. Internal nodes are simply unspliced 403 * without CAS since it is harmless if they are traversed anyway 404 * by releasers. To avoid effects of unsplicing from already 405 * removed nodes, the list is retraversed in case of an apparent 406 * race. This is slow when there are a lot of nodes, but we don't 407 * expect lists to be long enough to outweigh higher-overhead 408 * schemes. 409 */ 410 private void removeWaiter(WaitNode node) { 411 if (node != null) { 412 node.thread = null; 413 retry: 414 for (;;) { // restart on removeWaiter race 415 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 416 s = q.next; 417 if (q.thread != null) 418 pred = q; 419 else if (pred != null) { 420 pred.next = s; 421 if (pred.thread == null) // check for race 422 continue retry; 423 } 424 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, 425 q, s)) 426 continue retry; 427 } 428 break; 429 } 430 } 431 } 432 433 // Unsafe mechanics 434 private static final sun.misc.Unsafe UNSAFE; 435 private static final long stateOffset; 436 private static final long runnerOffset; 437 private static final long waitersOffset; 438 static { 439 try { 440 UNSAFE = sun.misc.Unsafe.getUnsafe(); 441 Class<?> k = FutureTask.class; 442 stateOffset = UNSAFE.objectFieldOffset 443 (k.getDeclaredField("state")); 444 runnerOffset = UNSAFE.objectFieldOffset 445 (k.getDeclaredField("runner")); 446 waitersOffset = UNSAFE.objectFieldOffset 447 (k.getDeclaredField("waiters")); 448 } catch (Exception e) { 449 throw new Error(e); 450 } 451 } 452 453} 454