1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8package jsr166;
9
10import static java.util.concurrent.TimeUnit.MILLISECONDS;
11
12import java.util.ArrayList;
13import java.util.Arrays;
14import java.util.Collection;
15import java.util.Iterator;
16import java.util.List;
17import java.util.NoSuchElementException;
18import java.util.Queue;
19import java.util.concurrent.BlockingQueue;
20import java.util.concurrent.CountDownLatch;
21import java.util.concurrent.Executors;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.LinkedTransferQueue;
24
25import junit.framework.Test;
26
27@SuppressWarnings({"unchecked", "rawtypes"})
28public class LinkedTransferQueueTest extends JSR166TestCase {
29    static class Implementation implements CollectionImplementation {
30        public Class<?> klazz() { return LinkedTransferQueue.class; }
31        public Collection emptyCollection() { return new LinkedTransferQueue(); }
32        public Object makeElement(int i) { return i; }
33        public boolean isConcurrent() { return true; }
34        public boolean permitsNulls() { return false; }
35    }
36
37    // android-note: These tests have been moved into their own separate
38    // classes to work around CTS issues:
39    // LinkedTransferQueueBlockingQueueTest.java
40    // LinkedTransferQueueCollectionTest.java
41    //
42    // public static class Generic extends BlockingQueueTest {
43    //     protected BlockingQueue emptyCollection() {
44    //         return new LinkedTransferQueue();
45    //     }
46    // }
47
48    // android-note: Removed because the CTS runner does a bad job of
49    // retrying tests that have suite() declarations.
50    //
51    // public static void main(String[] args) {
52    //     main(suite(), args);
53    // }
54    // public static Test suite() {
55    //     return newTestSuite(LinkedTransferQueueTest.class,
56    //                         new Generic().testSuite(),
57    //                         CollectionTest.testSuite(new Implementation()));
58    // }
59
60    /**
61     * Constructor builds new queue with size being zero and empty
62     * being true
63     */
64    public void testConstructor1() {
65        assertEquals(0, new LinkedTransferQueue().size());
66        assertTrue(new LinkedTransferQueue().isEmpty());
67    }
68
69    /**
70     * Initializing constructor with null collection throws
71     * NullPointerException
72     */
73    public void testConstructor2() {
74        try {
75            new LinkedTransferQueue(null);
76            shouldThrow();
77        } catch (NullPointerException success) {}
78    }
79
80    /**
81     * Initializing from Collection of null elements throws
82     * NullPointerException
83     */
84    public void testConstructor3() {
85        Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
86        try {
87            new LinkedTransferQueue(elements);
88            shouldThrow();
89        } catch (NullPointerException success) {}
90    }
91
92    /**
93     * Initializing constructor with a collection containing some null elements
94     * throws NullPointerException
95     */
96    public void testConstructor4() {
97        Integer[] ints = new Integer[SIZE];
98        for (int i = 0; i < SIZE - 1; ++i)
99            ints[i] = i;
100        Collection<Integer> elements = Arrays.asList(ints);
101        try {
102            new LinkedTransferQueue(elements);
103            shouldThrow();
104        } catch (NullPointerException success) {}
105    }
106
107    /**
108     * Queue contains all elements of the collection it is initialized by
109     */
110    public void testConstructor5() {
111        Integer[] ints = new Integer[SIZE];
112        for (int i = 0; i < SIZE; ++i) {
113            ints[i] = i;
114        }
115        List intList = Arrays.asList(ints);
116        LinkedTransferQueue q
117            = new LinkedTransferQueue(intList);
118        assertEquals(q.size(), intList.size());
119        assertEquals(q.toString(), intList.toString());
120        assertTrue(Arrays.equals(q.toArray(),
121                                     intList.toArray()));
122        assertTrue(Arrays.equals(q.toArray(new Object[0]),
123                                 intList.toArray(new Object[0])));
124        assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
125                                 intList.toArray(new Object[SIZE])));
126        for (int i = 0; i < SIZE; ++i) {
127            assertEquals(ints[i], q.poll());
128        }
129    }
130
131    /**
132     * remainingCapacity() always returns Integer.MAX_VALUE
133     */
134    public void testRemainingCapacity() {
135        BlockingQueue q = populatedQueue(SIZE);
136        for (int i = 0; i < SIZE; ++i) {
137            assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
138            assertEquals(SIZE - i, q.size());
139            assertEquals(i, q.remove());
140        }
141        for (int i = 0; i < SIZE; ++i) {
142            assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
143            assertEquals(i, q.size());
144            assertTrue(q.add(i));
145        }
146    }
147
148    /**
149     * addAll(this) throws IllegalArgumentException
150     */
151    public void testAddAllSelf() {
152        LinkedTransferQueue q = populatedQueue(SIZE);
153        try {
154            q.addAll(q);
155            shouldThrow();
156        } catch (IllegalArgumentException success) {}
157    }
158
159    /**
160     * addAll of a collection with any null elements throws
161     * NullPointerException after possibly adding some elements
162     */
163    public void testAddAll3() {
164        LinkedTransferQueue q = new LinkedTransferQueue();
165        Integer[] ints = new Integer[SIZE];
166        for (int i = 0; i < SIZE - 1; ++i)
167            ints[i] = i;
168        try {
169            q.addAll(Arrays.asList(ints));
170            shouldThrow();
171        } catch (NullPointerException success) {}
172    }
173
174    /**
175     * Queue contains all elements, in traversal order, of successful addAll
176     */
177    public void testAddAll5() {
178        Integer[] empty = new Integer[0];
179        Integer[] ints = new Integer[SIZE];
180        for (int i = 0; i < SIZE; ++i) {
181            ints[i] = i;
182        }
183        LinkedTransferQueue q = new LinkedTransferQueue();
184        assertFalse(q.addAll(Arrays.asList(empty)));
185        assertTrue(q.addAll(Arrays.asList(ints)));
186        for (int i = 0; i < SIZE; ++i) {
187            assertEquals(ints[i], q.poll());
188        }
189    }
190
191    /**
192     * all elements successfully put are contained
193     */
194    public void testPut() {
195        LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
196        for (int i = 0; i < SIZE; ++i) {
197            assertEquals(i, q.size());
198            q.put(i);
199            assertTrue(q.contains(i));
200        }
201    }
202
203    /**
204     * take retrieves elements in FIFO order
205     */
206    public void testTake() throws InterruptedException {
207        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
208        for (int i = 0; i < SIZE; ++i) {
209            assertEquals(i, (int) q.take());
210        }
211    }
212
213    /**
214     * take removes existing elements until empty, then blocks interruptibly
215     */
216    public void testBlockingTake() throws InterruptedException {
217        final BlockingQueue q = populatedQueue(SIZE);
218        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
219        Thread t = newStartedThread(new CheckedRunnable() {
220            public void realRun() throws InterruptedException {
221                for (int i = 0; i < SIZE; ++i) {
222                    assertEquals(i, q.take());
223                }
224
225                Thread.currentThread().interrupt();
226                try {
227                    q.take();
228                    shouldThrow();
229                } catch (InterruptedException success) {}
230                assertFalse(Thread.interrupted());
231
232                pleaseInterrupt.countDown();
233                try {
234                    q.take();
235                    shouldThrow();
236                } catch (InterruptedException success) {}
237                assertFalse(Thread.interrupted());
238            }});
239
240        await(pleaseInterrupt);
241        assertThreadStaysAlive(t);
242        t.interrupt();
243        awaitTermination(t);
244    }
245
246    /**
247     * poll succeeds unless empty
248     */
249    public void testPoll() throws InterruptedException {
250        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
251        for (int i = 0; i < SIZE; ++i) {
252            assertEquals(i, (int) q.poll());
253        }
254        assertNull(q.poll());
255        checkEmpty(q);
256    }
257
258    /**
259     * timed poll with zero timeout succeeds when non-empty, else times out
260     */
261    public void testTimedPoll0() throws InterruptedException {
262        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
263        for (int i = 0; i < SIZE; ++i) {
264            assertEquals(i, (int) q.poll(0, MILLISECONDS));
265        }
266        assertNull(q.poll(0, MILLISECONDS));
267        checkEmpty(q);
268    }
269
270    /**
271     * timed poll with nonzero timeout succeeds when non-empty, else times out
272     */
273    public void testTimedPoll() throws InterruptedException {
274        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
275        long startTime = System.nanoTime();
276        for (int i = 0; i < SIZE; ++i)
277            assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
278        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
279
280        startTime = System.nanoTime();
281        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
282        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
283        checkEmpty(q);
284    }
285
286    /**
287     * Interrupted timed poll throws InterruptedException instead of
288     * returning timeout status
289     */
290    public void testInterruptedTimedPoll() throws InterruptedException {
291        final BlockingQueue<Integer> q = populatedQueue(SIZE);
292        final CountDownLatch aboutToWait = new CountDownLatch(1);
293        Thread t = newStartedThread(new CheckedRunnable() {
294            public void realRun() throws InterruptedException {
295                long startTime = System.nanoTime();
296                for (int i = 0; i < SIZE; ++i)
297                    assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
298                aboutToWait.countDown();
299                try {
300                    q.poll(LONG_DELAY_MS, MILLISECONDS);
301                    shouldThrow();
302                } catch (InterruptedException success) {}
303                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
304            }});
305
306        aboutToWait.await();
307        waitForThreadToEnterWaitState(t);
308        t.interrupt();
309        awaitTermination(t);
310        checkEmpty(q);
311    }
312
313    /**
314     * timed poll after thread interrupted throws InterruptedException
315     * instead of returning timeout status
316     */
317    public void testTimedPollAfterInterrupt() throws InterruptedException {
318        final BlockingQueue<Integer> q = populatedQueue(SIZE);
319        Thread t = newStartedThread(new CheckedRunnable() {
320            public void realRun() throws InterruptedException {
321                long startTime = System.nanoTime();
322                Thread.currentThread().interrupt();
323                for (int i = 0; i < SIZE; ++i)
324                    assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
325                try {
326                    q.poll(LONG_DELAY_MS, MILLISECONDS);
327                    shouldThrow();
328                } catch (InterruptedException success) {}
329                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
330            }});
331
332        awaitTermination(t);
333        checkEmpty(q);
334    }
335
336    /**
337     * peek returns next element, or null if empty
338     */
339    public void testPeek() throws InterruptedException {
340        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
341        for (int i = 0; i < SIZE; ++i) {
342            assertEquals(i, (int) q.peek());
343            assertEquals(i, (int) q.poll());
344            assertTrue(q.peek() == null ||
345                       i != (int) q.peek());
346        }
347        assertNull(q.peek());
348        checkEmpty(q);
349    }
350
351    /**
352     * element returns next element, or throws NoSuchElementException if empty
353     */
354    public void testElement() throws InterruptedException {
355        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
356        for (int i = 0; i < SIZE; ++i) {
357            assertEquals(i, (int) q.element());
358            assertEquals(i, (int) q.poll());
359        }
360        try {
361            q.element();
362            shouldThrow();
363        } catch (NoSuchElementException success) {}
364        checkEmpty(q);
365    }
366
367    /**
368     * remove removes next element, or throws NoSuchElementException if empty
369     */
370    public void testRemove() throws InterruptedException {
371        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
372        for (int i = 0; i < SIZE; ++i) {
373            assertEquals(i, (int) q.remove());
374        }
375        try {
376            q.remove();
377            shouldThrow();
378        } catch (NoSuchElementException success) {}
379        checkEmpty(q);
380    }
381
382    /**
383     * An add following remove(x) succeeds
384     */
385    public void testRemoveElementAndAdd() throws InterruptedException {
386        LinkedTransferQueue q = new LinkedTransferQueue();
387        assertTrue(q.add(one));
388        assertTrue(q.add(two));
389        assertTrue(q.remove(one));
390        assertTrue(q.remove(two));
391        assertTrue(q.add(three));
392        assertSame(q.take(), three);
393    }
394
395    /**
396     * contains(x) reports true when elements added but not yet removed
397     */
398    public void testContains() {
399        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
400        for (int i = 0; i < SIZE; ++i) {
401            assertTrue(q.contains(i));
402            assertEquals(i, (int) q.poll());
403            assertFalse(q.contains(i));
404        }
405    }
406
407    /**
408     * clear removes all elements
409     */
410    public void testClear() throws InterruptedException {
411        LinkedTransferQueue q = populatedQueue(SIZE);
412        q.clear();
413        checkEmpty(q);
414        assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
415        q.add(one);
416        assertFalse(q.isEmpty());
417        assertEquals(1, q.size());
418        assertTrue(q.contains(one));
419        q.clear();
420        checkEmpty(q);
421    }
422
423    /**
424     * containsAll(c) is true when c contains a subset of elements
425     */
426    public void testContainsAll() {
427        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
428        LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
429        for (int i = 0; i < SIZE; ++i) {
430            assertTrue(q.containsAll(p));
431            assertFalse(p.containsAll(q));
432            p.add(i);
433        }
434        assertTrue(p.containsAll(q));
435    }
436
437    /**
438     * retainAll(c) retains only those elements of c and reports true
439     * if changed
440     */
441    public void testRetainAll() {
442        LinkedTransferQueue q = populatedQueue(SIZE);
443        LinkedTransferQueue p = populatedQueue(SIZE);
444        for (int i = 0; i < SIZE; ++i) {
445            boolean changed = q.retainAll(p);
446            if (i == 0) {
447                assertFalse(changed);
448            } else {
449                assertTrue(changed);
450            }
451            assertTrue(q.containsAll(p));
452            assertEquals(SIZE - i, q.size());
453            p.remove();
454        }
455    }
456
457    /**
458     * removeAll(c) removes only those elements of c and reports true
459     * if changed
460     */
461    public void testRemoveAll() {
462        for (int i = 1; i < SIZE; ++i) {
463            LinkedTransferQueue q = populatedQueue(SIZE);
464            LinkedTransferQueue p = populatedQueue(i);
465            assertTrue(q.removeAll(p));
466            assertEquals(SIZE - i, q.size());
467            for (int j = 0; j < i; ++j) {
468                assertFalse(q.contains(p.remove()));
469            }
470        }
471    }
472
473    /**
474     * toArray() contains all elements in FIFO order
475     */
476    public void testToArray() {
477        LinkedTransferQueue q = populatedQueue(SIZE);
478        Object[] o = q.toArray();
479        for (int i = 0; i < o.length; i++) {
480            assertSame(o[i], q.poll());
481        }
482    }
483
484    /**
485     * toArray(a) contains all elements in FIFO order
486     */
487    public void testToArray2() {
488        LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
489        Integer[] ints = new Integer[SIZE];
490        Integer[] array = q.toArray(ints);
491        assertSame(ints, array);
492        for (int i = 0; i < ints.length; i++) {
493            assertSame(ints[i], q.poll());
494        }
495    }
496
497    /**
498     * toArray(incompatible array type) throws ArrayStoreException
499     */
500    public void testToArray1_BadArg() {
501        LinkedTransferQueue q = populatedQueue(SIZE);
502        try {
503            q.toArray(new String[10]);
504            shouldThrow();
505        } catch (ArrayStoreException success) {}
506    }
507
508    /**
509     * iterator iterates through all elements
510     */
511    public void testIterator() throws InterruptedException {
512        LinkedTransferQueue q = populatedQueue(SIZE);
513        Iterator it = q.iterator();
514        int i;
515        for (i = 0; it.hasNext(); i++)
516            assertTrue(q.contains(it.next()));
517        assertEquals(i, SIZE);
518        assertIteratorExhausted(it);
519
520        it = q.iterator();
521        for (i = 0; it.hasNext(); i++)
522            assertEquals(it.next(), q.take());
523        assertEquals(i, SIZE);
524        assertIteratorExhausted(it);
525    }
526
527    /**
528     * iterator of empty collection has no elements
529     */
530    public void testEmptyIterator() {
531        assertIteratorExhausted(new LinkedTransferQueue().iterator());
532    }
533
534    /**
535     * iterator.remove() removes current element
536     */
537    public void testIteratorRemove() {
538        final LinkedTransferQueue q = new LinkedTransferQueue();
539        q.add(two);
540        q.add(one);
541        q.add(three);
542
543        Iterator it = q.iterator();
544        it.next();
545        it.remove();
546
547        it = q.iterator();
548        assertSame(it.next(), one);
549        assertSame(it.next(), three);
550        assertFalse(it.hasNext());
551    }
552
553    /**
554     * iterator ordering is FIFO
555     */
556    public void testIteratorOrdering() {
557        final LinkedTransferQueue<Integer> q
558            = new LinkedTransferQueue<Integer>();
559        assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
560        q.add(one);
561        q.add(two);
562        q.add(three);
563        assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
564        int k = 0;
565        for (Integer n : q) {
566            assertEquals(++k, (int) n);
567        }
568        assertEquals(3, k);
569    }
570
571    /**
572     * Modifications do not cause iterators to fail
573     */
574    public void testWeaklyConsistentIteration() {
575        final LinkedTransferQueue q = new LinkedTransferQueue();
576        q.add(one);
577        q.add(two);
578        q.add(three);
579        for (Iterator it = q.iterator(); it.hasNext();) {
580            q.remove();
581            it.next();
582        }
583        assertEquals(0, q.size());
584    }
585
586    /**
587     * toString contains toStrings of elements
588     */
589    public void testToString() {
590        LinkedTransferQueue q = populatedQueue(SIZE);
591        String s = q.toString();
592        for (int i = 0; i < SIZE; ++i) {
593            assertTrue(s.contains(String.valueOf(i)));
594        }
595    }
596
597    /**
598     * offer transfers elements across Executor tasks
599     */
600    public void testOfferInExecutor() {
601        final LinkedTransferQueue q = new LinkedTransferQueue();
602        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
603        final ExecutorService executor = Executors.newFixedThreadPool(2);
604        try (PoolCleaner cleaner = cleaner(executor)) {
605
606            executor.execute(new CheckedRunnable() {
607                public void realRun() throws InterruptedException {
608                    threadsStarted.await();
609                    long startTime = System.nanoTime();
610                    assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
611                    assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
612                }});
613
614            executor.execute(new CheckedRunnable() {
615                public void realRun() throws InterruptedException {
616                    threadsStarted.await();
617                    assertSame(one, q.take());
618                    checkEmpty(q);
619                }});
620        }
621    }
622
623    /**
624     * timed poll retrieves elements across Executor threads
625     */
626    public void testPollInExecutor() {
627        final LinkedTransferQueue q = new LinkedTransferQueue();
628        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
629        final ExecutorService executor = Executors.newFixedThreadPool(2);
630        try (PoolCleaner cleaner = cleaner(executor)) {
631
632            executor.execute(new CheckedRunnable() {
633                public void realRun() throws InterruptedException {
634                    assertNull(q.poll());
635                    threadsStarted.await();
636                    long startTime = System.nanoTime();
637                    assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
638                    assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
639                    checkEmpty(q);
640                }});
641
642            executor.execute(new CheckedRunnable() {
643                public void realRun() throws InterruptedException {
644                    threadsStarted.await();
645                    q.put(one);
646                }});
647        }
648    }
649
650    /**
651     * A deserialized serialized queue has same elements in same order
652     */
653    public void testSerialization() throws Exception {
654        Queue x = populatedQueue(SIZE);
655        Queue y = serialClone(x);
656
657        assertNotSame(y, x);
658        assertEquals(x.size(), y.size());
659        assertEquals(x.toString(), y.toString());
660        assertTrue(Arrays.equals(x.toArray(), y.toArray()));
661        while (!x.isEmpty()) {
662            assertFalse(y.isEmpty());
663            assertEquals(x.remove(), y.remove());
664        }
665        assertTrue(y.isEmpty());
666    }
667
668    /**
669     * drainTo(c) empties queue into another collection c
670     */
671    public void testDrainTo() {
672        LinkedTransferQueue q = populatedQueue(SIZE);
673        ArrayList l = new ArrayList();
674        q.drainTo(l);
675        assertEquals(0, q.size());
676        assertEquals(SIZE, l.size());
677        for (int i = 0; i < SIZE; ++i) {
678            assertEquals(i, l.get(i));
679        }
680        q.add(zero);
681        q.add(one);
682        assertFalse(q.isEmpty());
683        assertTrue(q.contains(zero));
684        assertTrue(q.contains(one));
685        l.clear();
686        q.drainTo(l);
687        assertEquals(0, q.size());
688        assertEquals(2, l.size());
689        for (int i = 0; i < 2; ++i) {
690            assertEquals(i, l.get(i));
691        }
692    }
693
694    /**
695     * drainTo(c) empties full queue, unblocking a waiting put.
696     */
697    public void testDrainToWithActivePut() throws InterruptedException {
698        final LinkedTransferQueue q = populatedQueue(SIZE);
699        Thread t = newStartedThread(new CheckedRunnable() {
700            public void realRun() {
701                q.put(SIZE + 1);
702            }});
703        ArrayList l = new ArrayList();
704        q.drainTo(l);
705        assertTrue(l.size() >= SIZE);
706        for (int i = 0; i < SIZE; ++i)
707            assertEquals(i, l.get(i));
708        awaitTermination(t);
709        assertTrue(q.size() + l.size() >= SIZE);
710    }
711
712    /**
713     * drainTo(c, n) empties first min(n, size) elements of queue into c
714     */
715    public void testDrainToN() {
716        LinkedTransferQueue q = new LinkedTransferQueue();
717        for (int i = 0; i < SIZE + 2; ++i) {
718            for (int j = 0; j < SIZE; j++) {
719                assertTrue(q.offer(j));
720            }
721            ArrayList l = new ArrayList();
722            q.drainTo(l, i);
723            int k = (i < SIZE) ? i : SIZE;
724            assertEquals(k, l.size());
725            assertEquals(SIZE - k, q.size());
726            for (int j = 0; j < k; ++j)
727                assertEquals(j, l.get(j));
728            do {} while (q.poll() != null);
729        }
730    }
731
732    /**
733     * timed poll() or take() increments the waiting consumer count;
734     * offer(e) decrements the waiting consumer count
735     */
736    public void testWaitingConsumer() throws InterruptedException {
737        final LinkedTransferQueue q = new LinkedTransferQueue();
738        assertEquals(0, q.getWaitingConsumerCount());
739        assertFalse(q.hasWaitingConsumer());
740        final CountDownLatch threadStarted = new CountDownLatch(1);
741
742        Thread t = newStartedThread(new CheckedRunnable() {
743            public void realRun() throws InterruptedException {
744                threadStarted.countDown();
745                long startTime = System.nanoTime();
746                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
747                assertEquals(0, q.getWaitingConsumerCount());
748                assertFalse(q.hasWaitingConsumer());
749                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
750            }});
751
752        threadStarted.await();
753        waitForThreadToEnterWaitState(t);
754        assertEquals(1, q.getWaitingConsumerCount());
755        assertTrue(q.hasWaitingConsumer());
756
757        assertTrue(q.offer(one));
758        assertEquals(0, q.getWaitingConsumerCount());
759        assertFalse(q.hasWaitingConsumer());
760
761        awaitTermination(t);
762    }
763
764    /**
765     * transfer(null) throws NullPointerException
766     */
767    public void testTransfer1() throws InterruptedException {
768        try {
769            LinkedTransferQueue q = new LinkedTransferQueue();
770            q.transfer(null);
771            shouldThrow();
772        } catch (NullPointerException success) {}
773    }
774
775    /**
776     * transfer waits until a poll occurs. The transfered element
777     * is returned by this associated poll.
778     */
779    public void testTransfer2() throws InterruptedException {
780        final LinkedTransferQueue<Integer> q
781            = new LinkedTransferQueue<Integer>();
782        final CountDownLatch threadStarted = new CountDownLatch(1);
783
784        Thread t = newStartedThread(new CheckedRunnable() {
785            public void realRun() throws InterruptedException {
786                threadStarted.countDown();
787                q.transfer(five);
788                checkEmpty(q);
789            }});
790
791        threadStarted.await();
792        waitForThreadToEnterWaitState(t);
793        assertEquals(1, q.size());
794        assertSame(five, q.poll());
795        checkEmpty(q);
796        awaitTermination(t);
797    }
798
799    /**
800     * transfer waits until a poll occurs, and then transfers in fifo order
801     */
802    public void testTransfer3() throws InterruptedException {
803        final LinkedTransferQueue<Integer> q
804            = new LinkedTransferQueue<Integer>();
805
806        Thread first = newStartedThread(new CheckedRunnable() {
807            public void realRun() throws InterruptedException {
808                q.transfer(four);
809                assertTrue(!q.contains(four));
810                assertEquals(1, q.size());
811            }});
812
813        Thread interruptedThread = newStartedThread(
814            new CheckedInterruptedRunnable() {
815                public void realRun() throws InterruptedException {
816                    while (q.isEmpty())
817                        Thread.yield();
818                    q.transfer(five);
819                }});
820
821        while (q.size() < 2)
822            Thread.yield();
823        assertEquals(2, q.size());
824        assertSame(four, q.poll());
825        first.join();
826        assertEquals(1, q.size());
827        interruptedThread.interrupt();
828        interruptedThread.join();
829        checkEmpty(q);
830    }
831
832    /**
833     * transfer waits until a poll occurs, at which point the polling
834     * thread returns the element
835     */
836    public void testTransfer4() throws InterruptedException {
837        final LinkedTransferQueue q = new LinkedTransferQueue();
838
839        Thread t = newStartedThread(new CheckedRunnable() {
840            public void realRun() throws InterruptedException {
841                q.transfer(four);
842                assertFalse(q.contains(four));
843                assertSame(three, q.poll());
844            }});
845
846        while (q.isEmpty())
847            Thread.yield();
848        assertFalse(q.isEmpty());
849        assertEquals(1, q.size());
850        assertTrue(q.offer(three));
851        assertSame(four, q.poll());
852        awaitTermination(t);
853    }
854
855    /**
856     * transfer waits until a take occurs. The transfered element
857     * is returned by this associated take.
858     */
859    public void testTransfer5() throws InterruptedException {
860        final LinkedTransferQueue<Integer> q
861            = new LinkedTransferQueue<Integer>();
862
863        Thread t = newStartedThread(new CheckedRunnable() {
864            public void realRun() throws InterruptedException {
865                q.transfer(four);
866                checkEmpty(q);
867            }});
868
869        while (q.isEmpty())
870            Thread.yield();
871        assertFalse(q.isEmpty());
872        assertEquals(1, q.size());
873        assertSame(four, q.take());
874        checkEmpty(q);
875        awaitTermination(t);
876    }
877
878    /**
879     * tryTransfer(null) throws NullPointerException
880     */
881    public void testTryTransfer1() {
882        final LinkedTransferQueue q = new LinkedTransferQueue();
883        try {
884            q.tryTransfer(null);
885            shouldThrow();
886        } catch (NullPointerException success) {}
887    }
888
889    /**
890     * tryTransfer returns false and does not enqueue if there are no
891     * consumers waiting to poll or take.
892     */
893    public void testTryTransfer2() throws InterruptedException {
894        final LinkedTransferQueue q = new LinkedTransferQueue();
895        assertFalse(q.tryTransfer(new Object()));
896        assertFalse(q.hasWaitingConsumer());
897        checkEmpty(q);
898    }
899
900    /**
901     * If there is a consumer waiting in timed poll, tryTransfer
902     * returns true while successfully transfering object.
903     */
904    public void testTryTransfer3() throws InterruptedException {
905        final Object hotPotato = new Object();
906        final LinkedTransferQueue q = new LinkedTransferQueue();
907
908        Thread t = newStartedThread(new CheckedRunnable() {
909            public void realRun() {
910                while (! q.hasWaitingConsumer())
911                    Thread.yield();
912                assertTrue(q.hasWaitingConsumer());
913                checkEmpty(q);
914                assertTrue(q.tryTransfer(hotPotato));
915            }});
916
917        long startTime = System.nanoTime();
918        assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
919        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
920        checkEmpty(q);
921        awaitTermination(t);
922    }
923
924    /**
925     * If there is a consumer waiting in take, tryTransfer returns
926     * true while successfully transfering object.
927     */
928    public void testTryTransfer4() throws InterruptedException {
929        final Object hotPotato = new Object();
930        final LinkedTransferQueue q = new LinkedTransferQueue();
931
932        Thread t = newStartedThread(new CheckedRunnable() {
933            public void realRun() {
934                while (! q.hasWaitingConsumer())
935                    Thread.yield();
936                assertTrue(q.hasWaitingConsumer());
937                checkEmpty(q);
938                assertTrue(q.tryTransfer(hotPotato));
939            }});
940
941        assertSame(q.take(), hotPotato);
942        checkEmpty(q);
943        awaitTermination(t);
944    }
945
946    /**
947     * tryTransfer blocks interruptibly if no takers
948     */
949    public void testTryTransfer5() throws InterruptedException {
950        final LinkedTransferQueue q = new LinkedTransferQueue();
951        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
952        assertTrue(q.isEmpty());
953
954        Thread t = newStartedThread(new CheckedRunnable() {
955            public void realRun() throws InterruptedException {
956                long startTime = System.nanoTime();
957                Thread.currentThread().interrupt();
958                try {
959                    q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
960                    shouldThrow();
961                } catch (InterruptedException success) {}
962                assertFalse(Thread.interrupted());
963
964                pleaseInterrupt.countDown();
965                try {
966                    q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
967                    shouldThrow();
968                } catch (InterruptedException success) {}
969                assertFalse(Thread.interrupted());
970                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
971            }});
972
973        await(pleaseInterrupt);
974        assertThreadStaysAlive(t);
975        t.interrupt();
976        awaitTermination(t);
977        checkEmpty(q);
978    }
979
980    /**
981     * tryTransfer gives up after the timeout and returns false
982     */
983    public void testTryTransfer6() throws InterruptedException {
984        final LinkedTransferQueue q = new LinkedTransferQueue();
985
986        Thread t = newStartedThread(new CheckedRunnable() {
987            public void realRun() throws InterruptedException {
988                long startTime = System.nanoTime();
989                assertFalse(q.tryTransfer(new Object(),
990                                          timeoutMillis(), MILLISECONDS));
991                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
992                checkEmpty(q);
993            }});
994
995        awaitTermination(t);
996        checkEmpty(q);
997    }
998
999    /**
1000     * tryTransfer waits for any elements previously in to be removed
1001     * before transfering to a poll or take
1002     */
1003    public void testTryTransfer7() throws InterruptedException {
1004        final LinkedTransferQueue q = new LinkedTransferQueue();
1005        assertTrue(q.offer(four));
1006
1007        Thread t = newStartedThread(new CheckedRunnable() {
1008            public void realRun() throws InterruptedException {
1009                long startTime = System.nanoTime();
1010                assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1011                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1012                checkEmpty(q);
1013            }});
1014
1015        while (q.size() != 2)
1016            Thread.yield();
1017        assertEquals(2, q.size());
1018        assertSame(four, q.poll());
1019        assertSame(five, q.poll());
1020        checkEmpty(q);
1021        awaitTermination(t);
1022    }
1023
1024    /**
1025     * tryTransfer attempts to enqueue into the queue and fails
1026     * returning false not enqueueing and the successive poll is null
1027     */
1028    public void testTryTransfer8() throws InterruptedException {
1029        final LinkedTransferQueue q = new LinkedTransferQueue();
1030        assertTrue(q.offer(four));
1031        assertEquals(1, q.size());
1032        long startTime = System.nanoTime();
1033        assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1034        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1035        assertEquals(1, q.size());
1036        assertSame(four, q.poll());
1037        assertNull(q.poll());
1038        checkEmpty(q);
1039    }
1040
1041    private LinkedTransferQueue<Integer> populatedQueue(int n) {
1042        LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1043        checkEmpty(q);
1044        for (int i = 0; i < n; i++) {
1045            assertEquals(i, q.size());
1046            assertTrue(q.offer(i));
1047            assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1048        }
1049        assertFalse(q.isEmpty());
1050        return q;
1051    }
1052
1053    /**
1054     * remove(null), contains(null) always return false
1055     */
1056    public void testNeverContainsNull() {
1057        Collection<?>[] qs = {
1058            new LinkedTransferQueue<Object>(),
1059            populatedQueue(2),
1060        };
1061
1062        for (Collection<?> q : qs) {
1063            assertFalse(q.contains(null));
1064            assertFalse(q.remove(null));
1065        }
1066    }
1067}
1068