1/*
2 * Copyright (C) 2009 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.base.Preconditions.checkState;
20import static com.google.common.util.concurrent.Futures.immediateFuture;
21import static com.google.common.util.concurrent.JdkFutureAdapters.listenInPoolThread;
22import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23import static java.util.concurrent.Executors.newCachedThreadPool;
24import static java.util.concurrent.TimeUnit.SECONDS;
25
26import com.google.common.testing.ClassSanityTester;
27import com.google.common.util.concurrent.FuturesTest.ExecutorSpy;
28import com.google.common.util.concurrent.FuturesTest.SingleCallListener;
29
30import junit.framework.AssertionFailedError;
31import junit.framework.TestCase;
32
33import java.util.concurrent.CountDownLatch;
34import java.util.concurrent.ExecutorService;
35import java.util.concurrent.Future;
36import java.util.concurrent.SynchronousQueue;
37import java.util.concurrent.ThreadPoolExecutor;
38import java.util.concurrent.TimeUnit;
39
40/**
41 * Unit tests for {@link JdkFutureAdapters}.
42 *
43 * @author Sven Mawson
44 * @author Kurt Alfred Kluever
45 */
46public class JdkFutureAdaptersTest extends TestCase {
47  private static final String DATA1 = "data";
48
49  public void testListenInPoolThreadReturnsSameFuture() throws Exception {
50    ListenableFuture<String> listenableFuture = immediateFuture(DATA1);
51    assertSame(listenableFuture, listenInPoolThread(listenableFuture));
52  }
53
54  public void testListenInPoolThreadIgnoresExecutorWhenDelegateIsDone()
55      throws Exception {
56    NonListenableSettableFuture<String> abstractFuture =
57        NonListenableSettableFuture.create();
58    abstractFuture.set(DATA1);
59    ExecutorSpy spy = new ExecutorSpy(directExecutor());
60    ListenableFuture<String> listenableFuture =
61        listenInPoolThread(abstractFuture, spy);
62
63    SingleCallListener singleCallListener = new SingleCallListener();
64    singleCallListener.expectCall();
65
66    assertFalse(spy.wasExecuted);
67    assertFalse(singleCallListener.wasCalled());
68    assertTrue(listenableFuture.isDone()); // We call AbstractFuture#set above.
69
70    // #addListener() will run the listener immediately because the Future is
71    // already finished (we explicitly set the result of it above).
72    listenableFuture.addListener(singleCallListener, directExecutor());
73    assertEquals(DATA1, listenableFuture.get());
74
75    // 'spy' should have been ignored since 'abstractFuture' was done before
76    // a listener was added.
77    assertFalse(spy.wasExecuted);
78    assertTrue(singleCallListener.wasCalled());
79    assertTrue(listenableFuture.isDone());
80  }
81
82  public void testListenInPoolThreadUsesGivenExecutor() throws Exception {
83    ExecutorService executorService = newCachedThreadPool(
84        new ThreadFactoryBuilder().setDaemon(true).build());
85    NonListenableSettableFuture<String> abstractFuture =
86        NonListenableSettableFuture.create();
87    ExecutorSpy spy = new ExecutorSpy(executorService);
88    ListenableFuture<String> listenableFuture =
89        listenInPoolThread(abstractFuture, spy);
90
91    SingleCallListener singleCallListener = new SingleCallListener();
92    singleCallListener.expectCall();
93
94    assertFalse(spy.wasExecuted);
95    assertFalse(singleCallListener.wasCalled());
96    assertFalse(listenableFuture.isDone());
97
98    listenableFuture.addListener(singleCallListener, executorService);
99    abstractFuture.set(DATA1);
100    assertEquals(DATA1, listenableFuture.get());
101    singleCallListener.waitForCall();
102
103    assertTrue(spy.wasExecuted);
104    assertTrue(singleCallListener.wasCalled());
105    assertTrue(listenableFuture.isDone());
106  }
107
108  public void testListenInPoolThreadCustomExecutorInterrupted()
109      throws Exception {
110    final CountDownLatch submitSuccessful = new CountDownLatch(1);
111    ExecutorService executorService = new ThreadPoolExecutor(
112        0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
113        new SynchronousQueue<Runnable>(),
114        new ThreadFactoryBuilder().setDaemon(true).build()) {
115      @Override
116      protected void beforeExecute(Thread t, Runnable r) {
117        submitSuccessful.countDown();
118      }
119    };
120    NonListenableSettableFuture<String> abstractFuture =
121        NonListenableSettableFuture.create();
122    ListenableFuture<String> listenableFuture =
123        listenInPoolThread(abstractFuture, executorService);
124
125    SingleCallListener singleCallListener = new SingleCallListener();
126    singleCallListener.expectCall();
127
128    assertFalse(singleCallListener.wasCalled());
129    assertFalse(listenableFuture.isDone());
130
131    listenableFuture.addListener(singleCallListener, directExecutor());
132    /*
133     * Don't shut down until the listenInPoolThread task has been accepted to
134     * run. We want to see what happens when it's interrupted, not when it's
135     * rejected.
136     */
137    submitSuccessful.await();
138    executorService.shutdownNow();
139    abstractFuture.set(DATA1);
140    assertEquals(DATA1, listenableFuture.get());
141    singleCallListener.waitForCall();
142
143    assertTrue(singleCallListener.wasCalled());
144    assertTrue(listenableFuture.isDone());
145  }
146
147  /**
148   * A Future that doesn't implement ListenableFuture, useful for testing
149   * listenInPoolThread.
150   */
151  private static final class NonListenableSettableFuture<V>
152      extends ForwardingFuture<V> {
153    static <V> NonListenableSettableFuture<V> create() {
154      return new NonListenableSettableFuture<V>();
155    }
156
157    final SettableFuture<V> delegate = SettableFuture.create();
158
159    @Override protected Future<V> delegate() {
160      return delegate;
161    }
162
163    void set(V value) {
164      delegate.set(value);
165    }
166  }
167
168  private static final class RuntimeExceptionThrowingFuture<V>
169      implements Future<V> {
170    final CountDownLatch allowGetToComplete = new CountDownLatch(1);
171
172    @Override
173    public boolean cancel(boolean mayInterruptIfRunning) {
174      throw new AssertionFailedError();
175    }
176
177    @Override
178    public V get() throws InterruptedException {
179      /*
180       * Wait a little to give us time to call addListener before the future's
181       * value is set in addition to the call we'll make after then.
182       */
183      allowGetToComplete.await(1, SECONDS);
184      throw new RuntimeException("expected, should be caught");
185    }
186
187    @Override
188    public V get(long timeout, TimeUnit unit) {
189      throw new AssertionFailedError();
190    }
191
192    @Override
193    public boolean isCancelled() {
194      throw new AssertionFailedError();
195    }
196
197    @Override
198    public boolean isDone() {
199      /*
200       * If isDone is true during the call to listenInPoolThread,
201       * listenInPoolThread doesn't start a thread. Make sure it's false the
202       * first time through (and forever after, since no one else cares about
203       * it).
204       */
205      return false;
206    }
207  }
208
209  private static final class RecordingRunnable implements Runnable {
210    final CountDownLatch wasRun = new CountDownLatch(1);
211
212    // synchronized so that checkState works as expected.
213    @Override
214    public synchronized void run() {
215      checkState(wasRun.getCount() > 0);
216      wasRun.countDown();
217    }
218  }
219
220  public void testListenInPoolThreadRunsListenerAfterRuntimeException()
221      throws Exception {
222    RuntimeExceptionThrowingFuture<String> input =
223        new RuntimeExceptionThrowingFuture<String>();
224    /*
225     * The compiler recognizes that "input instanceof ListenableFuture" is
226     * impossible. We want the test, though, in case that changes in the future,
227     * so we use isInstance instead.
228     */
229    assertFalse("Can't test the main listenInPoolThread path "
230        + "if the input is already a ListenableFuture",
231        ListenableFuture.class.isInstance(input));
232    ListenableFuture<String> listenable = listenInPoolThread(input);
233    /*
234     * This will occur before the waiting get() in the
235     * listenInPoolThread-spawned thread completes:
236     */
237    RecordingRunnable earlyListener = new RecordingRunnable();
238    listenable.addListener(earlyListener, directExecutor());
239
240    input.allowGetToComplete.countDown();
241    // Now give the get() thread time to finish:
242    assertTrue(earlyListener.wasRun.await(1, SECONDS));
243
244    // Now test an additional addListener call, which will be run in-thread:
245    RecordingRunnable lateListener = new RecordingRunnable();
246    listenable.addListener(lateListener, directExecutor());
247    assertTrue(lateListener.wasRun.await(1, SECONDS));
248  }
249
250  public void testAdapters_nullChecks() throws Exception {
251    new ClassSanityTester()
252        .forAllPublicStaticMethods(JdkFutureAdapters.class)
253        .thatReturn(Future.class)
254        .testNulls();
255  }
256}
257