QueuesTest.java revision 3ecfa412eddc4b084663f38d562537b86b9734d5
1/*
2 * Copyright (C) 2011 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.collect;
18
19import com.google.common.util.concurrent.Uninterruptibles;
20
21import junit.framework.TestCase;
22
23import java.util.Collection;
24import java.util.List;
25import java.util.concurrent.ArrayBlockingQueue;
26import java.util.concurrent.BlockingQueue;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.Future;
30import java.util.concurrent.LinkedBlockingQueue;
31import java.util.concurrent.PriorityBlockingQueue;
32import java.util.concurrent.SynchronousQueue;
33import java.util.concurrent.TimeUnit;
34
35/**
36 * Tests for {@link Queues}.
37 *
38 * @author Dimitris Andreou
39 */
40
41public class QueuesTest extends TestCase {
42  /*
43   * All the following tests relate to BlockingQueue methods in Queues.
44   */
45
46  public static List<BlockingQueue<Object>> blockingQueues() {
47    return ImmutableList.<BlockingQueue<Object>>of(
48        new LinkedBlockingQueue<Object>(),
49        new LinkedBlockingQueue<Object>(10),
50        new SynchronousQueue<Object>(),
51        new ArrayBlockingQueue<Object>(10),
52        new PriorityBlockingQueue<Object>(10, Ordering.arbitrary()));
53  }
54
55  private ExecutorService threadPool;
56
57  @Override
58  public void setUp() {
59    threadPool = Executors.newCachedThreadPool();
60  }
61
62  @Override
63  public void tearDown() throws InterruptedException {
64    // notice that if a Producer is interrupted (a bug), the Producer will go into an infinite
65    // loop, which will be noticed here
66    threadPool.shutdown();
67    assertTrue("Some worker didn't finish in time",
68        threadPool.awaitTermination(1, TimeUnit.SECONDS));
69  }
70
71  private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements,
72      long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException {
73    return interruptibly
74        ? Queues.drain(q, buffer, maxElements, timeout, unit)
75        : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit);
76  }
77
78  public void testMultipleProducers() throws Exception {
79    for (BlockingQueue<Object> q : blockingQueues()) {
80      testMultipleProducers(q);
81    }
82  }
83
84  private void testMultipleProducers(BlockingQueue<Object> q)
85      throws InterruptedException {
86    for (boolean interruptibly : new boolean[] { true, false }) {
87      threadPool.submit(new Producer(q, 20));
88      threadPool.submit(new Producer(q, 20));
89      threadPool.submit(new Producer(q, 20));
90      threadPool.submit(new Producer(q, 20));
91      threadPool.submit(new Producer(q, 20));
92
93      List<Object> buf = Lists.newArrayList();
94      int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly);
95      assertEquals(100, elements);
96      assertEquals(100, buf.size());
97      assertDrained(q);
98    }
99  }
100
101  public void testDrainTimesOut() throws Exception {
102    for (BlockingQueue<Object> q : blockingQueues()) {
103      testDrainTimesOut(q);
104    }
105  }
106
107  private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception {
108    for (boolean interruptibly : new boolean[] { true, false }) {
109      assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS));
110
111      // producing one, will ask for two
112      Future<?> submitter = threadPool.submit(new Producer(q, 1));
113
114      // make sure we time out
115      long startTime = System.nanoTime();
116
117      int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly);
118      assertTrue(drained <= 1);
119
120      assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
121
122      // If even the first one wasn't there, clean up so that the next test doesn't see an element.
123      submitter.get();
124      if (drained == 0) {
125        assertNotNull(q.poll());
126      }
127    }
128  }
129
130  public void testZeroElements() throws Exception {
131    for (BlockingQueue<Object> q : blockingQueues()) {
132      testZeroElements(q);
133    }
134  }
135
136  private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException {
137    for (boolean interruptibly : new boolean[] { true, false }) {
138      // asking to drain zero elements
139      assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly));
140    }
141  }
142
143  public void testEmpty() throws Exception {
144    for (BlockingQueue<Object> q : blockingQueues()) {
145      testEmpty(q);
146    }
147  }
148
149  private void testEmpty(BlockingQueue<Object> q) {
150    assertDrained(q);
151  }
152
153  public void testNegativeMaxElements() throws Exception {
154    for (BlockingQueue<Object> q : blockingQueues()) {
155      testNegativeMaxElements(q);
156    }
157  }
158
159  private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException {
160    threadPool.submit(new Producer(q, 1));
161
162    List<Object> buf = Lists.newArrayList();
163    int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
164    assertEquals(elements, 0);
165    assertTrue(buf.isEmpty());
166
167    // Clean up produced element to free the producer thread, otherwise it will complain
168    // when we shutdown the threadpool.
169    Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
170  }
171
172  public void testDrain_throws() throws Exception {
173    for (BlockingQueue<Object> q : blockingQueues()) {
174      testDrain_throws(q);
175    }
176  }
177
178  private void testDrain_throws(BlockingQueue<Object> q) {
179    threadPool.submit(new Interrupter(Thread.currentThread()));
180    try {
181      Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
182      fail();
183    } catch (InterruptedException expected) {
184    }
185  }
186
187  public void testDrainUninterruptibly_doesNotThrow() throws Exception {
188    for (BlockingQueue<Object> q : blockingQueues()) {
189      testDrainUninterruptibly_doesNotThrow(q);
190    }
191  }
192
193  private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) {
194    final Thread mainThread = Thread.currentThread();
195    threadPool.submit(new Runnable() {
196      public void run() {
197        new Producer(q, 50).run();
198        new Interrupter(mainThread).run();
199        new Producer(q, 50).run();
200      }
201    });
202    List<Object> buf = Lists.newArrayList();
203    int elements =
204        Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
205    // so when this drains all elements, we know the thread has also been interrupted in between
206    assertTrue(Thread.interrupted());
207    assertEquals(100, elements);
208    assertEquals(100, buf.size());
209  }
210
211  public void testNewLinkedBlockingDequeCapacity() {
212    try {
213      Queues.newLinkedBlockingDeque(0);
214      fail("Should have thrown IllegalArgumentException");
215    } catch (IllegalArgumentException expected) {
216      // any capacity less than 1 should throw IllegalArgumentException
217    }
218    assertEquals(1, Queues.newLinkedBlockingDeque(1).remainingCapacity());
219    assertEquals(11, Queues.newLinkedBlockingDeque(11).remainingCapacity());
220  }
221
222  public void testNewLinkedBlockingQueueCapacity() {
223    try {
224      Queues.newLinkedBlockingQueue(0);
225      fail("Should have thrown IllegalArgumentException");
226    } catch (IllegalArgumentException expected) {
227      // any capacity less than 1 should throw IllegalArgumentException
228    }
229    assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity());
230    assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity());
231  }
232
233  /**
234   * Checks that #drain() invocations behave correctly for a drained (empty) queue.
235   */
236  private void assertDrained(BlockingQueue<Object> q) {
237    assertNull(q.peek());
238    assertInterruptibleDrained(q);
239    assertUninterruptibleDrained(q);
240  }
241
242  private void assertInterruptibleDrained(BlockingQueue<Object> q) {
243    // nothing to drain, thus this should wait doing nothing
244    try {
245      assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
246    } catch (InterruptedException e) {
247      throw new AssertionError();
248    }
249
250    // but does the wait actually occurs?
251    threadPool.submit(new Interrupter(Thread.currentThread()));
252    try {
253      // if waiting works, this should get stuck
254      Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
255      fail();
256    } catch (InterruptedException expected) {
257      // we indeed waited; a slow thread had enough time to interrupt us
258    }
259  }
260
261  // same as above; uninterruptible version
262  private void assertUninterruptibleDrained(BlockingQueue<Object> q) {
263    assertEquals(0,
264        Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
265
266    // but does the wait actually occurs?
267    threadPool.submit(new Interrupter(Thread.currentThread()));
268
269    long startTime = System.nanoTime();
270    Queues.drainUninterruptibly(
271        q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS);
272    assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
273    // wait for interrupted status and clear it
274    while (!Thread.interrupted()) { Thread.yield(); }
275  }
276
277  private static class Producer implements Runnable {
278    final BlockingQueue<Object> q;
279    final int elements;
280
281    Producer(BlockingQueue<Object> q, int elements) {
282      this.q = q;
283      this.elements = elements;
284    }
285
286    @Override public void run() {
287      try {
288        for (int i = 0; i < elements; i++) {
289          q.put(new Object());
290        }
291      } catch (InterruptedException e) {
292        // TODO(user): replace this when there is a better way to spawn threads in tests and
293        // have threads propagate their errors back to the test thread.
294        e.printStackTrace();
295        // never returns, so that #tearDown() notices that one worker isn't done
296        Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
297      }
298    }
299  }
300
301  private static class Interrupter implements Runnable {
302    final Thread threadToInterrupt;
303
304    Interrupter(Thread threadToInterrupt) {
305      this.threadToInterrupt = threadToInterrupt;
306    }
307
308    @Override public void run() {
309      try {
310        Thread.sleep(100);
311      } catch (InterruptedException e) {
312        throw new AssertionError();
313      } finally {
314        threadToInterrupt.interrupt();
315      }
316    }
317  }
318}
319