1/* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.google.common.util.concurrent; 18 19import com.google.common.collect.ImmutableList; 20import com.google.common.collect.Lists; 21import com.google.common.collect.Queues; 22 23import junit.framework.TestCase; 24 25import java.util.List; 26import java.util.Queue; 27import java.util.concurrent.CyclicBarrier; 28import java.util.concurrent.Executor; 29import java.util.concurrent.ExecutorService; 30import java.util.concurrent.Executors; 31import java.util.concurrent.RejectedExecutionException; 32import java.util.concurrent.TimeUnit; 33import java.util.concurrent.atomic.AtomicBoolean; 34import java.util.concurrent.atomic.AtomicInteger; 35 36/** 37 * Tests {@link SerializingExecutor}. 38 * 39 * @author JJ Furman 40 */ 41public class SerializingExecutorTest extends TestCase { 42 private static class FakeExecutor implements Executor { 43 Queue<Runnable> tasks = Queues.newArrayDeque(); 44 @Override public void execute(Runnable command) { 45 tasks.add(command); 46 } 47 48 boolean hasNext() { 49 return !tasks.isEmpty(); 50 } 51 52 void runNext() { 53 assertTrue("expected at least one task to run", hasNext()); 54 tasks.remove().run(); 55 } 56 57 } 58 private FakeExecutor fakePool; 59 private SerializingExecutor e; 60 61 @Override 62 public void setUp() { 63 fakePool = new FakeExecutor(); 64 e = new SerializingExecutor(fakePool); 65 } 66 67 public void testSerializingNullExecutor_fails() { 68 try { 69 new SerializingExecutor(null); 70 fail("Should have failed with NullPointerException."); 71 } catch (NullPointerException expected) { 72 } 73 } 74 75 public void testBasics() { 76 final AtomicInteger totalCalls = new AtomicInteger(); 77 Runnable intCounter = new Runnable() { 78 @Override 79 public void run() { 80 totalCalls.incrementAndGet(); 81 } 82 }; 83 84 assertFalse(fakePool.hasNext()); 85 e.execute(intCounter); 86 assertTrue(fakePool.hasNext()); 87 e.execute(intCounter); 88 assertEquals(0, totalCalls.get()); 89 fakePool.runNext(); // run just 1 sub task... 90 assertEquals(2, totalCalls.get()); 91 assertFalse(fakePool.hasNext()); 92 93 // Check that execute can be safely repeated 94 e.execute(intCounter); 95 e.execute(intCounter); 96 e.execute(intCounter); 97 assertEquals(2, totalCalls.get()); 98 fakePool.runNext(); 99 assertEquals(5, totalCalls.get()); 100 assertFalse(fakePool.hasNext()); 101 } 102 103 public void testOrdering() { 104 final List<Integer> callOrder = Lists.newArrayList(); 105 106 class FakeOp implements Runnable { 107 final int op; 108 109 FakeOp(int op) { 110 this.op = op; 111 } 112 113 @Override 114 public void run() { 115 callOrder.add(op); 116 } 117 } 118 119 e.execute(new FakeOp(0)); 120 e.execute(new FakeOp(1)); 121 e.execute(new FakeOp(2)); 122 fakePool.runNext(); 123 124 assertEquals(ImmutableList.of(0, 1, 2), callOrder); 125 } 126 127 public void testExceptions() { 128 129 final AtomicInteger numCalls = new AtomicInteger(); 130 131 Runnable runMe = new Runnable() { 132 @Override 133 public void run() { 134 numCalls.incrementAndGet(); 135 throw new RuntimeException("FAKE EXCEPTION!"); 136 } 137 }; 138 139 e.execute(runMe); 140 e.execute(runMe); 141 fakePool.runNext(); 142 143 assertEquals(2, numCalls.get()); 144 } 145 146 public void testDelegateRejection() { 147 final AtomicInteger numCalls = new AtomicInteger(); 148 final AtomicBoolean reject = new AtomicBoolean(true); 149 final SerializingExecutor executor = new SerializingExecutor( 150 new Executor() { 151 @Override public void execute(Runnable r) { 152 if (reject.get()) { 153 throw new RejectedExecutionException(); 154 } 155 r.run(); 156 } 157 }); 158 Runnable task = new Runnable() { 159 @Override 160 public void run() { 161 numCalls.incrementAndGet(); 162 } 163 }; 164 try { 165 executor.execute(task); 166 fail(); 167 } catch (RejectedExecutionException expected) {} 168 assertEquals(0, numCalls.get()); 169 reject.set(false); 170 executor.execute(task); 171 assertEquals(2, numCalls.get()); 172 } 173 174 public void testTaskThrowsError() throws Exception { 175 class MyError extends Error {} 176 final CyclicBarrier barrier = new CyclicBarrier(2); 177 // we need to make sure the error gets thrown on a different thread. 178 ExecutorService service = Executors.newSingleThreadExecutor(); 179 try { 180 final SerializingExecutor executor = new SerializingExecutor(service); 181 Runnable errorTask = new Runnable() { 182 @Override 183 public void run() { 184 throw new MyError(); 185 } 186 }; 187 Runnable barrierTask = new Runnable() { 188 @Override 189 public void run() { 190 try { 191 barrier.await(); 192 } catch (Exception e) { 193 throw new RuntimeException(e); 194 } 195 } 196 }; 197 executor.execute(errorTask); 198 service.execute(barrierTask); // submit directly to the service 199 // the barrier task runs after the error task so we know that the error has been observed by 200 // SerializingExecutor by the time the barrier is satified 201 barrier.await(10, TimeUnit.SECONDS); 202 executor.execute(barrierTask); 203 // timeout means the second task wasn't even tried 204 barrier.await(10, TimeUnit.SECONDS); 205 } finally { 206 service.shutdown(); 207 } 208 } 209} 210