1/*
2 * Copyright (C) 2011 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
20import com.google.common.util.concurrent.Service.State;
21
22import junit.framework.TestCase;
23
24import java.util.concurrent.CyclicBarrier;
25import java.util.concurrent.ExecutionException;
26import java.util.concurrent.Executors;
27import java.util.concurrent.Future;
28import java.util.concurrent.ScheduledExecutorService;
29import java.util.concurrent.ScheduledFuture;
30import java.util.concurrent.ScheduledThreadPoolExecutor;
31
32import java.util.concurrent.TimeUnit;
33import java.util.concurrent.atomic.AtomicBoolean;
34import java.util.concurrent.atomic.AtomicInteger;
35
36/**
37 * Unit test for {@link AbstractScheduledService}.
38 *
39 * @author Luke Sandberg
40 */
41
42public class AbstractScheduledServiceTest extends TestCase {
43
44  volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
45  volatile ScheduledFuture<?> future = null;
46
47  volatile boolean atFixedRateCalled = false;
48  volatile boolean withFixedDelayCalled = false;
49  volatile boolean scheduleCalled = false;
50
51  final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
52    @Override
53    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
54        long delay, TimeUnit unit) {
55      return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
56    }
57  };
58
59  public void testServiceStartStop() throws Exception {
60    NullService service = new NullService();
61    service.startAndWait();
62    assertFalse(future.isDone());
63    service.stopAndWait();
64    assertTrue(future.isCancelled());
65  }
66
67  private class NullService extends AbstractScheduledService {
68    @Override protected void runOneIteration() throws Exception { }
69    @Override protected void startUp() throws Exception { }
70    @Override protected void shutDown() throws Exception { }
71    @Override protected Scheduler scheduler() { return configuration; }
72    @Override protected ScheduledExecutorService executor() { return executor; }
73  }
74
75  public void testFailOnExceptionFromRun() throws Exception {
76    TestService service = new TestService();
77    service.runException = new Exception();
78    service.startAndWait();
79    service.runFirstBarrier.await();
80    service.runSecondBarrier.await();
81    try {
82      future.get();
83      fail();
84    } catch (ExecutionException e) {
85      // An execution exception holds a runtime exception (from throwables.propogate) that holds our
86      // original exception.
87      assertEquals(service.runException, e.getCause().getCause());
88    }
89    assertEquals(service.state(), Service.State.FAILED);
90  }
91
92  public void testFailOnExceptionFromStartUp() {
93    TestService service = new TestService();
94    service.startUpException = new Exception();
95    try {
96      service.startAndWait();
97      fail();
98    } catch (UncheckedExecutionException e) {
99      assertEquals(service.startUpException, e.getCause());
100    }
101    assertEquals(0, service.numberOfTimesRunCalled.get());
102    assertEquals(Service.State.FAILED, service.state());
103  }
104
105  public void testFailOnExceptionFromShutDown() throws Exception {
106    TestService service = new TestService();
107    service.shutDownException = new Exception();
108    service.startAndWait();
109    service.runFirstBarrier.await();
110    ListenableFuture<Service.State> stopHandle = service.stop();
111    service.runSecondBarrier.await();
112    try {
113      stopHandle.get();
114      fail();
115    } catch (ExecutionException e) {
116      assertEquals(service.shutDownException, e.getCause());
117    }
118    assertEquals(Service.State.FAILED, service.state());
119  }
120
121  public void testRunOneIterationCalledMultipleTimes() throws Exception {
122    TestService service = new TestService();
123    service.startAndWait();
124    for (int i = 1; i < 10; i++) {
125      service.runFirstBarrier.await();
126      assertEquals(i, service.numberOfTimesRunCalled.get());
127      service.runSecondBarrier.await();
128    }
129    service.runFirstBarrier.await();
130    service.stop();
131    service.runSecondBarrier.await();
132    service.stopAndWait();
133  }
134
135  public void testExecutorOnlyCalledOnce() throws Exception {
136    TestService service = new TestService();
137    service.startAndWait();
138    // It should be called once during startup.
139    assertEquals(1, service.numberOfTimesExecutorCalled.get());
140    for (int i = 1; i < 10; i++) {
141      service.runFirstBarrier.await();
142      assertEquals(i, service.numberOfTimesRunCalled.get());
143      service.runSecondBarrier.await();
144    }
145    service.runFirstBarrier.await();
146    service.stop();
147    service.runSecondBarrier.await();
148    service.stopAndWait();
149    // Only called once overall.
150    assertEquals(1, service.numberOfTimesExecutorCalled.get());
151  }
152
153  public void testSchedulerOnlyCalledOnce() throws Exception {
154    TestService service = new TestService();
155    service.startAndWait();
156    // It should be called once during startup.
157    assertEquals(1, service.numberOfTimesSchedulerCalled.get());
158    for (int i = 1; i < 10; i++) {
159      service.runFirstBarrier.await();
160      assertEquals(i, service.numberOfTimesRunCalled.get());
161      service.runSecondBarrier.await();
162    }
163    service.runFirstBarrier.await();
164    service.stop();
165    service.runSecondBarrier.await();
166    service.stopAndWait();
167    // Only called once overall.
168    assertEquals(1, service.numberOfTimesSchedulerCalled.get());
169  }
170
171  private class TestService extends AbstractScheduledService {
172    CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
173    CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
174
175    volatile boolean startUpCalled = false;
176    volatile boolean shutDownCalled = false;
177    AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
178    AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
179    AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
180    volatile Exception runException = null;
181    volatile Exception startUpException = null;
182    volatile Exception shutDownException = null;
183
184    @Override
185    protected void runOneIteration() throws Exception {
186      assertTrue(startUpCalled);
187      assertFalse(shutDownCalled);
188      numberOfTimesRunCalled.incrementAndGet();
189      assertEquals(State.RUNNING, state());
190      runFirstBarrier.await();
191      runSecondBarrier.await();
192      if (runException != null) {
193        throw runException;
194      }
195    }
196
197    @Override
198    protected void startUp() throws Exception {
199      assertFalse(startUpCalled);
200      assertFalse(shutDownCalled);
201      startUpCalled = true;
202      assertEquals(State.STARTING, state());
203      if (startUpException != null) {
204        throw startUpException;
205      }
206    }
207
208    @Override
209    protected void shutDown() throws Exception {
210      assertTrue(startUpCalled);
211      assertFalse(shutDownCalled);
212      shutDownCalled = true;
213      if (shutDownException != null) {
214        throw shutDownException;
215      }
216    }
217
218    @Override
219    protected ScheduledExecutorService executor() {
220      numberOfTimesExecutorCalled.incrementAndGet();
221      return executor;
222    }
223
224    @Override
225    protected Scheduler scheduler() {
226      numberOfTimesSchedulerCalled.incrementAndGet();
227      return configuration;
228    }
229  }
230
231  public static class SchedulerTest extends TestCase {
232    // These constants are arbitrary and just used to make sure that the correct method is called
233    // with the correct parameters.
234    private static final int initialDelay = 10;
235    private static final int delay = 20;
236    private static final TimeUnit unit = TimeUnit.MILLISECONDS;
237
238    // Unique runnable object used for comparison.
239    final Runnable testRunnable = new Runnable() {@Override public void run() {}};
240    boolean called = false;
241
242    private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
243        long delay, TimeUnit unit) {
244      assertFalse(called);  // only called once.
245      called = true;
246      assertEquals(SchedulerTest.initialDelay, initialDelay);
247      assertEquals(SchedulerTest.delay, delay);
248      assertEquals(SchedulerTest.unit, unit);
249      assertEquals(testRunnable, command);
250    }
251
252    public void testFixedRateSchedule() {
253      Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
254      schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
255        @Override
256        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
257            long period, TimeUnit unit) {
258          assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
259          return null;
260        }
261      }, testRunnable);
262      assertTrue(called);
263    }
264
265    public void testFixedDelaySchedule() {
266      Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
267      schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
268        @Override
269        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
270            long delay, TimeUnit unit) {
271          assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
272          return null;
273        }
274      }, testRunnable);
275      assertTrue(called);
276    }
277
278    private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
279      public AtomicInteger scheduleCounter = new AtomicInteger(0);
280      @Override
281      protected Schedule getNextSchedule() throws Exception {
282        scheduleCounter.incrementAndGet();
283        return new Schedule(0, TimeUnit.SECONDS);
284      }
285    }
286
287    public void testCustomSchedule_startStop() throws Exception {
288      final CyclicBarrier firstBarrier = new CyclicBarrier(2);
289      final CyclicBarrier secondBarrier = new CyclicBarrier(2);
290      final AtomicBoolean shouldWait = new AtomicBoolean(true);
291      Runnable task = new Runnable() {
292        @Override public void run() {
293          try {
294            if (shouldWait.get()) {
295              firstBarrier.await();
296              secondBarrier.await();
297            }
298          } catch (Exception e) {
299            throw new RuntimeException(e);
300          }
301        }
302      };
303      TestCustomScheduler scheduler = new TestCustomScheduler();
304      Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
305      firstBarrier.await();
306      assertEquals(1, scheduler.scheduleCounter.get());
307      secondBarrier.await();
308      firstBarrier.await();
309      assertEquals(2, scheduler.scheduleCounter.get());
310      shouldWait.set(false);
311      secondBarrier.await();
312      future.cancel(false);
313    }
314
315    public void testCustomSchedulerServiceStop() throws Exception {
316      TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
317      service.startAndWait();
318      service.firstBarrier.await();
319      assertEquals(1, service.numIterations.get());
320      service.stop();
321      service.secondBarrier.await();
322      service.stopAndWait();
323      // Sleep for a while just to ensure that our task wasn't called again.
324      Thread.sleep(unit.toMillis(3 * delay));
325      assertEquals(1, service.numIterations.get());
326    }
327
328    public void testBig() throws Exception {
329      TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
330        @Override protected Scheduler scheduler() {
331          return new AbstractScheduledService.CustomScheduler(){
332            @Override
333            protected Schedule getNextSchedule() throws Exception {
334              // Explicitly yield to increase the probability of a pathological scheduling.
335              Thread.yield();
336              return new Schedule(0, TimeUnit.SECONDS);
337            }
338          };
339        }
340      };
341      service.useBarriers = false;
342      service.startAndWait();
343      Thread.sleep(50);
344      service.useBarriers = true;
345      service.firstBarrier.await();
346      int numIterations = service.numIterations.get();
347      service.stop();
348      service.secondBarrier.await();
349      service.stopAndWait();
350      assertEquals(numIterations, service.numIterations.get());
351    }
352
353    private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
354      final AtomicInteger numIterations = new AtomicInteger(0);
355      volatile boolean useBarriers = true;
356      final CyclicBarrier firstBarrier = new CyclicBarrier(2);
357      final CyclicBarrier secondBarrier = new CyclicBarrier(2);
358
359      @Override protected void runOneIteration() throws Exception {
360        numIterations.incrementAndGet();
361        if (useBarriers) {
362          firstBarrier.await();
363          secondBarrier.await();
364        }
365      }
366
367      @Override protected ScheduledExecutorService executor() {
368        // use a bunch of threads so that weird overlapping schedules are more likely to happen.
369        return Executors.newScheduledThreadPool(10);
370      }
371
372      @Override protected void startUp() throws Exception { }
373
374      @Override protected void shutDown() throws Exception { }
375
376      @Override protected Scheduler scheduler() {
377        return new CustomScheduler() {
378          @Override
379          protected Schedule getNextSchedule() throws Exception {
380            return new Schedule(delay, unit);
381          }};
382      }
383    }
384
385    public void testCustomSchedulerFailure() throws Exception {
386      TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
387      service.startAndWait();
388      for (int i = 1; i < 4; i++) {
389        service.firstBarrier.await();
390        assertEquals(i, service.numIterations.get());
391        service.secondBarrier.await();
392      }
393      Thread.sleep(1000);
394      try {
395        service.stop().get(100, TimeUnit.SECONDS);
396        fail();
397      } catch (ExecutionException e) {
398        assertEquals(State.FAILED, service.state());
399      }
400    }
401
402    private static class TestFailingCustomScheduledService extends AbstractScheduledService {
403      final AtomicInteger numIterations = new AtomicInteger(0);
404      final CyclicBarrier firstBarrier = new CyclicBarrier(2);
405      final CyclicBarrier secondBarrier = new CyclicBarrier(2);
406
407      @Override protected void runOneIteration() throws Exception {
408        numIterations.incrementAndGet();
409        firstBarrier.await();
410        secondBarrier.await();
411      }
412
413      @Override protected ScheduledExecutorService executor() {
414        // use a bunch of threads so that weird overlapping schedules are more likely to happen.
415        return Executors.newScheduledThreadPool(10);
416      }
417
418      @Override protected void startUp() throws Exception { }
419
420      @Override protected void shutDown() throws Exception { }
421
422      @Override protected Scheduler scheduler() {
423        return new CustomScheduler() {
424          @Override
425          protected Schedule getNextSchedule() throws Exception {
426            if (numIterations.get() > 2) {
427              throw new IllegalStateException("Failed");
428            }
429            return new Schedule(delay, unit);
430          }};
431      }
432    }
433  }
434}
435