1/*
2 * Copyright (C) 2014 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import com.google.common.util.concurrent.ListenerCallQueue.Callback;
20
21import junit.framework.TestCase;
22
23import java.util.concurrent.CountDownLatch;
24import java.util.concurrent.ExecutorService;
25import java.util.concurrent.Executors;
26import java.util.concurrent.atomic.AtomicInteger;
27
28/**
29 * Tests for {@link ListenerCallQueue}.
30 */
31public class ListenerCallQueueTest extends TestCase {
32
33  private static final Callback<Object> THROWING_CALLBACK = new Callback<Object>("throwing()") {
34    @Override public void call(Object object) {
35      throw new RuntimeException();
36    }
37  };
38
39  public void testAddAndExecute() {
40    Object listenerInstance = new Object();
41    ListenerCallQueue<Object> queue =
42        new ListenerCallQueue<Object>(listenerInstance, MoreExecutors.sameThreadExecutor());
43
44    AtomicInteger counter = new AtomicInteger();
45    queue.add(incrementingCallback(counter, 1));
46    queue.add(incrementingCallback(counter, 2));
47    queue.add(incrementingCallback(counter, 3));
48    queue.add(incrementingCallback(counter, 4));
49    assertEquals(0, counter.get());
50    queue.execute();
51    assertEquals(4, counter.get());
52  }
53
54  public void testAddAndExecute_withExceptions() {
55    Object listenerInstance = new Object();
56    ListenerCallQueue<Object> queue =
57        new ListenerCallQueue<Object>(listenerInstance, MoreExecutors.sameThreadExecutor());
58
59    AtomicInteger counter = new AtomicInteger();
60    queue.add(incrementingCallback(counter, 1));
61    queue.add(THROWING_CALLBACK);
62    queue.add(incrementingCallback(counter, 2));
63    queue.add(THROWING_CALLBACK);
64    queue.add(incrementingCallback(counter, 3));
65    queue.add(THROWING_CALLBACK);
66    queue.add(incrementingCallback(counter, 4));
67    queue.add(THROWING_CALLBACK);
68    assertEquals(0, counter.get());
69    queue.execute();
70    assertEquals(4, counter.get());
71  }
72
73  public void testAddAndExecute_multithreaded() throws InterruptedException {
74    ExecutorService service = Executors.newFixedThreadPool(4);
75    try {
76      ListenerCallQueue<Object> queue =
77          new ListenerCallQueue<Object>(new Object(), service);
78
79      final CountDownLatch latch = new CountDownLatch(1);
80      AtomicInteger counter = new AtomicInteger();
81      queue.add(incrementingCallback(counter, 1));
82      queue.add(incrementingCallback(counter, 2));
83      queue.add(incrementingCallback(counter, 3));
84      queue.add(incrementingCallback(counter, 4));
85      queue.add(countDownCallback(latch));
86      assertEquals(0, counter.get());
87      queue.execute();
88      latch.await();
89      assertEquals(4, counter.get());
90    } finally {
91      service.shutdown();
92    }
93  }
94
95  public void testAddAndExecute_multithreaded_withThrowingRunnable() throws InterruptedException {
96    ExecutorService service = Executors.newFixedThreadPool(4);
97    try {
98      ListenerCallQueue<Object> queue =
99          new ListenerCallQueue<Object>(new Object(), service);
100
101      final CountDownLatch latch = new CountDownLatch(1);
102      AtomicInteger counter = new AtomicInteger();
103      queue.add(incrementingCallback(counter, 1));
104      queue.add(THROWING_CALLBACK);
105      queue.add(incrementingCallback(counter, 2));
106      queue.add(THROWING_CALLBACK);
107      queue.add(incrementingCallback(counter, 3));
108      queue.add(THROWING_CALLBACK);
109      queue.add(incrementingCallback(counter, 4));
110      queue.add(THROWING_CALLBACK);
111      queue.add(countDownCallback(latch));
112      assertEquals(0, counter.get());
113      queue.execute();
114      latch.await();
115      assertEquals(4, counter.get());
116    } finally {
117      service.shutdown();
118    }
119  }
120
121  private Callback<Object> incrementingCallback(final AtomicInteger counter, final int expected) {
122    return new Callback<Object>("incrementing") {
123      @Override void call(Object listener) {
124        assertEquals(expected, counter.incrementAndGet());
125      }
126    };
127  }
128
129  private Callback<Object> countDownCallback(final CountDownLatch latch) {
130    return new Callback<Object>("countDown") {
131      @Override void call(Object listener) {
132        latch.countDown();
133      }
134    };
135  }
136}
137