1/*
2 * Copyright (C) 2014 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
20
21import com.google.common.util.concurrent.ListenerCallQueue.Callback;
22
23import junit.framework.TestCase;
24
25import java.util.concurrent.CountDownLatch;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.atomic.AtomicInteger;
29
30/**
31 * Tests for {@link ListenerCallQueue}.
32 */
33public class ListenerCallQueueTest extends TestCase {
34
35  private static final Callback<Object> THROWING_CALLBACK = new Callback<Object>("throwing()") {
36    @Override public void call(Object object) {
37      throw new RuntimeException();
38    }
39  };
40
41  public void testAddAndExecute() {
42    Object listenerInstance = new Object();
43    ListenerCallQueue<Object> queue =
44        new ListenerCallQueue<Object>(listenerInstance, directExecutor());
45
46    AtomicInteger counter = new AtomicInteger();
47    queue.add(incrementingCallback(counter, 1));
48    queue.add(incrementingCallback(counter, 2));
49    queue.add(incrementingCallback(counter, 3));
50    queue.add(incrementingCallback(counter, 4));
51    assertEquals(0, counter.get());
52    queue.execute();
53    assertEquals(4, counter.get());
54  }
55
56  public void testAddAndExecute_withExceptions() {
57    Object listenerInstance = new Object();
58    ListenerCallQueue<Object> queue =
59        new ListenerCallQueue<Object>(listenerInstance, directExecutor());
60
61    AtomicInteger counter = new AtomicInteger();
62    queue.add(incrementingCallback(counter, 1));
63    queue.add(THROWING_CALLBACK);
64    queue.add(incrementingCallback(counter, 2));
65    queue.add(THROWING_CALLBACK);
66    queue.add(incrementingCallback(counter, 3));
67    queue.add(THROWING_CALLBACK);
68    queue.add(incrementingCallback(counter, 4));
69    queue.add(THROWING_CALLBACK);
70    assertEquals(0, counter.get());
71    queue.execute();
72    assertEquals(4, counter.get());
73  }
74
75  public void testAddAndExecute_multithreaded() throws InterruptedException {
76    ExecutorService service = Executors.newFixedThreadPool(4);
77    try {
78      ListenerCallQueue<Object> queue =
79          new ListenerCallQueue<Object>(new Object(), service);
80
81      final CountDownLatch latch = new CountDownLatch(1);
82      AtomicInteger counter = new AtomicInteger();
83      queue.add(incrementingCallback(counter, 1));
84      queue.add(incrementingCallback(counter, 2));
85      queue.add(incrementingCallback(counter, 3));
86      queue.add(incrementingCallback(counter, 4));
87      queue.add(countDownCallback(latch));
88      assertEquals(0, counter.get());
89      queue.execute();
90      latch.await();
91      assertEquals(4, counter.get());
92    } finally {
93      service.shutdown();
94    }
95  }
96
97  public void testAddAndExecute_multithreaded_withThrowingRunnable() throws InterruptedException {
98    ExecutorService service = Executors.newFixedThreadPool(4);
99    try {
100      ListenerCallQueue<Object> queue =
101          new ListenerCallQueue<Object>(new Object(), service);
102
103      final CountDownLatch latch = new CountDownLatch(1);
104      AtomicInteger counter = new AtomicInteger();
105      queue.add(incrementingCallback(counter, 1));
106      queue.add(THROWING_CALLBACK);
107      queue.add(incrementingCallback(counter, 2));
108      queue.add(THROWING_CALLBACK);
109      queue.add(incrementingCallback(counter, 3));
110      queue.add(THROWING_CALLBACK);
111      queue.add(incrementingCallback(counter, 4));
112      queue.add(THROWING_CALLBACK);
113      queue.add(countDownCallback(latch));
114      assertEquals(0, counter.get());
115      queue.execute();
116      latch.await();
117      assertEquals(4, counter.get());
118    } finally {
119      service.shutdown();
120    }
121  }
122
123  private Callback<Object> incrementingCallback(final AtomicInteger counter, final int expected) {
124    return new Callback<Object>("incrementing") {
125      @Override void call(Object listener) {
126        assertEquals(expected, counter.incrementAndGet());
127      }
128    };
129  }
130
131  private Callback<Object> countDownCallback(final CountDownLatch latch) {
132    return new Callback<Object>("countDown") {
133      @Override void call(Object listener) {
134        latch.countDown();
135      }
136    };
137  }
138}
139