1/*
2 * Copyright (C) 2009 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20
21import com.google.common.annotations.Beta;
22import com.google.common.annotations.VisibleForTesting;
23
24import java.util.concurrent.Executor;
25import java.util.concurrent.Executors;
26import java.util.concurrent.Future;
27import java.util.concurrent.ThreadFactory;
28import java.util.concurrent.atomic.AtomicBoolean;
29
30/**
31 * Utilities necessary for working with libraries that supply plain {@link
32 * Future} instances. Note that, whenver possible, it is strongly preferred to
33 * modify those libraries to return {@code ListenableFuture} directly.
34 *
35 * @author Sven Mawson
36 * @since 10.0 (replacing {@code Futures.makeListenable}, which
37 *     existed in 1.0)
38 */
39@Beta
40public final class JdkFutureAdapters {
41  /**
42   * Assigns a thread to the given {@link Future} to provide {@link
43   * ListenableFuture} functionality.
44   *
45   * <p><b>Warning:</b> If the input future does not already implement {@link
46   * ListenableFuture}, the returned future will emulate {@link
47   * ListenableFuture#addListener} by taking a thread from an internal,
48   * unbounded pool at the first call to {@code addListener} and holding it
49   * until the future is {@linkplain Future#isDone() done}.
50   *
51   * <p>Prefer to create {@code ListenableFuture} instances with {@link
52   * SettableFuture}, {@link MoreExecutors#listeningDecorator(
53   * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
54   * {@link AbstractFuture}, and other utilities over creating plain {@code
55   * Future} instances to be upgraded to {@code ListenableFuture} after the
56   * fact.
57   */
58  public static <V> ListenableFuture<V> listenInPoolThread(
59      Future<V> future) {
60    if (future instanceof ListenableFuture<?>) {
61      return (ListenableFuture<V>) future;
62    }
63    return new ListenableFutureAdapter<V>(future);
64  }
65
66  @VisibleForTesting
67  static <V> ListenableFuture<V> listenInPoolThread(
68      Future<V> future, Executor executor) {
69    checkNotNull(executor);
70    if (future instanceof ListenableFuture<?>) {
71      return (ListenableFuture<V>) future;
72    }
73    return new ListenableFutureAdapter<V>(future, executor);
74  }
75
76  /**
77   * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
78   * will wait on the future to finish, and when it completes, run the
79   * listeners.  This implementation will wait on the source future
80   * indefinitely, so if the source future never completes, the adapter will
81   * never complete either.
82   *
83   * <p>If the delegate future is interrupted or throws an unexpected unchecked
84   * exception, the listeners will not be invoked.
85   */
86  private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
87      implements ListenableFuture<V> {
88
89    private static final ThreadFactory threadFactory =
90        new ThreadFactoryBuilder()
91            .setDaemon(true)
92            .setNameFormat("ListenableFutureAdapter-thread-%d")
93            .build();
94    private static final Executor defaultAdapterExecutor =
95        Executors.newCachedThreadPool(threadFactory);
96
97    private final Executor adapterExecutor;
98
99    // The execution list to hold our listeners.
100    private final ExecutionList executionList = new ExecutionList();
101
102    // This allows us to only start up a thread waiting on the delegate future
103    // when the first listener is added.
104    private final AtomicBoolean hasListeners = new AtomicBoolean(false);
105
106    // The delegate future.
107    private final Future<V> delegate;
108
109    ListenableFutureAdapter(Future<V> delegate) {
110      this(delegate, defaultAdapterExecutor);
111    }
112
113    ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
114      this.delegate = checkNotNull(delegate);
115      this.adapterExecutor = checkNotNull(adapterExecutor);
116    }
117
118    @Override
119    protected Future<V> delegate() {
120      return delegate;
121    }
122
123    @Override
124    public void addListener(Runnable listener, Executor exec) {
125      executionList.add(listener, exec);
126
127      // When a listener is first added, we run a task that will wait for
128      // the delegate to finish, and when it is done will run the listeners.
129      if (hasListeners.compareAndSet(false, true)) {
130        if (delegate.isDone()) {
131          // If the delegate is already done, run the execution list
132          // immediately on the current thread.
133          executionList.execute();
134          return;
135        }
136
137        adapterExecutor.execute(new Runnable() {
138          @Override
139          public void run() {
140            try {
141              delegate.get();
142            } catch (Error e) {
143              throw e;
144            } catch (InterruptedException e) {
145              Thread.currentThread().interrupt();
146              // Threads from our private pool are never interrupted.
147              throw new AssertionError(e);
148            } catch (Throwable e) {
149              // ExecutionException / CancellationException / RuntimeException
150              // The task is done, run the listeners.
151            }
152            executionList.execute();
153          }
154        });
155      }
156    }
157  }
158
159  private JdkFutureAdapters() {}
160}
161