SynchronousQueueTest.java revision 008167dfe7530e6a80066006633f2301d3eae012
1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9package jsr166;
10
11import junit.framework.*;
12import java.util.Arrays;
13import java.util.ArrayList;
14import java.util.Collection;
15import java.util.Iterator;
16import java.util.NoSuchElementException;
17import java.util.Queue;
18import java.util.concurrent.BlockingQueue;
19import java.util.concurrent.CountDownLatch;
20import java.util.concurrent.Executors;
21import java.util.concurrent.ExecutorService;
22import java.util.concurrent.SynchronousQueue;
23import static java.util.concurrent.TimeUnit.MILLISECONDS;
24
25public class SynchronousQueueTest extends JSR166TestCase {
26
27    /**
28     * Any SynchronousQueue is both empty and full
29     */
30    public void testEmptyFull()      { testEmptyFull(false); }
31    public void testEmptyFull_fair() { testEmptyFull(true); }
32    public void testEmptyFull(boolean fair) {
33        final SynchronousQueue q = new SynchronousQueue(fair);
34        assertTrue(q.isEmpty());
35        assertEquals(0, q.size());
36        assertEquals(0, q.remainingCapacity());
37        assertFalse(q.offer(zero));
38    }
39
40    /**
41     * offer fails if no active taker
42     */
43    public void testOffer()      { testOffer(false); }
44    public void testOffer_fair() { testOffer(true); }
45    public void testOffer(boolean fair) {
46        SynchronousQueue q = new SynchronousQueue(fair);
47        assertFalse(q.offer(one));
48    }
49
50    /**
51     * add throws IllegalStateException if no active taker
52     */
53    public void testAdd()      { testAdd(false); }
54    public void testAdd_fair() { testAdd(true); }
55    public void testAdd(boolean fair) {
56        SynchronousQueue q = new SynchronousQueue(fair);
57        assertEquals(0, q.remainingCapacity());
58        try {
59            q.add(one);
60            shouldThrow();
61        } catch (IllegalStateException success) {}
62    }
63
64    /**
65     * addAll(this) throws IllegalArgumentException
66     */
67    public void testAddAll_self()      { testAddAll_self(false); }
68    public void testAddAll_self_fair() { testAddAll_self(true); }
69    public void testAddAll_self(boolean fair) {
70        SynchronousQueue q = new SynchronousQueue(fair);
71        try {
72            q.addAll(q);
73            shouldThrow();
74        } catch (IllegalArgumentException success) {}
75    }
76
77    /**
78     * addAll throws ISE if no active taker
79     */
80    public void testAddAll_ISE()      { testAddAll_ISE(false); }
81    public void testAddAll_ISE_fair() { testAddAll_ISE(true); }
82    public void testAddAll_ISE(boolean fair) {
83        SynchronousQueue q = new SynchronousQueue(fair);
84        Integer[] ints = new Integer[1];
85        for (int i = 0; i < ints.length; i++)
86            ints[i] = i;
87        Collection<Integer> coll = Arrays.asList(ints);
88        try {
89            q.addAll(coll);
90            shouldThrow();
91        } catch (IllegalStateException success) {}
92    }
93
94    /**
95     * put blocks interruptibly if no active taker
96     */
97    public void testBlockingPut()      { testBlockingPut(false); }
98    public void testBlockingPut_fair() { testBlockingPut(true); }
99    public void testBlockingPut(boolean fair) {
100        final SynchronousQueue q = new SynchronousQueue(fair);
101        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
102        Thread t = newStartedThread(new CheckedRunnable() {
103            public void realRun() throws InterruptedException {
104                Thread.currentThread().interrupt();
105                try {
106                    q.put(99);
107                    shouldThrow();
108                } catch (InterruptedException success) {}
109                assertFalse(Thread.interrupted());
110
111                pleaseInterrupt.countDown();
112                try {
113                    q.put(99);
114                    shouldThrow();
115                } catch (InterruptedException success) {}
116                assertFalse(Thread.interrupted());
117            }});
118
119        await(pleaseInterrupt);
120        assertThreadStaysAlive(t);
121        t.interrupt();
122        awaitTermination(t);
123        assertEquals(0, q.remainingCapacity());
124    }
125
126    /**
127     * put blocks interruptibly waiting for take
128     */
129    public void testPutWithTake()      { testPutWithTake(false); }
130    public void testPutWithTake_fair() { testPutWithTake(true); }
131    public void testPutWithTake(boolean fair) {
132        final SynchronousQueue q = new SynchronousQueue(fair);
133        final CountDownLatch pleaseTake = new CountDownLatch(1);
134        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
135        Thread t = newStartedThread(new CheckedRunnable() {
136            public void realRun() throws InterruptedException {
137                pleaseTake.countDown();
138                q.put(one);
139
140                pleaseInterrupt.countDown();
141                try {
142                    q.put(99);
143                    shouldThrow();
144                } catch (InterruptedException success) {}
145                assertFalse(Thread.interrupted());
146            }});
147
148        await(pleaseTake);
149        assertEquals(0, q.remainingCapacity());
150        try { assertSame(one, q.take()); }
151        catch (InterruptedException e) { threadUnexpectedException(e); }
152
153        await(pleaseInterrupt);
154        assertThreadStaysAlive(t);
155        t.interrupt();
156        awaitTermination(t);
157        assertEquals(0, q.remainingCapacity());
158    }
159
160    /**
161     * timed offer times out if elements not taken
162     */
163    public void testTimedOffer()      { testTimedOffer(false); }
164    public void testTimedOffer_fair() { testTimedOffer(true); }
165    public void testTimedOffer(boolean fair) {
166        final SynchronousQueue q = new SynchronousQueue(fair);
167        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
168        Thread t = newStartedThread(new CheckedRunnable() {
169            public void realRun() throws InterruptedException {
170                long startTime = System.nanoTime();
171                assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
172                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
173                pleaseInterrupt.countDown();
174                try {
175                    q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
176                    shouldThrow();
177                } catch (InterruptedException success) {}
178            }});
179
180        await(pleaseInterrupt);
181        assertThreadStaysAlive(t);
182        t.interrupt();
183        awaitTermination(t);
184    }
185
186    /**
187     * poll return null if no active putter
188     */
189    public void testPoll()      { testPoll(false); }
190    public void testPoll_fair() { testPoll(true); }
191    public void testPoll(boolean fair) {
192        final SynchronousQueue q = new SynchronousQueue(fair);
193        assertNull(q.poll());
194    }
195
196    /**
197     * timed poll with zero timeout times out if no active putter
198     */
199    public void testTimedPoll0()      { testTimedPoll0(false); }
200    public void testTimedPoll0_fair() { testTimedPoll0(true); }
201    public void testTimedPoll0(boolean fair) {
202        final SynchronousQueue q = new SynchronousQueue(fair);
203        try { assertNull(q.poll(0, MILLISECONDS)); }
204        catch (InterruptedException e) { threadUnexpectedException(e); }
205    }
206
207    /**
208     * timed poll with nonzero timeout times out if no active putter
209     */
210    public void testTimedPoll()      { testTimedPoll(false); }
211    public void testTimedPoll_fair() { testTimedPoll(true); }
212    public void testTimedPoll(boolean fair) {
213        final SynchronousQueue q = new SynchronousQueue(fair);
214        long startTime = System.nanoTime();
215        try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
216        catch (InterruptedException e) { threadUnexpectedException(e); }
217        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
218    }
219
220    /**
221     * timed poll before a delayed offer times out, returning null;
222     * after offer succeeds; on interruption throws
223     */
224    public void testTimedPollWithOffer()      { testTimedPollWithOffer(false); }
225    public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); }
226    public void testTimedPollWithOffer(boolean fair) {
227        final SynchronousQueue q = new SynchronousQueue(fair);
228        final CountDownLatch pleaseOffer = new CountDownLatch(1);
229        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
230        Thread t = newStartedThread(new CheckedRunnable() {
231            public void realRun() throws InterruptedException {
232                long startTime = System.nanoTime();
233                assertNull(q.poll(timeoutMillis(), MILLISECONDS));
234                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
235
236                pleaseOffer.countDown();
237                startTime = System.nanoTime();
238                assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
239                assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
240
241                Thread.currentThread().interrupt();
242                try {
243                    q.poll(LONG_DELAY_MS, MILLISECONDS);
244                    shouldThrow();
245                } catch (InterruptedException success) {}
246                assertFalse(Thread.interrupted());
247
248                pleaseInterrupt.countDown();
249                try {
250                    q.poll(LONG_DELAY_MS, MILLISECONDS);
251                    shouldThrow();
252                } catch (InterruptedException success) {}
253                assertFalse(Thread.interrupted());
254            }});
255
256        await(pleaseOffer);
257        long startTime = System.nanoTime();
258        try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
259        catch (InterruptedException e) { threadUnexpectedException(e); }
260        assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
261
262        await(pleaseInterrupt);
263        assertThreadStaysAlive(t);
264        t.interrupt();
265        awaitTermination(t);
266    }
267
268    /**
269     * peek() returns null if no active putter
270     */
271    public void testPeek()      { testPeek(false); }
272    public void testPeek_fair() { testPeek(true); }
273    public void testPeek(boolean fair) {
274        final SynchronousQueue q = new SynchronousQueue(fair);
275        assertNull(q.peek());
276    }
277
278    /**
279     * element() throws NoSuchElementException if no active putter
280     */
281    public void testElement()      { testElement(false); }
282    public void testElement_fair() { testElement(true); }
283    public void testElement(boolean fair) {
284        final SynchronousQueue q = new SynchronousQueue(fair);
285        try {
286            q.element();
287            shouldThrow();
288        } catch (NoSuchElementException success) {}
289    }
290
291    /**
292     * remove() throws NoSuchElementException if no active putter
293     */
294    public void testRemove()      { testRemove(false); }
295    public void testRemove_fair() { testRemove(true); }
296    public void testRemove(boolean fair) {
297        final SynchronousQueue q = new SynchronousQueue(fair);
298        try {
299            q.remove();
300            shouldThrow();
301        } catch (NoSuchElementException success) {}
302    }
303
304    /**
305     * contains returns false
306     */
307    public void testContains()      { testContains(false); }
308    public void testContains_fair() { testContains(true); }
309    public void testContains(boolean fair) {
310        final SynchronousQueue q = new SynchronousQueue(fair);
311        assertFalse(q.contains(zero));
312    }
313
314    /**
315     * clear ensures isEmpty
316     */
317    public void testClear()      { testClear(false); }
318    public void testClear_fair() { testClear(true); }
319    public void testClear(boolean fair) {
320        final SynchronousQueue q = new SynchronousQueue(fair);
321        q.clear();
322        assertTrue(q.isEmpty());
323    }
324
325    /**
326     * containsAll returns false unless empty
327     */
328    public void testContainsAll()      { testContainsAll(false); }
329    public void testContainsAll_fair() { testContainsAll(true); }
330    public void testContainsAll(boolean fair) {
331        final SynchronousQueue q = new SynchronousQueue(fair);
332        Integer[] empty = new Integer[0];
333        assertTrue(q.containsAll(Arrays.asList(empty)));
334        Integer[] ints = new Integer[1]; ints[0] = zero;
335        assertFalse(q.containsAll(Arrays.asList(ints)));
336    }
337
338    /**
339     * retainAll returns false
340     */
341    public void testRetainAll()      { testRetainAll(false); }
342    public void testRetainAll_fair() { testRetainAll(true); }
343    public void testRetainAll(boolean fair) {
344        final SynchronousQueue q = new SynchronousQueue(fair);
345        Integer[] empty = new Integer[0];
346        assertFalse(q.retainAll(Arrays.asList(empty)));
347        Integer[] ints = new Integer[1]; ints[0] = zero;
348        assertFalse(q.retainAll(Arrays.asList(ints)));
349    }
350
351    /**
352     * removeAll returns false
353     */
354    public void testRemoveAll()      { testRemoveAll(false); }
355    public void testRemoveAll_fair() { testRemoveAll(true); }
356    public void testRemoveAll(boolean fair) {
357        final SynchronousQueue q = new SynchronousQueue(fair);
358        Integer[] empty = new Integer[0];
359        assertFalse(q.removeAll(Arrays.asList(empty)));
360        Integer[] ints = new Integer[1]; ints[0] = zero;
361        assertFalse(q.containsAll(Arrays.asList(ints)));
362    }
363
364    /**
365     * toArray is empty
366     */
367    public void testToArray()      { testToArray(false); }
368    public void testToArray_fair() { testToArray(true); }
369    public void testToArray(boolean fair) {
370        final SynchronousQueue q = new SynchronousQueue(fair);
371        Object[] o = q.toArray();
372        assertEquals(0, o.length);
373    }
374
375    /**
376     * toArray(Integer array) returns its argument with the first
377     * element (if present) nulled out
378     */
379    public void testToArray2()      { testToArray2(false); }
380    public void testToArray2_fair() { testToArray2(true); }
381    public void testToArray2(boolean fair) {
382        final SynchronousQueue<Integer> q
383            = new SynchronousQueue<Integer>(fair);
384        Integer[] a;
385
386        a = new Integer[0];
387        assertSame(a, q.toArray(a));
388
389        a = new Integer[3];
390        Arrays.fill(a, 42);
391        assertSame(a, q.toArray(a));
392        assertNull(a[0]);
393        for (int i = 1; i < a.length; i++)
394            assertEquals(42, (int) a[i]);
395    }
396
397    /**
398     * toArray(null) throws NPE
399     */
400    public void testToArray_null()      { testToArray_null(false); }
401    public void testToArray_null_fair() { testToArray_null(true); }
402    public void testToArray_null(boolean fair) {
403        final SynchronousQueue q = new SynchronousQueue(fair);
404        try {
405            Object o[] = q.toArray(null);
406            shouldThrow();
407        } catch (NullPointerException success) {}
408    }
409
410    /**
411     * iterator does not traverse any elements
412     */
413    public void testIterator()      { testIterator(false); }
414    public void testIterator_fair() { testIterator(true); }
415    public void testIterator(boolean fair) {
416        final SynchronousQueue q = new SynchronousQueue(fair);
417        Iterator it = q.iterator();
418        assertFalse(it.hasNext());
419        try {
420            Object x = it.next();
421            shouldThrow();
422        } catch (NoSuchElementException success) {}
423    }
424
425    /**
426     * iterator remove throws ISE
427     */
428    public void testIteratorRemove()      { testIteratorRemove(false); }
429    public void testIteratorRemove_fair() { testIteratorRemove(true); }
430    public void testIteratorRemove(boolean fair) {
431        final SynchronousQueue q = new SynchronousQueue(fair);
432        Iterator it = q.iterator();
433        try {
434            it.remove();
435            shouldThrow();
436        } catch (IllegalStateException success) {}
437    }
438
439    /**
440     * toString returns a non-null string
441     */
442    public void testToString()      { testToString(false); }
443    public void testToString_fair() { testToString(true); }
444    public void testToString(boolean fair) {
445        final SynchronousQueue q = new SynchronousQueue(fair);
446        String s = q.toString();
447        assertNotNull(s);
448    }
449
450    /**
451     * offer transfers elements across Executor tasks
452     */
453    public void testOfferInExecutor()      { testOfferInExecutor(false); }
454    public void testOfferInExecutor_fair() { testOfferInExecutor(true); }
455    public void testOfferInExecutor(boolean fair) {
456        final SynchronousQueue q = new SynchronousQueue(fair);
457        ExecutorService executor = Executors.newFixedThreadPool(2);
458        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
459
460        executor.execute(new CheckedRunnable() {
461            public void realRun() throws InterruptedException {
462                assertFalse(q.offer(one));
463                threadsStarted.await();
464                assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
465                assertEquals(0, q.remainingCapacity());
466            }});
467
468        executor.execute(new CheckedRunnable() {
469            public void realRun() throws InterruptedException {
470                threadsStarted.await();
471                assertSame(one, q.take());
472            }});
473
474        joinPool(executor);
475    }
476
477    /**
478     * timed poll retrieves elements across Executor threads
479     */
480    public void testPollInExecutor()      { testPollInExecutor(false); }
481    public void testPollInExecutor_fair() { testPollInExecutor(true); }
482    public void testPollInExecutor(boolean fair) {
483        final SynchronousQueue q = new SynchronousQueue(fair);
484        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
485        ExecutorService executor = Executors.newFixedThreadPool(2);
486        executor.execute(new CheckedRunnable() {
487            public void realRun() throws InterruptedException {
488                assertNull(q.poll());
489                threadsStarted.await();
490                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
491                assertTrue(q.isEmpty());
492            }});
493
494        executor.execute(new CheckedRunnable() {
495            public void realRun() throws InterruptedException {
496                threadsStarted.await();
497                q.put(one);
498            }});
499
500        joinPool(executor);
501    }
502
503    /**
504     * a deserialized serialized queue is usable
505     */
506    public void testSerialization() {
507        final SynchronousQueue x = new SynchronousQueue();
508        final SynchronousQueue y = new SynchronousQueue(false);
509        final SynchronousQueue z = new SynchronousQueue(true);
510        assertSerialEquals(x, y);
511        assertNotSerialEquals(x, z);
512        SynchronousQueue[] qs = { x, y, z };
513        for (SynchronousQueue q : qs) {
514            SynchronousQueue clone = serialClone(q);
515            assertNotSame(q, clone);
516            assertSerialEquals(q, clone);
517            assertTrue(clone.isEmpty());
518            assertEquals(0, clone.size());
519            assertEquals(0, clone.remainingCapacity());
520            assertFalse(clone.offer(zero));
521        }
522    }
523
524    /**
525     * drainTo(c) of empty queue doesn't transfer elements
526     */
527    public void testDrainTo()      { testDrainTo(false); }
528    public void testDrainTo_fair() { testDrainTo(true); }
529    public void testDrainTo(boolean fair) {
530        final SynchronousQueue q = new SynchronousQueue(fair);
531        ArrayList l = new ArrayList();
532        q.drainTo(l);
533        assertEquals(0, q.size());
534        assertEquals(0, l.size());
535    }
536
537    /**
538     * drainTo empties queue, unblocking a waiting put.
539     */
540    public void testDrainToWithActivePut()      { testDrainToWithActivePut(false); }
541    public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); }
542    public void testDrainToWithActivePut(boolean fair) {
543        final SynchronousQueue q = new SynchronousQueue(fair);
544        Thread t = newStartedThread(new CheckedRunnable() {
545            public void realRun() throws InterruptedException {
546                q.put(one);
547            }});
548
549        ArrayList l = new ArrayList();
550        long startTime = System.nanoTime();
551        while (l.isEmpty()) {
552            q.drainTo(l);
553            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
554                fail("timed out");
555            Thread.yield();
556        }
557        assertTrue(l.size() == 1);
558        assertSame(one, l.get(0));
559        awaitTermination(t);
560    }
561
562    /**
563     * drainTo(c, n) empties up to n elements of queue into c
564     */
565    public void testDrainToN() throws InterruptedException {
566        final SynchronousQueue q = new SynchronousQueue();
567        Thread t1 = newStartedThread(new CheckedRunnable() {
568            public void realRun() throws InterruptedException {
569                q.put(one);
570            }});
571
572        Thread t2 = newStartedThread(new CheckedRunnable() {
573            public void realRun() throws InterruptedException {
574                q.put(two);
575            }});
576
577        ArrayList l = new ArrayList();
578        delay(SHORT_DELAY_MS);
579        q.drainTo(l, 1);
580        assertEquals(1, l.size());
581        q.drainTo(l, 1);
582        assertEquals(2, l.size());
583        assertTrue(l.contains(one));
584        assertTrue(l.contains(two));
585        awaitTermination(t1);
586        awaitTermination(t2);
587    }
588
589}
590