1/*
2 * Copyright (C) 2011 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.collect;
18
19import com.google.common.util.concurrent.Uninterruptibles;
20
21import junit.framework.TestCase;
22
23import java.util.Collection;
24import java.util.List;
25import java.util.concurrent.ArrayBlockingQueue;
26import java.util.concurrent.BlockingQueue;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.Future;
30import java.util.concurrent.LinkedBlockingQueue;
31import java.util.concurrent.PriorityBlockingQueue;
32import java.util.concurrent.SynchronousQueue;
33import java.util.concurrent.TimeUnit;
34
35/**
36 * Tests for {@link Queues}.
37 *
38 * @author Dimitris Andreou
39 */
40
41public class QueuesTest extends TestCase {
42  /*
43   * All the following tests relate to BlockingQueue methods in Queues.
44   */
45
46  public static List<BlockingQueue<Object>> blockingQueues() {
47    return ImmutableList.<BlockingQueue<Object>>of(
48        new LinkedBlockingQueue<Object>(),
49        new LinkedBlockingQueue<Object>(10),
50        new SynchronousQueue<Object>(),
51        new ArrayBlockingQueue<Object>(10),
52        new PriorityBlockingQueue<Object>(10, Ordering.arbitrary()));
53  }
54
55  private ExecutorService threadPool;
56
57  @Override
58  public void setUp() {
59    threadPool = Executors.newCachedThreadPool();
60  }
61
62  @Override
63  public void tearDown() throws InterruptedException {
64    // notice that if a Producer is interrupted (a bug), the Producer will go into an infinite
65    // loop, which will be noticed here
66    threadPool.shutdown();
67    assertTrue("Some worker didn't finish in time",
68        threadPool.awaitTermination(1, TimeUnit.SECONDS));
69  }
70
71  private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements,
72      long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException {
73    return interruptibly
74        ? Queues.drain(q, buffer, maxElements, timeout, unit)
75        : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit);
76  }
77
78  public void testMultipleProducers() throws Exception {
79    for (BlockingQueue<Object> q : blockingQueues()) {
80      testMultipleProducers(q);
81    }
82  }
83
84  private void testMultipleProducers(BlockingQueue<Object> q)
85      throws InterruptedException {
86    for (boolean interruptibly : new boolean[] { true, false }) {
87      threadPool.submit(new Producer(q, 20));
88      threadPool.submit(new Producer(q, 20));
89      threadPool.submit(new Producer(q, 20));
90      threadPool.submit(new Producer(q, 20));
91      threadPool.submit(new Producer(q, 20));
92
93      List<Object> buf = Lists.newArrayList();
94      int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly);
95      assertEquals(100, elements);
96      assertEquals(100, buf.size());
97      assertDrained(q);
98    }
99  }
100
101  public void testDrainTimesOut() throws Exception {
102    for (BlockingQueue<Object> q : blockingQueues()) {
103      testDrainTimesOut(q);
104    }
105  }
106
107  private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception {
108    for (boolean interruptibly : new boolean[] { true, false }) {
109      assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS));
110
111      // producing one, will ask for two
112      Future<?> submitter = threadPool.submit(new Producer(q, 1));
113
114      // make sure we time out
115      long startTime = System.nanoTime();
116
117      int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly);
118      assertTrue(drained <= 1);
119
120      assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
121
122      // If even the first one wasn't there, clean up so that the next test doesn't see an element.
123      submitter.get();
124      if (drained == 0) {
125        assertNotNull(q.poll());
126      }
127    }
128  }
129
130  public void testZeroElements() throws Exception {
131    for (BlockingQueue<Object> q : blockingQueues()) {
132      testZeroElements(q);
133    }
134  }
135
136  private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException {
137    for (boolean interruptibly : new boolean[] { true, false }) {
138      // asking to drain zero elements
139      assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly));
140    }
141  }
142
143  public void testEmpty() throws Exception {
144    for (BlockingQueue<Object> q : blockingQueues()) {
145      testEmpty(q);
146    }
147  }
148
149  private void testEmpty(BlockingQueue<Object> q) {
150    assertDrained(q);
151  }
152
153  public void testNegativeMaxElements() throws Exception {
154    for (BlockingQueue<Object> q : blockingQueues()) {
155      testNegativeMaxElements(q);
156    }
157  }
158
159  private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException {
160    threadPool.submit(new Producer(q, 1));
161
162    List<Object> buf = Lists.newArrayList();
163    int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
164    assertEquals(elements, 0);
165    assertTrue(buf.isEmpty());
166
167    // Clean up produced element to free the producer thread, otherwise it will complain
168    // when we shutdown the threadpool.
169    Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
170  }
171
172  public void testDrain_throws() throws Exception {
173    for (BlockingQueue<Object> q : blockingQueues()) {
174      testDrain_throws(q);
175    }
176  }
177
178  private void testDrain_throws(BlockingQueue<Object> q) {
179    threadPool.submit(new Interrupter(Thread.currentThread()));
180    try {
181      Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
182      fail();
183    } catch (InterruptedException expected) {
184    }
185  }
186
187  public void testDrainUninterruptibly_doesNotThrow() throws Exception {
188    for (BlockingQueue<Object> q : blockingQueues()) {
189      testDrainUninterruptibly_doesNotThrow(q);
190    }
191  }
192
193  private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) {
194    final Thread mainThread = Thread.currentThread();
195    threadPool.submit(new Runnable() {
196      public void run() {
197        new Producer(q, 50).run();
198        new Interrupter(mainThread).run();
199        new Producer(q, 50).run();
200      }
201    });
202    List<Object> buf = Lists.newArrayList();
203    int elements =
204        Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
205    // so when this drains all elements, we know the thread has also been interrupted in between
206    assertTrue(Thread.interrupted());
207    assertEquals(100, elements);
208    assertEquals(100, buf.size());
209  }
210
211  public void testNewLinkedBlockingQueueCapacity() {
212    try {
213      Queues.newLinkedBlockingQueue(0);
214      fail("Should have thrown IllegalArgumentException");
215    } catch (IllegalArgumentException expected) {
216      // any capacity less than 1 should throw IllegalArgumentException
217    }
218    assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity());
219    assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity());
220  }
221
222  /**
223   * Checks that #drain() invocations behave correctly for a drained (empty) queue.
224   */
225  private void assertDrained(BlockingQueue<Object> q) {
226    assertNull(q.peek());
227    assertInterruptibleDrained(q);
228    assertUninterruptibleDrained(q);
229  }
230
231  private void assertInterruptibleDrained(BlockingQueue<Object> q) {
232    // nothing to drain, thus this should wait doing nothing
233    try {
234      assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
235    } catch (InterruptedException e) {
236      throw new AssertionError();
237    }
238
239    // but does the wait actually occurs?
240    threadPool.submit(new Interrupter(Thread.currentThread()));
241    try {
242      // if waiting works, this should get stuck
243      Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
244      fail();
245    } catch (InterruptedException expected) {
246      // we indeed waited; a slow thread had enough time to interrupt us
247    }
248  }
249
250  // same as above; uninterruptible version
251  private void assertUninterruptibleDrained(BlockingQueue<Object> q) {
252    assertEquals(0,
253        Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
254
255    // but does the wait actually occurs?
256    threadPool.submit(new Interrupter(Thread.currentThread()));
257
258    long startTime = System.nanoTime();
259    Queues.drainUninterruptibly(
260        q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS);
261    assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
262    // wait for interrupted status and clear it
263    while (!Thread.interrupted()) { Thread.yield(); }
264  }
265
266  private static class Producer implements Runnable {
267    final BlockingQueue<Object> q;
268    final int elements;
269
270    Producer(BlockingQueue<Object> q, int elements) {
271      this.q = q;
272      this.elements = elements;
273    }
274
275    @Override public void run() {
276      try {
277        for (int i = 0; i < elements; i++) {
278          q.put(new Object());
279        }
280      } catch (InterruptedException e) {
281        // TODO(user): replace this when there is a better way to spawn threads in tests and
282        // have threads propagate their errors back to the test thread.
283        e.printStackTrace();
284        // never returns, so that #tearDown() notices that one worker isn't done
285        Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
286      }
287    }
288  }
289
290  private static class Interrupter implements Runnable {
291    final Thread threadToInterrupt;
292
293    Interrupter(Thread threadToInterrupt) {
294      this.threadToInterrupt = threadToInterrupt;
295    }
296
297    @Override public void run() {
298      try {
299        Thread.sleep(100);
300      } catch (InterruptedException e) {
301        throw new AssertionError();
302      } finally {
303        threadToInterrupt.interrupt();
304      }
305    }
306  }
307}
308