1/*
2 * Copyright (C) 2006 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.base.Preconditions.checkArgument;
20import static com.google.common.base.Preconditions.checkNotNull;
21import static com.google.common.base.Preconditions.checkState;
22import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
23import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
24import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
25import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
26import static java.lang.Thread.currentThread;
27import static java.util.Arrays.asList;
28import static java.util.concurrent.TimeUnit.NANOSECONDS;
29
30import com.google.common.annotations.Beta;
31import com.google.common.base.Function;
32import com.google.common.base.Preconditions;
33import com.google.common.collect.ImmutableList;
34import com.google.common.collect.Lists;
35import com.google.common.collect.Ordering;
36
37import java.lang.reflect.Constructor;
38import java.lang.reflect.InvocationTargetException;
39import java.lang.reflect.UndeclaredThrowableException;
40import java.util.Arrays;
41import java.util.List;
42import java.util.concurrent.BlockingQueue;
43import java.util.concurrent.CancellationException;
44import java.util.concurrent.CountDownLatch;
45import java.util.concurrent.ExecutionException;
46import java.util.concurrent.Executor;
47import java.util.concurrent.Future;
48import java.util.concurrent.LinkedBlockingQueue;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51import java.util.concurrent.atomic.AtomicInteger;
52
53import javax.annotation.Nullable;
54
55/**
56 * Static utility methods pertaining to the {@link Future} interface.
57 *
58 * @author Kevin Bourrillion
59 * @author Nishant Thakkar
60 * @author Sven Mawson
61 * @since 1.0
62 */
63@Beta
64public final class Futures {
65  private Futures() {}
66
67  /**
68   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
69   * and a {@link Function} that maps from {@link Exception} instances into the
70   * appropriate checked type.
71   *
72   * <p>The given mapping function will be applied to an
73   * {@link InterruptedException}, a {@link CancellationException}, or an
74   * {@link ExecutionException} with the actual cause of the exception.
75   * See {@link Future#get()} for details on the exceptions thrown.
76   *
77   * @since 9.0 (source-compatible since 1.0)
78   */
79  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
80      ListenableFuture<V> future, Function<Exception, X> mapper) {
81    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
82  }
83
84  /**
85   * Creates a {@code ListenableFuture} which has its value set immediately upon
86   * construction. The getters just return the value. This {@code Future} can't
87   * be canceled or timed out and its {@code isDone()} method always returns
88   * {@code true}.
89   */
90  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
91    SettableFuture<V> future = SettableFuture.create();
92    future.set(value);
93    return future;
94  }
95
96  /**
97   * Returns a {@code CheckedFuture} which has its value set immediately upon
98   * construction.
99   *
100   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
101   * method always returns {@code true}. Calling {@code get()} or {@code
102   * checkedGet()} will immediately return the provided value.
103   */
104  public static <V, X extends Exception> CheckedFuture<V, X>
105      immediateCheckedFuture(@Nullable V value) {
106    SettableFuture<V> future = SettableFuture.create();
107    future.set(value);
108    return Futures.makeChecked(future, new Function<Exception, X>() {
109      @Override
110      public X apply(Exception e) {
111        throw new AssertionError("impossible");
112      }
113    });
114  }
115
116  /**
117   * Returns a {@code ListenableFuture} which has an exception set immediately
118   * upon construction.
119   *
120   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
121   * method always returns {@code true}. Calling {@code get()} will immediately
122   * throw the provided {@code Throwable} wrapped in an {@code
123   * ExecutionException}.
124   *
125   * @throws Error if the throwable is an {@link Error}.
126   */
127  public static <V> ListenableFuture<V> immediateFailedFuture(
128      Throwable throwable) {
129    checkNotNull(throwable);
130    SettableFuture<V> future = SettableFuture.create();
131    future.setException(throwable);
132    return future;
133  }
134
135  /**
136   * Returns a {@code CheckedFuture} which has an exception set immediately upon
137   * construction.
138   *
139   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
140   * method always returns {@code true}. Calling {@code get()} will immediately
141   * throw the provided {@code Throwable} wrapped in an {@code
142   * ExecutionException}, and calling {@code checkedGet()} will throw the
143   * provided exception itself.
144   *
145   * @throws Error if the throwable is an {@link Error}.
146   */
147  public static <V, X extends Exception> CheckedFuture<V, X>
148      immediateFailedCheckedFuture(final X exception) {
149    checkNotNull(exception);
150    return makeChecked(Futures.<V>immediateFailedFuture(exception),
151        new Function<Exception, X>() {
152          @Override
153          public X apply(Exception e) {
154            return exception;
155          }
156        });
157  }
158
159  /**
160   * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
161   * derived from the result of the given {@code Future}. More precisely, the
162   * returned {@code Future} takes its result from a {@code Future} produced by
163   * applying the given {@code Function} to the result of the original {@code
164   * Future}. Example:
165   *
166   * <pre>   {@code
167   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
168   *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
169   *       new Function<RowKey, ListenableFuture<QueryResult>>() {
170   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
171   *           return dataService.read(rowKey);
172   *         }
173   *       };
174   *   ListenableFuture<QueryResult> queryFuture =
175   *       chain(rowKeyFuture, queryFunction);
176   * }</pre>
177   *
178   * <p>Note: This overload of {@code chain} is designed for cases in which the
179   * work of creating the derived future is fast and lightweight, as the method
180   * does not accept an {@code Executor} in which to perform the the work. For
181   * heavier derivations, this overload carries some caveats: First, the thread
182   * that the derivation runs in depends on whether the input {@code Future} is
183   * done at the time {@code chain} is called. In particular, if called late,
184   * {@code chain} will run the derivation in the thread that called {@code
185   * chain}.  Second, derivations may run in an internal thread of the system
186   * responsible for the input {@code Future}, such as an RPC network thread.
187   * Finally, during the execution of a {@code sameThreadExecutor} {@code
188   * chain} function, all other registered but unexecuted listeners are
189   * prevented from running, even if those listeners are to run in other
190   * executors.
191   *
192   * <p>The returned {@code Future} attempts to keep its cancellation state in
193   * sync with that of the input future and that of the future returned by the
194   * chain function. That is, if the returned {@code Future} is cancelled, it
195   * will attempt to cancel the other two, and if either of the other two is
196   * cancelled, the returned {@code Future} will receive a callback in which it
197   * will attempt to cancel itself.
198   *
199   * @param input The future to chain
200   * @param function A function to chain the results of the provided future
201   *     to the results of the returned future.  This will be run in the thread
202   *     that notifies input it is complete.
203   * @return A future that holds result of the chain.
204   * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
205   *     use {@link #transform(ListenableFuture, AsyncFunction)}. This method is
206   *     scheduled to be removed from Guava in Guava release 12.0.
207   */
208  @Deprecated
209  public static <I, O> ListenableFuture<O> chain(
210      ListenableFuture<I> input,
211      Function<? super I, ? extends ListenableFuture<? extends O>> function) {
212    return chain(input, function, MoreExecutors.sameThreadExecutor());
213  }
214
215  /**
216   * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
217   * derived from the result of the given {@code Future}. More precisely, the
218   * returned {@code Future} takes its result from a {@code Future} produced by
219   * applying the given {@code Function} to the result of the original {@code
220   * Future}. Example:
221   *
222   * <pre>   {@code
223   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
224   *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
225   *       new Function<RowKey, ListenableFuture<QueryResult>>() {
226   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
227   *           return dataService.read(rowKey);
228   *         }
229   *       };
230   *   ListenableFuture<QueryResult> queryFuture =
231   *       chain(rowKeyFuture, queryFunction, executor);
232   * }</pre>
233   *
234   * <p>The returned {@code Future} attempts to keep its cancellation state in
235   * sync with that of the input future and that of the future returned by the
236   * chain function. That is, if the returned {@code Future} is cancelled, it
237   * will attempt to cancel the other two, and if either of the other two is
238   * cancelled, the returned {@code Future} will receive a callback in which it
239   * will attempt to cancel itself.
240   *
241   * <p>Note: For cases in which the work of creating the derived future is
242   * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture,
243   * Function) the other overload} or explicit use of {@code
244   * sameThreadExecutor}. For heavier derivations, this choice carries some
245   * caveats: First, the thread that the derivation runs in depends on whether
246   * the input {@code Future} is done at the time {@code chain} is called. In
247   * particular, if called late, {@code chain} will run the derivation in the
248   * thread that called {@code chain}. Second, derivations may run in an
249   * internal thread of the system responsible for the input {@code Future},
250   * such as an RPC network thread. Finally, during the execution of a {@code
251   * sameThreadExecutor} {@code chain} function, all other registered but
252   * unexecuted listeners are prevented from running, even if those listeners
253   * are to run in other executors.
254   *
255   * @param input The future to chain
256   * @param function A function to chain the results of the provided future
257   *     to the results of the returned future.
258   * @param executor Executor to run the function in.
259   * @return A future that holds result of the chain.
260   * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
261   *     use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This
262   *     method is scheduled to be removed from Guava in Guava release 12.0.
263   */
264  @Deprecated
265  public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
266      final Function<? super I, ? extends ListenableFuture<? extends O>>
267          function,
268      Executor executor) {
269    checkNotNull(function);
270    ChainingListenableFuture<I, O> chain =
271        new ChainingListenableFuture<I, O>(new AsyncFunction<I, O>() {
272          @Override
273          /*
274           * All methods of ListenableFuture are covariant, and we don't expose
275           * the object anywhere that would allow it to be downcast.
276           */
277          @SuppressWarnings("unchecked")
278          public ListenableFuture<O> apply(I input) {
279            return (ListenableFuture) function.apply(input);
280          }
281        }, input);
282    input.addListener(chain, executor);
283    return chain;
284  }
285
286  /**
287   * Returns a new {@code ListenableFuture} whose result is asynchronously
288   * derived from the result of the given {@code Future}. More precisely, the
289   * returned {@code Future} takes its result from a {@code Future} produced by
290   * applying the given {@code AsyncFunction} to the result of the original
291   * {@code Future}. Example:
292   *
293   * <pre>   {@code
294   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
295   *   AsyncFunction<RowKey, QueryResult> queryFunction =
296   *       new AsyncFunction<RowKey, QueryResult>() {
297   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
298   *           return dataService.read(rowKey);
299   *         }
300   *       };
301   *   ListenableFuture<QueryResult> queryFuture =
302   *       transform(rowKeyFuture, queryFunction);
303   * }</pre>
304   *
305   * <p>Note: This overload of {@code transform} is designed for cases in which
306   * the work of creating the derived {@code Future} is fast and lightweight,
307   * as the method does not accept an {@code Executor} in which to perform the
308   * the work. (The created {@code Future} itself need not complete quickly.)
309   * For heavier operations, this overload carries some caveats: First, the
310   * thread that {@code function.apply} runs in depends on whether the input
311   * {@code Future} is done at the time {@code transform} is called. In
312   * particular, if called late, {@code transform} will run the operation in
313   * the thread that called {@code transform}.  Second, {@code function.apply}
314   * may run in an internal thread of the system responsible for the input
315   * {@code Future}, such as an RPC network thread.  Finally, during the
316   * execution of a {@code sameThreadExecutor} {@code function.apply}, all
317   * other registered but unexecuted listeners are prevented from running, even
318   * if those listeners are to run in other executors.
319   *
320   * <p>The returned {@code Future} attempts to keep its cancellation state in
321   * sync with that of the input future and that of the future returned by the
322   * function. That is, if the returned {@code Future} is cancelled, it will
323   * attempt to cancel the other two, and if either of the other two is
324   * cancelled, the returned {@code Future} will receive a callback in which it
325   * will attempt to cancel itself.
326   *
327   * @param input The future to transform
328   * @param function A function to transform the result of the input future
329   *     to the result of the output future
330   * @return A future that holds result of the function (if the input succeeded)
331   *     or the original input's failure (if not)
332   * @since 11.0
333   */
334  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
335      AsyncFunction<? super I, ? extends O> function) {
336    return transform(input, function, MoreExecutors.sameThreadExecutor());
337  }
338
339  /**
340   * Returns a new {@code ListenableFuture} whose result is asynchronously
341   * derived from the result of the given {@code Future}. More precisely, the
342   * returned {@code Future} takes its result from a {@code Future} produced by
343   * applying the given {@code AsyncFunction} to the result of the original
344   * {@code Future}. Example:
345   *
346   * <pre>   {@code
347   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
348   *   AsyncFunction<RowKey, QueryResult> queryFunction =
349   *       new AsyncFunction<RowKey, QueryResult>() {
350   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
351   *           return dataService.read(rowKey);
352   *         }
353   *       };
354   *   ListenableFuture<QueryResult> queryFuture =
355   *       transform(rowKeyFuture, queryFunction, executor);
356   * }</pre>
357   *
358   * <p>The returned {@code Future} attempts to keep its cancellation state in
359   * sync with that of the input future and that of the future returned by the
360   * chain function. That is, if the returned {@code Future} is cancelled, it
361   * will attempt to cancel the other two, and if either of the other two is
362   * cancelled, the returned {@code Future} will receive a callback in which it
363   * will attempt to cancel itself.
364   *
365   * <p>Note: For cases in which the work of creating the derived future is
366   * fast and lightweight, consider {@linkplain
367   * Futures#transform(ListenableFuture, Function) the other overload} or
368   * explicit use of {@code sameThreadExecutor}. For heavier derivations, this
369   * choice carries some caveats: First, the thread that {@code function.apply}
370   * runs in depends on whether the input {@code Future} is done at the time
371   * {@code transform} is called. In particular, if called late, {@code
372   * transform} will run the operation in the thread that called {@code
373   * transform}.  Second, {@code function.apply} may run in an internal thread
374   * of the system responsible for the input {@code Future}, such as an RPC
375   * network thread.  Finally, during the execution of a {@code
376   * sameThreadExecutor} {@code function.apply}, all other registered but
377   * unexecuted listeners are prevented from running, even if those listeners
378   * are to run in other executors.
379   *
380   * @param input The future to transform
381   * @param function A function to transform the result of the input future
382   *     to the result of the output future
383   * @param executor Executor to run the function in.
384   * @return A future that holds result of the function (if the input succeeded)
385   *     or the original input's failure (if not)
386   * @since 11.0
387   */
388  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
389      AsyncFunction<? super I, ? extends O> function,
390      Executor executor) {
391    ChainingListenableFuture<I, O> output =
392        new ChainingListenableFuture<I, O>(function, input);
393    input.addListener(output, executor);
394    return output;
395  }
396
397  /**
398   * Returns a new {@code ListenableFuture} whose result is the product of
399   * applying the given {@code Function} to the result of the given {@code
400   * Future}. Example:
401   *
402   * <pre>   {@code
403   *   ListenableFuture<QueryResult> queryFuture = ...;
404   *   Function<QueryResult, List<Row>> rowsFunction =
405   *       new Function<QueryResult, List<Row>>() {
406   *         public List<Row> apply(QueryResult queryResult) {
407   *           return queryResult.getRows();
408   *         }
409   *       };
410   *   ListenableFuture<List<Row>> rowsFuture =
411   *       transform(queryFuture, rowsFunction);
412   * }</pre>
413   *
414   * <p>Note: This overload of {@code transform} is designed for cases in which
415   * the transformation is fast and lightweight, as the method does not accept
416   * an {@code Executor} in which to perform the the work. For heavier
417   * transformations, this overload carries some caveats: First, the thread
418   * that the transformation runs in depends on whether the input {@code
419   * Future} is done at the time {@code transform} is called. In particular, if
420   * called late, {@code transform} will perform the transformation in the
421   * thread that called {@code transform}. Second, transformations may run in
422   * an internal thread of the system responsible for the input {@code Future},
423   * such as an RPC network thread. Finally, during the execution of a {@code
424   * sameThreadExecutor} transformation, all other registered but unexecuted
425   * listeners are prevented from running, even if those listeners are to run
426   * in other executors.
427   *
428   * <p>The returned {@code Future} attempts to keep its cancellation state in
429   * sync with that of the input future. That is, if the returned {@code Future}
430   * is cancelled, it will attempt to cancel the input, and if the input is
431   * cancelled, the returned {@code Future} will receive a callback in which it
432   * will attempt to cancel itself.
433   *
434   * <p>An example use of this method is to convert a serializable object
435   * returned from an RPC into a POJO.
436   *
437   * @param future The future to transform
438   * @param function A Function to transform the results of the provided future
439   *     to the results of the returned future.  This will be run in the thread
440   *     that notifies input it is complete.
441   * @return A future that holds result of the transformation.
442   * @since 9.0 (in 1.0 as {@code compose})
443   */
444  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
445      final Function<? super I, ? extends O> function) {
446    return transform(future, function, MoreExecutors.sameThreadExecutor());
447  }
448
449  /**
450   * Returns a new {@code ListenableFuture} whose result is the product of
451   * applying the given {@code Function} to the result of the given {@code
452   * Future}. Example:
453   *
454   * <pre>   {@code
455   *   ListenableFuture<QueryResult> queryFuture = ...;
456   *   Function<QueryResult, List<Row>> rowsFunction =
457   *       new Function<QueryResult, List<Row>>() {
458   *         public List<Row> apply(QueryResult queryResult) {
459   *           return queryResult.getRows();
460   *         }
461   *       };
462   *   ListenableFuture<List<Row>> rowsFuture =
463   *       transform(queryFuture, rowsFunction, executor);
464   * }</pre>
465   *
466   * <p>The returned {@code Future} attempts to keep its cancellation state in
467   * sync with that of the input future. That is, if the returned {@code Future}
468   * is cancelled, it will attempt to cancel the input, and if the input is
469   * cancelled, the returned {@code Future} will receive a callback in which it
470   * will attempt to cancel itself.
471   *
472   * <p>An example use of this method is to convert a serializable object
473   * returned from an RPC into a POJO.
474   *
475   * <p>Note: For cases in which the transformation is fast and lightweight,
476   * consider {@linkplain Futures#transform(ListenableFuture, Function) the
477   * other overload} or explicit use of {@link
478   * MoreExecutors#sameThreadExecutor}. For heavier transformations, this
479   * choice carries some caveats: First, the thread that the transformation
480   * runs in depends on whether the input {@code Future} is done at the time
481   * {@code transform} is called. In particular, if called late, {@code
482   * transform} will perform the transformation in the thread that called
483   * {@code transform}.  Second, transformations may run in an internal thread
484   * of the system responsible for the input {@code Future}, such as an RPC
485   * network thread.  Finally, during the execution of a {@code
486   * sameThreadExecutor} transformation, all other registered but unexecuted
487   * listeners are prevented from running, even if those listeners are to run
488   * in other executors.
489   *
490   * @param future The future to transform
491   * @param function A Function to transform the results of the provided future
492   *     to the results of the returned future.
493   * @param executor Executor to run the function in.
494   * @return A future that holds result of the transformation.
495   * @since 9.0 (in 2.0 as {@code compose})
496   */
497  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
498      final Function<? super I, ? extends O> function, Executor executor) {
499    checkNotNull(function);
500    Function<I, ListenableFuture<O>> wrapperFunction
501        = new Function<I, ListenableFuture<O>>() {
502            @Override public ListenableFuture<O> apply(I input) {
503              O output = function.apply(input);
504              return immediateFuture(output);
505            }
506        };
507    return chain(future, wrapperFunction, executor);
508  }
509
510  /**
511   * Like {@link #transform(ListenableFuture, Function)} except that the
512   * transformation {@code function} is invoked on each call to
513   * {@link Future#get() get()} on the returned future.
514   *
515   * <p>The returned {@code Future} reflects the input's cancellation
516   * state directly, and any attempt to cancel the returned Future is likewise
517   * passed through to the input Future.
518   *
519   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
520   * only apply the timeout to the execution of the underlying {@code Future},
521   * <em>not</em> to the execution of the transformation function.
522   *
523   * <p>The primary audience of this method is callers of {@code transform}
524   * who don't have a {@code ListenableFuture} available and
525   * do not mind repeated, lazy function evaluation.
526   *
527   * @param future The future to transform
528   * @param function A Function to transform the results of the provided future
529   *     to the results of the returned future.
530   * @return A future that returns the result of the transformation.
531   * @since 10.0
532   */
533  @Beta
534  public static <I, O> Future<O> lazyTransform(final Future<I> future,
535      final Function<? super I, ? extends O> function) {
536    checkNotNull(future);
537    checkNotNull(function);
538    return new Future<O>() {
539
540      @Override
541      public boolean cancel(boolean mayInterruptIfRunning) {
542        return future.cancel(mayInterruptIfRunning);
543      }
544
545      @Override
546      public boolean isCancelled() {
547        return future.isCancelled();
548      }
549
550      @Override
551      public boolean isDone() {
552        return future.isDone();
553      }
554
555      @Override
556      public O get() throws InterruptedException, ExecutionException {
557        return applyTransformation(future.get());
558      }
559
560      @Override
561      public O get(long timeout, TimeUnit unit)
562          throws InterruptedException, ExecutionException, TimeoutException {
563        return applyTransformation(future.get(timeout, unit));
564      }
565
566      private O applyTransformation(I input) throws ExecutionException {
567        try {
568          return function.apply(input);
569        } catch (Throwable t) {
570          throw new ExecutionException(t);
571        }
572      }
573    };
574  }
575
576  /**
577   * An implementation of {@code ListenableFuture} that also implements
578   * {@code Runnable} so that it can be used to nest ListenableFutures.
579   * Once the passed-in {@code ListenableFuture} is complete, it calls the
580   * passed-in {@code Function} to generate the result.
581   *
582   * <p>If the function throws any checked exceptions, they should be wrapped
583   * in a {@code UndeclaredThrowableException} so that this class can get
584   * access to the cause.
585   */
586  private static class ChainingListenableFuture<I, O>
587      extends AbstractFuture<O> implements Runnable {
588
589    private AsyncFunction<? super I, ? extends O> function;
590    private ListenableFuture<? extends I> inputFuture;
591    private volatile ListenableFuture<? extends O> outputFuture;
592    private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
593        new LinkedBlockingQueue<Boolean>(1);
594    private final CountDownLatch outputCreated = new CountDownLatch(1);
595
596    private ChainingListenableFuture(
597        AsyncFunction<? super I, ? extends O> function,
598        ListenableFuture<? extends I> inputFuture) {
599      this.function = checkNotNull(function);
600      this.inputFuture = checkNotNull(inputFuture);
601    }
602
603    /**
604     * Delegate the get() to the input and output futures, in case
605     * their implementations defer starting computation until their
606     * own get() is invoked.
607     */
608    @Override
609    public O get() throws InterruptedException, ExecutionException {
610      if (!isDone()) {
611        // Invoking get on the inputFuture will ensure our own run()
612        // method below is invoked as a listener when inputFuture sets
613        // its value.  Therefore when get() returns we should then see
614        // the outputFuture be created.
615        ListenableFuture<? extends I> inputFuture = this.inputFuture;
616        if (inputFuture != null) {
617          inputFuture.get();
618        }
619
620        // If our listener was scheduled to run on an executor we may
621        // need to wait for our listener to finish running before the
622        // outputFuture has been constructed by the function.
623        outputCreated.await();
624
625        // Like above with the inputFuture, we have a listener on
626        // the outputFuture that will set our own value when its
627        // value is set.  Invoking get will ensure the output can
628        // complete and invoke our listener, so that we can later
629        // get the result.
630        ListenableFuture<? extends O> outputFuture = this.outputFuture;
631        if (outputFuture != null) {
632          outputFuture.get();
633        }
634      }
635      return super.get();
636    }
637
638    /**
639     * Delegate the get() to the input and output futures, in case
640     * their implementations defer starting computation until their
641     * own get() is invoked.
642     */
643    @Override
644    public O get(long timeout, TimeUnit unit) throws TimeoutException,
645        ExecutionException, InterruptedException {
646      if (!isDone()) {
647        // Use a single time unit so we can decrease remaining timeout
648        // as we wait for various phases to complete.
649        if (unit != NANOSECONDS) {
650          timeout = NANOSECONDS.convert(timeout, unit);
651          unit = NANOSECONDS;
652        }
653
654        // Invoking get on the inputFuture will ensure our own run()
655        // method below is invoked as a listener when inputFuture sets
656        // its value.  Therefore when get() returns we should then see
657        // the outputFuture be created.
658        ListenableFuture<? extends I> inputFuture = this.inputFuture;
659        if (inputFuture != null) {
660          long start = System.nanoTime();
661          inputFuture.get(timeout, unit);
662          timeout -= Math.max(0, System.nanoTime() - start);
663        }
664
665        // If our listener was scheduled to run on an executor we may
666        // need to wait for our listener to finish running before the
667        // outputFuture has been constructed by the function.
668        long start = System.nanoTime();
669        if (!outputCreated.await(timeout, unit)) {
670          throw new TimeoutException();
671        }
672        timeout -= Math.max(0, System.nanoTime() - start);
673
674        // Like above with the inputFuture, we have a listener on
675        // the outputFuture that will set our own value when its
676        // value is set.  Invoking get will ensure the output can
677        // complete and invoke our listener, so that we can later
678        // get the result.
679        ListenableFuture<? extends O> outputFuture = this.outputFuture;
680        if (outputFuture != null) {
681          outputFuture.get(timeout, unit);
682        }
683      }
684      return super.get(timeout, unit);
685    }
686
687    @Override
688    public boolean cancel(boolean mayInterruptIfRunning) {
689      /*
690       * Our additional cancellation work needs to occur even if
691       * !mayInterruptIfRunning, so we can't move it into interruptTask().
692       */
693      if (super.cancel(mayInterruptIfRunning)) {
694        // This should never block since only one thread is allowed to cancel
695        // this Future.
696        putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
697        cancel(inputFuture, mayInterruptIfRunning);
698        cancel(outputFuture, mayInterruptIfRunning);
699        return true;
700      }
701      return false;
702    }
703
704    private void cancel(@Nullable Future<?> future,
705        boolean mayInterruptIfRunning) {
706      if (future != null) {
707        future.cancel(mayInterruptIfRunning);
708      }
709    }
710
711    @Override
712    public void run() {
713      try {
714        I sourceResult;
715        try {
716          sourceResult = getUninterruptibly(inputFuture);
717        } catch (CancellationException e) {
718          // Cancel this future and return.
719          // At this point, inputFuture is cancelled and outputFuture doesn't
720          // exist, so the value of mayInterruptIfRunning is irrelevant.
721          cancel(false);
722          return;
723        } catch (ExecutionException e) {
724          // Set the cause of the exception as this future's exception
725          setException(e.getCause());
726          return;
727        }
728
729        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
730            function.apply(sourceResult);
731        if (isCancelled()) {
732          // Handles the case where cancel was called while the function was
733          // being applied.
734          // There is a gap in cancel(boolean) between calling sync.cancel()
735          // and storing the value of mayInterruptIfRunning, so this thread
736          // needs to block, waiting for that value.
737          outputFuture.cancel(
738              takeUninterruptibly(mayInterruptIfRunningChannel));
739          this.outputFuture = null;
740          return;
741        }
742        outputFuture.addListener(new Runnable() {
743            @Override
744            public void run() {
745              try {
746                // Here it would have been nice to have had an
747                // UninterruptibleListenableFuture, but we don't want to start a
748                // combinatorial explosion of interfaces, so we have to make do.
749                set(getUninterruptibly(outputFuture));
750              } catch (CancellationException e) {
751                // Cancel this future and return.
752                // At this point, inputFuture and outputFuture are done, so the
753                // value of mayInterruptIfRunning is irrelevant.
754                cancel(false);
755                return;
756              } catch (ExecutionException e) {
757                // Set the cause of the exception as this future's exception
758                setException(e.getCause());
759              } finally {
760                // Don't pin inputs beyond completion
761                ChainingListenableFuture.this.outputFuture = null;
762              }
763            }
764          }, MoreExecutors.sameThreadExecutor());
765      } catch (UndeclaredThrowableException e) {
766        // Set the cause of the exception as this future's exception
767        setException(e.getCause());
768      } catch (Exception e) {
769        // This exception is irrelevant in this thread, but useful for the
770        // client
771        setException(e);
772      } catch (Error e) {
773        // Propagate errors up ASAP - our superclass will rethrow the error
774        setException(e);
775      } finally {
776        // Don't pin inputs beyond completion
777        function = null;
778        inputFuture = null;
779        // Allow our get routines to examine outputFuture now.
780        outputCreated.countDown();
781      }
782    }
783  }
784
785  /**
786   * Creates a new {@code ListenableFuture} whose value is a list containing the
787   * values of all its input futures, if all succeed. If any input fails, the
788   * returned future fails.
789   *
790   * <p>The list of results is in the same order as the input list.
791   *
792   * <p>Canceling this future does not cancel any of the component futures;
793   * however, if any of the provided futures fails or is canceled, this one is,
794   * too.
795   *
796   * @param futures futures to combine
797   * @return a future that provides a list of the results of the component
798   *         futures
799   * @since 10.0
800   */
801  @Beta
802  public static <V> ListenableFuture<List<V>> allAsList(
803      ListenableFuture<? extends V>... futures) {
804    return new ListFuture<V>(ImmutableList.copyOf(futures), true,
805        MoreExecutors.sameThreadExecutor());
806  }
807
808  /**
809   * Creates a new {@code ListenableFuture} whose value is a list containing the
810   * values of all its input futures, if all succeed. If any input fails, the
811   * returned future fails.
812   *
813   * <p>The list of results is in the same order as the input list.
814   *
815   * <p>Canceling this future does not cancel any of the component futures;
816   * however, if any of the provided futures fails or is canceled, this one is,
817   * too.
818   *
819   * @param futures futures to combine
820   * @return a future that provides a list of the results of the component
821   *         futures
822   * @since 10.0
823   */
824  @Beta
825  public static <V> ListenableFuture<List<V>> allAsList(
826      Iterable<? extends ListenableFuture<? extends V>> futures) {
827    return new ListFuture<V>(ImmutableList.copyOf(futures), true,
828        MoreExecutors.sameThreadExecutor());
829  }
830
831  /**
832   * Creates a new {@code ListenableFuture} whose value is a list containing the
833   * values of all its successful input futures. The list of results is in the
834   * same order as the input list, and if any of the provided futures fails or
835   * is canceled, its corresponding position will contain {@code null} (which is
836   * indistinguishable from the future having a successful value of
837   * {@code null}).
838   *
839   * @param futures futures to combine
840   * @return a future that provides a list of the results of the component
841   *         futures
842   * @since 10.0
843   */
844  @Beta
845  public static <V> ListenableFuture<List<V>> successfulAsList(
846      ListenableFuture<? extends V>... futures) {
847    return new ListFuture<V>(ImmutableList.copyOf(futures), false,
848        MoreExecutors.sameThreadExecutor());
849  }
850
851  /**
852   * Creates a new {@code ListenableFuture} whose value is a list containing the
853   * values of all its successful input futures. The list of results is in the
854   * same order as the input list, and if any of the provided futures fails or
855   * is canceled, its corresponding position will contain {@code null} (which is
856   * indistinguishable from the future having a successful value of
857   * {@code null}).
858   *
859   * @param futures futures to combine
860   * @return a future that provides a list of the results of the component
861   *         futures
862   * @since 10.0
863   */
864  @Beta
865  public static <V> ListenableFuture<List<V>> successfulAsList(
866      Iterable<? extends ListenableFuture<? extends V>> futures) {
867    return new ListFuture<V>(ImmutableList.copyOf(futures), false,
868        MoreExecutors.sameThreadExecutor());
869  }
870
871  /**
872   * Registers separate success and failure callbacks to be run when the {@code
873   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
874   * complete} or, if the computation is already complete, immediately.
875   *
876   * <p>There is no guaranteed ordering of execution of callbacks, but any
877   * callback added through this method is guaranteed to be called once the
878   * computation is complete.
879   *
880   * Example: <pre> {@code
881   * ListenableFuture<QueryResult> future = ...;
882   * addCallback(future,
883   *     new FutureCallback<QueryResult> {
884   *       public void onSuccess(QueryResult result) {
885   *         storeInCache(result);
886   *       }
887   *       public void onFailure(Throwable t) {
888   *         reportError(t);
889   *       }
890   *     });}</pre>
891   *
892   * <p>Note: This overload of {@code addCallback} is designed for cases in
893   * which the callack is fast and lightweight, as the method does not accept
894   * an {@code Executor} in which to perform the the work. For heavier
895   * callbacks, this overload carries some caveats: First, the thread that the
896   * callback runs in depends on whether the input {@code Future} is done at the
897   * time {@code addCallback} is called and on whether the input {@code Future}
898   * is ever cancelled. In particular, {@code addCallback} may execute the
899   * callback in the thread that calls {@code addCallback} or {@code
900   * Future.cancel}. Second, callbacks may run in an internal thread of the
901   * system responsible for the input {@code Future}, such as an RPC network
902   * thread. Finally, during the execution of a {@code sameThreadExecutor}
903   * callback, all other registered but unexecuted listeners are prevented from
904   * running, even if those listeners are to run in other executors.
905   *
906   * <p>For a more general interface to attach a completion listener to a
907   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
908   *
909   * @param future The future attach the callback to.
910   * @param callback The callback to invoke when {@code future} is completed.
911   * @since 10.0
912   */
913  public static <V> void addCallback(ListenableFuture<V> future,
914      FutureCallback<? super V> callback) {
915    addCallback(future, callback, MoreExecutors.sameThreadExecutor());
916  }
917
918  /**
919   * Registers separate success and failure callbacks to be run when the {@code
920   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
921   * complete} or, if the computation is already complete, immediately.
922   *
923   * <p>The callback is run in {@code executor}.
924   * There is no guaranteed ordering of execution of callbacks, but any
925   * callback added through this method is guaranteed to be called once the
926   * computation is complete.
927   *
928   * Example: <pre> {@code
929   * ListenableFuture<QueryResult> future = ...;
930   * Executor e = ...
931   * addCallback(future, e,
932   *     new FutureCallback<QueryResult> {
933   *       public void onSuccess(QueryResult result) {
934   *         storeInCache(result);
935   *       }
936   *       public void onFailure(Throwable t) {
937   *         reportError(t);
938   *       }
939   *     });}</pre>
940   *
941   * When the callback is fast and lightweight consider {@linkplain
942   * Futures#addCallback(ListenableFuture, FutureCallback) the other overload}
943   * or explicit use of {@link MoreExecutors#sameThreadExecutor
944   * sameThreadExecutor}. For heavier callbacks, this choice carries some
945   * caveats: First, the thread that the callback runs in depends on whether
946   * the input {@code Future} is done at the time {@code addCallback} is called
947   * and on whether the input {@code Future} is ever cancelled. In particular,
948   * {@code addCallback} may execute the callback in the thread that calls
949   * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in
950   * an internal thread of the system responsible for the input {@code Future},
951   * such as an RPC network thread. Finally, during the execution of a {@code
952   * sameThreadExecutor} callback, all other registered but unexecuted
953   * listeners are prevented from running, even if those listeners are to run
954   * in other executors.
955   *
956   * <p>For a more general interface to attach a completion listener to a
957   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
958   *
959   * @param future The future attach the callback to.
960   * @param callback The callback to invoke when {@code future} is completed.
961   * @param executor The executor to run {@code callback} when the future
962   *    completes.
963   * @since 10.0
964   */
965  public static <V> void addCallback(final ListenableFuture<V> future,
966      final FutureCallback<? super V> callback, Executor executor) {
967    Preconditions.checkNotNull(callback);
968    Runnable callbackListener = new Runnable() {
969      @Override
970      public void run() {
971        try {
972          // TODO(user): (Before Guava release), validate that this
973          // is the thing for IE.
974          V value = getUninterruptibly(future);
975          callback.onSuccess(value);
976        } catch (ExecutionException e) {
977          callback.onFailure(e.getCause());
978        } catch (RuntimeException e) {
979          callback.onFailure(e);
980        } catch (Error e) {
981          callback.onFailure(e);
982        }
983      }
984    };
985    future.addListener(callbackListener, executor);
986  }
987
988  /**
989   * Returns the result of {@link Future#get()}, converting most exceptions to a
990   * new instance of the given checked exception type. This reduces boilerplate
991   * for a common use of {@code Future} in which it is unnecessary to
992   * programmatically distinguish between exception types or to extract other
993   * information from the exception instance.
994   *
995   * <p>Exceptions from {@code Future.get} are treated as follows:
996   * <ul>
997   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
998   *     {@code X} if the cause is a checked exception, an {@link
999   *     UncheckedExecutionException} if the cause is a {@code
1000   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1001   *     {@code Error}.
1002   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1003   *     restoring the interrupt).
1004   * <li>Any {@link CancellationException} is propagated untouched, as is any
1005   *     other {@link RuntimeException} (though {@code get} implementations are
1006   *     discouraged from throwing such exceptions).
1007   * </ul>
1008   *
1009   * The overall principle is to continue to treat every checked exception as a
1010   * checked exception, every unchecked exception as an unchecked exception, and
1011   * every error as an error. In addition, the cause of any {@code
1012   * ExecutionException} is wrapped in order to ensure that the new stack trace
1013   * matches that of the current thread.
1014   *
1015   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1016   * public constructor that accepts zero or more arguments, all of type {@code
1017   * String} or {@code Throwable} (preferring constructors with at least one
1018   * {@code String}) and calling the constructor via reflection. If the
1019   * exception did not already have a cause, one is set by calling {@link
1020   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1021   * {@code IllegalArgumentException} is thrown.
1022   *
1023   * @throws X if {@code get} throws any checked exception except for an {@code
1024   *         ExecutionException} whose cause is not itself a checked exception
1025   * @throws UncheckedExecutionException if {@code get} throws an {@code
1026   *         ExecutionException} with a {@code RuntimeException} as its cause
1027   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1028   *         with an {@code Error} as its cause
1029   * @throws CancellationException if {@code get} throws a {@code
1030   *         CancellationException}
1031   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1032   *         RuntimeException} or does not have a suitable constructor
1033   * @since 10.0
1034   */
1035  @Beta
1036  public static <V, X extends Exception> V get(
1037      Future<V> future, Class<X> exceptionClass) throws X {
1038    checkNotNull(future);
1039    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1040        "Futures.get exception type (%s) must not be a RuntimeException",
1041        exceptionClass);
1042    try {
1043      return future.get();
1044    } catch (InterruptedException e) {
1045      currentThread().interrupt();
1046      throw newWithCause(exceptionClass, e);
1047    } catch (ExecutionException e) {
1048      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1049      throw new AssertionError();
1050    }
1051  }
1052
1053  /**
1054   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1055   * exceptions to a new instance of the given checked exception type. This
1056   * reduces boilerplate for a common use of {@code Future} in which it is
1057   * unnecessary to programmatically distinguish between exception types or to
1058   * extract other information from the exception instance.
1059   *
1060   * <p>Exceptions from {@code Future.get} are treated as follows:
1061   * <ul>
1062   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1063   *     {@code X} if the cause is a checked exception, an {@link
1064   *     UncheckedExecutionException} if the cause is a {@code
1065   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1066   *     {@code Error}.
1067   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1068   *     restoring the interrupt).
1069   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1070   * <li>Any {@link CancellationException} is propagated untouched, as is any
1071   *     other {@link RuntimeException} (though {@code get} implementations are
1072   *     discouraged from throwing such exceptions).
1073   * </ul>
1074   *
1075   * The overall principle is to continue to treat every checked exception as a
1076   * checked exception, every unchecked exception as an unchecked exception, and
1077   * every error as an error. In addition, the cause of any {@code
1078   * ExecutionException} is wrapped in order to ensure that the new stack trace
1079   * matches that of the current thread.
1080   *
1081   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1082   * public constructor that accepts zero or more arguments, all of type {@code
1083   * String} or {@code Throwable} (preferring constructors with at least one
1084   * {@code String}) and calling the constructor via reflection. If the
1085   * exception did not already have a cause, one is set by calling {@link
1086   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1087   * {@code IllegalArgumentException} is thrown.
1088   *
1089   * @throws X if {@code get} throws any checked exception except for an {@code
1090   *         ExecutionException} whose cause is not itself a checked exception
1091   * @throws UncheckedExecutionException if {@code get} throws an {@code
1092   *         ExecutionException} with a {@code RuntimeException} as its cause
1093   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1094   *         with an {@code Error} as its cause
1095   * @throws CancellationException if {@code get} throws a {@code
1096   *         CancellationException}
1097   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1098   *         RuntimeException} or does not have a suitable constructor
1099   * @since 10.0
1100   */
1101  @Beta
1102  public static <V, X extends Exception> V get(
1103      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1104      throws X {
1105    checkNotNull(future);
1106    checkNotNull(unit);
1107    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1108        "Futures.get exception type (%s) must not be a RuntimeException",
1109        exceptionClass);
1110    try {
1111      return future.get(timeout, unit);
1112    } catch (InterruptedException e) {
1113      currentThread().interrupt();
1114      throw newWithCause(exceptionClass, e);
1115    } catch (TimeoutException e) {
1116      throw newWithCause(exceptionClass, e);
1117    } catch (ExecutionException e) {
1118      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1119      throw new AssertionError();
1120    }
1121  }
1122
1123  private static <X extends Exception> void wrapAndThrowExceptionOrError(
1124      Throwable cause, Class<X> exceptionClass) throws X {
1125    if (cause instanceof Error) {
1126      throw new ExecutionError((Error) cause);
1127    }
1128    if (cause instanceof RuntimeException) {
1129      throw new UncheckedExecutionException(cause);
1130    }
1131    throw newWithCause(exceptionClass, cause);
1132  }
1133
1134  /**
1135   * Returns the result of calling {@link Future#get()} uninterruptibly on a
1136   * task known not to throw a checked exception. This makes {@code Future} more
1137   * suitable for lightweight, fast-running tasks that, barring bugs in the
1138   * code, will not fail. This gives it exception-handling behavior similar to
1139   * that of {@code ForkJoinTask.join}.
1140   *
1141   * <p>Exceptions from {@code Future.get} are treated as follows:
1142   * <ul>
1143   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1144   *     {@link UncheckedExecutionException} (if the cause is an {@code
1145   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1146   *     Error}).
1147   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1148   *     call. The interrupt is restored before {@code getUnchecked} returns.
1149   * <li>Any {@link CancellationException} is propagated untouched. So is any
1150   *     other {@link RuntimeException} ({@code get} implementations are
1151   *     discouraged from throwing such exceptions).
1152   * </ul>
1153   *
1154   * The overall principle is to eliminate all checked exceptions: to loop to
1155   * avoid {@code InterruptedException}, to pass through {@code
1156   * CancellationException}, and to wrap any exception from the underlying
1157   * computation in an {@code UncheckedExecutionException} or {@code
1158   * ExecutionError}.
1159   *
1160   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1161   * {@link Uninterruptibles#getUninterruptibly(Future)}.
1162   *
1163   * @throws UncheckedExecutionException if {@code get} throws an {@code
1164   *         ExecutionException} with an {@code Exception} as its cause
1165   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1166   *         with an {@code Error} as its cause
1167   * @throws CancellationException if {@code get} throws a {@code
1168   *         CancellationException}
1169   * @since 10.0
1170   */
1171  @Beta
1172  public static <V> V getUnchecked(Future<V> future) {
1173    checkNotNull(future);
1174    try {
1175      return getUninterruptibly(future);
1176    } catch (ExecutionException e) {
1177      wrapAndThrowUnchecked(e.getCause());
1178      throw new AssertionError();
1179    }
1180  }
1181
1182  private static void wrapAndThrowUnchecked(Throwable cause) {
1183    if (cause instanceof Error) {
1184      throw new ExecutionError((Error) cause);
1185    }
1186    /*
1187     * It's a non-Error, non-Exception Throwable. From my survey of such
1188     * classes, I believe that most users intended to extend Exception, so we'll
1189     * treat it like an Exception.
1190     */
1191    throw new UncheckedExecutionException(cause);
1192  }
1193
1194  /*
1195   * TODO(user): FutureChecker interface for these to be static methods on? If
1196   * so, refer to it in the (static-method) Futures.get documentation
1197   */
1198
1199  /*
1200   * Arguably we don't need a timed getUnchecked because any operation slow
1201   * enough to require a timeout is heavyweight enough to throw a checked
1202   * exception and therefore be inappropriate to use with getUnchecked. Further,
1203   * it's not clear that converting the checked TimeoutException to a
1204   * RuntimeException -- especially to an UncheckedExecutionException, since it
1205   * wasn't thrown by the computation -- makes sense, and if we don't convert
1206   * it, the user still has to write a try-catch block.
1207   *
1208   * If you think you would use this method, let us know.
1209   */
1210
1211  private static <X extends Exception> X newWithCause(
1212      Class<X> exceptionClass, Throwable cause) {
1213    // getConstructors() guarantees this as long as we don't modify the array.
1214    @SuppressWarnings("unchecked")
1215    List<Constructor<X>> constructors =
1216        (List) Arrays.asList(exceptionClass.getConstructors());
1217    for (Constructor<X> constructor : preferringStrings(constructors)) {
1218      @Nullable X instance = newFromConstructor(constructor, cause);
1219      if (instance != null) {
1220        if (instance.getCause() == null) {
1221          instance.initCause(cause);
1222        }
1223        return instance;
1224      }
1225    }
1226    throw new IllegalArgumentException(
1227        "No appropriate constructor for exception of type " + exceptionClass
1228            + " in response to chained exception", cause);
1229  }
1230
1231  private static <X extends Exception> List<Constructor<X>>
1232      preferringStrings(List<Constructor<X>> constructors) {
1233    return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1234  }
1235
1236  private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1237      Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1238        @Override public Boolean apply(Constructor<?> input) {
1239          return asList(input.getParameterTypes()).contains(String.class);
1240        }
1241      }).reverse();
1242
1243  @Nullable private static <X> X newFromConstructor(
1244      Constructor<X> constructor, Throwable cause) {
1245    Class<?>[] paramTypes = constructor.getParameterTypes();
1246    Object[] params = new Object[paramTypes.length];
1247    for (int i = 0; i < paramTypes.length; i++) {
1248      Class<?> paramType = paramTypes[i];
1249      if (paramType.equals(String.class)) {
1250        params[i] = cause.toString();
1251      } else if (paramType.equals(Throwable.class)) {
1252        params[i] = cause;
1253      } else {
1254        return null;
1255      }
1256    }
1257    try {
1258      return constructor.newInstance(params);
1259    } catch (IllegalArgumentException e) {
1260      return null;
1261    } catch (InstantiationException e) {
1262      return null;
1263    } catch (IllegalAccessException e) {
1264      return null;
1265    } catch (InvocationTargetException e) {
1266      return null;
1267    }
1268  }
1269
1270  /**
1271   * Class that implements {@link #allAsList} and {@link #successfulAsList}.
1272   * The idea is to create a (null-filled) List and register a listener with
1273   * each component future to fill out the value in the List when that future
1274   * completes.
1275   */
1276  private static class ListFuture<V> extends AbstractFuture<List<V>> {
1277    ImmutableList<? extends ListenableFuture<? extends V>> futures;
1278    final boolean allMustSucceed;
1279    final AtomicInteger remaining;
1280    List<V> values;
1281
1282    /**
1283     * Constructor.
1284     *
1285     * @param futures all the futures to build the list from
1286     * @param allMustSucceed whether a single failure or cancellation should
1287     *        propagate to this future
1288     * @param listenerExecutor used to run listeners on all the passed in
1289     *        futures.
1290     */
1291    ListFuture(
1292        final ImmutableList<? extends ListenableFuture<? extends V>> futures,
1293        final boolean allMustSucceed, final Executor listenerExecutor) {
1294      this.futures = futures;
1295      this.values = Lists.newArrayListWithCapacity(futures.size());
1296      this.allMustSucceed = allMustSucceed;
1297      this.remaining = new AtomicInteger(futures.size());
1298
1299      init(listenerExecutor);
1300    }
1301
1302    private void init(final Executor listenerExecutor) {
1303      // First, schedule cleanup to execute when the Future is done.
1304      addListener(new Runnable() {
1305        @Override
1306        public void run() {
1307          // By now the values array has either been set as the Future's value,
1308          // or (in case of failure) is no longer useful.
1309          ListFuture.this.values = null;
1310
1311          // Let go of the memory held by other futures
1312          ListFuture.this.futures = null;
1313        }
1314      }, MoreExecutors.sameThreadExecutor());
1315
1316      // Now begin the "real" initialization.
1317
1318      // Corner case: List is empty.
1319      if (futures.isEmpty()) {
1320        set(Lists.newArrayList(values));
1321        return;
1322      }
1323
1324      // Populate the results list with null initially.
1325      for (int i = 0; i < futures.size(); ++i) {
1326        values.add(null);
1327      }
1328
1329      // Register a listener on each Future in the list to update
1330      // the state of this future.
1331      // Note that if all the futures on the list are done prior to completing
1332      // this loop, the last call to addListener() will callback to
1333      // setOneValue(), transitively call our cleanup listener, and set
1334      // this.futures to null.
1335      // We store a reference to futures to avoid the NPE.
1336      ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
1337      for (int i = 0; i < localFutures.size(); i++) {
1338        final ListenableFuture<? extends V> listenable = localFutures.get(i);
1339        final int index = i;
1340        listenable.addListener(new Runnable() {
1341          @Override
1342          public void run() {
1343            setOneValue(index, listenable);
1344          }
1345        }, listenerExecutor);
1346      }
1347    }
1348
1349    /**
1350     * Sets the value at the given index to that of the given future.
1351     */
1352    private void setOneValue(int index, Future<? extends V> future) {
1353      List<V> localValues = values;
1354      if (isDone() || localValues == null) {
1355        // Some other future failed or has been cancelled, causing this one to
1356        // also be cancelled or have an exception set. This should only happen
1357        // if allMustSucceed is true.
1358        checkState(allMustSucceed,
1359            "Future was done before all dependencies completed");
1360        return;
1361      }
1362
1363      try {
1364        checkState(future.isDone(),
1365            "Tried to set value from future which is not done");
1366        localValues.set(index, getUninterruptibly(future));
1367      } catch (CancellationException e) {
1368        if (allMustSucceed) {
1369          // Set ourselves as cancelled. Let the input futures keep running
1370          // as some of them may be used elsewhere.
1371          // (Currently we don't override interruptTask, so
1372          // mayInterruptIfRunning==false isn't technically necessary.)
1373          cancel(false);
1374        }
1375      } catch (ExecutionException e) {
1376        if (allMustSucceed) {
1377          // As soon as the first one fails, throw the exception up.
1378          // The result of all other inputs is then ignored.
1379          setException(e.getCause());
1380        }
1381      } catch (RuntimeException e) {
1382        if (allMustSucceed) {
1383          setException(e);
1384        }
1385      } catch (Error e) {
1386        // Propagate errors up ASAP - our superclass will rethrow the error
1387        setException(e);
1388      } finally {
1389        int newRemaining = remaining.decrementAndGet();
1390        checkState(newRemaining >= 0, "Less than 0 remaining futures");
1391        if (newRemaining == 0) {
1392          localValues = values;
1393          if (localValues != null) {
1394            set(Lists.newArrayList(localValues));
1395          } else {
1396            checkState(isDone());
1397          }
1398        }
1399      }
1400    }
1401
1402    @Override
1403    public List<V> get() throws InterruptedException, ExecutionException {
1404      callAllGets();
1405
1406      // This may still block in spite of the calls above, as the listeners may
1407      // be scheduled for execution in other threads.
1408      return super.get();
1409    }
1410
1411    /**
1412     * Calls the get method of all dependency futures to work around a bug in
1413     * some ListenableFutures where the listeners aren't called until get() is
1414     * called.
1415     */
1416    private void callAllGets() throws InterruptedException {
1417      List<? extends ListenableFuture<? extends V>> oldFutures = futures;
1418      if (oldFutures != null && !isDone()) {
1419        for (ListenableFuture<? extends V> future : oldFutures) {
1420          // We wait for a little while for the future, but if it's not done,
1421          // we check that no other futures caused a cancellation or failure.
1422          // This can introduce a delay of up to 10ms in reporting an exception.
1423          while (!future.isDone()) {
1424            try {
1425              future.get();
1426            } catch (Error e) {
1427              throw e;
1428            } catch (InterruptedException e) {
1429              throw e;
1430            } catch (Throwable e) {
1431              // ExecutionException / CancellationException / RuntimeException
1432              if (allMustSucceed) {
1433                return;
1434              } else {
1435                continue;
1436              }
1437            }
1438          }
1439        }
1440      }
1441    }
1442  }
1443
1444  /**
1445   * A checked future that uses a function to map from exceptions to the
1446   * appropriate checked type.
1447   */
1448  private static class MappingCheckedFuture<V, X extends Exception> extends
1449      AbstractCheckedFuture<V, X> {
1450
1451    final Function<Exception, X> mapper;
1452
1453    MappingCheckedFuture(ListenableFuture<V> delegate,
1454        Function<Exception, X> mapper) {
1455      super(delegate);
1456
1457      this.mapper = checkNotNull(mapper);
1458    }
1459
1460    @Override
1461    protected X mapException(Exception e) {
1462      return mapper.apply(e);
1463    }
1464  }
1465}
1466