1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9package jsr166;
10
11import junit.framework.*;
12import java.util.Arrays;
13import java.util.ArrayList;
14import java.util.Collection;
15import java.util.Comparator;
16import java.util.Iterator;
17import java.util.NoSuchElementException;
18import java.util.Queue;
19import java.util.concurrent.PriorityBlockingQueue;
20import java.util.concurrent.BlockingQueue;
21import java.util.concurrent.CountDownLatch;
22import java.util.concurrent.Executors;
23import java.util.concurrent.ExecutorService;
24import static java.util.concurrent.TimeUnit.MILLISECONDS;
25
26public class PriorityBlockingQueueTest extends JSR166TestCase {
27
28    private static final int NOCAP = Integer.MAX_VALUE;
29
30    /** Sample Comparator */
31    static class MyReverseComparator implements Comparator {
32        public int compare(Object x, Object y) {
33            return ((Comparable)y).compareTo(x);
34        }
35    }
36
37    /**
38     * Returns a new queue of given size containing consecutive
39     * Integers 0 ... n.
40     */
41    private PriorityBlockingQueue<Integer> populatedQueue(int n) {
42        PriorityBlockingQueue<Integer> q =
43            new PriorityBlockingQueue<Integer>(n);
44        assertTrue(q.isEmpty());
45        for (int i = n-1; i >= 0; i-=2)
46            assertTrue(q.offer(new Integer(i)));
47        for (int i = (n & 1); i < n; i+=2)
48            assertTrue(q.offer(new Integer(i)));
49        assertFalse(q.isEmpty());
50        assertEquals(NOCAP, q.remainingCapacity());
51        assertEquals(n, q.size());
52        return q;
53    }
54
55    /**
56     * A new queue has unbounded capacity
57     */
58    public void testConstructor1() {
59        assertEquals(NOCAP, new PriorityBlockingQueue(SIZE).remainingCapacity());
60    }
61
62    /**
63     * Constructor throws IAE if capacity argument nonpositive
64     */
65    public void testConstructor2() {
66        try {
67            new PriorityBlockingQueue(0);
68            shouldThrow();
69        } catch (IllegalArgumentException success) {}
70    }
71
72    /**
73     * Initializing from null Collection throws NPE
74     */
75    public void testConstructor3() {
76        try {
77            new PriorityBlockingQueue(null);
78            shouldThrow();
79        } catch (NullPointerException success) {}
80    }
81
82    /**
83     * Initializing from Collection of null elements throws NPE
84     */
85    public void testConstructor4() {
86        Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
87        try {
88            new PriorityBlockingQueue(elements);
89            shouldThrow();
90        } catch (NullPointerException success) {}
91    }
92
93    /**
94     * Initializing from Collection with some null elements throws NPE
95     */
96    public void testConstructor5() {
97        Integer[] ints = new Integer[SIZE];
98        for (int i = 0; i < SIZE-1; ++i)
99            ints[i] = i;
100        Collection<Integer> elements = Arrays.asList(ints);
101        try {
102            new PriorityBlockingQueue(elements);
103            shouldThrow();
104        } catch (NullPointerException success) {}
105    }
106
107    /**
108     * Queue contains all elements of collection used to initialize
109     */
110    public void testConstructor6() {
111        Integer[] ints = new Integer[SIZE];
112        for (int i = 0; i < SIZE; ++i)
113            ints[i] = i;
114        PriorityBlockingQueue q = new PriorityBlockingQueue(Arrays.asList(ints));
115        for (int i = 0; i < SIZE; ++i)
116            assertEquals(ints[i], q.poll());
117    }
118
119    /**
120     * The comparator used in constructor is used
121     */
122    public void testConstructor7() {
123        MyReverseComparator cmp = new MyReverseComparator();
124        PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE, cmp);
125        assertEquals(cmp, q.comparator());
126        Integer[] ints = new Integer[SIZE];
127        for (int i = 0; i < SIZE; ++i)
128            ints[i] = new Integer(i);
129        q.addAll(Arrays.asList(ints));
130        for (int i = SIZE-1; i >= 0; --i)
131            assertEquals(ints[i], q.poll());
132    }
133
134    /**
135     * isEmpty is true before add, false after
136     */
137    public void testEmpty() {
138        PriorityBlockingQueue q = new PriorityBlockingQueue(2);
139        assertTrue(q.isEmpty());
140        assertEquals(NOCAP, q.remainingCapacity());
141        q.add(one);
142        assertFalse(q.isEmpty());
143        q.add(two);
144        q.remove();
145        q.remove();
146        assertTrue(q.isEmpty());
147    }
148
149    /**
150     * remainingCapacity does not change when elements added or removed,
151     * but size does
152     */
153    public void testRemainingCapacity() {
154        PriorityBlockingQueue q = populatedQueue(SIZE);
155        for (int i = 0; i < SIZE; ++i) {
156            assertEquals(NOCAP, q.remainingCapacity());
157            assertEquals(SIZE-i, q.size());
158            q.remove();
159        }
160        for (int i = 0; i < SIZE; ++i) {
161            assertEquals(NOCAP, q.remainingCapacity());
162            assertEquals(i, q.size());
163            q.add(new Integer(i));
164        }
165    }
166
167    /**
168     * Offer of comparable element succeeds
169     */
170    public void testOffer() {
171        PriorityBlockingQueue q = new PriorityBlockingQueue(1);
172        assertTrue(q.offer(zero));
173        assertTrue(q.offer(one));
174    }
175
176    /**
177     * Offer of non-Comparable throws CCE
178     */
179    public void testOfferNonComparable() {
180        try {
181            PriorityBlockingQueue q = new PriorityBlockingQueue(1);
182            q.offer(new Object());
183            q.offer(new Object());
184            q.offer(new Object());
185            shouldThrow();
186        } catch (ClassCastException success) {}
187    }
188
189    /**
190     * add of comparable succeeds
191     */
192    public void testAdd() {
193        PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
194        for (int i = 0; i < SIZE; ++i) {
195            assertEquals(i, q.size());
196            assertTrue(q.add(new Integer(i)));
197        }
198    }
199
200    /**
201     * addAll(this) throws IAE
202     */
203    public void testAddAllSelf() {
204        try {
205            PriorityBlockingQueue q = populatedQueue(SIZE);
206            q.addAll(q);
207            shouldThrow();
208        } catch (IllegalArgumentException success) {}
209    }
210
211    /**
212     * addAll of a collection with any null elements throws NPE after
213     * possibly adding some elements
214     */
215    public void testAddAll3() {
216        try {
217            PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
218            Integer[] ints = new Integer[SIZE];
219            for (int i = 0; i < SIZE-1; ++i)
220                ints[i] = new Integer(i);
221            q.addAll(Arrays.asList(ints));
222            shouldThrow();
223        } catch (NullPointerException success) {}
224    }
225
226    /**
227     * Queue contains all elements of successful addAll
228     */
229    public void testAddAll5() {
230        Integer[] empty = new Integer[0];
231        Integer[] ints = new Integer[SIZE];
232        for (int i = SIZE-1; i >= 0; --i)
233            ints[i] = new Integer(i);
234        PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
235        assertFalse(q.addAll(Arrays.asList(empty)));
236        assertTrue(q.addAll(Arrays.asList(ints)));
237        for (int i = 0; i < SIZE; ++i)
238            assertEquals(ints[i], q.poll());
239    }
240
241    /**
242     * all elements successfully put are contained
243     */
244    public void testPut() {
245        PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
246        for (int i = 0; i < SIZE; ++i) {
247            Integer I = new Integer(i);
248            q.put(I);
249            assertTrue(q.contains(I));
250        }
251        assertEquals(SIZE, q.size());
252    }
253
254    /**
255     * put doesn't block waiting for take
256     */
257    public void testPutWithTake() throws InterruptedException {
258        final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
259        final int size = 4;
260        Thread t = newStartedThread(new CheckedRunnable() {
261            public void realRun() {
262                for (int i = 0; i < size; i++)
263                    q.put(new Integer(0));
264            }});
265
266        awaitTermination(t);
267        assertEquals(size, q.size());
268        q.take();
269    }
270
271    /**
272     * timed offer does not time out
273     */
274    public void testTimedOffer() throws InterruptedException {
275        final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
276        Thread t = newStartedThread(new CheckedRunnable() {
277            public void realRun() {
278                q.put(new Integer(0));
279                q.put(new Integer(0));
280                assertTrue(q.offer(new Integer(0), SHORT_DELAY_MS, MILLISECONDS));
281                assertTrue(q.offer(new Integer(0), LONG_DELAY_MS, MILLISECONDS));
282            }});
283
284        awaitTermination(t);
285    }
286
287    /**
288     * take retrieves elements in priority order
289     */
290    public void testTake() throws InterruptedException {
291        PriorityBlockingQueue q = populatedQueue(SIZE);
292        for (int i = 0; i < SIZE; ++i) {
293            assertEquals(i, q.take());
294        }
295    }
296
297    /**
298     * Take removes existing elements until empty, then blocks interruptibly
299     */
300    public void testBlockingTake() throws InterruptedException {
301        final PriorityBlockingQueue q = populatedQueue(SIZE);
302        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
303        Thread t = newStartedThread(new CheckedRunnable() {
304            public void realRun() throws InterruptedException {
305                for (int i = 0; i < SIZE; ++i) {
306                    assertEquals(i, q.take());
307                }
308
309                Thread.currentThread().interrupt();
310                try {
311                    q.take();
312                    shouldThrow();
313                } catch (InterruptedException success) {}
314                assertFalse(Thread.interrupted());
315
316                pleaseInterrupt.countDown();
317                try {
318                    q.take();
319                    shouldThrow();
320                } catch (InterruptedException success) {}
321                assertFalse(Thread.interrupted());
322            }});
323
324        await(pleaseInterrupt);
325        assertThreadStaysAlive(t);
326        t.interrupt();
327        awaitTermination(t);
328    }
329
330    /**
331     * poll succeeds unless empty
332     */
333    public void testPoll() {
334        PriorityBlockingQueue q = populatedQueue(SIZE);
335        for (int i = 0; i < SIZE; ++i) {
336            assertEquals(i, q.poll());
337        }
338        assertNull(q.poll());
339    }
340
341    /**
342     * timed poll with zero timeout succeeds when non-empty, else times out
343     */
344    public void testTimedPoll0() throws InterruptedException {
345        PriorityBlockingQueue q = populatedQueue(SIZE);
346        for (int i = 0; i < SIZE; ++i) {
347            assertEquals(i, q.poll(0, MILLISECONDS));
348        }
349        assertNull(q.poll(0, MILLISECONDS));
350    }
351
352    /**
353     * timed poll with nonzero timeout succeeds when non-empty, else times out
354     */
355    public void testTimedPoll() throws InterruptedException {
356        PriorityBlockingQueue<Integer> q = populatedQueue(SIZE);
357        for (int i = 0; i < SIZE; ++i) {
358            long startTime = System.nanoTime();
359            assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
360            assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
361        }
362        long startTime = System.nanoTime();
363        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
364        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
365        checkEmpty(q);
366    }
367
368    /**
369     * Interrupted timed poll throws InterruptedException instead of
370     * returning timeout status
371     */
372    public void testInterruptedTimedPoll() throws InterruptedException {
373        final BlockingQueue<Integer> q = populatedQueue(SIZE);
374        final CountDownLatch aboutToWait = new CountDownLatch(1);
375        Thread t = newStartedThread(new CheckedRunnable() {
376            public void realRun() throws InterruptedException {
377                for (int i = 0; i < SIZE; ++i) {
378                    long t0 = System.nanoTime();
379                    assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
380                    assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
381                }
382                long t0 = System.nanoTime();
383                aboutToWait.countDown();
384                try {
385                    q.poll(LONG_DELAY_MS, MILLISECONDS);
386                    shouldThrow();
387                } catch (InterruptedException success) {
388                    assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
389                }
390            }});
391
392        aboutToWait.await();
393        waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
394        t.interrupt();
395        awaitTermination(t, MEDIUM_DELAY_MS);
396    }
397
398    /**
399     * peek returns next element, or null if empty
400     */
401    public void testPeek() {
402        PriorityBlockingQueue q = populatedQueue(SIZE);
403        for (int i = 0; i < SIZE; ++i) {
404            assertEquals(i, q.peek());
405            assertEquals(i, q.poll());
406            assertTrue(q.peek() == null ||
407                       !q.peek().equals(i));
408        }
409        assertNull(q.peek());
410    }
411
412    /**
413     * element returns next element, or throws NSEE if empty
414     */
415    public void testElement() {
416        PriorityBlockingQueue q = populatedQueue(SIZE);
417        for (int i = 0; i < SIZE; ++i) {
418            assertEquals(i, q.element());
419            assertEquals(i, q.poll());
420        }
421        try {
422            q.element();
423            shouldThrow();
424        } catch (NoSuchElementException success) {}
425    }
426
427    /**
428     * remove removes next element, or throws NSEE if empty
429     */
430    public void testRemove() {
431        PriorityBlockingQueue q = populatedQueue(SIZE);
432        for (int i = 0; i < SIZE; ++i) {
433            assertEquals(i, q.remove());
434        }
435        try {
436            q.remove();
437            shouldThrow();
438        } catch (NoSuchElementException success) {}
439    }
440
441    /**
442     * contains(x) reports true when elements added but not yet removed
443     */
444    public void testContains() {
445        PriorityBlockingQueue q = populatedQueue(SIZE);
446        for (int i = 0; i < SIZE; ++i) {
447            assertTrue(q.contains(new Integer(i)));
448            q.poll();
449            assertFalse(q.contains(new Integer(i)));
450        }
451    }
452
453    /**
454     * clear removes all elements
455     */
456    public void testClear() {
457        PriorityBlockingQueue q = populatedQueue(SIZE);
458        q.clear();
459        assertTrue(q.isEmpty());
460        assertEquals(0, q.size());
461        q.add(one);
462        assertFalse(q.isEmpty());
463        assertTrue(q.contains(one));
464        q.clear();
465        assertTrue(q.isEmpty());
466    }
467
468    /**
469     * containsAll(c) is true when c contains a subset of elements
470     */
471    public void testContainsAll() {
472        PriorityBlockingQueue q = populatedQueue(SIZE);
473        PriorityBlockingQueue p = new PriorityBlockingQueue(SIZE);
474        for (int i = 0; i < SIZE; ++i) {
475            assertTrue(q.containsAll(p));
476            assertFalse(p.containsAll(q));
477            p.add(new Integer(i));
478        }
479        assertTrue(p.containsAll(q));
480    }
481
482    /**
483     * retainAll(c) retains only those elements of c and reports true if changed
484     */
485    public void testRetainAll() {
486        PriorityBlockingQueue q = populatedQueue(SIZE);
487        PriorityBlockingQueue p = populatedQueue(SIZE);
488        for (int i = 0; i < SIZE; ++i) {
489            boolean changed = q.retainAll(p);
490            if (i == 0)
491                assertFalse(changed);
492            else
493                assertTrue(changed);
494
495            assertTrue(q.containsAll(p));
496            assertEquals(SIZE-i, q.size());
497            p.remove();
498        }
499    }
500
501    /**
502     * removeAll(c) removes only those elements of c and reports true if changed
503     */
504    public void testRemoveAll() {
505        for (int i = 1; i < SIZE; ++i) {
506            PriorityBlockingQueue q = populatedQueue(SIZE);
507            PriorityBlockingQueue p = populatedQueue(i);
508            assertTrue(q.removeAll(p));
509            assertEquals(SIZE-i, q.size());
510            for (int j = 0; j < i; ++j) {
511                Integer I = (Integer)(p.remove());
512                assertFalse(q.contains(I));
513            }
514        }
515    }
516
517    /**
518     * toArray contains all elements
519     */
520    public void testToArray() throws InterruptedException {
521        PriorityBlockingQueue q = populatedQueue(SIZE);
522        Object[] o = q.toArray();
523        Arrays.sort(o);
524        for (int i = 0; i < o.length; i++)
525            assertSame(o[i], q.take());
526    }
527
528    /**
529     * toArray(a) contains all elements
530     */
531    public void testToArray2() throws InterruptedException {
532        PriorityBlockingQueue<Integer> q = populatedQueue(SIZE);
533        Integer[] ints = new Integer[SIZE];
534        Integer[] array = q.toArray(ints);
535        assertSame(ints, array);
536        Arrays.sort(ints);
537        for (int i = 0; i < ints.length; i++)
538            assertSame(ints[i], q.take());
539    }
540
541    /**
542     * toArray(incompatible array type) throws ArrayStoreException
543     */
544    public void testToArray1_BadArg() {
545        PriorityBlockingQueue q = populatedQueue(SIZE);
546        try {
547            q.toArray(new String[10]);
548            shouldThrow();
549        } catch (ArrayStoreException success) {}
550    }
551
552    /**
553     * iterator iterates through all elements
554     */
555    public void testIterator() {
556        PriorityBlockingQueue q = populatedQueue(SIZE);
557        int i = 0;
558        Iterator it = q.iterator();
559        while (it.hasNext()) {
560            assertTrue(q.contains(it.next()));
561            ++i;
562        }
563        assertEquals(i, SIZE);
564    }
565
566    /**
567     * iterator.remove removes current element
568     */
569    public void testIteratorRemove() {
570        final PriorityBlockingQueue q = new PriorityBlockingQueue(3);
571        q.add(new Integer(2));
572        q.add(new Integer(1));
573        q.add(new Integer(3));
574
575        Iterator it = q.iterator();
576        it.next();
577        it.remove();
578
579        it = q.iterator();
580        assertEquals(it.next(), new Integer(2));
581        assertEquals(it.next(), new Integer(3));
582        assertFalse(it.hasNext());
583    }
584
585    /**
586     * toString contains toStrings of elements
587     */
588    public void testToString() {
589        PriorityBlockingQueue q = populatedQueue(SIZE);
590        String s = q.toString();
591        for (int i = 0; i < SIZE; ++i) {
592            assertTrue(s.contains(String.valueOf(i)));
593        }
594    }
595
596    /**
597     * timed poll transfers elements across Executor tasks
598     */
599    public void testPollInExecutor() {
600        final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
601        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
602        ExecutorService executor = Executors.newFixedThreadPool(2);
603        executor.execute(new CheckedRunnable() {
604            public void realRun() throws InterruptedException {
605                assertNull(q.poll());
606                threadsStarted.await();
607                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
608                checkEmpty(q);
609            }});
610
611        executor.execute(new CheckedRunnable() {
612            public void realRun() throws InterruptedException {
613                threadsStarted.await();
614                q.put(one);
615            }});
616
617        joinPool(executor);
618    }
619
620    /**
621     * A deserialized serialized queue has same elements
622     */
623    public void testSerialization() throws Exception {
624        Queue x = populatedQueue(SIZE);
625        Queue y = serialClone(x);
626
627        assertNotSame(x, y);
628        assertEquals(x.size(), y.size());
629        while (!x.isEmpty()) {
630            assertFalse(y.isEmpty());
631            assertEquals(x.remove(), y.remove());
632        }
633        assertTrue(y.isEmpty());
634    }
635
636    /**
637     * drainTo(c) empties queue into another collection c
638     */
639    public void testDrainTo() {
640        PriorityBlockingQueue q = populatedQueue(SIZE);
641        ArrayList l = new ArrayList();
642        q.drainTo(l);
643        assertEquals(0, q.size());
644        assertEquals(SIZE, l.size());
645        for (int i = 0; i < SIZE; ++i)
646            assertEquals(l.get(i), new Integer(i));
647        q.add(zero);
648        q.add(one);
649        assertFalse(q.isEmpty());
650        assertTrue(q.contains(zero));
651        assertTrue(q.contains(one));
652        l.clear();
653        q.drainTo(l);
654        assertEquals(0, q.size());
655        assertEquals(2, l.size());
656        for (int i = 0; i < 2; ++i)
657            assertEquals(l.get(i), new Integer(i));
658    }
659
660    /**
661     * drainTo empties queue
662     */
663    public void testDrainToWithActivePut() throws InterruptedException {
664        final PriorityBlockingQueue q = populatedQueue(SIZE);
665        Thread t = new Thread(new CheckedRunnable() {
666            public void realRun() {
667                q.put(new Integer(SIZE+1));
668            }});
669
670        t.start();
671        ArrayList l = new ArrayList();
672        q.drainTo(l);
673        assertTrue(l.size() >= SIZE);
674        for (int i = 0; i < SIZE; ++i)
675            assertEquals(l.get(i), new Integer(i));
676        t.join();
677        assertTrue(q.size() + l.size() >= SIZE);
678    }
679
680    /**
681     * drainTo(c, n) empties first min(n, size) elements of queue into c
682     */
683    public void testDrainToN() {
684        PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE*2);
685        for (int i = 0; i < SIZE + 2; ++i) {
686            for (int j = 0; j < SIZE; j++)
687                assertTrue(q.offer(new Integer(j)));
688            ArrayList l = new ArrayList();
689            q.drainTo(l, i);
690            int k = (i < SIZE) ? i : SIZE;
691            assertEquals(k, l.size());
692            assertEquals(SIZE-k, q.size());
693            for (int j = 0; j < k; ++j)
694                assertEquals(l.get(j), new Integer(j));
695            while (q.poll() != null) ;
696        }
697    }
698
699}
700