FutureTask.java revision 29957558cf0db700bfaae360a80c42dc3871d0e5
1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.util.concurrent.locks.LockSupport;
39
40/**
41 * A cancellable asynchronous computation.  This class provides a base
42 * implementation of {@link Future}, with methods to start and cancel
43 * a computation, query to see if the computation is complete, and
44 * retrieve the result of the computation.  The result can only be
45 * retrieved when the computation has completed; the {@code get}
46 * methods will block if the computation has not yet completed.  Once
47 * the computation has completed, the computation cannot be restarted
48 * or cancelled (unless the computation is invoked using
49 * {@link #runAndReset}).
50 *
51 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
52 * {@link Runnable} object.  Because {@code FutureTask} implements
53 * {@code Runnable}, a {@code FutureTask} can be submitted to an
54 * {@link Executor} for execution.
55 *
56 * <p>In addition to serving as a standalone class, this class provides
57 * {@code protected} functionality that may be useful when creating
58 * customized task classes.
59 *
60 * @since 1.5
61 * @author Doug Lea
62 * @param <V> The result type returned by this FutureTask's {@code get} methods
63 */
64public class FutureTask<V> implements RunnableFuture<V> {
65    /*
66     * Revision notes: This differs from previous versions of this
67     * class that relied on AbstractQueuedSynchronizer, mainly to
68     * avoid surprising users about retaining interrupt status during
69     * cancellation races. Sync control in the current design relies
70     * on a "state" field updated via CAS to track completion, along
71     * with a simple Treiber stack to hold waiting threads.
72     *
73     * Style note: As usual, we bypass overhead of using
74     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
75     */
76
77    /**
78     * The run state of this task, initially NEW.  The run state
79     * transitions to a terminal state only in methods set,
80     * setException, and cancel.  During completion, state may take on
81     * transient values of COMPLETING (while outcome is being set) or
82     * INTERRUPTING (only while interrupting the runner to satisfy a
83     * cancel(true)). Transitions from these intermediate to final
84     * states use cheaper ordered/lazy writes because values are unique
85     * and cannot be further modified.
86     *
87     * Possible state transitions:
88     * NEW -> COMPLETING -> NORMAL
89     * NEW -> COMPLETING -> EXCEPTIONAL
90     * NEW -> CANCELLED
91     * NEW -> INTERRUPTING -> INTERRUPTED
92     */
93    private volatile int state;
94    private static final int NEW          = 0;
95    private static final int COMPLETING   = 1;
96    private static final int NORMAL       = 2;
97    private static final int EXCEPTIONAL  = 3;
98    private static final int CANCELLED    = 4;
99    private static final int INTERRUPTING = 5;
100    private static final int INTERRUPTED  = 6;
101
102    /** The underlying callable; nulled out after running */
103    private Callable<V> callable;
104    /** The result to return or exception to throw from get() */
105    private Object outcome; // non-volatile, protected by state reads/writes
106    /** The thread running the callable; CASed during run() */
107    private volatile Thread runner;
108    /** Treiber stack of waiting threads */
109    private volatile WaitNode waiters;
110
111    /**
112     * Returns result or throws exception for completed task.
113     *
114     * @param s completed state value
115     */
116    @SuppressWarnings("unchecked")
117    private V report(int s) throws ExecutionException {
118        Object x = outcome;
119        if (s == NORMAL)
120            return (V)x;
121        if (s >= CANCELLED)
122            throw new CancellationException();
123        throw new ExecutionException((Throwable)x);
124    }
125
126    /**
127     * Creates a {@code FutureTask} that will, upon running, execute the
128     * given {@code Callable}.
129     *
130     * @param  callable the callable task
131     * @throws NullPointerException if the callable is null
132     */
133    public FutureTask(Callable<V> callable) {
134        if (callable == null)
135            throw new NullPointerException();
136        this.callable = callable;
137        this.state = NEW;       // ensure visibility of callable
138    }
139
140    /**
141     * Creates a {@code FutureTask} that will, upon running, execute the
142     * given {@code Runnable}, and arrange that {@code get} will return the
143     * given result on successful completion.
144     *
145     * @param runnable the runnable task
146     * @param result the result to return on successful completion. If
147     * you don't need a particular result, consider using
148     * constructions of the form:
149     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
150     * @throws NullPointerException if the runnable is null
151     */
152    public FutureTask(Runnable runnable, V result) {
153        this.callable = Executors.callable(runnable, result);
154        this.state = NEW;       // ensure visibility of callable
155    }
156
157    public boolean isCancelled() {
158        return state >= CANCELLED;
159    }
160
161    public boolean isDone() {
162        return state != NEW;
163    }
164
165    public boolean cancel(boolean mayInterruptIfRunning) {
166        if (!(state == NEW &&
167              U.compareAndSwapInt(this, STATE, NEW,
168                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
169            return false;
170        try {    // in case call to interrupt throws exception
171            if (mayInterruptIfRunning) {
172                try {
173                    Thread t = runner;
174                    if (t != null)
175                        t.interrupt();
176                } finally { // final state
177                    U.putOrderedInt(this, STATE, INTERRUPTED);
178                }
179            }
180        } finally {
181            finishCompletion();
182        }
183        return true;
184    }
185
186    /**
187     * @throws CancellationException {@inheritDoc}
188     */
189    public V get() throws InterruptedException, ExecutionException {
190        int s = state;
191        if (s <= COMPLETING)
192            s = awaitDone(false, 0L);
193        return report(s);
194    }
195
196    /**
197     * @throws CancellationException {@inheritDoc}
198     */
199    public V get(long timeout, TimeUnit unit)
200        throws InterruptedException, ExecutionException, TimeoutException {
201        if (unit == null)
202            throw new NullPointerException();
203        int s = state;
204        if (s <= COMPLETING &&
205            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
206            throw new TimeoutException();
207        return report(s);
208    }
209
210    /**
211     * Protected method invoked when this task transitions to state
212     * {@code isDone} (whether normally or via cancellation). The
213     * default implementation does nothing.  Subclasses may override
214     * this method to invoke completion callbacks or perform
215     * bookkeeping. Note that you can query status inside the
216     * implementation of this method to determine whether this task
217     * has been cancelled.
218     */
219    protected void done() { }
220
221    /**
222     * Sets the result of this future to the given value unless
223     * this future has already been set or has been cancelled.
224     *
225     * <p>This method is invoked internally by the {@link #run} method
226     * upon successful completion of the computation.
227     *
228     * @param v the value
229     */
230    protected void set(V v) {
231        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
232            outcome = v;
233            U.putOrderedInt(this, STATE, NORMAL); // final state
234            finishCompletion();
235        }
236    }
237
238    /**
239     * Causes this future to report an {@link ExecutionException}
240     * with the given throwable as its cause, unless this future has
241     * already been set or has been cancelled.
242     *
243     * <p>This method is invoked internally by the {@link #run} method
244     * upon failure of the computation.
245     *
246     * @param t the cause of failure
247     */
248    protected void setException(Throwable t) {
249        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
250            outcome = t;
251            U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
252            finishCompletion();
253        }
254    }
255
256    public void run() {
257        if (state != NEW ||
258            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
259            return;
260        try {
261            Callable<V> c = callable;
262            if (c != null && state == NEW) {
263                V result;
264                boolean ran;
265                try {
266                    result = c.call();
267                    ran = true;
268                } catch (Throwable ex) {
269                    result = null;
270                    ran = false;
271                    setException(ex);
272                }
273                if (ran)
274                    set(result);
275            }
276        } finally {
277            // runner must be non-null until state is settled to
278            // prevent concurrent calls to run()
279            runner = null;
280            // state must be re-read after nulling runner to prevent
281            // leaked interrupts
282            int s = state;
283            if (s >= INTERRUPTING)
284                handlePossibleCancellationInterrupt(s);
285        }
286    }
287
288    /**
289     * Executes the computation without setting its result, and then
290     * resets this future to initial state, failing to do so if the
291     * computation encounters an exception or is cancelled.  This is
292     * designed for use with tasks that intrinsically execute more
293     * than once.
294     *
295     * @return {@code true} if successfully run and reset
296     */
297    protected boolean runAndReset() {
298        if (state != NEW ||
299            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
300            return false;
301        boolean ran = false;
302        int s = state;
303        try {
304            Callable<V> c = callable;
305            if (c != null && s == NEW) {
306                try {
307                    c.call(); // don't set result
308                    ran = true;
309                } catch (Throwable ex) {
310                    setException(ex);
311                }
312            }
313        } finally {
314            // runner must be non-null until state is settled to
315            // prevent concurrent calls to run()
316            runner = null;
317            // state must be re-read after nulling runner to prevent
318            // leaked interrupts
319            s = state;
320            if (s >= INTERRUPTING)
321                handlePossibleCancellationInterrupt(s);
322        }
323        return ran && s == NEW;
324    }
325
326    /**
327     * Ensures that any interrupt from a possible cancel(true) is only
328     * delivered to a task while in run or runAndReset.
329     */
330    private void handlePossibleCancellationInterrupt(int s) {
331        // It is possible for our interrupter to stall before getting a
332        // chance to interrupt us.  Let's spin-wait patiently.
333        if (s == INTERRUPTING)
334            while (state == INTERRUPTING)
335                Thread.yield(); // wait out pending interrupt
336
337        // assert state == INTERRUPTED;
338
339        // We want to clear any interrupt we may have received from
340        // cancel(true).  However, it is permissible to use interrupts
341        // as an independent mechanism for a task to communicate with
342        // its caller, and there is no way to clear only the
343        // cancellation interrupt.
344        //
345        // Thread.interrupted();
346    }
347
348    /**
349     * Simple linked list nodes to record waiting threads in a Treiber
350     * stack.  See other classes such as Phaser and SynchronousQueue
351     * for more detailed explanation.
352     */
353    static final class WaitNode {
354        volatile Thread thread;
355        volatile WaitNode next;
356        WaitNode() { thread = Thread.currentThread(); }
357    }
358
359    /**
360     * Removes and signals all waiting threads, invokes done(), and
361     * nulls out callable.
362     */
363    private void finishCompletion() {
364        // assert state > COMPLETING;
365        for (WaitNode q; (q = waiters) != null;) {
366            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
367                for (;;) {
368                    Thread t = q.thread;
369                    if (t != null) {
370                        q.thread = null;
371                        LockSupport.unpark(t);
372                    }
373                    WaitNode next = q.next;
374                    if (next == null)
375                        break;
376                    q.next = null; // unlink to help gc
377                    q = next;
378                }
379                break;
380            }
381        }
382
383        done();
384
385        callable = null;        // to reduce footprint
386    }
387
388    /**
389     * Awaits completion or aborts on interrupt or timeout.
390     *
391     * @param timed true if use timed waits
392     * @param nanos time to wait, if timed
393     * @return state upon completion or at timeout
394     */
395    private int awaitDone(boolean timed, long nanos)
396        throws InterruptedException {
397        // The code below is very delicate, to achieve these goals:
398        // - call nanoTime exactly once for each call to park
399        // - if nanos <= 0L, return promptly without allocation or nanoTime
400        // - if nanos == Long.MIN_VALUE, don't underflow
401        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
402        //   and we suffer a spurious wakeup, we will do no worse than
403        //   to park-spin for a while
404        long startTime = 0L;    // Special value 0L means not yet parked
405        WaitNode q = null;
406        boolean queued = false;
407        for (;;) {
408            int s = state;
409            if (s > COMPLETING) {
410                if (q != null)
411                    q.thread = null;
412                return s;
413            }
414            else if (s == COMPLETING)
415                // We may have already promised (via isDone) that we are done
416                // so never return empty-handed or throw InterruptedException
417                Thread.yield();
418            else if (Thread.interrupted()) {
419                removeWaiter(q);
420                throw new InterruptedException();
421            }
422            else if (q == null) {
423                if (timed && nanos <= 0L)
424                    return s;
425                q = new WaitNode();
426            }
427            else if (!queued)
428                queued = U.compareAndSwapObject(this, WAITERS,
429                                                q.next = waiters, q);
430            else if (timed) {
431                final long parkNanos;
432                if (startTime == 0L) { // first time
433                    startTime = System.nanoTime();
434                    if (startTime == 0L)
435                        startTime = 1L;
436                    parkNanos = nanos;
437                } else {
438                    long elapsed = System.nanoTime() - startTime;
439                    if (elapsed >= nanos) {
440                        removeWaiter(q);
441                        return state;
442                    }
443                    parkNanos = nanos - elapsed;
444                }
445                // nanoTime may be slow; recheck before parking
446                if (state < COMPLETING)
447                    LockSupport.parkNanos(this, parkNanos);
448            }
449            else
450                LockSupport.park(this);
451        }
452    }
453
454    /**
455     * Tries to unlink a timed-out or interrupted wait node to avoid
456     * accumulating garbage.  Internal nodes are simply unspliced
457     * without CAS since it is harmless if they are traversed anyway
458     * by releasers.  To avoid effects of unsplicing from already
459     * removed nodes, the list is retraversed in case of an apparent
460     * race.  This is slow when there are a lot of nodes, but we don't
461     * expect lists to be long enough to outweigh higher-overhead
462     * schemes.
463     */
464    private void removeWaiter(WaitNode node) {
465        if (node != null) {
466            node.thread = null;
467            retry:
468            for (;;) {          // restart on removeWaiter race
469                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
470                    s = q.next;
471                    if (q.thread != null)
472                        pred = q;
473                    else if (pred != null) {
474                        pred.next = s;
475                        if (pred.thread == null) // check for race
476                            continue retry;
477                    }
478                    else if (!U.compareAndSwapObject(this, WAITERS, q, s))
479                        continue retry;
480                }
481                break;
482            }
483        }
484    }
485
486    // Unsafe mechanics
487    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
488    private static final long STATE;
489    private static final long RUNNER;
490    private static final long WAITERS;
491    static {
492        try {
493            STATE = U.objectFieldOffset
494                (FutureTask.class.getDeclaredField("state"));
495            RUNNER = U.objectFieldOffset
496                (FutureTask.class.getDeclaredField("runner"));
497            WAITERS = U.objectFieldOffset
498                (FutureTask.class.getDeclaredField("waiters"));
499        } catch (ReflectiveOperationException e) {
500            throw new Error(e);
501        }
502
503        // Reduce the risk of rare disastrous classloading in first call to
504        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
505        Class<?> ensureLoaded = LockSupport.class;
506    }
507
508}
509