1/*
2 * Copyright (C) 2007 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20
21import java.util.concurrent.CancellationException;
22import java.util.concurrent.ExecutionException;
23import java.util.concurrent.Executor;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.TimeoutException;
26import java.util.concurrent.locks.AbstractQueuedSynchronizer;
27
28import javax.annotation.Nullable;
29
30/**
31 * An abstract implementation of the {@link ListenableFuture} interface. This
32 * class is preferable to {@link java.util.concurrent.FutureTask} for two
33 * reasons: It implements {@code ListenableFuture}, and it does not implement
34 * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
35 * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
36 * tasks to a {@link ListeningExecutorService}.)
37 *
38 * <p>This class implements all methods in {@code ListenableFuture}.
39 * Subclasses should provide a way to set the result of the computation through
40 * the protected methods {@link #set(Object)} and
41 * {@link #setException(Throwable)}. Subclasses may also override {@link
42 * #interruptTask()}, which will be invoked automatically if a call to {@link
43 * #cancel(boolean) cancel(true)} succeeds in canceling the future.
44 *
45 * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
46 * with concurrency issues and guarantee thread safety.
47 *
48 * <p>The state changing methods all return a boolean indicating success or
49 * failure in changing the future's state.  Valid states are running,
50 * completed, failed, or cancelled.
51 *
52 * <p>This class uses an {@link ExecutionList} to guarantee that all registered
53 * listeners will be executed, either when the future finishes or, for listeners
54 * that are added after the future completes, immediately.
55 * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
56 * are not necessarily executed in the order in which they were added.  (If a
57 * listener is added after the Future is complete, it will be executed
58 * immediately, even if earlier listeners have not been executed. Additionally,
59 * executors need not guarantee FIFO execution, or different listeners may run
60 * in different executors.)
61 *
62 * @author Sven Mawson
63 * @since 1.0
64 */
65public abstract class AbstractFuture<V> implements ListenableFuture<V> {
66
67  /** Synchronization control for AbstractFutures. */
68  private final Sync<V> sync = new Sync<V>();
69
70  // The execution list to hold our executors.
71  private final ExecutionList executionList = new ExecutionList();
72
73  /*
74   * Improve the documentation of when InterruptedException is thrown. Our
75   * behavior matches the JDK's, but the JDK's documentation is misleading.
76   */
77  /**
78   * {@inheritDoc}
79   *
80   * <p>The default {@link AbstractFuture} implementation throws {@code
81   * InterruptedException} if the current thread is interrupted before or during
82   * the call, even if the value is already available.
83   *
84   * @throws InterruptedException if the current thread was interrupted before
85   *     or during the call (optional but recommended).
86   * @throws CancellationException {@inheritDoc}
87   */
88  @Override
89  public V get(long timeout, TimeUnit unit) throws InterruptedException,
90      TimeoutException, ExecutionException {
91    return sync.get(unit.toNanos(timeout));
92  }
93
94  /*
95   * Improve the documentation of when InterruptedException is thrown. Our
96   * behavior matches the JDK's, but the JDK's documentation is misleading.
97   */
98  /**
99   * {@inheritDoc}
100   *
101   * <p>The default {@link AbstractFuture} implementation throws {@code
102   * InterruptedException} if the current thread is interrupted before or during
103   * the call, even if the value is already available.
104   *
105   * @throws InterruptedException if the current thread was interrupted before
106   *     or during the call (optional but recommended).
107   * @throws CancellationException {@inheritDoc}
108   */
109  @Override
110  public V get() throws InterruptedException, ExecutionException {
111    return sync.get();
112  }
113
114  @Override
115  public boolean isDone() {
116    return sync.isDone();
117  }
118
119  @Override
120  public boolean isCancelled() {
121    return sync.isCancelled();
122  }
123
124  @Override
125  public boolean cancel(boolean mayInterruptIfRunning) {
126    if (!sync.cancel()) {
127      return false;
128    }
129    executionList.execute();
130    if (mayInterruptIfRunning) {
131      interruptTask();
132    }
133    return true;
134  }
135
136  /**
137   * Subclasses can override this method to implement interruption of the
138   * future's computation. The method is invoked automatically by a successful
139   * call to {@link #cancel(boolean) cancel(true)}.
140   *
141   * <p>The default implementation does nothing.
142   *
143   * @since 10.0
144   */
145  protected void interruptTask() {
146  }
147
148  /**
149   * {@inheritDoc}
150   *
151   * @since 10.0
152   */
153  @Override
154  public void addListener(Runnable listener, Executor exec) {
155    executionList.add(listener, exec);
156  }
157
158  /**
159   * Subclasses should invoke this method to set the result of the computation
160   * to {@code value}.  This will set the state of the future to
161   * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
162   * state was successfully changed.
163   *
164   * @param value the value that was the result of the task.
165   * @return true if the state was successfully changed.
166   */
167  protected boolean set(@Nullable V value) {
168    boolean result = sync.set(value);
169    if (result) {
170      executionList.execute();
171    }
172    return result;
173  }
174
175  /**
176   * Subclasses should invoke this method to set the result of the computation
177   * to an error, {@code throwable}.  This will set the state of the future to
178   * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
179   * state was successfully changed.
180   *
181   * @param throwable the exception that the task failed with.
182   * @return true if the state was successfully changed.
183   * @throws Error if the throwable was an {@link Error}.
184   */
185  protected boolean setException(Throwable throwable) {
186    boolean result = sync.setException(checkNotNull(throwable));
187    if (result) {
188      executionList.execute();
189    }
190
191    // If it's an Error, we want to make sure it reaches the top of the
192    // call stack, so we rethrow it.
193    if (throwable instanceof Error) {
194      throw (Error) throwable;
195    }
196    return result;
197  }
198
199  /**
200   * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
201   * private subclass to hold the synchronizer.  This synchronizer is used to
202   * implement the blocking and waiting calls as well as to handle state changes
203   * in a thread-safe manner.  The current state of the future is held in the
204   * Sync state, and the lock is released whenever the state changes to either
205   * {@link #COMPLETED} or {@link #CANCELLED}.
206   *
207   * <p>To avoid races between threads doing release and acquire, we transition
208   * to the final state in two steps.  One thread will successfully CAS from
209   * RUNNING to COMPLETING, that thread will then set the result of the
210   * computation, and only then transition to COMPLETED or CANCELLED.
211   *
212   * <p>We don't use the integer argument passed between acquire methods so we
213   * pass around a -1 everywhere.
214   */
215  static final class Sync<V> extends AbstractQueuedSynchronizer {
216
217    private static final long serialVersionUID = 0L;
218
219    /* Valid states. */
220    static final int RUNNING = 0;
221    static final int COMPLETING = 1;
222    static final int COMPLETED = 2;
223    static final int CANCELLED = 4;
224
225    private V value;
226    private Throwable exception;
227
228    /*
229     * Acquisition succeeds if the future is done, otherwise it fails.
230     */
231    @Override
232    protected int tryAcquireShared(int ignored) {
233      if (isDone()) {
234        return 1;
235      }
236      return -1;
237    }
238
239    /*
240     * We always allow a release to go through, this means the state has been
241     * successfully changed and the result is available.
242     */
243    @Override
244    protected boolean tryReleaseShared(int finalState) {
245      setState(finalState);
246      return true;
247    }
248
249    /**
250     * Blocks until the task is complete or the timeout expires.  Throws a
251     * {@link TimeoutException} if the timer expires, otherwise behaves like
252     * {@link #get()}.
253     */
254    V get(long nanos) throws TimeoutException, CancellationException,
255        ExecutionException, InterruptedException {
256
257      // Attempt to acquire the shared lock with a timeout.
258      if (!tryAcquireSharedNanos(-1, nanos)) {
259        throw new TimeoutException("Timeout waiting for task.");
260      }
261
262      return getValue();
263    }
264
265    /**
266     * Blocks until {@link #complete(Object, Throwable, int)} has been
267     * successfully called.  Throws a {@link CancellationException} if the task
268     * was cancelled, or a {@link ExecutionException} if the task completed with
269     * an error.
270     */
271    V get() throws CancellationException, ExecutionException,
272        InterruptedException {
273
274      // Acquire the shared lock allowing interruption.
275      acquireSharedInterruptibly(-1);
276      return getValue();
277    }
278
279    /**
280     * Implementation of the actual value retrieval.  Will return the value
281     * on success, an exception on failure, a cancellation on cancellation, or
282     * an illegal state if the synchronizer is in an invalid state.
283     */
284    private V getValue() throws CancellationException, ExecutionException {
285      int state = getState();
286      switch (state) {
287        case COMPLETED:
288          if (exception != null) {
289            throw new ExecutionException(exception);
290          } else {
291            return value;
292          }
293
294        case CANCELLED:
295          throw new CancellationException("Task was cancelled.");
296
297        default:
298          throw new IllegalStateException(
299              "Error, synchronizer in invalid state: " + state);
300      }
301    }
302
303    /**
304     * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
305     */
306    boolean isDone() {
307      return (getState() & (COMPLETED | CANCELLED)) != 0;
308    }
309
310    /**
311     * Checks if the state is {@link #CANCELLED}.
312     */
313    boolean isCancelled() {
314      return getState() == CANCELLED;
315    }
316
317    /**
318     * Transition to the COMPLETED state and set the value.
319     */
320    boolean set(@Nullable V v) {
321      return complete(v, null, COMPLETED);
322    }
323
324    /**
325     * Transition to the COMPLETED state and set the exception.
326     */
327    boolean setException(Throwable t) {
328      return complete(null, t, COMPLETED);
329    }
330
331    /**
332     * Transition to the CANCELLED state.
333     */
334    boolean cancel() {
335      return complete(null, null, CANCELLED);
336    }
337
338    /**
339     * Implementation of completing a task.  Either {@code v} or {@code t} will
340     * be set but not both.  The {@code finalState} is the state to change to
341     * from {@link #RUNNING}.  If the state is not in the RUNNING state we
342     * return {@code false} after waiting for the state to be set to a valid
343     * final state ({@link #COMPLETED} or {@link #CANCELLED}).
344     *
345     * @param v the value to set as the result of the computation.
346     * @param t the exception to set as the result of the computation.
347     * @param finalState the state to transition to.
348     */
349    private boolean complete(@Nullable V v, @Nullable Throwable t,
350        int finalState) {
351      boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
352      if (doCompletion) {
353        // If this thread successfully transitioned to COMPLETING, set the value
354        // and exception and then release to the final state.
355        this.value = v;
356        this.exception = t;
357        releaseShared(finalState);
358      } else if (getState() == COMPLETING) {
359        // If some other thread is currently completing the future, block until
360        // they are done so we can guarantee completion.
361        acquireShared(-1);
362      }
363      return doCompletion;
364    }
365  }
366}
367