1/* 2 * Copyright (C) 2014 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.google.common.util.concurrent; 18 19import com.google.common.util.concurrent.ListenerCallQueue.Callback; 20 21import junit.framework.TestCase; 22 23import java.util.concurrent.CountDownLatch; 24import java.util.concurrent.ExecutorService; 25import java.util.concurrent.Executors; 26import java.util.concurrent.atomic.AtomicInteger; 27 28/** 29 * Tests for {@link ListenerCallQueue}. 30 */ 31public class ListenerCallQueueTest extends TestCase { 32 33 private static final Callback<Object> THROWING_CALLBACK = new Callback<Object>("throwing()") { 34 @Override public void call(Object object) { 35 throw new RuntimeException(); 36 } 37 }; 38 39 public void testAddAndExecute() { 40 Object listenerInstance = new Object(); 41 ListenerCallQueue<Object> queue = 42 new ListenerCallQueue<Object>(listenerInstance, MoreExecutors.sameThreadExecutor()); 43 44 AtomicInteger counter = new AtomicInteger(); 45 queue.add(incrementingCallback(counter, 1)); 46 queue.add(incrementingCallback(counter, 2)); 47 queue.add(incrementingCallback(counter, 3)); 48 queue.add(incrementingCallback(counter, 4)); 49 assertEquals(0, counter.get()); 50 queue.execute(); 51 assertEquals(4, counter.get()); 52 } 53 54 public void testAddAndExecute_withExceptions() { 55 Object listenerInstance = new Object(); 56 ListenerCallQueue<Object> queue = 57 new ListenerCallQueue<Object>(listenerInstance, MoreExecutors.sameThreadExecutor()); 58 59 AtomicInteger counter = new AtomicInteger(); 60 queue.add(incrementingCallback(counter, 1)); 61 queue.add(THROWING_CALLBACK); 62 queue.add(incrementingCallback(counter, 2)); 63 queue.add(THROWING_CALLBACK); 64 queue.add(incrementingCallback(counter, 3)); 65 queue.add(THROWING_CALLBACK); 66 queue.add(incrementingCallback(counter, 4)); 67 queue.add(THROWING_CALLBACK); 68 assertEquals(0, counter.get()); 69 queue.execute(); 70 assertEquals(4, counter.get()); 71 } 72 73 public void testAddAndExecute_multithreaded() throws InterruptedException { 74 ExecutorService service = Executors.newFixedThreadPool(4); 75 try { 76 ListenerCallQueue<Object> queue = 77 new ListenerCallQueue<Object>(new Object(), service); 78 79 final CountDownLatch latch = new CountDownLatch(1); 80 AtomicInteger counter = new AtomicInteger(); 81 queue.add(incrementingCallback(counter, 1)); 82 queue.add(incrementingCallback(counter, 2)); 83 queue.add(incrementingCallback(counter, 3)); 84 queue.add(incrementingCallback(counter, 4)); 85 queue.add(countDownCallback(latch)); 86 assertEquals(0, counter.get()); 87 queue.execute(); 88 latch.await(); 89 assertEquals(4, counter.get()); 90 } finally { 91 service.shutdown(); 92 } 93 } 94 95 public void testAddAndExecute_multithreaded_withThrowingRunnable() throws InterruptedException { 96 ExecutorService service = Executors.newFixedThreadPool(4); 97 try { 98 ListenerCallQueue<Object> queue = 99 new ListenerCallQueue<Object>(new Object(), service); 100 101 final CountDownLatch latch = new CountDownLatch(1); 102 AtomicInteger counter = new AtomicInteger(); 103 queue.add(incrementingCallback(counter, 1)); 104 queue.add(THROWING_CALLBACK); 105 queue.add(incrementingCallback(counter, 2)); 106 queue.add(THROWING_CALLBACK); 107 queue.add(incrementingCallback(counter, 3)); 108 queue.add(THROWING_CALLBACK); 109 queue.add(incrementingCallback(counter, 4)); 110 queue.add(THROWING_CALLBACK); 111 queue.add(countDownCallback(latch)); 112 assertEquals(0, counter.get()); 113 queue.execute(); 114 latch.await(); 115 assertEquals(4, counter.get()); 116 } finally { 117 service.shutdown(); 118 } 119 } 120 121 private Callback<Object> incrementingCallback(final AtomicInteger counter, final int expected) { 122 return new Callback<Object>("incrementing") { 123 @Override void call(Object listener) { 124 assertEquals(expected, counter.incrementAndGet()); 125 } 126 }; 127 } 128 129 private Callback<Object> countDownCallback(final CountDownLatch latch) { 130 return new Callback<Object>("countDown") { 131 @Override void call(Object listener) { 132 latch.countDown(); 133 } 134 }; 135 } 136} 137