1/*
2 * Copyright (C) 2008 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.collect.Iterables.getOnlyElement;
20import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
21import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
22import static java.util.concurrent.TimeUnit.SECONDS;
23import static org.junit.contrib.truth.Truth.ASSERT;
24
25import com.google.common.base.Throwables;
26import com.google.common.collect.ImmutableList;
27
28import junit.framework.TestCase;
29
30import java.util.Collections;
31import java.util.List;
32import java.util.concurrent.Callable;
33import java.util.concurrent.CyclicBarrier;
34import java.util.concurrent.ExecutorService;
35import java.util.concurrent.Future;
36import java.util.concurrent.RejectedExecutionException;
37import java.util.concurrent.TimeUnit;
38import java.util.concurrent.atomic.AtomicReference;
39
40/**
41 * Tests for MoreExecutors.
42 *
43 * @author Kyle Littlefield (klittle)
44 */
45public class MoreExecutorsTest extends TestCase {
46
47  public void testSameThreadExecutorServiceInThreadExecution()
48      throws Exception {
49    final ListeningExecutorService executor =
50        MoreExecutors.sameThreadExecutor();
51    final ThreadLocal<Integer> threadLocalCount = new ThreadLocal<Integer>() {
52      @Override
53      protected Integer initialValue() {
54        return 0;
55      }
56    };
57    final AtomicReference<Throwable> throwableFromOtherThread =
58        new AtomicReference<Throwable>(null);
59    final Runnable incrementTask =
60        new Runnable() {
61          @Override
62          public void run() {
63            threadLocalCount.set(threadLocalCount.get() + 1);
64          }
65        };
66
67    Thread otherThread = new Thread(
68        new Runnable() {
69          @Override
70          public void run() {
71            try {
72              Future<?> future = executor.submit(incrementTask);
73              assertTrue(future.isDone());
74              assertEquals(1, threadLocalCount.get().intValue());
75            } catch (Throwable Throwable) {
76              throwableFromOtherThread.set(Throwable);
77            }
78          }
79        });
80
81    otherThread.start();
82
83    ListenableFuture<?> future = executor.submit(incrementTask);
84    assertTrue(future.isDone());
85    assertListenerRunImmediately(future);
86    assertEquals(1, threadLocalCount.get().intValue());
87    otherThread.join(1000);
88    assertEquals(Thread.State.TERMINATED, otherThread.getState());
89    Throwable throwable = throwableFromOtherThread.get();
90    assertNull("Throwable from other thread: "
91        + (throwable == null ? null : Throwables.getStackTraceAsString(throwable)),
92        throwableFromOtherThread.get());
93  }
94
95  public void testSameThreadExecutorInvokeAll() throws Exception {
96    final ExecutorService executor = MoreExecutors.sameThreadExecutor();
97    final ThreadLocal<Integer> threadLocalCount = new ThreadLocal<Integer>() {
98      @Override
99      protected Integer initialValue() {
100        return 0;
101      }
102    };
103
104    final Callable<Integer> incrementTask = new Callable<Integer>() {
105      @Override
106      public Integer call() {
107        int i = threadLocalCount.get();
108        threadLocalCount.set(i + 1);
109        return i;
110      }
111    };
112
113    List<Future<Integer>> futures =
114        executor.invokeAll(Collections.nCopies(10, incrementTask));
115
116    for (int i = 0; i < 10; i++) {
117      Future<Integer> future = futures.get(i);
118      assertTrue("Task should have been run before being returned", future.isDone());
119      assertEquals(i, future.get().intValue());
120    }
121
122    assertEquals(10, threadLocalCount.get().intValue());
123  }
124
125  public void testSameThreadExecutorServiceTermination()
126      throws Exception {
127    final ExecutorService executor = MoreExecutors.sameThreadExecutor();
128    final CyclicBarrier barrier = new CyclicBarrier(2);
129    final AtomicReference<Throwable> throwableFromOtherThread =
130        new AtomicReference<Throwable>(null);
131    final Runnable doNothingRunnable = new Runnable() {
132        @Override public void run() {
133        }};
134
135    Thread otherThread = new Thread(new Runnable() {
136      @Override
137      public void run() {
138        try {
139          Future<?> future = executor.submit(new Callable<Void>() {
140            @Override
141            public Void call() throws Exception {
142              // WAIT #1
143              barrier.await(1, TimeUnit.SECONDS);
144
145              // WAIT #2
146              barrier.await(1, TimeUnit.SECONDS);
147              assertTrue(executor.isShutdown());
148              assertFalse(executor.isTerminated());
149
150              // WAIT #3
151              barrier.await(1, TimeUnit.SECONDS);
152              return null;
153            }
154          });
155          assertTrue(future.isDone());
156          assertTrue(executor.isShutdown());
157          assertTrue(executor.isTerminated());
158        } catch (Throwable Throwable) {
159          throwableFromOtherThread.set(Throwable);
160        }
161      }});
162
163    otherThread.start();
164
165    // WAIT #1
166    barrier.await(1, TimeUnit.SECONDS);
167    assertFalse(executor.isShutdown());
168    assertFalse(executor.isTerminated());
169
170    executor.shutdown();
171    assertTrue(executor.isShutdown());
172    try {
173      executor.submit(doNothingRunnable);
174      fail("Should have encountered RejectedExecutionException");
175    } catch (RejectedExecutionException ex) {
176      // good to go
177    }
178    assertFalse(executor.isTerminated());
179
180    // WAIT #2
181    barrier.await(1, TimeUnit.SECONDS);
182    assertFalse(executor.awaitTermination(20, TimeUnit.MILLISECONDS));
183
184    // WAIT #3
185    barrier.await(1, TimeUnit.SECONDS);
186    assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
187    assertTrue(executor.awaitTermination(0, TimeUnit.SECONDS));
188    assertTrue(executor.isShutdown());
189    try {
190      executor.submit(doNothingRunnable);
191      fail("Should have encountered RejectedExecutionException");
192    } catch (RejectedExecutionException ex) {
193      // good to go
194    }
195    assertTrue(executor.isTerminated());
196
197    otherThread.join(1000);
198    assertEquals(Thread.State.TERMINATED, otherThread.getState());
199    Throwable throwable = throwableFromOtherThread.get();
200    assertNull("Throwable from other thread: "
201        + (throwable == null ? null : Throwables.getStackTraceAsString(throwable)),
202        throwableFromOtherThread.get());
203  }
204
205  public void testListeningDecorator() throws Exception {
206    ListeningExecutorService service =
207        listeningDecorator(MoreExecutors.sameThreadExecutor());
208    assertSame(service, listeningDecorator(service));
209    List<Callable<String>> callables =
210        ImmutableList.of(Callables.returning("x"));
211    List<Future<String>> results;
212
213    results = service.invokeAll(callables);
214    ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class);
215
216    results = service.invokeAll(callables, 1, SECONDS);
217    ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class);
218
219    /*
220     * TODO(cpovirk): move ForwardingTestCase somewhere common, and use it to
221     * test the forwarded methods
222     */
223  }
224
225  private static void assertListenerRunImmediately(ListenableFuture<?> future) {
226    CountingRunnable listener = new CountingRunnable();
227    future.addListener(listener, sameThreadExecutor());
228    assertEquals(1, listener.count);
229  }
230
231  private static final class CountingRunnable implements Runnable {
232    int count;
233
234    @Override
235    public void run() {
236      count++;
237    }
238  }
239}
240