FutureTask.java revision 51b1b6997fd3f980076b8081f7f1165ccc2a4008
1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37import java.util.concurrent.locks.LockSupport;
38
39/**
40 * A cancellable asynchronous computation.  This class provides a base
41 * implementation of {@link Future}, with methods to start and cancel
42 * a computation, query to see if the computation is complete, and
43 * retrieve the result of the computation.  The result can only be
44 * retrieved when the computation has completed; the {@code get}
45 * methods will block if the computation has not yet completed.  Once
46 * the computation has completed, the computation cannot be restarted
47 * or cancelled (unless the computation is invoked using
48 * {@link #runAndReset}).
49 *
50 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
51 * {@link Runnable} object.  Because {@code FutureTask} implements
52 * {@code Runnable}, a {@code FutureTask} can be submitted to an
53 * {@link Executor} for execution.
54 *
55 * <p>In addition to serving as a standalone class, this class provides
56 * {@code protected} functionality that may be useful when creating
57 * customized task classes.
58 *
59 * @since 1.5
60 * @author Doug Lea
61 * @param <V> The result type returned by this FutureTask's {@code get} methods
62 */
63public class FutureTask<V> implements RunnableFuture<V> {
64    /*
65     * Revision notes: This differs from previous versions of this
66     * class that relied on AbstractQueuedSynchronizer, mainly to
67     * avoid surprising users about retaining interrupt status during
68     * cancellation races. Sync control in the current design relies
69     * on a "state" field updated via CAS to track completion, along
70     * with a simple Treiber stack to hold waiting threads.
71     *
72     * Style note: As usual, we bypass overhead of using
73     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
74     */
75
76    /**
77     * The run state of this task, initially NEW.  The run state
78     * transitions to a terminal state only in methods set,
79     * setException, and cancel.  During completion, state may take on
80     * transient values of COMPLETING (while outcome is being set) or
81     * INTERRUPTING (only while interrupting the runner to satisfy a
82     * cancel(true)). Transitions from these intermediate to final
83     * states use cheaper ordered/lazy writes because values are unique
84     * and cannot be further modified.
85     *
86     * Possible state transitions:
87     * NEW -> COMPLETING -> NORMAL
88     * NEW -> COMPLETING -> EXCEPTIONAL
89     * NEW -> CANCELLED
90     * NEW -> INTERRUPTING -> INTERRUPTED
91     */
92    private volatile int state;
93    private static final int NEW          = 0;
94    private static final int COMPLETING   = 1;
95    private static final int NORMAL       = 2;
96    private static final int EXCEPTIONAL  = 3;
97    private static final int CANCELLED    = 4;
98    private static final int INTERRUPTING = 5;
99    private static final int INTERRUPTED  = 6;
100
101    /** The underlying callable; nulled out after running */
102    private Callable<V> callable;
103    /** The result to return or exception to throw from get() */
104    private Object outcome; // non-volatile, protected by state reads/writes
105    /** The thread running the callable; CASed during run() */
106    private volatile Thread runner;
107    /** Treiber stack of waiting threads */
108    private volatile WaitNode waiters;
109
110    /**
111     * Returns result or throws exception for completed task.
112     *
113     * @param s completed state value
114     */
115    @SuppressWarnings("unchecked")
116    private V report(int s) throws ExecutionException {
117        Object x = outcome;
118        if (s == NORMAL)
119            return (V)x;
120        if (s >= CANCELLED)
121            throw new CancellationException();
122        throw new ExecutionException((Throwable)x);
123    }
124
125    /**
126     * Creates a {@code FutureTask} that will, upon running, execute the
127     * given {@code Callable}.
128     *
129     * @param  callable the callable task
130     * @throws NullPointerException if the callable is null
131     */
132    public FutureTask(Callable<V> callable) {
133        if (callable == null)
134            throw new NullPointerException();
135        this.callable = callable;
136        this.state = NEW;       // ensure visibility of callable
137    }
138
139    /**
140     * Creates a {@code FutureTask} that will, upon running, execute the
141     * given {@code Runnable}, and arrange that {@code get} will return the
142     * given result on successful completion.
143     *
144     * @param runnable the runnable task
145     * @param result the result to return on successful completion. If
146     * you don't need a particular result, consider using
147     * constructions of the form:
148     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
149     * @throws NullPointerException if the runnable is null
150     */
151    public FutureTask(Runnable runnable, V result) {
152        this.callable = Executors.callable(runnable, result);
153        this.state = NEW;       // ensure visibility of callable
154    }
155
156    public boolean isCancelled() {
157        return state >= CANCELLED;
158    }
159
160    public boolean isDone() {
161        return state != NEW;
162    }
163
164    public boolean cancel(boolean mayInterruptIfRunning) {
165        if (state != NEW)
166            return false;
167        if (mayInterruptIfRunning) {
168            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
169                return false;
170            Thread t = runner;
171            if (t != null)
172                t.interrupt();
173            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
174        }
175        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
176            return false;
177        finishCompletion();
178        return true;
179    }
180
181    /**
182     * @throws CancellationException {@inheritDoc}
183     */
184    public V get() throws InterruptedException, ExecutionException {
185        int s = state;
186        if (s <= COMPLETING)
187            s = awaitDone(false, 0L);
188        return report(s);
189    }
190
191    /**
192     * @throws CancellationException {@inheritDoc}
193     */
194    public V get(long timeout, TimeUnit unit)
195        throws InterruptedException, ExecutionException, TimeoutException {
196        if (unit == null)
197            throw new NullPointerException();
198        int s = state;
199        if (s <= COMPLETING &&
200            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
201            throw new TimeoutException();
202        return report(s);
203    }
204
205    /**
206     * Protected method invoked when this task transitions to state
207     * {@code isDone} (whether normally or via cancellation). The
208     * default implementation does nothing.  Subclasses may override
209     * this method to invoke completion callbacks or perform
210     * bookkeeping. Note that you can query status inside the
211     * implementation of this method to determine whether this task
212     * has been cancelled.
213     */
214    protected void done() { }
215
216    /**
217     * Sets the result of this future to the given value unless
218     * this future has already been set or has been cancelled.
219     *
220     * <p>This method is invoked internally by the {@link #run} method
221     * upon successful completion of the computation.
222     *
223     * @param v the value
224     */
225    protected void set(V v) {
226        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
227            outcome = v;
228            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
229            finishCompletion();
230        }
231    }
232
233    /**
234     * Causes this future to report an {@link ExecutionException}
235     * with the given throwable as its cause, unless this future has
236     * already been set or has been cancelled.
237     *
238     * <p>This method is invoked internally by the {@link #run} method
239     * upon failure of the computation.
240     *
241     * @param t the cause of failure
242     */
243    protected void setException(Throwable t) {
244        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
245            outcome = t;
246            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
247            finishCompletion();
248        }
249    }
250
251    public void run() {
252        if (state != NEW ||
253            !UNSAFE.compareAndSwapObject(this, runnerOffset,
254                                         null, Thread.currentThread()))
255            return;
256        try {
257            Callable<V> c = callable;
258            if (c != null && state == NEW) {
259                V result;
260                boolean ran;
261                try {
262                    result = c.call();
263                    ran = true;
264                } catch (Throwable ex) {
265                    result = null;
266                    ran = false;
267                    setException(ex);
268                }
269                if (ran)
270                    set(result);
271            }
272        } finally {
273            // runner must be non-null until state is settled to
274            // prevent concurrent calls to run()
275            runner = null;
276            // state must be re-read after nulling runner to prevent
277            // leaked interrupts
278            int s = state;
279            if (s >= INTERRUPTING)
280                handlePossibleCancellationInterrupt(s);
281        }
282    }
283
284    /**
285     * Executes the computation without setting its result, and then
286     * resets this future to initial state, failing to do so if the
287     * computation encounters an exception or is cancelled.  This is
288     * designed for use with tasks that intrinsically execute more
289     * than once.
290     *
291     * @return true if successfully run and reset
292     */
293    protected boolean runAndReset() {
294        if (state != NEW ||
295            !UNSAFE.compareAndSwapObject(this, runnerOffset,
296                                         null, Thread.currentThread()))
297            return false;
298        boolean ran = false;
299        int s = state;
300        try {
301            Callable<V> c = callable;
302            if (c != null && s == NEW) {
303                try {
304                    c.call(); // don't set result
305                    ran = true;
306                } catch (Throwable ex) {
307                    setException(ex);
308                }
309            }
310        } finally {
311            // runner must be non-null until state is settled to
312            // prevent concurrent calls to run()
313            runner = null;
314            // state must be re-read after nulling runner to prevent
315            // leaked interrupts
316            s = state;
317            if (s >= INTERRUPTING)
318                handlePossibleCancellationInterrupt(s);
319        }
320        return ran && s == NEW;
321    }
322
323    /**
324     * Ensures that any interrupt from a possible cancel(true) is only
325     * delivered to a task while in run or runAndReset.
326     */
327    private void handlePossibleCancellationInterrupt(int s) {
328        // It is possible for our interrupter to stall before getting a
329        // chance to interrupt us.  Let's spin-wait patiently.
330        if (s == INTERRUPTING)
331            while (state == INTERRUPTING)
332                Thread.yield(); // wait out pending interrupt
333
334        // assert state == INTERRUPTED;
335
336        // We want to clear any interrupt we may have received from
337        // cancel(true).  However, it is permissible to use interrupts
338        // as an independent mechanism for a task to communicate with
339        // its caller, and there is no way to clear only the
340        // cancellation interrupt.
341        //
342        // Thread.interrupted();
343    }
344
345    /**
346     * Simple linked list nodes to record waiting threads in a Treiber
347     * stack.  See other classes such as Phaser and SynchronousQueue
348     * for more detailed explanation.
349     */
350    static final class WaitNode {
351        volatile Thread thread;
352        volatile WaitNode next;
353        WaitNode() { thread = Thread.currentThread(); }
354    }
355
356    /**
357     * Removes and signals all waiting threads, invokes done(), and
358     * nulls out callable.
359     */
360    private void finishCompletion() {
361        // assert state > COMPLETING;
362        for (WaitNode q; (q = waiters) != null;) {
363            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
364                for (;;) {
365                    Thread t = q.thread;
366                    if (t != null) {
367                        q.thread = null;
368                        LockSupport.unpark(t);
369                    }
370                    WaitNode next = q.next;
371                    if (next == null)
372                        break;
373                    q.next = null; // unlink to help gc
374                    q = next;
375                }
376                break;
377            }
378        }
379
380        done();
381
382        callable = null;        // to reduce footprint
383    }
384
385    /**
386     * Awaits completion or aborts on interrupt or timeout.
387     *
388     * @param timed true if use timed waits
389     * @param nanos time to wait, if timed
390     * @return state upon completion
391     */
392    private int awaitDone(boolean timed, long nanos)
393        throws InterruptedException {
394        final long deadline = timed ? System.nanoTime() + nanos : 0L;
395        WaitNode q = null;
396        boolean queued = false;
397        for (;;) {
398            if (Thread.interrupted()) {
399                removeWaiter(q);
400                throw new InterruptedException();
401            }
402
403            int s = state;
404            if (s > COMPLETING) {
405                if (q != null)
406                    q.thread = null;
407                return s;
408            }
409            else if (s == COMPLETING) // cannot time out yet
410                Thread.yield();
411            else if (q == null)
412                q = new WaitNode();
413            else if (!queued)
414                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
415                                                     q.next = waiters, q);
416            else if (timed) {
417                nanos = deadline - System.nanoTime();
418                if (nanos <= 0L) {
419                    removeWaiter(q);
420                    return state;
421                }
422                LockSupport.parkNanos(this, nanos);
423            }
424            else
425                LockSupport.park(this);
426        }
427    }
428
429    /**
430     * Tries to unlink a timed-out or interrupted wait node to avoid
431     * accumulating garbage.  Internal nodes are simply unspliced
432     * without CAS since it is harmless if they are traversed anyway
433     * by releasers.  To avoid effects of unsplicing from already
434     * removed nodes, the list is retraversed in case of an apparent
435     * race.  This is slow when there are a lot of nodes, but we don't
436     * expect lists to be long enough to outweigh higher-overhead
437     * schemes.
438     */
439    private void removeWaiter(WaitNode node) {
440        if (node != null) {
441            node.thread = null;
442            retry:
443            for (;;) {          // restart on removeWaiter race
444                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
445                    s = q.next;
446                    if (q.thread != null)
447                        pred = q;
448                    else if (pred != null) {
449                        pred.next = s;
450                        if (pred.thread == null) // check for race
451                            continue retry;
452                    }
453                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
454                                                          q, s))
455                        continue retry;
456                }
457                break;
458            }
459        }
460    }
461
462    // Unsafe mechanics
463    private static final sun.misc.Unsafe UNSAFE;
464    private static final long stateOffset;
465    private static final long runnerOffset;
466    private static final long waitersOffset;
467    static {
468        try {
469            UNSAFE = sun.misc.Unsafe.getUnsafe();
470            Class<?> k = FutureTask.class;
471            stateOffset = UNSAFE.objectFieldOffset
472                (k.getDeclaredField("state"));
473            runnerOffset = UNSAFE.objectFieldOffset
474                (k.getDeclaredField("runner"));
475            waitersOffset = UNSAFE.objectFieldOffset
476                (k.getDeclaredField("waiters"));
477        } catch (Exception e) {
478            throw new Error(e);
479        }
480    }
481
482}
483