1/*
2 * Copyright (C) 2006 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.base.Preconditions.checkArgument;
20import static com.google.common.base.Preconditions.checkNotNull;
21import static com.google.common.base.Preconditions.checkState;
22import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
24import static java.lang.Thread.currentThread;
25import static java.util.Arrays.asList;
26
27import com.google.common.annotations.Beta;
28import com.google.common.base.Function;
29import com.google.common.base.Optional;
30import com.google.common.base.Preconditions;
31import com.google.common.collect.ImmutableCollection;
32import com.google.common.collect.ImmutableList;
33import com.google.common.collect.Lists;
34import com.google.common.collect.Ordering;
35import com.google.common.collect.Queues;
36import com.google.common.collect.Sets;
37
38import java.lang.reflect.Constructor;
39import java.lang.reflect.InvocationTargetException;
40import java.lang.reflect.UndeclaredThrowableException;
41import java.util.Arrays;
42import java.util.Collections;
43import java.util.List;
44import java.util.Set;
45import java.util.concurrent.Callable;
46import java.util.concurrent.CancellationException;
47import java.util.concurrent.ConcurrentLinkedQueue;
48import java.util.concurrent.ExecutionException;
49import java.util.concurrent.Executor;
50import java.util.concurrent.Future;
51import java.util.concurrent.RejectedExecutionException;
52import java.util.concurrent.TimeUnit;
53import java.util.concurrent.TimeoutException;
54import java.util.concurrent.atomic.AtomicBoolean;
55import java.util.concurrent.atomic.AtomicInteger;
56import java.util.logging.Level;
57import java.util.logging.Logger;
58
59import javax.annotation.Nullable;
60
61/**
62 * Static utility methods pertaining to the {@link Future} interface.
63 *
64 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
65 * Guava User Guide article on <a href=
66 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
67 * {@code ListenableFuture}</a>.
68 *
69 * @author Kevin Bourrillion
70 * @author Nishant Thakkar
71 * @author Sven Mawson
72 * @since 1.0
73 */
74@Beta
75public final class Futures {
76  private Futures() {}
77
78  /**
79   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
80   * and a {@link Function} that maps from {@link Exception} instances into the
81   * appropriate checked type.
82   *
83   * <p>The given mapping function will be applied to an
84   * {@link InterruptedException}, a {@link CancellationException}, or an
85   * {@link ExecutionException}.
86   * See {@link Future#get()} for details on the exceptions thrown.
87   *
88   * @since 9.0 (source-compatible since 1.0)
89   */
90  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
91      ListenableFuture<V> future, Function<? super Exception, X> mapper) {
92    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
93  }
94
95  private abstract static class ImmediateFuture<V>
96      implements ListenableFuture<V> {
97
98    private static final Logger log =
99        Logger.getLogger(ImmediateFuture.class.getName());
100
101    @Override
102    public void addListener(Runnable listener, Executor executor) {
103      checkNotNull(listener, "Runnable was null.");
104      checkNotNull(executor, "Executor was null.");
105      try {
106        executor.execute(listener);
107      } catch (RuntimeException e) {
108        // ListenableFuture's contract is that it will not throw unchecked
109        // exceptions, so log the bad runnable and/or executor and swallow it.
110        log.log(Level.SEVERE, "RuntimeException while executing runnable "
111            + listener + " with executor " + executor, e);
112      }
113    }
114
115    @Override
116    public boolean cancel(boolean mayInterruptIfRunning) {
117      return false;
118    }
119
120    @Override
121    public abstract V get() throws ExecutionException;
122
123    @Override
124    public V get(long timeout, TimeUnit unit) throws ExecutionException {
125      checkNotNull(unit);
126      return get();
127    }
128
129    @Override
130    public boolean isCancelled() {
131      return false;
132    }
133
134    @Override
135    public boolean isDone() {
136      return true;
137    }
138  }
139
140  private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
141
142    @Nullable private final V value;
143
144    ImmediateSuccessfulFuture(@Nullable V value) {
145      this.value = value;
146    }
147
148    @Override
149    public V get() {
150      return value;
151    }
152  }
153
154  private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
155      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
156
157    @Nullable private final V value;
158
159    ImmediateSuccessfulCheckedFuture(@Nullable V value) {
160      this.value = value;
161    }
162
163    @Override
164    public V get() {
165      return value;
166    }
167
168    @Override
169    public V checkedGet() {
170      return value;
171    }
172
173    @Override
174    public V checkedGet(long timeout, TimeUnit unit) {
175      checkNotNull(unit);
176      return value;
177    }
178  }
179
180  private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
181
182    private final Throwable thrown;
183
184    ImmediateFailedFuture(Throwable thrown) {
185      this.thrown = thrown;
186    }
187
188    @Override
189    public V get() throws ExecutionException {
190      throw new ExecutionException(thrown);
191    }
192  }
193
194  private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
195
196    private final CancellationException thrown;
197
198    ImmediateCancelledFuture() {
199      this.thrown = new CancellationException("Immediate cancelled future.");
200    }
201
202    @Override
203    public boolean isCancelled() {
204      return true;
205    }
206
207    @Override
208    public V get() {
209      throw AbstractFuture.cancellationExceptionWithCause(
210          "Task was cancelled.", thrown);
211    }
212  }
213
214  private static class ImmediateFailedCheckedFuture<V, X extends Exception>
215      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
216
217    private final X thrown;
218
219    ImmediateFailedCheckedFuture(X thrown) {
220      this.thrown = thrown;
221    }
222
223    @Override
224    public V get() throws ExecutionException {
225      throw new ExecutionException(thrown);
226    }
227
228    @Override
229    public V checkedGet() throws X {
230      throw thrown;
231    }
232
233    @Override
234    public V checkedGet(long timeout, TimeUnit unit) throws X {
235      checkNotNull(unit);
236      throw thrown;
237    }
238  }
239
240  /**
241   * Creates a {@code ListenableFuture} which has its value set immediately upon
242   * construction. The getters just return the value. This {@code Future} can't
243   * be canceled or timed out and its {@code isDone()} method always returns
244   * {@code true}.
245   */
246  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
247    return new ImmediateSuccessfulFuture<V>(value);
248  }
249
250  /**
251   * Returns a {@code CheckedFuture} which has its value set immediately upon
252   * construction.
253   *
254   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
255   * method always returns {@code true}. Calling {@code get()} or {@code
256   * checkedGet()} will immediately return the provided value.
257   */
258  public static <V, X extends Exception> CheckedFuture<V, X>
259      immediateCheckedFuture(@Nullable V value) {
260    return new ImmediateSuccessfulCheckedFuture<V, X>(value);
261  }
262
263  /**
264   * Returns a {@code ListenableFuture} which has an exception set immediately
265   * upon construction.
266   *
267   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
268   * method always returns {@code true}. Calling {@code get()} will immediately
269   * throw the provided {@code Throwable} wrapped in an {@code
270   * ExecutionException}.
271   */
272  public static <V> ListenableFuture<V> immediateFailedFuture(
273      Throwable throwable) {
274    checkNotNull(throwable);
275    return new ImmediateFailedFuture<V>(throwable);
276  }
277
278  /**
279   * Creates a {@code ListenableFuture} which is cancelled immediately upon
280   * construction, so that {@code isCancelled()} always returns {@code true}.
281   *
282   * @since 14.0
283   */
284  public static <V> ListenableFuture<V> immediateCancelledFuture() {
285    return new ImmediateCancelledFuture<V>();
286  }
287
288  /**
289   * Returns a {@code CheckedFuture} which has an exception set immediately upon
290   * construction.
291   *
292   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
293   * method always returns {@code true}. Calling {@code get()} will immediately
294   * throw the provided {@code Exception} wrapped in an {@code
295   * ExecutionException}, and calling {@code checkedGet()} will throw the
296   * provided exception itself.
297   */
298  public static <V, X extends Exception> CheckedFuture<V, X>
299      immediateFailedCheckedFuture(X exception) {
300    checkNotNull(exception);
301    return new ImmediateFailedCheckedFuture<V, X>(exception);
302  }
303
304  /**
305   * Returns a {@code Future} whose result is taken from the given primary
306   * {@code input} or, if the primary input fails, from the {@code Future}
307   * provided by the {@code fallback}. {@link FutureFallback#create} is not
308   * invoked until the primary input has failed, so if the primary input
309   * succeeds, it is never invoked. If, during the invocation of {@code
310   * fallback}, an exception is thrown, this exception is used as the result of
311   * the output {@code Future}.
312   *
313   * <p>Below is an example of a fallback that returns a default value if an
314   * exception occurs:
315   *
316   * <pre>   {@code
317   *   ListenableFuture<Integer> fetchCounterFuture = ...;
318   *
319   *   // Falling back to a zero counter in case an exception happens when
320   *   // processing the RPC to fetch counters.
321   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
322   *       fetchCounterFuture, new FutureFallback<Integer>() {
323   *         public ListenableFuture<Integer> create(Throwable t) {
324   *           // Returning "0" as the default for the counter when the
325   *           // exception happens.
326   *           return immediateFuture(0);
327   *         }
328   *       });}</pre>
329   *
330   * <p>The fallback can also choose to propagate the original exception when
331   * desired:
332   *
333   * <pre>   {@code
334   *   ListenableFuture<Integer> fetchCounterFuture = ...;
335   *
336   *   // Falling back to a zero counter only in case the exception was a
337   *   // TimeoutException.
338   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
339   *       fetchCounterFuture, new FutureFallback<Integer>() {
340   *         public ListenableFuture<Integer> create(Throwable t) {
341   *           if (t instanceof TimeoutException) {
342   *             return immediateFuture(0);
343   *           }
344   *           return immediateFailedFuture(t);
345   *         }
346   *       });}</pre>
347   *
348   * <p>Note: If the derived {@code Future} is slow or heavyweight to create
349   * (whether the {@code Future} itself is slow or heavyweight to complete is
350   * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
351   * FutureFallback, Executor) supplying an executor}. If you do not supply an
352   * executor, {@code withFallback} will use a
353   * {@linkplain MoreExecutors#directExecutor direct executor}, which carries
354   * some caveats for heavier operations. For example, the call to {@code
355   * fallback.create} may run on an unpredictable or undesirable thread:
356   *
357   * <ul>
358   * <li>If the input {@code Future} is done at the time {@code withFallback}
359   * is called, {@code withFallback} will call {@code fallback.create} inline.
360   * <li>If the input {@code Future} is not yet done, {@code withFallback} will
361   * schedule {@code fallback.create} to be run by the thread that completes
362   * the input {@code Future}, which may be an internal system thread such as
363   * an RPC network thread.
364   * </ul>
365   *
366   * <p>Also note that, regardless of which thread executes the {@code
367   * fallback.create}, all other registered but unexecuted listeners are
368   * prevented from running during its execution, even if those listeners are
369   * to run in other executors.
370   *
371   * @param input the primary input {@code Future}
372   * @param fallback the {@link FutureFallback} implementation to be called if
373   *     {@code input} fails
374   * @since 14.0
375   */
376  public static <V> ListenableFuture<V> withFallback(
377      ListenableFuture<? extends V> input,
378      FutureFallback<? extends V> fallback) {
379    return withFallback(input, fallback, directExecutor());
380  }
381
382  /**
383   * Returns a {@code Future} whose result is taken from the given primary
384   * {@code input} or, if the primary input fails, from the {@code Future}
385   * provided by the {@code fallback}. {@link FutureFallback#create} is not
386   * invoked until the primary input has failed, so if the primary input
387   * succeeds, it is never invoked. If, during the invocation of {@code
388   * fallback}, an exception is thrown, this exception is used as the result of
389   * the output {@code Future}.
390   *
391   * <p>Below is an example of a fallback that returns a default value if an
392   * exception occurs:
393   *
394   * <pre>   {@code
395   *   ListenableFuture<Integer> fetchCounterFuture = ...;
396   *
397   *   // Falling back to a zero counter in case an exception happens when
398   *   // processing the RPC to fetch counters.
399   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
400   *       fetchCounterFuture, new FutureFallback<Integer>() {
401   *         public ListenableFuture<Integer> create(Throwable t) {
402   *           // Returning "0" as the default for the counter when the
403   *           // exception happens.
404   *           return immediateFuture(0);
405   *         }
406   *       }, directExecutor());}</pre>
407   *
408   * <p>The fallback can also choose to propagate the original exception when
409   * desired:
410   *
411   * <pre>   {@code
412   *   ListenableFuture<Integer> fetchCounterFuture = ...;
413   *
414   *   // Falling back to a zero counter only in case the exception was a
415   *   // TimeoutException.
416   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
417   *       fetchCounterFuture, new FutureFallback<Integer>() {
418   *         public ListenableFuture<Integer> create(Throwable t) {
419   *           if (t instanceof TimeoutException) {
420   *             return immediateFuture(0);
421   *           }
422   *           return immediateFailedFuture(t);
423   *         }
424   *       }, directExecutor());}</pre>
425   *
426   * <p>When the execution of {@code fallback.create} is fast and lightweight
427   * (though the {@code Future} it returns need not meet these criteria),
428   * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
429   * omitting the executor} or explicitly specifying {@code
430   * directExecutor}. However, be aware of the caveats documented in the
431   * link above.
432   *
433   * @param input the primary input {@code Future}
434   * @param fallback the {@link FutureFallback} implementation to be called if
435   *     {@code input} fails
436   * @param executor the executor that runs {@code fallback} if {@code input}
437   *     fails
438   * @since 14.0
439   */
440  public static <V> ListenableFuture<V> withFallback(
441      ListenableFuture<? extends V> input,
442      FutureFallback<? extends V> fallback, Executor executor) {
443    checkNotNull(fallback);
444    return new FallbackFuture<V>(input, fallback, executor);
445  }
446
447  /**
448   * A future that falls back on a second, generated future, in case its
449   * original future fails.
450   */
451  private static class FallbackFuture<V> extends AbstractFuture<V> {
452
453    private volatile ListenableFuture<? extends V> running;
454
455    FallbackFuture(ListenableFuture<? extends V> input,
456        final FutureFallback<? extends V> fallback,
457        final Executor executor) {
458      running = input;
459      addCallback(running, new FutureCallback<V>() {
460        @Override
461        public void onSuccess(V value) {
462          set(value);
463        }
464
465        @Override
466        public void onFailure(Throwable t) {
467          if (isCancelled()) {
468            return;
469          }
470          try {
471            running = fallback.create(t);
472            if (isCancelled()) { // in case cancel called in the meantime
473              running.cancel(wasInterrupted());
474              return;
475            }
476            addCallback(running, new FutureCallback<V>() {
477              @Override
478              public void onSuccess(V value) {
479                set(value);
480              }
481
482              @Override
483              public void onFailure(Throwable t) {
484                if (running.isCancelled()) {
485                  cancel(false);
486                } else {
487                  setException(t);
488                }
489              }
490            }, directExecutor());
491          } catch (Throwable e) {
492            setException(e);
493          }
494        }
495      }, executor);
496    }
497
498    @Override
499    public boolean cancel(boolean mayInterruptIfRunning) {
500      if (super.cancel(mayInterruptIfRunning)) {
501        running.cancel(mayInterruptIfRunning);
502        return true;
503      }
504      return false;
505    }
506  }
507
508  /**
509   * Returns a new {@code ListenableFuture} whose result is asynchronously
510   * derived from the result of the given {@code Future}. More precisely, the
511   * returned {@code Future} takes its result from a {@code Future} produced by
512   * applying the given {@code AsyncFunction} to the result of the original
513   * {@code Future}. Example:
514   *
515   * <pre>   {@code
516   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
517   *   AsyncFunction<RowKey, QueryResult> queryFunction =
518   *       new AsyncFunction<RowKey, QueryResult>() {
519   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
520   *           return dataService.read(rowKey);
521   *         }
522   *       };
523   *   ListenableFuture<QueryResult> queryFuture =
524   *       transform(rowKeyFuture, queryFunction);}</pre>
525   *
526   * <p>Note: If the derived {@code Future} is slow or heavyweight to create
527   * (whether the {@code Future} itself is slow or heavyweight to complete is
528   * irrelevant), consider {@linkplain #transform(ListenableFuture,
529   * AsyncFunction, Executor) supplying an executor}. If you do not supply an
530   * executor, {@code transform} will use a
531   * {@linkplain MoreExecutors#directExecutor direct executor}, which carries
532   * some caveats for heavier operations. For example, the call to {@code
533   * function.apply} may run on an unpredictable or undesirable thread:
534   *
535   * <ul>
536   * <li>If the input {@code Future} is done at the time {@code transform} is
537   * called, {@code transform} will call {@code function.apply} inline.
538   * <li>If the input {@code Future} is not yet done, {@code transform} will
539   * schedule {@code function.apply} to be run by the thread that completes the
540   * input {@code Future}, which may be an internal system thread such as an
541   * RPC network thread.
542   * </ul>
543   *
544   * <p>Also note that, regardless of which thread executes the {@code
545   * function.apply}, all other registered but unexecuted listeners are
546   * prevented from running during its execution, even if those listeners are
547   * to run in other executors.
548   *
549   * <p>The returned {@code Future} attempts to keep its cancellation state in
550   * sync with that of the input future and that of the future returned by the
551   * function. That is, if the returned {@code Future} is cancelled, it will
552   * attempt to cancel the other two, and if either of the other two is
553   * cancelled, the returned {@code Future} will receive a callback in which it
554   * will attempt to cancel itself.
555   *
556   * @param input The future to transform
557   * @param function A function to transform the result of the input future
558   *     to the result of the output future
559   * @return A future that holds result of the function (if the input succeeded)
560   *     or the original input's failure (if not)
561   * @since 11.0
562   */
563  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
564      AsyncFunction<? super I, ? extends O> function) {
565    ChainingListenableFuture<I, O> output =
566        new ChainingListenableFuture<I, O>(function, input);
567    input.addListener(output, directExecutor());
568    return output;
569  }
570
571  /**
572   * Returns a new {@code ListenableFuture} whose result is asynchronously
573   * derived from the result of the given {@code Future}. More precisely, the
574   * returned {@code Future} takes its result from a {@code Future} produced by
575   * applying the given {@code AsyncFunction} to the result of the original
576   * {@code Future}. Example:
577   *
578   * <pre>   {@code
579   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
580   *   AsyncFunction<RowKey, QueryResult> queryFunction =
581   *       new AsyncFunction<RowKey, QueryResult>() {
582   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
583   *           return dataService.read(rowKey);
584   *         }
585   *       };
586   *   ListenableFuture<QueryResult> queryFuture =
587   *       transform(rowKeyFuture, queryFunction, executor);}</pre>
588   *
589   * <p>The returned {@code Future} attempts to keep its cancellation state in
590   * sync with that of the input future and that of the future returned by the
591   * chain function. That is, if the returned {@code Future} is cancelled, it
592   * will attempt to cancel the other two, and if either of the other two is
593   * cancelled, the returned {@code Future} will receive a callback in which it
594   * will attempt to cancel itself.
595   *
596   * <p>When the execution of {@code function.apply} is fast and lightweight
597   * (though the {@code Future} it returns need not meet these criteria),
598   * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
599   * the executor} or explicitly specifying {@code directExecutor}.
600   * However, be aware of the caveats documented in the link above.
601   *
602   * @param input The future to transform
603   * @param function A function to transform the result of the input future
604   *     to the result of the output future
605   * @param executor Executor to run the function in.
606   * @return A future that holds result of the function (if the input succeeded)
607   *     or the original input's failure (if not)
608   * @since 11.0
609   */
610  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
611      AsyncFunction<? super I, ? extends O> function,
612      Executor executor) {
613    checkNotNull(executor);
614    ChainingListenableFuture<I, O> output =
615        new ChainingListenableFuture<I, O>(function, input);
616    input.addListener(rejectionPropagatingRunnable(output, output, executor), directExecutor());
617    return output;
618  }
619
620  /**
621   * Returns a Runnable that will invoke the delegate Runnable on the delegate executor, but if the
622   * task is rejected, it will propagate that rejection to the output future.
623   */
624  private static Runnable rejectionPropagatingRunnable(
625      final AbstractFuture<?> outputFuture,
626      final Runnable delegateTask,
627      final Executor delegateExecutor) {
628    return new Runnable() {
629      @Override public void run() {
630        final AtomicBoolean thrownFromDelegate = new AtomicBoolean(true);
631        try {
632          delegateExecutor.execute(new Runnable() {
633            @Override public void run() {
634              thrownFromDelegate.set(false);
635              delegateTask.run();
636            }
637          });
638        } catch (RejectedExecutionException e) {
639          if (thrownFromDelegate.get()) {
640            // wrap exception?
641            outputFuture.setException(e);
642          }
643          // otherwise it must have been thrown from a transitive call and the delegate runnable
644          // should have handled it.
645        }
646      }
647    };
648  }
649
650  /**
651   * Returns a new {@code ListenableFuture} whose result is the product of
652   * applying the given {@code Function} to the result of the given {@code
653   * Future}. Example:
654   *
655   * <pre>   {@code
656   *   ListenableFuture<QueryResult> queryFuture = ...;
657   *   Function<QueryResult, List<Row>> rowsFunction =
658   *       new Function<QueryResult, List<Row>>() {
659   *         public List<Row> apply(QueryResult queryResult) {
660   *           return queryResult.getRows();
661   *         }
662   *       };
663   *   ListenableFuture<List<Row>> rowsFuture =
664   *       transform(queryFuture, rowsFunction);}</pre>
665   *
666   * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain
667   * #transform(ListenableFuture, Function, Executor) supplying an executor}.
668   * If you do not supply an executor, {@code transform} will use an inline
669   * executor, which carries some caveats for heavier operations.  For example,
670   * the call to {@code function.apply} may run on an unpredictable or
671   * undesirable thread:
672   *
673   * <ul>
674   * <li>If the input {@code Future} is done at the time {@code transform} is
675   * called, {@code transform} will call {@code function.apply} inline.
676   * <li>If the input {@code Future} is not yet done, {@code transform} will
677   * schedule {@code function.apply} to be run by the thread that completes the
678   * input {@code Future}, which may be an internal system thread such as an
679   * RPC network thread.
680   * </ul>
681   *
682   * <p>Also note that, regardless of which thread executes the {@code
683   * function.apply}, all other registered but unexecuted listeners are
684   * prevented from running during its execution, even if those listeners are
685   * to run in other executors.
686   *
687   * <p>The returned {@code Future} attempts to keep its cancellation state in
688   * sync with that of the input future. That is, if the returned {@code Future}
689   * is cancelled, it will attempt to cancel the input, and if the input is
690   * cancelled, the returned {@code Future} will receive a callback in which it
691   * will attempt to cancel itself.
692   *
693   * <p>An example use of this method is to convert a serializable object
694   * returned from an RPC into a POJO.
695   *
696   * @param input The future to transform
697   * @param function A Function to transform the results of the provided future
698   *     to the results of the returned future.  This will be run in the thread
699   *     that notifies input it is complete.
700   * @return A future that holds result of the transformation.
701   * @since 9.0 (in 1.0 as {@code compose})
702   */
703  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
704      final Function<? super I, ? extends O> function) {
705    checkNotNull(function);
706    ChainingListenableFuture<I, O> output =
707        new ChainingListenableFuture<I, O>(asAsyncFunction(function), input);
708    input.addListener(output, directExecutor());
709    return output;
710  }
711
712  /**
713   * Returns a new {@code ListenableFuture} whose result is the product of
714   * applying the given {@code Function} to the result of the given {@code
715   * Future}. Example:
716   *
717   * <pre>   {@code
718   *   ListenableFuture<QueryResult> queryFuture = ...;
719   *   Function<QueryResult, List<Row>> rowsFunction =
720   *       new Function<QueryResult, List<Row>>() {
721   *         public List<Row> apply(QueryResult queryResult) {
722   *           return queryResult.getRows();
723   *         }
724   *       };
725   *   ListenableFuture<List<Row>> rowsFuture =
726   *       transform(queryFuture, rowsFunction, executor);}</pre>
727   *
728   * <p>The returned {@code Future} attempts to keep its cancellation state in
729   * sync with that of the input future. That is, if the returned {@code Future}
730   * is cancelled, it will attempt to cancel the input, and if the input is
731   * cancelled, the returned {@code Future} will receive a callback in which it
732   * will attempt to cancel itself.
733   *
734   * <p>An example use of this method is to convert a serializable object
735   * returned from an RPC into a POJO.
736   *
737   * <p>When the transformation is fast and lightweight, consider {@linkplain
738   * #transform(ListenableFuture, Function) omitting the executor} or
739   * explicitly specifying {@code directExecutor}. However, be aware of the
740   * caveats documented in the link above.
741   *
742   * @param input The future to transform
743   * @param function A Function to transform the results of the provided future
744   *     to the results of the returned future.
745   * @param executor Executor to run the function in.
746   * @return A future that holds result of the transformation.
747   * @since 9.0 (in 2.0 as {@code compose})
748   */
749  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
750      final Function<? super I, ? extends O> function, Executor executor) {
751    checkNotNull(function);
752    return transform(input, asAsyncFunction(function), executor);
753  }
754
755  /** Wraps the given function as an AsyncFunction. */
756  private static <I, O> AsyncFunction<I, O> asAsyncFunction(
757      final Function<? super I, ? extends O> function) {
758    return new AsyncFunction<I, O>() {
759      @Override public ListenableFuture<O> apply(I input) {
760        O output = function.apply(input);
761        return immediateFuture(output);
762      }
763    };
764  }
765
766  /**
767   * Like {@link #transform(ListenableFuture, Function)} except that the
768   * transformation {@code function} is invoked on each call to
769   * {@link Future#get() get()} on the returned future.
770   *
771   * <p>The returned {@code Future} reflects the input's cancellation
772   * state directly, and any attempt to cancel the returned Future is likewise
773   * passed through to the input Future.
774   *
775   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
776   * only apply the timeout to the execution of the underlying {@code Future},
777   * <em>not</em> to the execution of the transformation function.
778   *
779   * <p>The primary audience of this method is callers of {@code transform}
780   * who don't have a {@code ListenableFuture} available and
781   * do not mind repeated, lazy function evaluation.
782   *
783   * @param input The future to transform
784   * @param function A Function to transform the results of the provided future
785   *     to the results of the returned future.
786   * @return A future that returns the result of the transformation.
787   * @since 10.0
788   */
789  public static <I, O> Future<O> lazyTransform(final Future<I> input,
790      final Function<? super I, ? extends O> function) {
791    checkNotNull(input);
792    checkNotNull(function);
793    return new Future<O>() {
794
795      @Override
796      public boolean cancel(boolean mayInterruptIfRunning) {
797        return input.cancel(mayInterruptIfRunning);
798      }
799
800      @Override
801      public boolean isCancelled() {
802        return input.isCancelled();
803      }
804
805      @Override
806      public boolean isDone() {
807        return input.isDone();
808      }
809
810      @Override
811      public O get() throws InterruptedException, ExecutionException {
812        return applyTransformation(input.get());
813      }
814
815      @Override
816      public O get(long timeout, TimeUnit unit)
817          throws InterruptedException, ExecutionException, TimeoutException {
818        return applyTransformation(input.get(timeout, unit));
819      }
820
821      private O applyTransformation(I input) throws ExecutionException {
822        try {
823          return function.apply(input);
824        } catch (Throwable t) {
825          throw new ExecutionException(t);
826        }
827      }
828    };
829  }
830
831  /**
832   * An implementation of {@code ListenableFuture} that also implements
833   * {@code Runnable} so that it can be used to nest ListenableFutures.
834   * Once the passed-in {@code ListenableFuture} is complete, it calls the
835   * passed-in {@code Function} to generate the result.
836   *
837   * <p>For historical reasons, this class has a special case in its exception
838   * handling: If the given {@code AsyncFunction} throws an {@code
839   * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
840   * and uses its <i>cause</i> as the output future's exception, rather than
841   * using the {@code UndeclaredThrowableException} itself as it would for other
842   * exception types. The reason for this is that {@code Futures.transform} used
843   * to require a {@code Function}, whose {@code apply} method is not allowed to
844   * throw checked exceptions. Nowadays, {@code Futures.transform} has an
845   * overload that accepts an {@code AsyncFunction}, whose {@code apply} method
846   * <i>is</i> allowed to throw checked exception. Users who wish to throw
847   * checked exceptions should use that overload instead, and <a
848   * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
849   * should remove the {@code UndeclaredThrowableException} special case</a>.
850   */
851  private static class ChainingListenableFuture<I, O>
852      extends AbstractFuture<O> implements Runnable {
853
854    private AsyncFunction<? super I, ? extends O> function;
855    private ListenableFuture<? extends I> inputFuture;
856    private volatile ListenableFuture<? extends O> outputFuture;
857
858    private ChainingListenableFuture(
859        AsyncFunction<? super I, ? extends O> function,
860        ListenableFuture<? extends I> inputFuture) {
861      this.function = checkNotNull(function);
862      this.inputFuture = checkNotNull(inputFuture);
863    }
864
865    @Override
866    public boolean cancel(boolean mayInterruptIfRunning) {
867      /*
868       * Our additional cancellation work needs to occur even if
869       * !mayInterruptIfRunning, so we can't move it into interruptTask().
870       */
871      if (super.cancel(mayInterruptIfRunning)) {
872        // This should never block since only one thread is allowed to cancel
873        // this Future.
874        cancel(inputFuture, mayInterruptIfRunning);
875        cancel(outputFuture, mayInterruptIfRunning);
876        return true;
877      }
878      return false;
879    }
880
881    private void cancel(@Nullable Future<?> future,
882        boolean mayInterruptIfRunning) {
883      if (future != null) {
884        future.cancel(mayInterruptIfRunning);
885      }
886    }
887
888    @Override
889    public void run() {
890      try {
891        I sourceResult;
892        try {
893          sourceResult = getUninterruptibly(inputFuture);
894        } catch (CancellationException e) {
895          // Cancel this future and return.
896          // At this point, inputFuture is cancelled and outputFuture doesn't
897          // exist, so the value of mayInterruptIfRunning is irrelevant.
898          cancel(false);
899          return;
900        } catch (ExecutionException e) {
901          // Set the cause of the exception as this future's exception
902          setException(e.getCause());
903          return;
904        }
905
906        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
907            Preconditions.checkNotNull(function.apply(sourceResult),
908                "AsyncFunction may not return null.");
909        if (isCancelled()) {
910          outputFuture.cancel(wasInterrupted());
911          this.outputFuture = null;
912          return;
913        }
914        outputFuture.addListener(new Runnable() {
915            @Override
916            public void run() {
917              try {
918                set(getUninterruptibly(outputFuture));
919              } catch (CancellationException e) {
920                // Cancel this future and return.
921                // At this point, inputFuture and outputFuture are done, so the
922                // value of mayInterruptIfRunning is irrelevant.
923                cancel(false);
924                return;
925              } catch (ExecutionException e) {
926                // Set the cause of the exception as this future's exception
927                setException(e.getCause());
928              } finally {
929                // Don't pin inputs beyond completion
930                ChainingListenableFuture.this.outputFuture = null;
931              }
932            }
933          }, directExecutor());
934      } catch (UndeclaredThrowableException e) {
935        // Set the cause of the exception as this future's exception
936        setException(e.getCause());
937      } catch (Throwable t) {
938        // This exception is irrelevant in this thread, but useful for the
939        // client
940        setException(t);
941      } finally {
942        // Don't pin inputs beyond completion
943        function = null;
944        inputFuture = null;
945      }
946    }
947  }
948
949  /**
950   * Returns a new {@code ListenableFuture} whose result is the product of
951   * calling {@code get()} on the {@code Future} nested within the given {@code
952   * Future}, effectively chaining the futures one after the other.  Example:
953   *
954   * <pre>   {@code
955   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
956   *   ListenableFuture<String> dereferenced = dereference(nested);}</pre>
957   *
958   * <p>This call has the same cancellation and execution semantics as {@link
959   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
960   * Future} attempts to keep its cancellation state in sync with both the
961   * input {@code Future} and the nested {@code Future}.  The transformation
962   * is very lightweight and therefore takes place in the same thread (either
963   * the thread that called {@code dereference}, or the thread in which the
964   * dereferenced future completes).
965   *
966   * @param nested The nested future to transform.
967   * @return A future that holds result of the inner future.
968   * @since 13.0
969   */
970  @SuppressWarnings({"rawtypes", "unchecked"})
971  public static <V> ListenableFuture<V> dereference(
972      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
973    return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
974  }
975
976  /**
977   * Helper {@code Function} for {@link #dereference}.
978   */
979  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
980      new AsyncFunction<ListenableFuture<Object>, Object>() {
981        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
982          return input;
983        }
984      };
985
986  /**
987   * Creates a new {@code ListenableFuture} whose value is a list containing the
988   * values of all its input futures, if all succeed. If any input fails, the
989   * returned future fails immediately.
990   *
991   * <p>The list of results is in the same order as the input list.
992   *
993   * <p>Canceling this future will attempt to cancel all the component futures,
994   * and if any of the provided futures fails or is canceled, this one is,
995   * too.
996   *
997   * @param futures futures to combine
998   * @return a future that provides a list of the results of the component
999   *         futures
1000   * @since 10.0
1001   */
1002  @Beta
1003  public static <V> ListenableFuture<List<V>> allAsList(
1004      ListenableFuture<? extends V>... futures) {
1005    return listFuture(ImmutableList.copyOf(futures), true, directExecutor());
1006  }
1007
1008  /**
1009   * Creates a new {@code ListenableFuture} whose value is a list containing the
1010   * values of all its input futures, if all succeed. If any input fails, the
1011   * returned future fails immediately.
1012   *
1013   * <p>The list of results is in the same order as the input list.
1014   *
1015   * <p>Canceling this future will attempt to cancel all the component futures,
1016   * and if any of the provided futures fails or is canceled, this one is,
1017   * too.
1018   *
1019   * @param futures futures to combine
1020   * @return a future that provides a list of the results of the component
1021   *         futures
1022   * @since 10.0
1023   */
1024  @Beta
1025  public static <V> ListenableFuture<List<V>> allAsList(
1026      Iterable<? extends ListenableFuture<? extends V>> futures) {
1027    return listFuture(ImmutableList.copyOf(futures), true, directExecutor());
1028  }
1029
1030  private static final class WrappedCombiner<T> implements Callable<T> {
1031    final Callable<T> delegate;
1032    CombinerFuture<T> outputFuture;
1033
1034    WrappedCombiner(Callable<T> delegate) {
1035      this.delegate = checkNotNull(delegate);
1036    }
1037
1038    @Override public T call() throws Exception {
1039      try {
1040        return delegate.call();
1041      } catch (ExecutionException e) {
1042        outputFuture.setException(e.getCause());
1043      } catch (CancellationException e) {
1044        outputFuture.cancel(false);
1045      }
1046      // at this point the return value doesn't matter since we already called setException or
1047      // cancel so the future is done.
1048      return null;
1049    }
1050  }
1051
1052  private static final class CombinerFuture<V> extends ListenableFutureTask<V> {
1053    ImmutableList<ListenableFuture<?>> futures;
1054
1055    CombinerFuture(Callable<V> callable, ImmutableList<ListenableFuture<?>> futures) {
1056      super(callable);
1057      this.futures = futures;
1058    }
1059
1060    @Override public boolean cancel(boolean mayInterruptIfRunning) {
1061      ImmutableList<ListenableFuture<?>> futures = this.futures;
1062      if (super.cancel(mayInterruptIfRunning)) {
1063        for (ListenableFuture<?> future : futures) {
1064          future.cancel(mayInterruptIfRunning);
1065        }
1066        return true;
1067      }
1068      return false;
1069    }
1070
1071    @Override protected void done() {
1072      super.done();
1073      futures = null;
1074    }
1075
1076    @Override protected void setException(Throwable t) {
1077      super.setException(t);
1078    }
1079  }
1080
1081  /**
1082   * Creates a new {@code ListenableFuture} whose result is set from the
1083   * supplied future when it completes.  Cancelling the supplied future
1084   * will also cancel the returned future, but cancelling the returned
1085   * future will have no effect on the supplied future.
1086   *
1087   * @since 15.0
1088   */
1089  public static <V> ListenableFuture<V> nonCancellationPropagating(
1090      ListenableFuture<V> future) {
1091    return new NonCancellationPropagatingFuture<V>(future);
1092  }
1093
1094  /**
1095   * A wrapped future that does not propagate cancellation to its delegate.
1096   */
1097  private static class NonCancellationPropagatingFuture<V>
1098      extends AbstractFuture<V> {
1099    NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
1100      checkNotNull(delegate);
1101      addCallback(delegate, new FutureCallback<V>() {
1102        @Override
1103        public void onSuccess(V result) {
1104          set(result);
1105        }
1106
1107        @Override
1108        public void onFailure(Throwable t) {
1109          if (delegate.isCancelled()) {
1110            cancel(false);
1111          } else {
1112            setException(t);
1113          }
1114        }
1115      }, directExecutor());
1116    }
1117  }
1118
1119  /**
1120   * Creates a new {@code ListenableFuture} whose value is a list containing the
1121   * values of all its successful input futures. The list of results is in the
1122   * same order as the input list, and if any of the provided futures fails or
1123   * is canceled, its corresponding position will contain {@code null} (which is
1124   * indistinguishable from the future having a successful value of
1125   * {@code null}).
1126   *
1127   * <p>Canceling this future will attempt to cancel all the component futures.
1128   *
1129   * @param futures futures to combine
1130   * @return a future that provides a list of the results of the component
1131   *         futures
1132   * @since 10.0
1133   */
1134  @Beta
1135  public static <V> ListenableFuture<List<V>> successfulAsList(
1136      ListenableFuture<? extends V>... futures) {
1137    return listFuture(ImmutableList.copyOf(futures), false, directExecutor());
1138  }
1139
1140  /**
1141   * Creates a new {@code ListenableFuture} whose value is a list containing the
1142   * values of all its successful input futures. The list of results is in the
1143   * same order as the input list, and if any of the provided futures fails or
1144   * is canceled, its corresponding position will contain {@code null} (which is
1145   * indistinguishable from the future having a successful value of
1146   * {@code null}).
1147   *
1148   * <p>Canceling this future will attempt to cancel all the component futures.
1149   *
1150   * @param futures futures to combine
1151   * @return a future that provides a list of the results of the component
1152   *         futures
1153   * @since 10.0
1154   */
1155  @Beta
1156  public static <V> ListenableFuture<List<V>> successfulAsList(
1157      Iterable<? extends ListenableFuture<? extends V>> futures) {
1158    return listFuture(ImmutableList.copyOf(futures), false, directExecutor());
1159  }
1160
1161  /**
1162   * Returns a list of delegate futures that correspond to the futures received in the order
1163   * that they complete. Delegate futures return the same value or throw the same exception
1164   * as the corresponding input future returns/throws.
1165   *
1166   * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
1167   * does not correspond to a specific input future until the appropriate number of input
1168   * futures have completed. At that point, it is too late to cancel the input future.
1169   * The input future's result, which cannot be stored into the cancelled delegate future,
1170   * is ignored.
1171   *
1172   * @since 17.0
1173   */
1174  @Beta
1175  public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
1176      Iterable<? extends ListenableFuture<? extends T>> futures) {
1177    // A CLQ may be overkill here.  We could save some pointers/memory by synchronizing on an
1178    // ArrayDeque
1179    final ConcurrentLinkedQueue<AsyncSettableFuture<T>> delegates =
1180        Queues.newConcurrentLinkedQueue();
1181    ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
1182    // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
1183    // atomically and therefore that each returned future is guaranteed to be in completion order.
1184    // N.B. there are some cases where the use of this executor could have possibly surprising
1185    // effects when input futures finish at approximately the same time _and_ the output futures
1186    // have directExecutor listeners. In this situation, the listeners may end up running on a
1187    // different thread than if they were attached to the corresponding input future.  We believe
1188    // this to be a negligible cost since:
1189    // 1. Using the directExecutor implies that your callback is safe to run on any thread.
1190    // 2. This would likely only be noticeable if you were doing something expensive or blocking on
1191    //    a directExecutor listener on one of the output futures which is an antipattern anyway.
1192    SerializingExecutor executor = new SerializingExecutor(directExecutor());
1193    for (final ListenableFuture<? extends T> future : futures) {
1194      AsyncSettableFuture<T> delegate = AsyncSettableFuture.create();
1195      // Must make sure to add the delegate to the queue first in case the future is already done
1196      delegates.add(delegate);
1197      future.addListener(new Runnable() {
1198        @Override public void run() {
1199          delegates.remove().setFuture(future);
1200        }
1201      }, executor);
1202      listBuilder.add(delegate);
1203    }
1204    return listBuilder.build();
1205  }
1206
1207  /**
1208   * Registers separate success and failure callbacks to be run when the {@code
1209   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1210   * complete} or, if the computation is already complete, immediately.
1211   *
1212   * <p>There is no guaranteed ordering of execution of callbacks, but any
1213   * callback added through this method is guaranteed to be called once the
1214   * computation is complete.
1215   *
1216   * Example: <pre> {@code
1217   * ListenableFuture<QueryResult> future = ...;
1218   * addCallback(future,
1219   *     new FutureCallback<QueryResult> {
1220   *       public void onSuccess(QueryResult result) {
1221   *         storeInCache(result);
1222   *       }
1223   *       public void onFailure(Throwable t) {
1224   *         reportError(t);
1225   *       }
1226   *     });}</pre>
1227   *
1228   * <p>Note: If the callback is slow or heavyweight, consider {@linkplain
1229   * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1230   * executor}. If you do not supply an executor, {@code addCallback} will use
1231   * a {@linkplain MoreExecutors#directExecutor direct executor}, which carries
1232   * some caveats for heavier operations. For example, the callback may run on
1233   * an unpredictable or undesirable thread:
1234   *
1235   * <ul>
1236   * <li>If the input {@code Future} is done at the time {@code addCallback} is
1237   * called, {@code addCallback} will execute the callback inline.
1238   * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1239   * schedule the callback to be run by the thread that completes the input
1240   * {@code Future}, which may be an internal system thread such as an RPC
1241   * network thread.
1242   * </ul>
1243   *
1244   * <p>Also note that, regardless of which thread executes the callback, all
1245   * other registered but unexecuted listeners are prevented from running
1246   * during its execution, even if those listeners are to run in other
1247   * executors.
1248   *
1249   * <p>For a more general interface to attach a completion listener to a
1250   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1251   *
1252   * @param future The future attach the callback to.
1253   * @param callback The callback to invoke when {@code future} is completed.
1254   * @since 10.0
1255   */
1256  public static <V> void addCallback(ListenableFuture<V> future,
1257      FutureCallback<? super V> callback) {
1258    addCallback(future, callback, directExecutor());
1259  }
1260
1261  /**
1262   * Registers separate success and failure callbacks to be run when the {@code
1263   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1264   * complete} or, if the computation is already complete, immediately.
1265   *
1266   * <p>The callback is run in {@code executor}.
1267   * There is no guaranteed ordering of execution of callbacks, but any
1268   * callback added through this method is guaranteed to be called once the
1269   * computation is complete.
1270   *
1271   * Example: <pre> {@code
1272   * ListenableFuture<QueryResult> future = ...;
1273   * Executor e = ...
1274   * addCallback(future,
1275   *     new FutureCallback<QueryResult> {
1276   *       public void onSuccess(QueryResult result) {
1277   *         storeInCache(result);
1278   *       }
1279   *       public void onFailure(Throwable t) {
1280   *         reportError(t);
1281   *       }
1282   *     }, e);}</pre>
1283   *
1284   * <p>When the callback is fast and lightweight, consider {@linkplain
1285   * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1286   * explicitly specifying {@code directExecutor}. However, be aware of the
1287   * caveats documented in the link above.
1288   *
1289   * <p>For a more general interface to attach a completion listener to a
1290   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1291   *
1292   * @param future The future attach the callback to.
1293   * @param callback The callback to invoke when {@code future} is completed.
1294   * @param executor The executor to run {@code callback} when the future
1295   *    completes.
1296   * @since 10.0
1297   */
1298  public static <V> void addCallback(final ListenableFuture<V> future,
1299      final FutureCallback<? super V> callback, Executor executor) {
1300    Preconditions.checkNotNull(callback);
1301    Runnable callbackListener = new Runnable() {
1302      @Override
1303      public void run() {
1304        final V value;
1305        try {
1306          // TODO(user): (Before Guava release), validate that this
1307          // is the thing for IE.
1308          value = getUninterruptibly(future);
1309        } catch (ExecutionException e) {
1310          callback.onFailure(e.getCause());
1311          return;
1312        } catch (RuntimeException e) {
1313          callback.onFailure(e);
1314          return;
1315        } catch (Error e) {
1316          callback.onFailure(e);
1317          return;
1318        }
1319        callback.onSuccess(value);
1320      }
1321    };
1322    future.addListener(callbackListener, executor);
1323  }
1324
1325  /**
1326   * Returns the result of {@link Future#get()}, converting most exceptions to a
1327   * new instance of the given checked exception type. This reduces boilerplate
1328   * for a common use of {@code Future} in which it is unnecessary to
1329   * programmatically distinguish between exception types or to extract other
1330   * information from the exception instance.
1331   *
1332   * <p>Exceptions from {@code Future.get} are treated as follows:
1333   * <ul>
1334   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1335   *     {@code X} if the cause is a checked exception, an {@link
1336   *     UncheckedExecutionException} if the cause is a {@code
1337   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1338   *     {@code Error}.
1339   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1340   *     restoring the interrupt).
1341   * <li>Any {@link CancellationException} is propagated untouched, as is any
1342   *     other {@link RuntimeException} (though {@code get} implementations are
1343   *     discouraged from throwing such exceptions).
1344   * </ul>
1345   *
1346   * <p>The overall principle is to continue to treat every checked exception as a
1347   * checked exception, every unchecked exception as an unchecked exception, and
1348   * every error as an error. In addition, the cause of any {@code
1349   * ExecutionException} is wrapped in order to ensure that the new stack trace
1350   * matches that of the current thread.
1351   *
1352   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1353   * public constructor that accepts zero or more arguments, all of type {@code
1354   * String} or {@code Throwable} (preferring constructors with at least one
1355   * {@code String}) and calling the constructor via reflection. If the
1356   * exception did not already have a cause, one is set by calling {@link
1357   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1358   * {@code IllegalArgumentException} is thrown.
1359   *
1360   * @throws X if {@code get} throws any checked exception except for an {@code
1361   *         ExecutionException} whose cause is not itself a checked exception
1362   * @throws UncheckedExecutionException if {@code get} throws an {@code
1363   *         ExecutionException} with a {@code RuntimeException} as its cause
1364   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1365   *         with an {@code Error} as its cause
1366   * @throws CancellationException if {@code get} throws a {@code
1367   *         CancellationException}
1368   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1369   *         RuntimeException} or does not have a suitable constructor
1370   * @since 10.0
1371   */
1372  public static <V, X extends Exception> V get(
1373      Future<V> future, Class<X> exceptionClass) throws X {
1374    checkNotNull(future);
1375    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1376        "Futures.get exception type (%s) must not be a RuntimeException",
1377        exceptionClass);
1378    try {
1379      return future.get();
1380    } catch (InterruptedException e) {
1381      currentThread().interrupt();
1382      throw newWithCause(exceptionClass, e);
1383    } catch (ExecutionException e) {
1384      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1385      throw new AssertionError();
1386    }
1387  }
1388
1389  /**
1390   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1391   * exceptions to a new instance of the given checked exception type. This
1392   * reduces boilerplate for a common use of {@code Future} in which it is
1393   * unnecessary to programmatically distinguish between exception types or to
1394   * extract other information from the exception instance.
1395   *
1396   * <p>Exceptions from {@code Future.get} are treated as follows:
1397   * <ul>
1398   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1399   *     {@code X} if the cause is a checked exception, an {@link
1400   *     UncheckedExecutionException} if the cause is a {@code
1401   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1402   *     {@code Error}.
1403   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1404   *     restoring the interrupt).
1405   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1406   * <li>Any {@link CancellationException} is propagated untouched, as is any
1407   *     other {@link RuntimeException} (though {@code get} implementations are
1408   *     discouraged from throwing such exceptions).
1409   * </ul>
1410   *
1411   * <p>The overall principle is to continue to treat every checked exception as a
1412   * checked exception, every unchecked exception as an unchecked exception, and
1413   * every error as an error. In addition, the cause of any {@code
1414   * ExecutionException} is wrapped in order to ensure that the new stack trace
1415   * matches that of the current thread.
1416   *
1417   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1418   * public constructor that accepts zero or more arguments, all of type {@code
1419   * String} or {@code Throwable} (preferring constructors with at least one
1420   * {@code String}) and calling the constructor via reflection. If the
1421   * exception did not already have a cause, one is set by calling {@link
1422   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1423   * {@code IllegalArgumentException} is thrown.
1424   *
1425   * @throws X if {@code get} throws any checked exception except for an {@code
1426   *         ExecutionException} whose cause is not itself a checked exception
1427   * @throws UncheckedExecutionException if {@code get} throws an {@code
1428   *         ExecutionException} with a {@code RuntimeException} as its cause
1429   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1430   *         with an {@code Error} as its cause
1431   * @throws CancellationException if {@code get} throws a {@code
1432   *         CancellationException}
1433   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1434   *         RuntimeException} or does not have a suitable constructor
1435   * @since 10.0
1436   */
1437  public static <V, X extends Exception> V get(
1438      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1439      throws X {
1440    checkNotNull(future);
1441    checkNotNull(unit);
1442    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1443        "Futures.get exception type (%s) must not be a RuntimeException",
1444        exceptionClass);
1445    try {
1446      return future.get(timeout, unit);
1447    } catch (InterruptedException e) {
1448      currentThread().interrupt();
1449      throw newWithCause(exceptionClass, e);
1450    } catch (TimeoutException e) {
1451      throw newWithCause(exceptionClass, e);
1452    } catch (ExecutionException e) {
1453      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1454      throw new AssertionError();
1455    }
1456  }
1457
1458  private static <X extends Exception> void wrapAndThrowExceptionOrError(
1459      Throwable cause, Class<X> exceptionClass) throws X {
1460    if (cause instanceof Error) {
1461      throw new ExecutionError((Error) cause);
1462    }
1463    if (cause instanceof RuntimeException) {
1464      throw new UncheckedExecutionException(cause);
1465    }
1466    throw newWithCause(exceptionClass, cause);
1467  }
1468
1469  /**
1470   * Returns the result of calling {@link Future#get()} uninterruptibly on a
1471   * task known not to throw a checked exception. This makes {@code Future} more
1472   * suitable for lightweight, fast-running tasks that, barring bugs in the
1473   * code, will not fail. This gives it exception-handling behavior similar to
1474   * that of {@code ForkJoinTask.join}.
1475   *
1476   * <p>Exceptions from {@code Future.get} are treated as follows:
1477   * <ul>
1478   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1479   *     {@link UncheckedExecutionException} (if the cause is an {@code
1480   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1481   *     Error}).
1482   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1483   *     call. The interrupt is restored before {@code getUnchecked} returns.
1484   * <li>Any {@link CancellationException} is propagated untouched. So is any
1485   *     other {@link RuntimeException} ({@code get} implementations are
1486   *     discouraged from throwing such exceptions).
1487   * </ul>
1488   *
1489   * <p>The overall principle is to eliminate all checked exceptions: to loop to
1490   * avoid {@code InterruptedException}, to pass through {@code
1491   * CancellationException}, and to wrap any exception from the underlying
1492   * computation in an {@code UncheckedExecutionException} or {@code
1493   * ExecutionError}.
1494   *
1495   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1496   * {@link Uninterruptibles#getUninterruptibly(Future)}.
1497   *
1498   * @throws UncheckedExecutionException if {@code get} throws an {@code
1499   *         ExecutionException} with an {@code Exception} as its cause
1500   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1501   *         with an {@code Error} as its cause
1502   * @throws CancellationException if {@code get} throws a {@code
1503   *         CancellationException}
1504   * @since 10.0
1505   */
1506  public static <V> V getUnchecked(Future<V> future) {
1507    checkNotNull(future);
1508    try {
1509      return getUninterruptibly(future);
1510    } catch (ExecutionException e) {
1511      wrapAndThrowUnchecked(e.getCause());
1512      throw new AssertionError();
1513    }
1514  }
1515
1516  private static void wrapAndThrowUnchecked(Throwable cause) {
1517    if (cause instanceof Error) {
1518      throw new ExecutionError((Error) cause);
1519    }
1520    /*
1521     * It's a non-Error, non-Exception Throwable. From my survey of such
1522     * classes, I believe that most users intended to extend Exception, so we'll
1523     * treat it like an Exception.
1524     */
1525    throw new UncheckedExecutionException(cause);
1526  }
1527
1528  /*
1529   * TODO(user): FutureChecker interface for these to be static methods on? If
1530   * so, refer to it in the (static-method) Futures.get documentation
1531   */
1532
1533  /*
1534   * Arguably we don't need a timed getUnchecked because any operation slow
1535   * enough to require a timeout is heavyweight enough to throw a checked
1536   * exception and therefore be inappropriate to use with getUnchecked. Further,
1537   * it's not clear that converting the checked TimeoutException to a
1538   * RuntimeException -- especially to an UncheckedExecutionException, since it
1539   * wasn't thrown by the computation -- makes sense, and if we don't convert
1540   * it, the user still has to write a try-catch block.
1541   *
1542   * If you think you would use this method, let us know.
1543   */
1544
1545  private static <X extends Exception> X newWithCause(
1546      Class<X> exceptionClass, Throwable cause) {
1547    // getConstructors() guarantees this as long as we don't modify the array.
1548    @SuppressWarnings("unchecked")
1549    List<Constructor<X>> constructors =
1550        (List) Arrays.asList(exceptionClass.getConstructors());
1551    for (Constructor<X> constructor : preferringStrings(constructors)) {
1552      @Nullable X instance = newFromConstructor(constructor, cause);
1553      if (instance != null) {
1554        if (instance.getCause() == null) {
1555          instance.initCause(cause);
1556        }
1557        return instance;
1558      }
1559    }
1560    throw new IllegalArgumentException(
1561        "No appropriate constructor for exception of type " + exceptionClass
1562            + " in response to chained exception", cause);
1563  }
1564
1565  private static <X extends Exception> List<Constructor<X>>
1566      preferringStrings(List<Constructor<X>> constructors) {
1567    return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1568  }
1569
1570  private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1571      Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1572        @Override public Boolean apply(Constructor<?> input) {
1573          return asList(input.getParameterTypes()).contains(String.class);
1574        }
1575      }).reverse();
1576
1577  @Nullable private static <X> X newFromConstructor(
1578      Constructor<X> constructor, Throwable cause) {
1579    Class<?>[] paramTypes = constructor.getParameterTypes();
1580    Object[] params = new Object[paramTypes.length];
1581    for (int i = 0; i < paramTypes.length; i++) {
1582      Class<?> paramType = paramTypes[i];
1583      if (paramType.equals(String.class)) {
1584        params[i] = cause.toString();
1585      } else if (paramType.equals(Throwable.class)) {
1586        params[i] = cause;
1587      } else {
1588        return null;
1589      }
1590    }
1591    try {
1592      return constructor.newInstance(params);
1593    } catch (IllegalArgumentException e) {
1594      return null;
1595    } catch (InstantiationException e) {
1596      return null;
1597    } catch (IllegalAccessException e) {
1598      return null;
1599    } catch (InvocationTargetException e) {
1600      return null;
1601    }
1602  }
1603
1604  private interface FutureCombiner<V, C> {
1605    C combine(List<Optional<V>> values);
1606  }
1607
1608  private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1609    private static final Logger logger =
1610        Logger.getLogger(CombinedFuture.class.getName());
1611
1612    ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1613    final boolean allMustSucceed;
1614    final AtomicInteger remaining;
1615    FutureCombiner<V, C> combiner;
1616    List<Optional<V>> values;
1617    final Object seenExceptionsLock = new Object();
1618    Set<Throwable> seenExceptions;
1619
1620    CombinedFuture(
1621        ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1622        boolean allMustSucceed, Executor listenerExecutor,
1623        FutureCombiner<V, C> combiner) {
1624      this.futures = futures;
1625      this.allMustSucceed = allMustSucceed;
1626      this.remaining = new AtomicInteger(futures.size());
1627      this.combiner = combiner;
1628      this.values = Lists.newArrayListWithCapacity(futures.size());
1629      init(listenerExecutor);
1630    }
1631
1632    /**
1633     * Must be called at the end of the constructor.
1634     */
1635    protected void init(final Executor listenerExecutor) {
1636      // First, schedule cleanup to execute when the Future is done.
1637      addListener(new Runnable() {
1638        @Override
1639        public void run() {
1640          // Cancel all the component futures.
1641          if (CombinedFuture.this.isCancelled()) {
1642            for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1643              future.cancel(CombinedFuture.this.wasInterrupted());
1644            }
1645          }
1646
1647          // Let go of the memory held by other futures
1648          CombinedFuture.this.futures = null;
1649
1650          // By now the values array has either been set as the Future's value,
1651          // or (in case of failure) is no longer useful.
1652          CombinedFuture.this.values = null;
1653
1654          // The combiner may also hold state, so free that as well
1655          CombinedFuture.this.combiner = null;
1656        }
1657      }, directExecutor());
1658
1659      // Now begin the "real" initialization.
1660
1661      // Corner case: List is empty.
1662      if (futures.isEmpty()) {
1663        set(combiner.combine(ImmutableList.<Optional<V>>of()));
1664        return;
1665      }
1666
1667      // Populate the results list with null initially.
1668      for (int i = 0; i < futures.size(); ++i) {
1669        values.add(null);
1670      }
1671
1672      // Register a listener on each Future in the list to update
1673      // the state of this future.
1674      // Note that if all the futures on the list are done prior to completing
1675      // this loop, the last call to addListener() will callback to
1676      // setOneValue(), transitively call our cleanup listener, and set
1677      // this.futures to null.
1678      // This is not actually a problem, since the foreach only needs
1679      // this.futures to be non-null at the beginning of the loop.
1680      int i = 0;
1681      for (final ListenableFuture<? extends V> listenable : futures) {
1682        final int index = i++;
1683        listenable.addListener(new Runnable() {
1684          @Override
1685          public void run() {
1686            setOneValue(index, listenable);
1687          }
1688        }, listenerExecutor);
1689      }
1690    }
1691
1692    /**
1693     * Fails this future with the given Throwable if {@link #allMustSucceed} is
1694     * true. Also, logs the throwable if it is an {@link Error} or if
1695     * {@link #allMustSucceed} is {@code true}, the throwable did not cause
1696     * this future to fail, and it is the first time we've seen that particular Throwable.
1697     */
1698    private void setExceptionAndMaybeLog(Throwable throwable) {
1699      boolean visibleFromOutputFuture = false;
1700      boolean firstTimeSeeingThisException = true;
1701      if (allMustSucceed) {
1702        // As soon as the first one fails, throw the exception up.
1703        // The result of all other inputs is then ignored.
1704        visibleFromOutputFuture = super.setException(throwable);
1705
1706        synchronized (seenExceptionsLock) {
1707          if (seenExceptions == null) {
1708            seenExceptions = Sets.newHashSet();
1709          }
1710          firstTimeSeeingThisException = seenExceptions.add(throwable);
1711        }
1712      }
1713
1714      if (throwable instanceof Error
1715          || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) {
1716        logger.log(Level.SEVERE, "input future failed.", throwable);
1717      }
1718    }
1719
1720    /**
1721     * Sets the value at the given index to that of the given future.
1722     */
1723    private void setOneValue(int index, Future<? extends V> future) {
1724      List<Optional<V>> localValues = values;
1725      // TODO(user): This check appears to be redundant since values is
1726      // assigned null only after the future completes.  However, values
1727      // is not volatile so it may be possible for us to observe the changes
1728      // to these two values in a different order... which I think is why
1729      // we need to check both.  Clear up this craziness either by making
1730      // values volatile or proving that it doesn't need to be for some other
1731      // reason.
1732      if (isDone() || localValues == null) {
1733        // Some other future failed or has been cancelled, causing this one to
1734        // also be cancelled or have an exception set. This should only happen
1735        // if allMustSucceed is true or if the output itself has been
1736        // cancelled.
1737        checkState(allMustSucceed || isCancelled(),
1738            "Future was done before all dependencies completed");
1739      }
1740
1741      try {
1742        checkState(future.isDone(),
1743            "Tried to set value from future which is not done");
1744        V returnValue = getUninterruptibly(future);
1745        if (localValues != null) {
1746          localValues.set(index, Optional.fromNullable(returnValue));
1747        }
1748      } catch (CancellationException e) {
1749        if (allMustSucceed) {
1750          // Set ourselves as cancelled. Let the input futures keep running
1751          // as some of them may be used elsewhere.
1752          cancel(false);
1753        }
1754      } catch (ExecutionException e) {
1755        setExceptionAndMaybeLog(e.getCause());
1756      } catch (Throwable t) {
1757        setExceptionAndMaybeLog(t);
1758      } finally {
1759        int newRemaining = remaining.decrementAndGet();
1760        checkState(newRemaining >= 0, "Less than 0 remaining futures");
1761        if (newRemaining == 0) {
1762          FutureCombiner<V, C> localCombiner = combiner;
1763          if (localCombiner != null && localValues != null) {
1764            set(localCombiner.combine(localValues));
1765          } else {
1766            checkState(isDone());
1767          }
1768        }
1769      }
1770    }
1771  }
1772
1773  /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1774  private static <V> ListenableFuture<List<V>> listFuture(
1775      ImmutableList<ListenableFuture<? extends V>> futures,
1776      boolean allMustSucceed, Executor listenerExecutor) {
1777    return new CombinedFuture<V, List<V>>(
1778        futures, allMustSucceed, listenerExecutor,
1779        new FutureCombiner<V, List<V>>() {
1780          @Override
1781          public List<V> combine(List<Optional<V>> values) {
1782            List<V> result = Lists.newArrayList();
1783            for (Optional<V> element : values) {
1784              result.add(element != null ? element.orNull() : null);
1785            }
1786            return Collections.unmodifiableList(result);
1787          }
1788        });
1789  }
1790
1791  /**
1792   * A checked future that uses a function to map from exceptions to the
1793   * appropriate checked type.
1794   */
1795  private static class MappingCheckedFuture<V, X extends Exception> extends
1796      AbstractCheckedFuture<V, X> {
1797
1798    final Function<? super Exception, X> mapper;
1799
1800    MappingCheckedFuture(ListenableFuture<V> delegate,
1801        Function<? super Exception, X> mapper) {
1802      super(delegate);
1803
1804      this.mapper = checkNotNull(mapper);
1805    }
1806
1807    @Override
1808    protected X mapException(Exception e) {
1809      return mapper.apply(e);
1810    }
1811  }
1812}
1813