1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 * Other contributors include John Vint 6 */ 7 8package jsr166; 9 10import static java.util.concurrent.TimeUnit.MILLISECONDS; 11 12import java.util.ArrayList; 13import java.util.Arrays; 14import java.util.Collection; 15import java.util.Iterator; 16import java.util.List; 17import java.util.NoSuchElementException; 18import java.util.Queue; 19import java.util.concurrent.BlockingQueue; 20import java.util.concurrent.CountDownLatch; 21import java.util.concurrent.Executors; 22import java.util.concurrent.ExecutorService; 23import java.util.concurrent.LinkedTransferQueue; 24 25import junit.framework.Test; 26 27@SuppressWarnings({"unchecked", "rawtypes"}) 28public class LinkedTransferQueueTest extends JSR166TestCase { 29 static class Implementation implements CollectionImplementation { 30 public Class<?> klazz() { return LinkedTransferQueue.class; } 31 public Collection emptyCollection() { return new LinkedTransferQueue(); } 32 public Object makeElement(int i) { return i; } 33 public boolean isConcurrent() { return true; } 34 public boolean permitsNulls() { return false; } 35 } 36 37 // android-note: These tests have been moved into their own separate 38 // classes to work around CTS issues: 39 // LinkedTransferQueueBlockingQueueTest.java 40 // LinkedTransferQueueCollectionTest.java 41 // 42 // public static class Generic extends BlockingQueueTest { 43 // protected BlockingQueue emptyCollection() { 44 // return new LinkedTransferQueue(); 45 // } 46 // } 47 48 // android-note: Removed because the CTS runner does a bad job of 49 // retrying tests that have suite() declarations. 50 // 51 // public static void main(String[] args) { 52 // main(suite(), args); 53 // } 54 // public static Test suite() { 55 // return newTestSuite(LinkedTransferQueueTest.class, 56 // new Generic().testSuite(), 57 // CollectionTest.testSuite(new Implementation())); 58 // } 59 60 /** 61 * Constructor builds new queue with size being zero and empty 62 * being true 63 */ 64 public void testConstructor1() { 65 assertEquals(0, new LinkedTransferQueue().size()); 66 assertTrue(new LinkedTransferQueue().isEmpty()); 67 } 68 69 /** 70 * Initializing constructor with null collection throws 71 * NullPointerException 72 */ 73 public void testConstructor2() { 74 try { 75 new LinkedTransferQueue(null); 76 shouldThrow(); 77 } catch (NullPointerException success) {} 78 } 79 80 /** 81 * Initializing from Collection of null elements throws 82 * NullPointerException 83 */ 84 public void testConstructor3() { 85 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]); 86 try { 87 new LinkedTransferQueue(elements); 88 shouldThrow(); 89 } catch (NullPointerException success) {} 90 } 91 92 /** 93 * Initializing constructor with a collection containing some null elements 94 * throws NullPointerException 95 */ 96 public void testConstructor4() { 97 Integer[] ints = new Integer[SIZE]; 98 for (int i = 0; i < SIZE - 1; ++i) 99 ints[i] = i; 100 Collection<Integer> elements = Arrays.asList(ints); 101 try { 102 new LinkedTransferQueue(elements); 103 shouldThrow(); 104 } catch (NullPointerException success) {} 105 } 106 107 /** 108 * Queue contains all elements of the collection it is initialized by 109 */ 110 public void testConstructor5() { 111 Integer[] ints = new Integer[SIZE]; 112 for (int i = 0; i < SIZE; ++i) { 113 ints[i] = i; 114 } 115 List intList = Arrays.asList(ints); 116 LinkedTransferQueue q 117 = new LinkedTransferQueue(intList); 118 assertEquals(q.size(), intList.size()); 119 assertEquals(q.toString(), intList.toString()); 120 assertTrue(Arrays.equals(q.toArray(), 121 intList.toArray())); 122 assertTrue(Arrays.equals(q.toArray(new Object[0]), 123 intList.toArray(new Object[0]))); 124 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]), 125 intList.toArray(new Object[SIZE]))); 126 for (int i = 0; i < SIZE; ++i) { 127 assertEquals(ints[i], q.poll()); 128 } 129 } 130 131 /** 132 * remainingCapacity() always returns Integer.MAX_VALUE 133 */ 134 public void testRemainingCapacity() { 135 BlockingQueue q = populatedQueue(SIZE); 136 for (int i = 0; i < SIZE; ++i) { 137 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 138 assertEquals(SIZE - i, q.size()); 139 assertEquals(i, q.remove()); 140 } 141 for (int i = 0; i < SIZE; ++i) { 142 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 143 assertEquals(i, q.size()); 144 assertTrue(q.add(i)); 145 } 146 } 147 148 /** 149 * addAll(this) throws IllegalArgumentException 150 */ 151 public void testAddAllSelf() { 152 LinkedTransferQueue q = populatedQueue(SIZE); 153 try { 154 q.addAll(q); 155 shouldThrow(); 156 } catch (IllegalArgumentException success) {} 157 } 158 159 /** 160 * addAll of a collection with any null elements throws 161 * NullPointerException after possibly adding some elements 162 */ 163 public void testAddAll3() { 164 LinkedTransferQueue q = new LinkedTransferQueue(); 165 Integer[] ints = new Integer[SIZE]; 166 for (int i = 0; i < SIZE - 1; ++i) 167 ints[i] = i; 168 try { 169 q.addAll(Arrays.asList(ints)); 170 shouldThrow(); 171 } catch (NullPointerException success) {} 172 } 173 174 /** 175 * Queue contains all elements, in traversal order, of successful addAll 176 */ 177 public void testAddAll5() { 178 Integer[] empty = new Integer[0]; 179 Integer[] ints = new Integer[SIZE]; 180 for (int i = 0; i < SIZE; ++i) { 181 ints[i] = i; 182 } 183 LinkedTransferQueue q = new LinkedTransferQueue(); 184 assertFalse(q.addAll(Arrays.asList(empty))); 185 assertTrue(q.addAll(Arrays.asList(ints))); 186 for (int i = 0; i < SIZE; ++i) { 187 assertEquals(ints[i], q.poll()); 188 } 189 } 190 191 /** 192 * all elements successfully put are contained 193 */ 194 public void testPut() { 195 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>(); 196 for (int i = 0; i < SIZE; ++i) { 197 assertEquals(i, q.size()); 198 q.put(i); 199 assertTrue(q.contains(i)); 200 } 201 } 202 203 /** 204 * take retrieves elements in FIFO order 205 */ 206 public void testTake() throws InterruptedException { 207 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 208 for (int i = 0; i < SIZE; ++i) { 209 assertEquals(i, (int) q.take()); 210 } 211 } 212 213 /** 214 * take removes existing elements until empty, then blocks interruptibly 215 */ 216 public void testBlockingTake() throws InterruptedException { 217 final BlockingQueue q = populatedQueue(SIZE); 218 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 219 Thread t = newStartedThread(new CheckedRunnable() { 220 public void realRun() throws InterruptedException { 221 for (int i = 0; i < SIZE; ++i) { 222 assertEquals(i, q.take()); 223 } 224 225 Thread.currentThread().interrupt(); 226 try { 227 q.take(); 228 shouldThrow(); 229 } catch (InterruptedException success) {} 230 assertFalse(Thread.interrupted()); 231 232 pleaseInterrupt.countDown(); 233 try { 234 q.take(); 235 shouldThrow(); 236 } catch (InterruptedException success) {} 237 assertFalse(Thread.interrupted()); 238 }}); 239 240 await(pleaseInterrupt); 241 assertThreadStaysAlive(t); 242 t.interrupt(); 243 awaitTermination(t); 244 } 245 246 /** 247 * poll succeeds unless empty 248 */ 249 public void testPoll() throws InterruptedException { 250 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 251 for (int i = 0; i < SIZE; ++i) { 252 assertEquals(i, (int) q.poll()); 253 } 254 assertNull(q.poll()); 255 checkEmpty(q); 256 } 257 258 /** 259 * timed poll with zero timeout succeeds when non-empty, else times out 260 */ 261 public void testTimedPoll0() throws InterruptedException { 262 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 263 for (int i = 0; i < SIZE; ++i) { 264 assertEquals(i, (int) q.poll(0, MILLISECONDS)); 265 } 266 assertNull(q.poll(0, MILLISECONDS)); 267 checkEmpty(q); 268 } 269 270 /** 271 * timed poll with nonzero timeout succeeds when non-empty, else times out 272 */ 273 public void testTimedPoll() throws InterruptedException { 274 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 275 long startTime = System.nanoTime(); 276 for (int i = 0; i < SIZE; ++i) 277 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 278 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 279 280 startTime = System.nanoTime(); 281 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 282 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 283 checkEmpty(q); 284 } 285 286 /** 287 * Interrupted timed poll throws InterruptedException instead of 288 * returning timeout status 289 */ 290 public void testInterruptedTimedPoll() throws InterruptedException { 291 final BlockingQueue<Integer> q = populatedQueue(SIZE); 292 final CountDownLatch aboutToWait = new CountDownLatch(1); 293 Thread t = newStartedThread(new CheckedRunnable() { 294 public void realRun() throws InterruptedException { 295 long startTime = System.nanoTime(); 296 for (int i = 0; i < SIZE; ++i) 297 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 298 aboutToWait.countDown(); 299 try { 300 q.poll(LONG_DELAY_MS, MILLISECONDS); 301 shouldThrow(); 302 } catch (InterruptedException success) {} 303 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 304 }}); 305 306 aboutToWait.await(); 307 waitForThreadToEnterWaitState(t); 308 t.interrupt(); 309 awaitTermination(t); 310 checkEmpty(q); 311 } 312 313 /** 314 * timed poll after thread interrupted throws InterruptedException 315 * instead of returning timeout status 316 */ 317 public void testTimedPollAfterInterrupt() throws InterruptedException { 318 final BlockingQueue<Integer> q = populatedQueue(SIZE); 319 Thread t = newStartedThread(new CheckedRunnable() { 320 public void realRun() throws InterruptedException { 321 long startTime = System.nanoTime(); 322 Thread.currentThread().interrupt(); 323 for (int i = 0; i < SIZE; ++i) 324 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 325 try { 326 q.poll(LONG_DELAY_MS, MILLISECONDS); 327 shouldThrow(); 328 } catch (InterruptedException success) {} 329 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 330 }}); 331 332 awaitTermination(t); 333 checkEmpty(q); 334 } 335 336 /** 337 * peek returns next element, or null if empty 338 */ 339 public void testPeek() throws InterruptedException { 340 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 341 for (int i = 0; i < SIZE; ++i) { 342 assertEquals(i, (int) q.peek()); 343 assertEquals(i, (int) q.poll()); 344 assertTrue(q.peek() == null || 345 i != (int) q.peek()); 346 } 347 assertNull(q.peek()); 348 checkEmpty(q); 349 } 350 351 /** 352 * element returns next element, or throws NoSuchElementException if empty 353 */ 354 public void testElement() throws InterruptedException { 355 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 356 for (int i = 0; i < SIZE; ++i) { 357 assertEquals(i, (int) q.element()); 358 assertEquals(i, (int) q.poll()); 359 } 360 try { 361 q.element(); 362 shouldThrow(); 363 } catch (NoSuchElementException success) {} 364 checkEmpty(q); 365 } 366 367 /** 368 * remove removes next element, or throws NoSuchElementException if empty 369 */ 370 public void testRemove() throws InterruptedException { 371 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 372 for (int i = 0; i < SIZE; ++i) { 373 assertEquals(i, (int) q.remove()); 374 } 375 try { 376 q.remove(); 377 shouldThrow(); 378 } catch (NoSuchElementException success) {} 379 checkEmpty(q); 380 } 381 382 /** 383 * An add following remove(x) succeeds 384 */ 385 public void testRemoveElementAndAdd() throws InterruptedException { 386 LinkedTransferQueue q = new LinkedTransferQueue(); 387 assertTrue(q.add(one)); 388 assertTrue(q.add(two)); 389 assertTrue(q.remove(one)); 390 assertTrue(q.remove(two)); 391 assertTrue(q.add(three)); 392 assertSame(q.take(), three); 393 } 394 395 /** 396 * contains(x) reports true when elements added but not yet removed 397 */ 398 public void testContains() { 399 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 400 for (int i = 0; i < SIZE; ++i) { 401 assertTrue(q.contains(i)); 402 assertEquals(i, (int) q.poll()); 403 assertFalse(q.contains(i)); 404 } 405 } 406 407 /** 408 * clear removes all elements 409 */ 410 public void testClear() throws InterruptedException { 411 LinkedTransferQueue q = populatedQueue(SIZE); 412 q.clear(); 413 checkEmpty(q); 414 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 415 q.add(one); 416 assertFalse(q.isEmpty()); 417 assertEquals(1, q.size()); 418 assertTrue(q.contains(one)); 419 q.clear(); 420 checkEmpty(q); 421 } 422 423 /** 424 * containsAll(c) is true when c contains a subset of elements 425 */ 426 public void testContainsAll() { 427 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 428 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>(); 429 for (int i = 0; i < SIZE; ++i) { 430 assertTrue(q.containsAll(p)); 431 assertFalse(p.containsAll(q)); 432 p.add(i); 433 } 434 assertTrue(p.containsAll(q)); 435 } 436 437 /** 438 * retainAll(c) retains only those elements of c and reports true 439 * if changed 440 */ 441 public void testRetainAll() { 442 LinkedTransferQueue q = populatedQueue(SIZE); 443 LinkedTransferQueue p = populatedQueue(SIZE); 444 for (int i = 0; i < SIZE; ++i) { 445 boolean changed = q.retainAll(p); 446 if (i == 0) { 447 assertFalse(changed); 448 } else { 449 assertTrue(changed); 450 } 451 assertTrue(q.containsAll(p)); 452 assertEquals(SIZE - i, q.size()); 453 p.remove(); 454 } 455 } 456 457 /** 458 * removeAll(c) removes only those elements of c and reports true 459 * if changed 460 */ 461 public void testRemoveAll() { 462 for (int i = 1; i < SIZE; ++i) { 463 LinkedTransferQueue q = populatedQueue(SIZE); 464 LinkedTransferQueue p = populatedQueue(i); 465 assertTrue(q.removeAll(p)); 466 assertEquals(SIZE - i, q.size()); 467 for (int j = 0; j < i; ++j) { 468 assertFalse(q.contains(p.remove())); 469 } 470 } 471 } 472 473 /** 474 * toArray() contains all elements in FIFO order 475 */ 476 public void testToArray() { 477 LinkedTransferQueue q = populatedQueue(SIZE); 478 Object[] o = q.toArray(); 479 for (int i = 0; i < o.length; i++) { 480 assertSame(o[i], q.poll()); 481 } 482 } 483 484 /** 485 * toArray(a) contains all elements in FIFO order 486 */ 487 public void testToArray2() { 488 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 489 Integer[] ints = new Integer[SIZE]; 490 Integer[] array = q.toArray(ints); 491 assertSame(ints, array); 492 for (int i = 0; i < ints.length; i++) { 493 assertSame(ints[i], q.poll()); 494 } 495 } 496 497 /** 498 * toArray(incompatible array type) throws ArrayStoreException 499 */ 500 public void testToArray1_BadArg() { 501 LinkedTransferQueue q = populatedQueue(SIZE); 502 try { 503 q.toArray(new String[10]); 504 shouldThrow(); 505 } catch (ArrayStoreException success) {} 506 } 507 508 /** 509 * iterator iterates through all elements 510 */ 511 public void testIterator() throws InterruptedException { 512 LinkedTransferQueue q = populatedQueue(SIZE); 513 Iterator it = q.iterator(); 514 int i; 515 for (i = 0; it.hasNext(); i++) 516 assertTrue(q.contains(it.next())); 517 assertEquals(i, SIZE); 518 assertIteratorExhausted(it); 519 520 it = q.iterator(); 521 for (i = 0; it.hasNext(); i++) 522 assertEquals(it.next(), q.take()); 523 assertEquals(i, SIZE); 524 assertIteratorExhausted(it); 525 } 526 527 /** 528 * iterator of empty collection has no elements 529 */ 530 public void testEmptyIterator() { 531 assertIteratorExhausted(new LinkedTransferQueue().iterator()); 532 } 533 534 /** 535 * iterator.remove() removes current element 536 */ 537 public void testIteratorRemove() { 538 final LinkedTransferQueue q = new LinkedTransferQueue(); 539 q.add(two); 540 q.add(one); 541 q.add(three); 542 543 Iterator it = q.iterator(); 544 it.next(); 545 it.remove(); 546 547 it = q.iterator(); 548 assertSame(it.next(), one); 549 assertSame(it.next(), three); 550 assertFalse(it.hasNext()); 551 } 552 553 /** 554 * iterator ordering is FIFO 555 */ 556 public void testIteratorOrdering() { 557 final LinkedTransferQueue<Integer> q 558 = new LinkedTransferQueue<Integer>(); 559 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 560 q.add(one); 561 q.add(two); 562 q.add(three); 563 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 564 int k = 0; 565 for (Integer n : q) { 566 assertEquals(++k, (int) n); 567 } 568 assertEquals(3, k); 569 } 570 571 /** 572 * Modifications do not cause iterators to fail 573 */ 574 public void testWeaklyConsistentIteration() { 575 final LinkedTransferQueue q = new LinkedTransferQueue(); 576 q.add(one); 577 q.add(two); 578 q.add(three); 579 for (Iterator it = q.iterator(); it.hasNext();) { 580 q.remove(); 581 it.next(); 582 } 583 assertEquals(0, q.size()); 584 } 585 586 /** 587 * toString contains toStrings of elements 588 */ 589 public void testToString() { 590 LinkedTransferQueue q = populatedQueue(SIZE); 591 String s = q.toString(); 592 for (int i = 0; i < SIZE; ++i) { 593 assertTrue(s.contains(String.valueOf(i))); 594 } 595 } 596 597 /** 598 * offer transfers elements across Executor tasks 599 */ 600 public void testOfferInExecutor() { 601 final LinkedTransferQueue q = new LinkedTransferQueue(); 602 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 603 final ExecutorService executor = Executors.newFixedThreadPool(2); 604 try (PoolCleaner cleaner = cleaner(executor)) { 605 606 executor.execute(new CheckedRunnable() { 607 public void realRun() throws InterruptedException { 608 threadsStarted.await(); 609 long startTime = System.nanoTime(); 610 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); 611 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 612 }}); 613 614 executor.execute(new CheckedRunnable() { 615 public void realRun() throws InterruptedException { 616 threadsStarted.await(); 617 assertSame(one, q.take()); 618 checkEmpty(q); 619 }}); 620 } 621 } 622 623 /** 624 * timed poll retrieves elements across Executor threads 625 */ 626 public void testPollInExecutor() { 627 final LinkedTransferQueue q = new LinkedTransferQueue(); 628 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 629 final ExecutorService executor = Executors.newFixedThreadPool(2); 630 try (PoolCleaner cleaner = cleaner(executor)) { 631 632 executor.execute(new CheckedRunnable() { 633 public void realRun() throws InterruptedException { 634 assertNull(q.poll()); 635 threadsStarted.await(); 636 long startTime = System.nanoTime(); 637 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 638 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 639 checkEmpty(q); 640 }}); 641 642 executor.execute(new CheckedRunnable() { 643 public void realRun() throws InterruptedException { 644 threadsStarted.await(); 645 q.put(one); 646 }}); 647 } 648 } 649 650 /** 651 * A deserialized serialized queue has same elements in same order 652 */ 653 public void testSerialization() throws Exception { 654 Queue x = populatedQueue(SIZE); 655 Queue y = serialClone(x); 656 657 assertNotSame(y, x); 658 assertEquals(x.size(), y.size()); 659 assertEquals(x.toString(), y.toString()); 660 assertTrue(Arrays.equals(x.toArray(), y.toArray())); 661 while (!x.isEmpty()) { 662 assertFalse(y.isEmpty()); 663 assertEquals(x.remove(), y.remove()); 664 } 665 assertTrue(y.isEmpty()); 666 } 667 668 /** 669 * drainTo(c) empties queue into another collection c 670 */ 671 public void testDrainTo() { 672 LinkedTransferQueue q = populatedQueue(SIZE); 673 ArrayList l = new ArrayList(); 674 q.drainTo(l); 675 assertEquals(0, q.size()); 676 assertEquals(SIZE, l.size()); 677 for (int i = 0; i < SIZE; ++i) { 678 assertEquals(i, l.get(i)); 679 } 680 q.add(zero); 681 q.add(one); 682 assertFalse(q.isEmpty()); 683 assertTrue(q.contains(zero)); 684 assertTrue(q.contains(one)); 685 l.clear(); 686 q.drainTo(l); 687 assertEquals(0, q.size()); 688 assertEquals(2, l.size()); 689 for (int i = 0; i < 2; ++i) { 690 assertEquals(i, l.get(i)); 691 } 692 } 693 694 /** 695 * drainTo(c) empties full queue, unblocking a waiting put. 696 */ 697 public void testDrainToWithActivePut() throws InterruptedException { 698 final LinkedTransferQueue q = populatedQueue(SIZE); 699 Thread t = newStartedThread(new CheckedRunnable() { 700 public void realRun() { 701 q.put(SIZE + 1); 702 }}); 703 ArrayList l = new ArrayList(); 704 q.drainTo(l); 705 assertTrue(l.size() >= SIZE); 706 for (int i = 0; i < SIZE; ++i) 707 assertEquals(i, l.get(i)); 708 awaitTermination(t); 709 assertTrue(q.size() + l.size() >= SIZE); 710 } 711 712 /** 713 * drainTo(c, n) empties first min(n, size) elements of queue into c 714 */ 715 public void testDrainToN() { 716 LinkedTransferQueue q = new LinkedTransferQueue(); 717 for (int i = 0; i < SIZE + 2; ++i) { 718 for (int j = 0; j < SIZE; j++) { 719 assertTrue(q.offer(j)); 720 } 721 ArrayList l = new ArrayList(); 722 q.drainTo(l, i); 723 int k = (i < SIZE) ? i : SIZE; 724 assertEquals(k, l.size()); 725 assertEquals(SIZE - k, q.size()); 726 for (int j = 0; j < k; ++j) 727 assertEquals(j, l.get(j)); 728 do {} while (q.poll() != null); 729 } 730 } 731 732 /** 733 * timed poll() or take() increments the waiting consumer count; 734 * offer(e) decrements the waiting consumer count 735 */ 736 public void testWaitingConsumer() throws InterruptedException { 737 final LinkedTransferQueue q = new LinkedTransferQueue(); 738 assertEquals(0, q.getWaitingConsumerCount()); 739 assertFalse(q.hasWaitingConsumer()); 740 final CountDownLatch threadStarted = new CountDownLatch(1); 741 742 Thread t = newStartedThread(new CheckedRunnable() { 743 public void realRun() throws InterruptedException { 744 threadStarted.countDown(); 745 long startTime = System.nanoTime(); 746 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 747 assertEquals(0, q.getWaitingConsumerCount()); 748 assertFalse(q.hasWaitingConsumer()); 749 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 750 }}); 751 752 threadStarted.await(); 753 waitForThreadToEnterWaitState(t); 754 assertEquals(1, q.getWaitingConsumerCount()); 755 assertTrue(q.hasWaitingConsumer()); 756 757 assertTrue(q.offer(one)); 758 assertEquals(0, q.getWaitingConsumerCount()); 759 assertFalse(q.hasWaitingConsumer()); 760 761 awaitTermination(t); 762 } 763 764 /** 765 * transfer(null) throws NullPointerException 766 */ 767 public void testTransfer1() throws InterruptedException { 768 try { 769 LinkedTransferQueue q = new LinkedTransferQueue(); 770 q.transfer(null); 771 shouldThrow(); 772 } catch (NullPointerException success) {} 773 } 774 775 /** 776 * transfer waits until a poll occurs. The transfered element 777 * is returned by this associated poll. 778 */ 779 public void testTransfer2() throws InterruptedException { 780 final LinkedTransferQueue<Integer> q 781 = new LinkedTransferQueue<Integer>(); 782 final CountDownLatch threadStarted = new CountDownLatch(1); 783 784 Thread t = newStartedThread(new CheckedRunnable() { 785 public void realRun() throws InterruptedException { 786 threadStarted.countDown(); 787 q.transfer(five); 788 checkEmpty(q); 789 }}); 790 791 threadStarted.await(); 792 waitForThreadToEnterWaitState(t); 793 assertEquals(1, q.size()); 794 assertSame(five, q.poll()); 795 checkEmpty(q); 796 awaitTermination(t); 797 } 798 799 /** 800 * transfer waits until a poll occurs, and then transfers in fifo order 801 */ 802 public void testTransfer3() throws InterruptedException { 803 final LinkedTransferQueue<Integer> q 804 = new LinkedTransferQueue<Integer>(); 805 806 Thread first = newStartedThread(new CheckedRunnable() { 807 public void realRun() throws InterruptedException { 808 q.transfer(four); 809 assertTrue(!q.contains(four)); 810 assertEquals(1, q.size()); 811 }}); 812 813 Thread interruptedThread = newStartedThread( 814 new CheckedInterruptedRunnable() { 815 public void realRun() throws InterruptedException { 816 while (q.isEmpty()) 817 Thread.yield(); 818 q.transfer(five); 819 }}); 820 821 while (q.size() < 2) 822 Thread.yield(); 823 assertEquals(2, q.size()); 824 assertSame(four, q.poll()); 825 first.join(); 826 assertEquals(1, q.size()); 827 interruptedThread.interrupt(); 828 interruptedThread.join(); 829 checkEmpty(q); 830 } 831 832 /** 833 * transfer waits until a poll occurs, at which point the polling 834 * thread returns the element 835 */ 836 public void testTransfer4() throws InterruptedException { 837 final LinkedTransferQueue q = new LinkedTransferQueue(); 838 839 Thread t = newStartedThread(new CheckedRunnable() { 840 public void realRun() throws InterruptedException { 841 q.transfer(four); 842 assertFalse(q.contains(four)); 843 assertSame(three, q.poll()); 844 }}); 845 846 while (q.isEmpty()) 847 Thread.yield(); 848 assertFalse(q.isEmpty()); 849 assertEquals(1, q.size()); 850 assertTrue(q.offer(three)); 851 assertSame(four, q.poll()); 852 awaitTermination(t); 853 } 854 855 /** 856 * transfer waits until a take occurs. The transfered element 857 * is returned by this associated take. 858 */ 859 public void testTransfer5() throws InterruptedException { 860 final LinkedTransferQueue<Integer> q 861 = new LinkedTransferQueue<Integer>(); 862 863 Thread t = newStartedThread(new CheckedRunnable() { 864 public void realRun() throws InterruptedException { 865 q.transfer(four); 866 checkEmpty(q); 867 }}); 868 869 while (q.isEmpty()) 870 Thread.yield(); 871 assertFalse(q.isEmpty()); 872 assertEquals(1, q.size()); 873 assertSame(four, q.take()); 874 checkEmpty(q); 875 awaitTermination(t); 876 } 877 878 /** 879 * tryTransfer(null) throws NullPointerException 880 */ 881 public void testTryTransfer1() { 882 final LinkedTransferQueue q = new LinkedTransferQueue(); 883 try { 884 q.tryTransfer(null); 885 shouldThrow(); 886 } catch (NullPointerException success) {} 887 } 888 889 /** 890 * tryTransfer returns false and does not enqueue if there are no 891 * consumers waiting to poll or take. 892 */ 893 public void testTryTransfer2() throws InterruptedException { 894 final LinkedTransferQueue q = new LinkedTransferQueue(); 895 assertFalse(q.tryTransfer(new Object())); 896 assertFalse(q.hasWaitingConsumer()); 897 checkEmpty(q); 898 } 899 900 /** 901 * If there is a consumer waiting in timed poll, tryTransfer 902 * returns true while successfully transfering object. 903 */ 904 public void testTryTransfer3() throws InterruptedException { 905 final Object hotPotato = new Object(); 906 final LinkedTransferQueue q = new LinkedTransferQueue(); 907 908 Thread t = newStartedThread(new CheckedRunnable() { 909 public void realRun() { 910 while (! q.hasWaitingConsumer()) 911 Thread.yield(); 912 assertTrue(q.hasWaitingConsumer()); 913 checkEmpty(q); 914 assertTrue(q.tryTransfer(hotPotato)); 915 }}); 916 917 long startTime = System.nanoTime(); 918 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS)); 919 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 920 checkEmpty(q); 921 awaitTermination(t); 922 } 923 924 /** 925 * If there is a consumer waiting in take, tryTransfer returns 926 * true while successfully transfering object. 927 */ 928 public void testTryTransfer4() throws InterruptedException { 929 final Object hotPotato = new Object(); 930 final LinkedTransferQueue q = new LinkedTransferQueue(); 931 932 Thread t = newStartedThread(new CheckedRunnable() { 933 public void realRun() { 934 while (! q.hasWaitingConsumer()) 935 Thread.yield(); 936 assertTrue(q.hasWaitingConsumer()); 937 checkEmpty(q); 938 assertTrue(q.tryTransfer(hotPotato)); 939 }}); 940 941 assertSame(q.take(), hotPotato); 942 checkEmpty(q); 943 awaitTermination(t); 944 } 945 946 /** 947 * tryTransfer blocks interruptibly if no takers 948 */ 949 public void testTryTransfer5() throws InterruptedException { 950 final LinkedTransferQueue q = new LinkedTransferQueue(); 951 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 952 assertTrue(q.isEmpty()); 953 954 Thread t = newStartedThread(new CheckedRunnable() { 955 public void realRun() throws InterruptedException { 956 long startTime = System.nanoTime(); 957 Thread.currentThread().interrupt(); 958 try { 959 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); 960 shouldThrow(); 961 } catch (InterruptedException success) {} 962 assertFalse(Thread.interrupted()); 963 964 pleaseInterrupt.countDown(); 965 try { 966 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); 967 shouldThrow(); 968 } catch (InterruptedException success) {} 969 assertFalse(Thread.interrupted()); 970 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 971 }}); 972 973 await(pleaseInterrupt); 974 assertThreadStaysAlive(t); 975 t.interrupt(); 976 awaitTermination(t); 977 checkEmpty(q); 978 } 979 980 /** 981 * tryTransfer gives up after the timeout and returns false 982 */ 983 public void testTryTransfer6() throws InterruptedException { 984 final LinkedTransferQueue q = new LinkedTransferQueue(); 985 986 Thread t = newStartedThread(new CheckedRunnable() { 987 public void realRun() throws InterruptedException { 988 long startTime = System.nanoTime(); 989 assertFalse(q.tryTransfer(new Object(), 990 timeoutMillis(), MILLISECONDS)); 991 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 992 checkEmpty(q); 993 }}); 994 995 awaitTermination(t); 996 checkEmpty(q); 997 } 998 999 /** 1000 * tryTransfer waits for any elements previously in to be removed 1001 * before transfering to a poll or take 1002 */ 1003 public void testTryTransfer7() throws InterruptedException { 1004 final LinkedTransferQueue q = new LinkedTransferQueue(); 1005 assertTrue(q.offer(four)); 1006 1007 Thread t = newStartedThread(new CheckedRunnable() { 1008 public void realRun() throws InterruptedException { 1009 long startTime = System.nanoTime(); 1010 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS)); 1011 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 1012 checkEmpty(q); 1013 }}); 1014 1015 while (q.size() != 2) 1016 Thread.yield(); 1017 assertEquals(2, q.size()); 1018 assertSame(four, q.poll()); 1019 assertSame(five, q.poll()); 1020 checkEmpty(q); 1021 awaitTermination(t); 1022 } 1023 1024 /** 1025 * tryTransfer attempts to enqueue into the queue and fails 1026 * returning false not enqueueing and the successive poll is null 1027 */ 1028 public void testTryTransfer8() throws InterruptedException { 1029 final LinkedTransferQueue q = new LinkedTransferQueue(); 1030 assertTrue(q.offer(four)); 1031 assertEquals(1, q.size()); 1032 long startTime = System.nanoTime(); 1033 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS)); 1034 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 1035 assertEquals(1, q.size()); 1036 assertSame(four, q.poll()); 1037 assertNull(q.poll()); 1038 checkEmpty(q); 1039 } 1040 1041 private LinkedTransferQueue<Integer> populatedQueue(int n) { 1042 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>(); 1043 checkEmpty(q); 1044 for (int i = 0; i < n; i++) { 1045 assertEquals(i, q.size()); 1046 assertTrue(q.offer(i)); 1047 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 1048 } 1049 assertFalse(q.isEmpty()); 1050 return q; 1051 } 1052 1053 /** 1054 * remove(null), contains(null) always return false 1055 */ 1056 public void testNeverContainsNull() { 1057 Collection<?>[] qs = { 1058 new LinkedTransferQueue<Object>(), 1059 populatedQueue(2), 1060 }; 1061 1062 for (Collection<?> q : qs) { 1063 assertFalse(q.contains(null)); 1064 assertFalse(q.remove(null)); 1065 } 1066 } 1067} 1068