1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8
9import java.util.concurrent.locks.LockSupport;
10
11/**
12 * A cancellable asynchronous computation.  This class provides a base
13 * implementation of {@link Future}, with methods to start and cancel
14 * a computation, query to see if the computation is complete, and
15 * retrieve the result of the computation.  The result can only be
16 * retrieved when the computation has completed; the {@code get}
17 * methods will block if the computation has not yet completed.  Once
18 * the computation has completed, the computation cannot be restarted
19 * or cancelled (unless the computation is invoked using
20 * {@link #runAndReset}).
21 *
22 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
23 * {@link Runnable} object.  Because {@code FutureTask} implements
24 * {@code Runnable}, a {@code FutureTask} can be submitted to an
25 * {@link Executor} for execution.
26 *
27 * <p>In addition to serving as a standalone class, this class provides
28 * {@code protected} functionality that may be useful when creating
29 * customized task classes.
30 *
31 * @since 1.5
32 * @author Doug Lea
33 * @param <V> The result type returned by this FutureTask's {@code get} methods
34 */
35public class FutureTask<V> implements RunnableFuture<V> {
36    /*
37     * Revision notes: This differs from previous versions of this
38     * class that relied on AbstractQueuedSynchronizer, mainly to
39     * avoid surprising users about retaining interrupt status during
40     * cancellation races. Sync control in the current design relies
41     * on a "state" field updated via CAS to track completion, along
42     * with a simple Treiber stack to hold waiting threads.
43     *
44     * Style note: As usual, we bypass overhead of using
45     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
46     */
47
48    /**
49     * The run state of this task, initially NEW.  The run state
50     * transitions to a terminal state only in methods set,
51     * setException, and cancel.  During completion, state may take on
52     * transient values of COMPLETING (while outcome is being set) or
53     * INTERRUPTING (only while interrupting the runner to satisfy a
54     * cancel(true)). Transitions from these intermediate to final
55     * states use cheaper ordered/lazy writes because values are unique
56     * and cannot be further modified.
57     *
58     * Possible state transitions:
59     * NEW -> COMPLETING -> NORMAL
60     * NEW -> COMPLETING -> EXCEPTIONAL
61     * NEW -> CANCELLED
62     * NEW -> INTERRUPTING -> INTERRUPTED
63     */
64    private volatile int state;
65    private static final int NEW          = 0;
66    private static final int COMPLETING   = 1;
67    private static final int NORMAL       = 2;
68    private static final int EXCEPTIONAL  = 3;
69    private static final int CANCELLED    = 4;
70    private static final int INTERRUPTING = 5;
71    private static final int INTERRUPTED  = 6;
72
73    /** The underlying callable; nulled out after running */
74    private Callable<V> callable;
75    /** The result to return or exception to throw from get() */
76    private Object outcome; // non-volatile, protected by state reads/writes
77    /** The thread running the callable; CASed during run() */
78    private volatile Thread runner;
79    /** Treiber stack of waiting threads */
80    private volatile WaitNode waiters;
81
82    /**
83     * Returns result or throws exception for completed task.
84     *
85     * @param s completed state value
86     */
87    @SuppressWarnings("unchecked")
88    private V report(int s) throws ExecutionException {
89        Object x = outcome;
90        if (s == NORMAL)
91            return (V)x;
92        if (s >= CANCELLED)
93            throw new CancellationException();
94        throw new ExecutionException((Throwable)x);
95    }
96
97    /**
98     * Creates a {@code FutureTask} that will, upon running, execute the
99     * given {@code Callable}.
100     *
101     * @param  callable the callable task
102     * @throws NullPointerException if the callable is null
103     */
104    public FutureTask(Callable<V> callable) {
105        if (callable == null)
106            throw new NullPointerException();
107        this.callable = callable;
108        this.state = NEW;       // ensure visibility of callable
109    }
110
111    /**
112     * Creates a {@code FutureTask} that will, upon running, execute the
113     * given {@code Runnable}, and arrange that {@code get} will return the
114     * given result on successful completion.
115     *
116     * @param runnable the runnable task
117     * @param result the result to return on successful completion. If
118     * you don't need a particular result, consider using
119     * constructions of the form:
120     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
121     * @throws NullPointerException if the runnable is null
122     */
123    public FutureTask(Runnable runnable, V result) {
124        this.callable = Executors.callable(runnable, result);
125        this.state = NEW;       // ensure visibility of callable
126    }
127
128    public boolean isCancelled() {
129        return state >= CANCELLED;
130    }
131
132    public boolean isDone() {
133        return state != NEW;
134    }
135
136    public boolean cancel(boolean mayInterruptIfRunning) {
137        if (!(state == NEW &&
138              U.compareAndSwapInt(this, STATE, NEW,
139                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
140            return false;
141        try {    // in case call to interrupt throws exception
142            if (mayInterruptIfRunning) {
143                try {
144                    Thread t = runner;
145                    if (t != null)
146                        t.interrupt();
147                } finally { // final state
148                    U.putOrderedInt(this, STATE, INTERRUPTED);
149                }
150            }
151        } finally {
152            finishCompletion();
153        }
154        return true;
155    }
156
157    /**
158     * @throws CancellationException {@inheritDoc}
159     */
160    public V get() throws InterruptedException, ExecutionException {
161        int s = state;
162        if (s <= COMPLETING)
163            s = awaitDone(false, 0L);
164        return report(s);
165    }
166
167    /**
168     * @throws CancellationException {@inheritDoc}
169     */
170    public V get(long timeout, TimeUnit unit)
171        throws InterruptedException, ExecutionException, TimeoutException {
172        if (unit == null)
173            throw new NullPointerException();
174        int s = state;
175        if (s <= COMPLETING &&
176            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
177            throw new TimeoutException();
178        return report(s);
179    }
180
181    /**
182     * Protected method invoked when this task transitions to state
183     * {@code isDone} (whether normally or via cancellation). The
184     * default implementation does nothing.  Subclasses may override
185     * this method to invoke completion callbacks or perform
186     * bookkeeping. Note that you can query status inside the
187     * implementation of this method to determine whether this task
188     * has been cancelled.
189     */
190    protected void done() { }
191
192    /**
193     * Sets the result of this future to the given value unless
194     * this future has already been set or has been cancelled.
195     *
196     * <p>This method is invoked internally by the {@link #run} method
197     * upon successful completion of the computation.
198     *
199     * @param v the value
200     */
201    protected void set(V v) {
202        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
203            outcome = v;
204            U.putOrderedInt(this, STATE, NORMAL); // final state
205            finishCompletion();
206        }
207    }
208
209    /**
210     * Causes this future to report an {@link ExecutionException}
211     * with the given throwable as its cause, unless this future has
212     * already been set or has been cancelled.
213     *
214     * <p>This method is invoked internally by the {@link #run} method
215     * upon failure of the computation.
216     *
217     * @param t the cause of failure
218     */
219    protected void setException(Throwable t) {
220        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
221            outcome = t;
222            U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
223            finishCompletion();
224        }
225    }
226
227    public void run() {
228        if (state != NEW ||
229            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
230            return;
231        try {
232            Callable<V> c = callable;
233            if (c != null && state == NEW) {
234                V result;
235                boolean ran;
236                try {
237                    result = c.call();
238                    ran = true;
239                } catch (Throwable ex) {
240                    result = null;
241                    ran = false;
242                    setException(ex);
243                }
244                if (ran)
245                    set(result);
246            }
247        } finally {
248            // runner must be non-null until state is settled to
249            // prevent concurrent calls to run()
250            runner = null;
251            // state must be re-read after nulling runner to prevent
252            // leaked interrupts
253            int s = state;
254            if (s >= INTERRUPTING)
255                handlePossibleCancellationInterrupt(s);
256        }
257    }
258
259    /**
260     * Executes the computation without setting its result, and then
261     * resets this future to initial state, failing to do so if the
262     * computation encounters an exception or is cancelled.  This is
263     * designed for use with tasks that intrinsically execute more
264     * than once.
265     *
266     * @return {@code true} if successfully run and reset
267     */
268    protected boolean runAndReset() {
269        if (state != NEW ||
270            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
271            return false;
272        boolean ran = false;
273        int s = state;
274        try {
275            Callable<V> c = callable;
276            if (c != null && s == NEW) {
277                try {
278                    c.call(); // don't set result
279                    ran = true;
280                } catch (Throwable ex) {
281                    setException(ex);
282                }
283            }
284        } finally {
285            // runner must be non-null until state is settled to
286            // prevent concurrent calls to run()
287            runner = null;
288            // state must be re-read after nulling runner to prevent
289            // leaked interrupts
290            s = state;
291            if (s >= INTERRUPTING)
292                handlePossibleCancellationInterrupt(s);
293        }
294        return ran && s == NEW;
295    }
296
297    /**
298     * Ensures that any interrupt from a possible cancel(true) is only
299     * delivered to a task while in run or runAndReset.
300     */
301    private void handlePossibleCancellationInterrupt(int s) {
302        // It is possible for our interrupter to stall before getting a
303        // chance to interrupt us.  Let's spin-wait patiently.
304        if (s == INTERRUPTING)
305            while (state == INTERRUPTING)
306                Thread.yield(); // wait out pending interrupt
307
308        // assert state == INTERRUPTED;
309
310        // We want to clear any interrupt we may have received from
311        // cancel(true).  However, it is permissible to use interrupts
312        // as an independent mechanism for a task to communicate with
313        // its caller, and there is no way to clear only the
314        // cancellation interrupt.
315        //
316        // Thread.interrupted();
317    }
318
319    /**
320     * Simple linked list nodes to record waiting threads in a Treiber
321     * stack.  See other classes such as Phaser and SynchronousQueue
322     * for more detailed explanation.
323     */
324    static final class WaitNode {
325        volatile Thread thread;
326        volatile WaitNode next;
327        WaitNode() { thread = Thread.currentThread(); }
328    }
329
330    /**
331     * Removes and signals all waiting threads, invokes done(), and
332     * nulls out callable.
333     */
334    private void finishCompletion() {
335        // assert state > COMPLETING;
336        for (WaitNode q; (q = waiters) != null;) {
337            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
338                for (;;) {
339                    Thread t = q.thread;
340                    if (t != null) {
341                        q.thread = null;
342                        LockSupport.unpark(t);
343                    }
344                    WaitNode next = q.next;
345                    if (next == null)
346                        break;
347                    q.next = null; // unlink to help gc
348                    q = next;
349                }
350                break;
351            }
352        }
353
354        done();
355
356        callable = null;        // to reduce footprint
357    }
358
359    /**
360     * Awaits completion or aborts on interrupt or timeout.
361     *
362     * @param timed true if use timed waits
363     * @param nanos time to wait, if timed
364     * @return state upon completion or at timeout
365     */
366    private int awaitDone(boolean timed, long nanos)
367        throws InterruptedException {
368        // The code below is very delicate, to achieve these goals:
369        // - call nanoTime exactly once for each call to park
370        // - if nanos <= 0L, return promptly without allocation or nanoTime
371        // - if nanos == Long.MIN_VALUE, don't underflow
372        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
373        //   and we suffer a spurious wakeup, we will do no worse than
374        //   to park-spin for a while
375        long startTime = 0L;    // Special value 0L means not yet parked
376        WaitNode q = null;
377        boolean queued = false;
378        for (;;) {
379            int s = state;
380            if (s > COMPLETING) {
381                if (q != null)
382                    q.thread = null;
383                return s;
384            }
385            else if (s == COMPLETING)
386                // We may have already promised (via isDone) that we are done
387                // so never return empty-handed or throw InterruptedException
388                Thread.yield();
389            else if (Thread.interrupted()) {
390                removeWaiter(q);
391                throw new InterruptedException();
392            }
393            else if (q == null) {
394                if (timed && nanos <= 0L)
395                    return s;
396                q = new WaitNode();
397            }
398            else if (!queued)
399                queued = U.compareAndSwapObject(this, WAITERS,
400                                                q.next = waiters, q);
401            else if (timed) {
402                final long parkNanos;
403                if (startTime == 0L) { // first time
404                    startTime = System.nanoTime();
405                    if (startTime == 0L)
406                        startTime = 1L;
407                    parkNanos = nanos;
408                } else {
409                    long elapsed = System.nanoTime() - startTime;
410                    if (elapsed >= nanos) {
411                        removeWaiter(q);
412                        return state;
413                    }
414                    parkNanos = nanos - elapsed;
415                }
416                // nanoTime may be slow; recheck before parking
417                if (state < COMPLETING)
418                    LockSupport.parkNanos(this, parkNanos);
419            }
420            else
421                LockSupport.park(this);
422        }
423    }
424
425    /**
426     * Tries to unlink a timed-out or interrupted wait node to avoid
427     * accumulating garbage.  Internal nodes are simply unspliced
428     * without CAS since it is harmless if they are traversed anyway
429     * by releasers.  To avoid effects of unsplicing from already
430     * removed nodes, the list is retraversed in case of an apparent
431     * race.  This is slow when there are a lot of nodes, but we don't
432     * expect lists to be long enough to outweigh higher-overhead
433     * schemes.
434     */
435    private void removeWaiter(WaitNode node) {
436        if (node != null) {
437            node.thread = null;
438            retry:
439            for (;;) {          // restart on removeWaiter race
440                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
441                    s = q.next;
442                    if (q.thread != null)
443                        pred = q;
444                    else if (pred != null) {
445                        pred.next = s;
446                        if (pred.thread == null) // check for race
447                            continue retry;
448                    }
449                    else if (!U.compareAndSwapObject(this, WAITERS, q, s))
450                        continue retry;
451                }
452                break;
453            }
454        }
455    }
456
457    // Unsafe mechanics
458    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
459    private static final long STATE;
460    private static final long RUNNER;
461    private static final long WAITERS;
462    static {
463        try {
464            STATE = U.objectFieldOffset
465                (FutureTask.class.getDeclaredField("state"));
466            RUNNER = U.objectFieldOffset
467                (FutureTask.class.getDeclaredField("runner"));
468            WAITERS = U.objectFieldOffset
469                (FutureTask.class.getDeclaredField("waiters"));
470        } catch (ReflectiveOperationException e) {
471            throw new Error(e);
472        }
473
474        // Reduce the risk of rare disastrous classloading in first call to
475        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
476        Class<?> ensureLoaded = LockSupport.class;
477    }
478
479}
480