1/*
2 * Copyright (C) 2008 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Queues;
22
23import junit.framework.TestCase;
24
25import java.util.List;
26import java.util.Queue;
27import java.util.concurrent.CyclicBarrier;
28import java.util.concurrent.Executor;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.RejectedExecutionException;
32import java.util.concurrent.TimeUnit;
33import java.util.concurrent.atomic.AtomicBoolean;
34import java.util.concurrent.atomic.AtomicInteger;
35
36/**
37 * Tests {@link SerializingExecutor}.
38 *
39 * @author JJ Furman
40 */
41public class SerializingExecutorTest extends TestCase {
42  private static class FakeExecutor implements Executor {
43    Queue<Runnable> tasks = Queues.newArrayDeque();
44    @Override public void execute(Runnable command) {
45      tasks.add(command);
46    }
47
48    boolean hasNext() {
49      return !tasks.isEmpty();
50    }
51
52    void runNext() {
53      assertTrue("expected at least one task to run", hasNext());
54      tasks.remove().run();
55    }
56
57  }
58  private FakeExecutor fakePool;
59  private SerializingExecutor e;
60
61  @Override
62  public void setUp() {
63    fakePool = new FakeExecutor();
64    e = new SerializingExecutor(fakePool);
65  }
66
67  public void testSerializingNullExecutor_fails() {
68    try {
69      new SerializingExecutor(null);
70      fail("Should have failed with NullPointerException.");
71    } catch (NullPointerException expected) {
72    }
73  }
74
75  public void testBasics() {
76    final AtomicInteger totalCalls = new AtomicInteger();
77    Runnable intCounter = new Runnable() {
78      @Override
79      public void run() {
80        totalCalls.incrementAndGet();
81      }
82    };
83
84    assertFalse(fakePool.hasNext());
85    e.execute(intCounter);
86    assertTrue(fakePool.hasNext());
87    e.execute(intCounter);
88    assertEquals(0, totalCalls.get());
89    fakePool.runNext(); // run just 1 sub task...
90    assertEquals(2, totalCalls.get());
91    assertFalse(fakePool.hasNext());
92
93    // Check that execute can be safely repeated
94    e.execute(intCounter);
95    e.execute(intCounter);
96    e.execute(intCounter);
97    assertEquals(2, totalCalls.get());
98    fakePool.runNext();
99    assertEquals(5, totalCalls.get());
100    assertFalse(fakePool.hasNext());
101  }
102
103  public void testOrdering() {
104    final List<Integer> callOrder = Lists.newArrayList();
105
106    class FakeOp implements Runnable {
107      final int op;
108
109      FakeOp(int op) {
110        this.op = op;
111      }
112
113      @Override
114      public void run() {
115        callOrder.add(op);
116      }
117    }
118
119    e.execute(new FakeOp(0));
120    e.execute(new FakeOp(1));
121    e.execute(new FakeOp(2));
122    fakePool.runNext();
123
124    assertEquals(ImmutableList.of(0, 1, 2), callOrder);
125  }
126
127  public void testExceptions() {
128
129    final AtomicInteger numCalls = new AtomicInteger();
130
131    Runnable runMe = new Runnable() {
132      @Override
133      public void run() {
134        numCalls.incrementAndGet();
135        throw new RuntimeException("FAKE EXCEPTION!");
136      }
137    };
138
139    e.execute(runMe);
140    e.execute(runMe);
141    fakePool.runNext();
142
143    assertEquals(2, numCalls.get());
144  }
145
146  public void testDelegateRejection() {
147    final AtomicInteger numCalls = new AtomicInteger();
148    final AtomicBoolean reject = new AtomicBoolean(true);
149    final SerializingExecutor executor = new SerializingExecutor(
150        new Executor() {
151          @Override public void execute(Runnable r) {
152            if (reject.get()) {
153              throw new RejectedExecutionException();
154            }
155            r.run();
156          }
157        });
158    Runnable task = new Runnable() {
159      @Override
160      public void run() {
161        numCalls.incrementAndGet();
162      }
163    };
164    try {
165      executor.execute(task);
166      fail();
167    } catch (RejectedExecutionException expected) {}
168    assertEquals(0, numCalls.get());
169    reject.set(false);
170    executor.execute(task);
171    assertEquals(2, numCalls.get());
172  }
173
174  public void testTaskThrowsError() throws Exception {
175    class MyError extends Error {}
176    final CyclicBarrier barrier = new CyclicBarrier(2);
177    // we need to make sure the error gets thrown on a different thread.
178    ExecutorService service = Executors.newSingleThreadExecutor();
179    try {
180      final SerializingExecutor executor = new SerializingExecutor(service);
181      Runnable errorTask = new Runnable() {
182        @Override
183        public void run() {
184          throw new MyError();
185        }
186      };
187      Runnable barrierTask = new Runnable() {
188        @Override
189        public void run() {
190          try {
191            barrier.await();
192          } catch (Exception e) {
193            throw new RuntimeException(e);
194          }
195        }
196      };
197      executor.execute(errorTask);
198      service.execute(barrierTask);  // submit directly to the service
199      // the barrier task runs after the error task so we know that the error has been observed by
200      // SerializingExecutor by the time the barrier is satified
201      barrier.await(10, TimeUnit.SECONDS);
202      executor.execute(barrierTask);
203      // timeout means the second task wasn't even tried
204      barrier.await(10, TimeUnit.SECONDS);
205    } finally {
206      service.shutdown();
207    }
208  }
209}
210