1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8package jsr166;
9
10import static java.util.concurrent.TimeUnit.MILLISECONDS;
11
12import java.util.ArrayList;
13import java.util.Arrays;
14import java.util.Collection;
15import java.util.Iterator;
16import java.util.List;
17import java.util.NoSuchElementException;
18import java.util.Queue;
19import java.util.concurrent.BlockingQueue;
20import java.util.concurrent.Callable;
21import java.util.concurrent.CountDownLatch;
22import java.util.concurrent.Executors;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.LinkedTransferQueue;
25
26import junit.framework.Test;
27
28@SuppressWarnings({"unchecked", "rawtypes"})
29public class LinkedTransferQueueTest extends JSR166TestCase {
30    static class Implementation implements CollectionImplementation {
31        public Class<?> klazz() { return LinkedTransferQueue.class; }
32        public Collection emptyCollection() { return new LinkedTransferQueue(); }
33        public Object makeElement(int i) { return i; }
34        public boolean isConcurrent() { return true; }
35        public boolean permitsNulls() { return false; }
36    }
37
38    // android-note: These tests have been moved into their own separate
39    // classes to work around CTS issues:
40    // LinkedTransferQueueBlockingQueueTest.java
41    // LinkedTransferQueueCollectionTest.java
42    //
43    // public static class Generic extends BlockingQueueTest {
44    //     protected BlockingQueue emptyCollection() {
45    //         return new LinkedTransferQueue();
46    //     }
47    // }
48
49    // android-note: Removed because the CTS runner does a bad job of
50    // retrying tests that have suite() declarations.
51    //
52    // public static void main(String[] args) {
53    //     main(suite(), args);
54    // }
55    // public static Test suite() {
56    //     return newTestSuite(LinkedTransferQueueTest.class,
57    //                         new Generic().testSuite(),
58    //                         CollectionTest.testSuite(new Implementation()));
59    // }
60
61    /**
62     * Constructor builds new queue with size being zero and empty
63     * being true
64     */
65    public void testConstructor1() {
66        assertEquals(0, new LinkedTransferQueue().size());
67        assertTrue(new LinkedTransferQueue().isEmpty());
68    }
69
70    /**
71     * Initializing constructor with null collection throws
72     * NullPointerException
73     */
74    public void testConstructor2() {
75        try {
76            new LinkedTransferQueue(null);
77            shouldThrow();
78        } catch (NullPointerException success) {}
79    }
80
81    /**
82     * Initializing from Collection of null elements throws
83     * NullPointerException
84     */
85    public void testConstructor3() {
86        Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
87        try {
88            new LinkedTransferQueue(elements);
89            shouldThrow();
90        } catch (NullPointerException success) {}
91    }
92
93    /**
94     * Initializing constructor with a collection containing some null elements
95     * throws NullPointerException
96     */
97    public void testConstructor4() {
98        Integer[] ints = new Integer[SIZE];
99        for (int i = 0; i < SIZE - 1; ++i)
100            ints[i] = i;
101        Collection<Integer> elements = Arrays.asList(ints);
102        try {
103            new LinkedTransferQueue(elements);
104            shouldThrow();
105        } catch (NullPointerException success) {}
106    }
107
108    /**
109     * Queue contains all elements of the collection it is initialized by
110     */
111    public void testConstructor5() {
112        Integer[] ints = new Integer[SIZE];
113        for (int i = 0; i < SIZE; ++i) {
114            ints[i] = i;
115        }
116        List intList = Arrays.asList(ints);
117        LinkedTransferQueue q
118            = new LinkedTransferQueue(intList);
119        assertEquals(q.size(), intList.size());
120        assertEquals(q.toString(), intList.toString());
121        assertTrue(Arrays.equals(q.toArray(),
122                                     intList.toArray()));
123        assertTrue(Arrays.equals(q.toArray(new Object[0]),
124                                 intList.toArray(new Object[0])));
125        assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
126                                 intList.toArray(new Object[SIZE])));
127        for (int i = 0; i < SIZE; ++i) {
128            assertEquals(ints[i], q.poll());
129        }
130    }
131
132    /**
133     * remainingCapacity() always returns Integer.MAX_VALUE
134     */
135    public void testRemainingCapacity() {
136        BlockingQueue q = populatedQueue(SIZE);
137        for (int i = 0; i < SIZE; ++i) {
138            assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
139            assertEquals(SIZE - i, q.size());
140            assertEquals(i, q.remove());
141        }
142        for (int i = 0; i < SIZE; ++i) {
143            assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
144            assertEquals(i, q.size());
145            assertTrue(q.add(i));
146        }
147    }
148
149    /**
150     * addAll(this) throws IllegalArgumentException
151     */
152    public void testAddAllSelf() {
153        LinkedTransferQueue q = populatedQueue(SIZE);
154        try {
155            q.addAll(q);
156            shouldThrow();
157        } catch (IllegalArgumentException success) {}
158    }
159
160    /**
161     * addAll of a collection with any null elements throws
162     * NullPointerException after possibly adding some elements
163     */
164    public void testAddAll3() {
165        LinkedTransferQueue q = new LinkedTransferQueue();
166        Integer[] ints = new Integer[SIZE];
167        for (int i = 0; i < SIZE - 1; ++i)
168            ints[i] = i;
169        try {
170            q.addAll(Arrays.asList(ints));
171            shouldThrow();
172        } catch (NullPointerException success) {}
173    }
174
175    /**
176     * Queue contains all elements, in traversal order, of successful addAll
177     */
178    public void testAddAll5() {
179        Integer[] empty = new Integer[0];
180        Integer[] ints = new Integer[SIZE];
181        for (int i = 0; i < SIZE; ++i) {
182            ints[i] = i;
183        }
184        LinkedTransferQueue q = new LinkedTransferQueue();
185        assertFalse(q.addAll(Arrays.asList(empty)));
186        assertTrue(q.addAll(Arrays.asList(ints)));
187        for (int i = 0; i < SIZE; ++i) {
188            assertEquals(ints[i], q.poll());
189        }
190    }
191
192    /**
193     * all elements successfully put are contained
194     */
195    public void testPut() {
196        LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
197        for (int i = 0; i < SIZE; ++i) {
198            assertEquals(i, q.size());
199            q.put(i);
200            assertTrue(q.contains(i));
201        }
202    }
203
204    /**
205     * take retrieves elements in FIFO order
206     */
207    public void testTake() throws InterruptedException {
208        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
209        for (int i = 0; i < SIZE; ++i) {
210            assertEquals(i, (int) q.take());
211        }
212    }
213
214    /**
215     * take removes existing elements until empty, then blocks interruptibly
216     */
217    public void testBlockingTake() throws InterruptedException {
218        final BlockingQueue q = populatedQueue(SIZE);
219        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
220        Thread t = newStartedThread(new CheckedRunnable() {
221            public void realRun() throws InterruptedException {
222                for (int i = 0; i < SIZE; ++i) {
223                    assertEquals(i, q.take());
224                }
225
226                Thread.currentThread().interrupt();
227                try {
228                    q.take();
229                    shouldThrow();
230                } catch (InterruptedException success) {}
231                assertFalse(Thread.interrupted());
232
233                pleaseInterrupt.countDown();
234                try {
235                    q.take();
236                    shouldThrow();
237                } catch (InterruptedException success) {}
238                assertFalse(Thread.interrupted());
239            }});
240
241        await(pleaseInterrupt);
242        assertThreadStaysAlive(t);
243        t.interrupt();
244        awaitTermination(t);
245    }
246
247    /**
248     * poll succeeds unless empty
249     */
250    public void testPoll() throws InterruptedException {
251        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252        for (int i = 0; i < SIZE; ++i) {
253            assertEquals(i, (int) q.poll());
254        }
255        assertNull(q.poll());
256        checkEmpty(q);
257    }
258
259    /**
260     * timed poll with zero timeout succeeds when non-empty, else times out
261     */
262    public void testTimedPoll0() throws InterruptedException {
263        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264        for (int i = 0; i < SIZE; ++i) {
265            assertEquals(i, (int) q.poll(0, MILLISECONDS));
266        }
267        assertNull(q.poll(0, MILLISECONDS));
268        checkEmpty(q);
269    }
270
271    /**
272     * timed poll with nonzero timeout succeeds when non-empty, else times out
273     */
274    public void testTimedPoll() throws InterruptedException {
275        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
276        long startTime = System.nanoTime();
277        for (int i = 0; i < SIZE; ++i)
278            assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
279        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
280
281        startTime = System.nanoTime();
282        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
283        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
284        checkEmpty(q);
285    }
286
287    /**
288     * Interrupted timed poll throws InterruptedException instead of
289     * returning timeout status
290     */
291    public void testInterruptedTimedPoll() throws InterruptedException {
292        final BlockingQueue<Integer> q = populatedQueue(SIZE);
293        final CountDownLatch aboutToWait = new CountDownLatch(1);
294        Thread t = newStartedThread(new CheckedRunnable() {
295            public void realRun() throws InterruptedException {
296                long startTime = System.nanoTime();
297                for (int i = 0; i < SIZE; ++i)
298                    assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
299                aboutToWait.countDown();
300                try {
301                    q.poll(LONG_DELAY_MS, MILLISECONDS);
302                    shouldThrow();
303                } catch (InterruptedException success) {}
304                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
305            }});
306
307        aboutToWait.await();
308        waitForThreadToEnterWaitState(t);
309        t.interrupt();
310        awaitTermination(t);
311        checkEmpty(q);
312    }
313
314    /**
315     * timed poll after thread interrupted throws InterruptedException
316     * instead of returning timeout status
317     */
318    public void testTimedPollAfterInterrupt() throws InterruptedException {
319        final BlockingQueue<Integer> q = populatedQueue(SIZE);
320        Thread t = newStartedThread(new CheckedRunnable() {
321            public void realRun() throws InterruptedException {
322                long startTime = System.nanoTime();
323                Thread.currentThread().interrupt();
324                for (int i = 0; i < SIZE; ++i)
325                    assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
326                try {
327                    q.poll(LONG_DELAY_MS, MILLISECONDS);
328                    shouldThrow();
329                } catch (InterruptedException success) {}
330                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
331            }});
332
333        awaitTermination(t);
334        checkEmpty(q);
335    }
336
337    /**
338     * peek returns next element, or null if empty
339     */
340    public void testPeek() throws InterruptedException {
341        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
342        for (int i = 0; i < SIZE; ++i) {
343            assertEquals(i, (int) q.peek());
344            assertEquals(i, (int) q.poll());
345            assertTrue(q.peek() == null ||
346                       i != (int) q.peek());
347        }
348        assertNull(q.peek());
349        checkEmpty(q);
350    }
351
352    /**
353     * element returns next element, or throws NoSuchElementException if empty
354     */
355    public void testElement() throws InterruptedException {
356        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
357        for (int i = 0; i < SIZE; ++i) {
358            assertEquals(i, (int) q.element());
359            assertEquals(i, (int) q.poll());
360        }
361        try {
362            q.element();
363            shouldThrow();
364        } catch (NoSuchElementException success) {}
365        checkEmpty(q);
366    }
367
368    /**
369     * remove removes next element, or throws NoSuchElementException if empty
370     */
371    public void testRemove() throws InterruptedException {
372        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
373        for (int i = 0; i < SIZE; ++i) {
374            assertEquals(i, (int) q.remove());
375        }
376        try {
377            q.remove();
378            shouldThrow();
379        } catch (NoSuchElementException success) {}
380        checkEmpty(q);
381    }
382
383    /**
384     * An add following remove(x) succeeds
385     */
386    public void testRemoveElementAndAdd() throws InterruptedException {
387        LinkedTransferQueue q = new LinkedTransferQueue();
388        assertTrue(q.add(one));
389        assertTrue(q.add(two));
390        assertTrue(q.remove(one));
391        assertTrue(q.remove(two));
392        assertTrue(q.add(three));
393        assertSame(q.take(), three);
394    }
395
396    /**
397     * contains(x) reports true when elements added but not yet removed
398     */
399    public void testContains() {
400        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
401        for (int i = 0; i < SIZE; ++i) {
402            assertTrue(q.contains(i));
403            assertEquals(i, (int) q.poll());
404            assertFalse(q.contains(i));
405        }
406    }
407
408    /**
409     * clear removes all elements
410     */
411    public void testClear() throws InterruptedException {
412        LinkedTransferQueue q = populatedQueue(SIZE);
413        q.clear();
414        checkEmpty(q);
415        assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
416        q.add(one);
417        assertFalse(q.isEmpty());
418        assertEquals(1, q.size());
419        assertTrue(q.contains(one));
420        q.clear();
421        checkEmpty(q);
422    }
423
424    /**
425     * containsAll(c) is true when c contains a subset of elements
426     */
427    public void testContainsAll() {
428        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
429        LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
430        for (int i = 0; i < SIZE; ++i) {
431            assertTrue(q.containsAll(p));
432            assertFalse(p.containsAll(q));
433            p.add(i);
434        }
435        assertTrue(p.containsAll(q));
436    }
437
438    /**
439     * retainAll(c) retains only those elements of c and reports true
440     * if changed
441     */
442    public void testRetainAll() {
443        LinkedTransferQueue q = populatedQueue(SIZE);
444        LinkedTransferQueue p = populatedQueue(SIZE);
445        for (int i = 0; i < SIZE; ++i) {
446            boolean changed = q.retainAll(p);
447            if (i == 0) {
448                assertFalse(changed);
449            } else {
450                assertTrue(changed);
451            }
452            assertTrue(q.containsAll(p));
453            assertEquals(SIZE - i, q.size());
454            p.remove();
455        }
456    }
457
458    /**
459     * removeAll(c) removes only those elements of c and reports true
460     * if changed
461     */
462    public void testRemoveAll() {
463        for (int i = 1; i < SIZE; ++i) {
464            LinkedTransferQueue q = populatedQueue(SIZE);
465            LinkedTransferQueue p = populatedQueue(i);
466            assertTrue(q.removeAll(p));
467            assertEquals(SIZE - i, q.size());
468            for (int j = 0; j < i; ++j) {
469                assertFalse(q.contains(p.remove()));
470            }
471        }
472    }
473
474    /**
475     * toArray() contains all elements in FIFO order
476     */
477    public void testToArray() {
478        LinkedTransferQueue q = populatedQueue(SIZE);
479        Object[] o = q.toArray();
480        for (int i = 0; i < o.length; i++) {
481            assertSame(o[i], q.poll());
482        }
483    }
484
485    /**
486     * toArray(a) contains all elements in FIFO order
487     */
488    public void testToArray2() {
489        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
490        Integer[] ints = new Integer[SIZE];
491        Integer[] array = q.toArray(ints);
492        assertSame(ints, array);
493        for (int i = 0; i < ints.length; i++) {
494            assertSame(ints[i], q.poll());
495        }
496    }
497
498    /**
499     * toArray(incompatible array type) throws ArrayStoreException
500     */
501    public void testToArray1_BadArg() {
502        LinkedTransferQueue q = populatedQueue(SIZE);
503        try {
504            q.toArray(new String[10]);
505            shouldThrow();
506        } catch (ArrayStoreException success) {}
507    }
508
509    /**
510     * iterator iterates through all elements
511     */
512    public void testIterator() throws InterruptedException {
513        LinkedTransferQueue q = populatedQueue(SIZE);
514        Iterator it = q.iterator();
515        int i;
516        for (i = 0; it.hasNext(); i++)
517            assertTrue(q.contains(it.next()));
518        assertEquals(i, SIZE);
519        assertIteratorExhausted(it);
520
521        it = q.iterator();
522        for (i = 0; it.hasNext(); i++)
523            assertEquals(it.next(), q.take());
524        assertEquals(i, SIZE);
525        assertIteratorExhausted(it);
526    }
527
528    /**
529     * iterator of empty collection has no elements
530     */
531    public void testEmptyIterator() {
532        assertIteratorExhausted(new LinkedTransferQueue().iterator());
533    }
534
535    /**
536     * iterator.remove() removes current element
537     */
538    public void testIteratorRemove() {
539        final LinkedTransferQueue q = new LinkedTransferQueue();
540        q.add(two);
541        q.add(one);
542        q.add(three);
543
544        Iterator it = q.iterator();
545        it.next();
546        it.remove();
547
548        it = q.iterator();
549        assertSame(it.next(), one);
550        assertSame(it.next(), three);
551        assertFalse(it.hasNext());
552    }
553
554    /**
555     * iterator ordering is FIFO
556     */
557    public void testIteratorOrdering() {
558        final LinkedTransferQueue<Integer> q
559            = new LinkedTransferQueue<Integer>();
560        assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
561        q.add(one);
562        q.add(two);
563        q.add(three);
564        assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
565        int k = 0;
566        for (Integer n : q) {
567            assertEquals(++k, (int) n);
568        }
569        assertEquals(3, k);
570    }
571
572    /**
573     * Modifications do not cause iterators to fail
574     */
575    public void testWeaklyConsistentIteration() {
576        final LinkedTransferQueue q = new LinkedTransferQueue();
577        q.add(one);
578        q.add(two);
579        q.add(three);
580        for (Iterator it = q.iterator(); it.hasNext();) {
581            q.remove();
582            it.next();
583        }
584        assertEquals(0, q.size());
585    }
586
587    /**
588     * toString contains toStrings of elements
589     */
590    public void testToString() {
591        LinkedTransferQueue q = populatedQueue(SIZE);
592        String s = q.toString();
593        for (int i = 0; i < SIZE; ++i) {
594            assertTrue(s.contains(String.valueOf(i)));
595        }
596    }
597
598    /**
599     * offer transfers elements across Executor tasks
600     */
601    public void testOfferInExecutor() {
602        final LinkedTransferQueue q = new LinkedTransferQueue();
603        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
604        final ExecutorService executor = Executors.newFixedThreadPool(2);
605        try (PoolCleaner cleaner = cleaner(executor)) {
606
607            executor.execute(new CheckedRunnable() {
608                public void realRun() throws InterruptedException {
609                    threadsStarted.await();
610                    long startTime = System.nanoTime();
611                    assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
612                    assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
613                }});
614
615            executor.execute(new CheckedRunnable() {
616                public void realRun() throws InterruptedException {
617                    threadsStarted.await();
618                    assertSame(one, q.take());
619                    checkEmpty(q);
620                }});
621        }
622    }
623
624    /**
625     * timed poll retrieves elements across Executor threads
626     */
627    public void testPollInExecutor() {
628        final LinkedTransferQueue q = new LinkedTransferQueue();
629        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
630        final ExecutorService executor = Executors.newFixedThreadPool(2);
631        try (PoolCleaner cleaner = cleaner(executor)) {
632
633            executor.execute(new CheckedRunnable() {
634                public void realRun() throws InterruptedException {
635                    assertNull(q.poll());
636                    threadsStarted.await();
637                    long startTime = System.nanoTime();
638                    assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
639                    assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
640                    checkEmpty(q);
641                }});
642
643            executor.execute(new CheckedRunnable() {
644                public void realRun() throws InterruptedException {
645                    threadsStarted.await();
646                    q.put(one);
647                }});
648        }
649    }
650
651    /**
652     * A deserialized serialized queue has same elements in same order
653     */
654    public void testSerialization() throws Exception {
655        Queue x = populatedQueue(SIZE);
656        Queue y = serialClone(x);
657
658        assertNotSame(y, x);
659        assertEquals(x.size(), y.size());
660        assertEquals(x.toString(), y.toString());
661        assertTrue(Arrays.equals(x.toArray(), y.toArray()));
662        while (!x.isEmpty()) {
663            assertFalse(y.isEmpty());
664            assertEquals(x.remove(), y.remove());
665        }
666        assertTrue(y.isEmpty());
667    }
668
669    /**
670     * drainTo(c) empties queue into another collection c
671     */
672    public void testDrainTo() {
673        LinkedTransferQueue q = populatedQueue(SIZE);
674        ArrayList l = new ArrayList();
675        q.drainTo(l);
676        assertEquals(0, q.size());
677        assertEquals(SIZE, l.size());
678        for (int i = 0; i < SIZE; ++i) {
679            assertEquals(i, l.get(i));
680        }
681        q.add(zero);
682        q.add(one);
683        assertFalse(q.isEmpty());
684        assertTrue(q.contains(zero));
685        assertTrue(q.contains(one));
686        l.clear();
687        q.drainTo(l);
688        assertEquals(0, q.size());
689        assertEquals(2, l.size());
690        for (int i = 0; i < 2; ++i) {
691            assertEquals(i, l.get(i));
692        }
693    }
694
695    /**
696     * drainTo(c) empties full queue, unblocking a waiting put.
697     */
698    public void testDrainToWithActivePut() throws InterruptedException {
699        final LinkedTransferQueue q = populatedQueue(SIZE);
700        Thread t = newStartedThread(new CheckedRunnable() {
701            public void realRun() {
702                q.put(SIZE + 1);
703            }});
704        ArrayList l = new ArrayList();
705        q.drainTo(l);
706        assertTrue(l.size() >= SIZE);
707        for (int i = 0; i < SIZE; ++i)
708            assertEquals(i, l.get(i));
709        awaitTermination(t);
710        assertTrue(q.size() + l.size() >= SIZE);
711    }
712
713    /**
714     * drainTo(c, n) empties first min(n, size) elements of queue into c
715     */
716    public void testDrainToN() {
717        LinkedTransferQueue q = new LinkedTransferQueue();
718        for (int i = 0; i < SIZE + 2; ++i) {
719            for (int j = 0; j < SIZE; j++) {
720                assertTrue(q.offer(j));
721            }
722            ArrayList l = new ArrayList();
723            q.drainTo(l, i);
724            int k = (i < SIZE) ? i : SIZE;
725            assertEquals(k, l.size());
726            assertEquals(SIZE - k, q.size());
727            for (int j = 0; j < k; ++j)
728                assertEquals(j, l.get(j));
729            do {} while (q.poll() != null);
730        }
731    }
732
733    /**
734     * timed poll() or take() increments the waiting consumer count;
735     * offer(e) decrements the waiting consumer count
736     */
737    public void testWaitingConsumer() throws InterruptedException {
738        final LinkedTransferQueue q = new LinkedTransferQueue();
739        assertEquals(0, q.getWaitingConsumerCount());
740        assertFalse(q.hasWaitingConsumer());
741        final CountDownLatch threadStarted = new CountDownLatch(1);
742
743        Thread t = newStartedThread(new CheckedRunnable() {
744            public void realRun() throws InterruptedException {
745                threadStarted.countDown();
746                long startTime = System.nanoTime();
747                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
748                assertEquals(0, q.getWaitingConsumerCount());
749                assertFalse(q.hasWaitingConsumer());
750                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
751            }});
752
753        threadStarted.await();
754        Callable<Boolean> oneConsumer
755            = new Callable<Boolean>() { public Boolean call() {
756                return q.hasWaitingConsumer()
757                && q.getWaitingConsumerCount() == 1; }};
758        waitForThreadToEnterWaitState(t, oneConsumer);
759
760        assertTrue(q.offer(one));
761        assertEquals(0, q.getWaitingConsumerCount());
762        assertFalse(q.hasWaitingConsumer());
763
764        awaitTermination(t);
765    }
766
767    /**
768     * transfer(null) throws NullPointerException
769     */
770    public void testTransfer1() throws InterruptedException {
771        try {
772            LinkedTransferQueue q = new LinkedTransferQueue();
773            q.transfer(null);
774            shouldThrow();
775        } catch (NullPointerException success) {}
776    }
777
778    /**
779     * transfer waits until a poll occurs. The transfered element
780     * is returned by this associated poll.
781     */
782    public void testTransfer2() throws InterruptedException {
783        final LinkedTransferQueue<Integer> q
784            = new LinkedTransferQueue<Integer>();
785        final CountDownLatch threadStarted = new CountDownLatch(1);
786
787        Thread t = newStartedThread(new CheckedRunnable() {
788            public void realRun() throws InterruptedException {
789                threadStarted.countDown();
790                q.transfer(five);
791                checkEmpty(q);
792            }});
793
794        threadStarted.await();
795        Callable<Boolean> oneElement
796            = new Callable<Boolean>() { public Boolean call() {
797                return !q.isEmpty() && q.size() == 1; }};
798        waitForThreadToEnterWaitState(t, oneElement);
799
800        assertSame(five, q.poll());
801        checkEmpty(q);
802        awaitTermination(t);
803    }
804
805    /**
806     * transfer waits until a poll occurs, and then transfers in fifo order
807     */
808    public void testTransfer3() throws InterruptedException {
809        final LinkedTransferQueue<Integer> q
810            = new LinkedTransferQueue<Integer>();
811
812        Thread first = newStartedThread(new CheckedRunnable() {
813            public void realRun() throws InterruptedException {
814                q.transfer(four);
815                assertTrue(!q.contains(four));
816                assertEquals(1, q.size());
817            }});
818
819        Thread interruptedThread = newStartedThread(
820            new CheckedInterruptedRunnable() {
821                public void realRun() throws InterruptedException {
822                    while (q.isEmpty())
823                        Thread.yield();
824                    q.transfer(five);
825                }});
826
827        while (q.size() < 2)
828            Thread.yield();
829        assertEquals(2, q.size());
830        assertSame(four, q.poll());
831        first.join();
832        assertEquals(1, q.size());
833        interruptedThread.interrupt();
834        interruptedThread.join();
835        checkEmpty(q);
836    }
837
838    /**
839     * transfer waits until a poll occurs, at which point the polling
840     * thread returns the element
841     */
842    public void testTransfer4() throws InterruptedException {
843        final LinkedTransferQueue q = new LinkedTransferQueue();
844
845        Thread t = newStartedThread(new CheckedRunnable() {
846            public void realRun() throws InterruptedException {
847                q.transfer(four);
848                assertFalse(q.contains(four));
849                assertSame(three, q.poll());
850            }});
851
852        while (q.isEmpty())
853            Thread.yield();
854        assertFalse(q.isEmpty());
855        assertEquals(1, q.size());
856        assertTrue(q.offer(three));
857        assertSame(four, q.poll());
858        awaitTermination(t);
859    }
860
861    /**
862     * transfer waits until a take occurs. The transfered element
863     * is returned by this associated take.
864     */
865    public void testTransfer5() throws InterruptedException {
866        final LinkedTransferQueue<Integer> q
867            = new LinkedTransferQueue<Integer>();
868
869        Thread t = newStartedThread(new CheckedRunnable() {
870            public void realRun() throws InterruptedException {
871                q.transfer(four);
872                checkEmpty(q);
873            }});
874
875        while (q.isEmpty())
876            Thread.yield();
877        assertFalse(q.isEmpty());
878        assertEquals(1, q.size());
879        assertSame(four, q.take());
880        checkEmpty(q);
881        awaitTermination(t);
882    }
883
884    /**
885     * tryTransfer(null) throws NullPointerException
886     */
887    public void testTryTransfer1() {
888        final LinkedTransferQueue q = new LinkedTransferQueue();
889        try {
890            q.tryTransfer(null);
891            shouldThrow();
892        } catch (NullPointerException success) {}
893    }
894
895    /**
896     * tryTransfer returns false and does not enqueue if there are no
897     * consumers waiting to poll or take.
898     */
899    public void testTryTransfer2() throws InterruptedException {
900        final LinkedTransferQueue q = new LinkedTransferQueue();
901        assertFalse(q.tryTransfer(new Object()));
902        assertFalse(q.hasWaitingConsumer());
903        checkEmpty(q);
904    }
905
906    /**
907     * If there is a consumer waiting in timed poll, tryTransfer
908     * returns true while successfully transfering object.
909     */
910    public void testTryTransfer3() throws InterruptedException {
911        final Object hotPotato = new Object();
912        final LinkedTransferQueue q = new LinkedTransferQueue();
913
914        Thread t = newStartedThread(new CheckedRunnable() {
915            public void realRun() {
916                while (! q.hasWaitingConsumer())
917                    Thread.yield();
918                assertTrue(q.hasWaitingConsumer());
919                checkEmpty(q);
920                assertTrue(q.tryTransfer(hotPotato));
921            }});
922
923        long startTime = System.nanoTime();
924        assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
925        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
926        checkEmpty(q);
927        awaitTermination(t);
928    }
929
930    /**
931     * If there is a consumer waiting in take, tryTransfer returns
932     * true while successfully transfering object.
933     */
934    public void testTryTransfer4() throws InterruptedException {
935        final Object hotPotato = new Object();
936        final LinkedTransferQueue q = new LinkedTransferQueue();
937
938        Thread t = newStartedThread(new CheckedRunnable() {
939            public void realRun() {
940                while (! q.hasWaitingConsumer())
941                    Thread.yield();
942                assertTrue(q.hasWaitingConsumer());
943                checkEmpty(q);
944                assertTrue(q.tryTransfer(hotPotato));
945            }});
946
947        assertSame(q.take(), hotPotato);
948        checkEmpty(q);
949        awaitTermination(t);
950    }
951
952    /**
953     * tryTransfer blocks interruptibly if no takers
954     */
955    public void testTryTransfer5() throws InterruptedException {
956        final LinkedTransferQueue q = new LinkedTransferQueue();
957        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
958        assertTrue(q.isEmpty());
959
960        Thread t = newStartedThread(new CheckedRunnable() {
961            public void realRun() throws InterruptedException {
962                long startTime = System.nanoTime();
963                Thread.currentThread().interrupt();
964                try {
965                    q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
966                    shouldThrow();
967                } catch (InterruptedException success) {}
968                assertFalse(Thread.interrupted());
969
970                pleaseInterrupt.countDown();
971                try {
972                    q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
973                    shouldThrow();
974                } catch (InterruptedException success) {}
975                assertFalse(Thread.interrupted());
976                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
977            }});
978
979        await(pleaseInterrupt);
980        assertThreadStaysAlive(t);
981        t.interrupt();
982        awaitTermination(t);
983        checkEmpty(q);
984    }
985
986    /**
987     * tryTransfer gives up after the timeout and returns false
988     */
989    public void testTryTransfer6() throws InterruptedException {
990        final LinkedTransferQueue q = new LinkedTransferQueue();
991
992        Thread t = newStartedThread(new CheckedRunnable() {
993            public void realRun() throws InterruptedException {
994                long startTime = System.nanoTime();
995                assertFalse(q.tryTransfer(new Object(),
996                                          timeoutMillis(), MILLISECONDS));
997                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
998                checkEmpty(q);
999            }});
1000
1001        awaitTermination(t);
1002        checkEmpty(q);
1003    }
1004
1005    /**
1006     * tryTransfer waits for any elements previously in to be removed
1007     * before transfering to a poll or take
1008     */
1009    public void testTryTransfer7() throws InterruptedException {
1010        final LinkedTransferQueue q = new LinkedTransferQueue();
1011        assertTrue(q.offer(four));
1012
1013        Thread t = newStartedThread(new CheckedRunnable() {
1014            public void realRun() throws InterruptedException {
1015                long startTime = System.nanoTime();
1016                assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1017                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1018                checkEmpty(q);
1019            }});
1020
1021        while (q.size() != 2)
1022            Thread.yield();
1023        assertEquals(2, q.size());
1024        assertSame(four, q.poll());
1025        assertSame(five, q.poll());
1026        checkEmpty(q);
1027        awaitTermination(t);
1028    }
1029
1030    /**
1031     * tryTransfer attempts to enqueue into the queue and fails
1032     * returning false not enqueueing and the successive poll is null
1033     */
1034    public void testTryTransfer8() throws InterruptedException {
1035        final LinkedTransferQueue q = new LinkedTransferQueue();
1036        assertTrue(q.offer(four));
1037        assertEquals(1, q.size());
1038        long startTime = System.nanoTime();
1039        assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1040        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1041        assertEquals(1, q.size());
1042        assertSame(four, q.poll());
1043        assertNull(q.poll());
1044        checkEmpty(q);
1045    }
1046
1047    private LinkedTransferQueue<Integer> populatedQueue(int n) {
1048        LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1049        checkEmpty(q);
1050        for (int i = 0; i < n; i++) {
1051            assertEquals(i, q.size());
1052            assertTrue(q.offer(i));
1053            assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1054        }
1055        assertFalse(q.isEmpty());
1056        return q;
1057    }
1058
1059    /**
1060     * remove(null), contains(null) always return false
1061     */
1062    public void testNeverContainsNull() {
1063        Collection<?>[] qs = {
1064            new LinkedTransferQueue<Object>(),
1065            populatedQueue(2),
1066        };
1067
1068        for (Collection<?> q : qs) {
1069            assertFalse(q.contains(null));
1070            assertFalse(q.remove(null));
1071        }
1072    }
1073}
1074