PriorityBlockingQueue.java revision cec4dd4b1d33f78997603d0f89c0d0e56e64dbcd
1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/licenses/publicdomain 5 */ 6 7package java.util.concurrent; 8 9import java.util.concurrent.locks.*; 10import java.util.*; 11 12// BEGIN android-note 13// removed link to collections framework docs 14// END android-note 15 16/** 17 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 18 * the same ordering rules as class {@link PriorityQueue} and supplies 19 * blocking retrieval operations. While this queue is logically 20 * unbounded, attempted additions may fail due to resource exhaustion 21 * (causing <tt>OutOfMemoryError</tt>). This class does not permit 22 * <tt>null</tt> elements. A priority queue relying on {@linkplain 23 * Comparable natural ordering} also does not permit insertion of 24 * non-comparable objects (doing so results in 25 * <tt>ClassCastException</tt>). 26 * 27 * <p>This class and its iterator implement all of the 28 * <em>optional</em> methods of the {@link Collection} and {@link 29 * Iterator} interfaces. The Iterator provided in method {@link 30 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 31 * the PriorityBlockingQueue in any particular order. If you need 32 * ordered traversal, consider using 33 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt> 34 * can be used to <em>remove</em> some or all elements in priority 35 * order and place them in another collection. 36 * 37 * <p>Operations on this class make no guarantees about the ordering 38 * of elements with equal priority. If you need to enforce an 39 * ordering, you can define custom classes or comparators that use a 40 * secondary key to break ties in primary priority values. For 41 * example, here is a class that applies first-in-first-out 42 * tie-breaking to comparable elements. To use it, you would insert a 43 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object. 44 * 45 * <pre> 46 * class FIFOEntry<E extends Comparable<? super E>> 47 * implements Comparable<FIFOEntry<E>> { 48 * final static AtomicLong seq = new AtomicLong(); 49 * final long seqNum; 50 * final E entry; 51 * public FIFOEntry(E entry) { 52 * seqNum = seq.getAndIncrement(); 53 * this.entry = entry; 54 * } 55 * public E getEntry() { return entry; } 56 * public int compareTo(FIFOEntry<E> other) { 57 * int res = entry.compareTo(other.entry); 58 * if (res == 0 && other.entry != this.entry) 59 * res = (seqNum < other.seqNum ? -1 : 1); 60 * return res; 61 * } 62 * }</pre> 63 * 64 * @since 1.5 65 * @author Doug Lea 66 * @param <E> the type of elements held in this collection 67 */ 68public class PriorityBlockingQueue<E> extends AbstractQueue<E> 69 implements BlockingQueue<E>, java.io.Serializable { 70 private static final long serialVersionUID = 5595510919245408276L; 71 72 private final PriorityQueue<E> q; 73 private final ReentrantLock lock = new ReentrantLock(true); 74 private final Condition notEmpty = lock.newCondition(); 75 76 /** 77 * Creates a <tt>PriorityBlockingQueue</tt> with the default 78 * initial capacity (11) that orders its elements according to 79 * their {@linkplain Comparable natural ordering}. 80 */ 81 public PriorityBlockingQueue() { 82 q = new PriorityQueue<E>(); 83 } 84 85 /** 86 * Creates a <tt>PriorityBlockingQueue</tt> with the specified 87 * initial capacity that orders its elements according to their 88 * {@linkplain Comparable natural ordering}. 89 * 90 * @param initialCapacity the initial capacity for this priority queue 91 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less 92 * than 1 93 */ 94 public PriorityBlockingQueue(int initialCapacity) { 95 q = new PriorityQueue<E>(initialCapacity, null); 96 } 97 98 /** 99 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial 100 * capacity that orders its elements according to the specified 101 * comparator. 102 * 103 * @param initialCapacity the initial capacity for this priority queue 104 * @param comparator the comparator that will be used to order this 105 * priority queue. If {@code null}, the {@linkplain Comparable 106 * natural ordering} of the elements will be used. 107 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less 108 * than 1 109 */ 110 public PriorityBlockingQueue(int initialCapacity, 111 Comparator<? super E> comparator) { 112 q = new PriorityQueue<E>(initialCapacity, comparator); 113 } 114 115 /** 116 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements 117 * in the specified collection. If the specified collection is a 118 * {@link SortedSet} or a {@link PriorityQueue}, this 119 * priority queue will be ordered according to the same ordering. 120 * Otherwise, this priority queue will be ordered according to the 121 * {@linkplain Comparable natural ordering} of its elements. 122 * 123 * @param c the collection whose elements are to be placed 124 * into this priority queue 125 * @throws ClassCastException if elements of the specified collection 126 * cannot be compared to one another according to the priority 127 * queue's ordering 128 * @throws NullPointerException if the specified collection or any 129 * of its elements are null 130 */ 131 public PriorityBlockingQueue(Collection<? extends E> c) { 132 q = new PriorityQueue<E>(c); 133 } 134 135 /** 136 * Inserts the specified element into this priority queue. 137 * 138 * @param e the element to add 139 * @return <tt>true</tt> (as specified by {@link Collection#add}) 140 * @throws ClassCastException if the specified element cannot be compared 141 * with elements currently in the priority queue according to the 142 * priority queue's ordering 143 * @throws NullPointerException if the specified element is null 144 */ 145 public boolean add(E e) { 146 return offer(e); 147 } 148 149 /** 150 * Inserts the specified element into this priority queue. 151 * 152 * @param e the element to add 153 * @return <tt>true</tt> (as specified by {@link Queue#offer}) 154 * @throws ClassCastException if the specified element cannot be compared 155 * with elements currently in the priority queue according to the 156 * priority queue's ordering 157 * @throws NullPointerException if the specified element is null 158 */ 159 public boolean offer(E e) { 160 final ReentrantLock lock = this.lock; 161 lock.lock(); 162 try { 163 boolean ok = q.offer(e); 164 assert ok; 165 notEmpty.signal(); 166 return true; 167 } finally { 168 lock.unlock(); 169 } 170 } 171 172 /** 173 * Inserts the specified element into this priority queue. As the queue is 174 * unbounded this method will never block. 175 * 176 * @param e the element to add 177 * @throws ClassCastException if the specified element cannot be compared 178 * with elements currently in the priority queue according to the 179 * priority queue's ordering 180 * @throws NullPointerException if the specified element is null 181 */ 182 public void put(E e) { 183 offer(e); // never need to block 184 } 185 186 /** 187 * Inserts the specified element into this priority queue. As the queue is 188 * unbounded this method will never block. 189 * 190 * @param e the element to add 191 * @param timeout This parameter is ignored as the method never blocks 192 * @param unit This parameter is ignored as the method never blocks 193 * @return <tt>true</tt> 194 * @throws ClassCastException if the specified element cannot be compared 195 * with elements currently in the priority queue according to the 196 * priority queue's ordering 197 * @throws NullPointerException if the specified element is null 198 */ 199 public boolean offer(E e, long timeout, TimeUnit unit) { 200 return offer(e); // never need to block 201 } 202 203 public E poll() { 204 final ReentrantLock lock = this.lock; 205 lock.lock(); 206 try { 207 return q.poll(); 208 } finally { 209 lock.unlock(); 210 } 211 } 212 213 public E take() throws InterruptedException { 214 final ReentrantLock lock = this.lock; 215 lock.lockInterruptibly(); 216 try { 217 try { 218 while (q.size() == 0) 219 notEmpty.await(); 220 } catch (InterruptedException ie) { 221 notEmpty.signal(); // propagate to non-interrupted thread 222 throw ie; 223 } 224 E x = q.poll(); 225 assert x != null; 226 return x; 227 } finally { 228 lock.unlock(); 229 } 230 } 231 232 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 233 long nanos = unit.toNanos(timeout); 234 final ReentrantLock lock = this.lock; 235 lock.lockInterruptibly(); 236 try { 237 for (;;) { 238 E x = q.poll(); 239 if (x != null) 240 return x; 241 if (nanos <= 0) 242 return null; 243 try { 244 nanos = notEmpty.awaitNanos(nanos); 245 } catch (InterruptedException ie) { 246 notEmpty.signal(); // propagate to non-interrupted thread 247 throw ie; 248 } 249 } 250 } finally { 251 lock.unlock(); 252 } 253 } 254 255 public E peek() { 256 final ReentrantLock lock = this.lock; 257 lock.lock(); 258 try { 259 return q.peek(); 260 } finally { 261 lock.unlock(); 262 } 263 } 264 265 /** 266 * Returns the comparator used to order the elements in this queue, 267 * or <tt>null</tt> if this queue uses the {@linkplain Comparable 268 * natural ordering} of its elements. 269 * 270 * @return the comparator used to order the elements in this queue, 271 * or <tt>null</tt> if this queue uses the natural 272 * ordering of its elements 273 */ 274 public Comparator<? super E> comparator() { 275 return q.comparator(); 276 } 277 278 public int size() { 279 final ReentrantLock lock = this.lock; 280 lock.lock(); 281 try { 282 return q.size(); 283 } finally { 284 lock.unlock(); 285 } 286 } 287 288 /** 289 * Always returns <tt>Integer.MAX_VALUE</tt> because 290 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained. 291 * @return <tt>Integer.MAX_VALUE</tt> 292 */ 293 public int remainingCapacity() { 294 return Integer.MAX_VALUE; 295 } 296 297 /** 298 * Removes a single instance of the specified element from this queue, 299 * if it is present. More formally, removes an element {@code e} such 300 * that {@code o.equals(e)}, if this queue contains one or more such 301 * elements. Returns {@code true} if and only if this queue contained 302 * the specified element (or equivalently, if this queue changed as a 303 * result of the call). 304 * 305 * @param o element to be removed from this queue, if present 306 * @return <tt>true</tt> if this queue changed as a result of the call 307 */ 308 public boolean remove(Object o) { 309 final ReentrantLock lock = this.lock; 310 lock.lock(); 311 try { 312 return q.remove(o); 313 } finally { 314 lock.unlock(); 315 } 316 } 317 318 /** 319 * Returns {@code true} if this queue contains the specified element. 320 * More formally, returns {@code true} if and only if this queue contains 321 * at least one element {@code e} such that {@code o.equals(e)}. 322 * 323 * @param o object to be checked for containment in this queue 324 * @return <tt>true</tt> if this queue contains the specified element 325 */ 326 public boolean contains(Object o) { 327 final ReentrantLock lock = this.lock; 328 lock.lock(); 329 try { 330 return q.contains(o); 331 } finally { 332 lock.unlock(); 333 } 334 } 335 336 /** 337 * Returns an array containing all of the elements in this queue. 338 * The returned array elements are in no particular order. 339 * 340 * <p>The returned array will be "safe" in that no references to it are 341 * maintained by this queue. (In other words, this method must allocate 342 * a new array). The caller is thus free to modify the returned array. 343 * 344 * <p>This method acts as bridge between array-based and collection-based 345 * APIs. 346 * 347 * @return an array containing all of the elements in this queue 348 */ 349 public Object[] toArray() { 350 final ReentrantLock lock = this.lock; 351 lock.lock(); 352 try { 353 return q.toArray(); 354 } finally { 355 lock.unlock(); 356 } 357 } 358 359 360 public String toString() { 361 final ReentrantLock lock = this.lock; 362 lock.lock(); 363 try { 364 return q.toString(); 365 } finally { 366 lock.unlock(); 367 } 368 } 369 370 /** 371 * @throws UnsupportedOperationException {@inheritDoc} 372 * @throws ClassCastException {@inheritDoc} 373 * @throws NullPointerException {@inheritDoc} 374 * @throws IllegalArgumentException {@inheritDoc} 375 */ 376 public int drainTo(Collection<? super E> c) { 377 if (c == null) 378 throw new NullPointerException(); 379 if (c == this) 380 throw new IllegalArgumentException(); 381 final ReentrantLock lock = this.lock; 382 lock.lock(); 383 try { 384 int n = 0; 385 E e; 386 while ( (e = q.poll()) != null) { 387 c.add(e); 388 ++n; 389 } 390 return n; 391 } finally { 392 lock.unlock(); 393 } 394 } 395 396 /** 397 * @throws UnsupportedOperationException {@inheritDoc} 398 * @throws ClassCastException {@inheritDoc} 399 * @throws NullPointerException {@inheritDoc} 400 * @throws IllegalArgumentException {@inheritDoc} 401 */ 402 public int drainTo(Collection<? super E> c, int maxElements) { 403 if (c == null) 404 throw new NullPointerException(); 405 if (c == this) 406 throw new IllegalArgumentException(); 407 if (maxElements <= 0) 408 return 0; 409 final ReentrantLock lock = this.lock; 410 lock.lock(); 411 try { 412 int n = 0; 413 E e; 414 while (n < maxElements && (e = q.poll()) != null) { 415 c.add(e); 416 ++n; 417 } 418 return n; 419 } finally { 420 lock.unlock(); 421 } 422 } 423 424 /** 425 * Atomically removes all of the elements from this queue. 426 * The queue will be empty after this call returns. 427 */ 428 public void clear() { 429 final ReentrantLock lock = this.lock; 430 lock.lock(); 431 try { 432 q.clear(); 433 } finally { 434 lock.unlock(); 435 } 436 } 437 438 /** 439 * Returns an array containing all of the elements in this queue; the 440 * runtime type of the returned array is that of the specified array. 441 * The returned array elements are in no particular order. 442 * If the queue fits in the specified array, it is returned therein. 443 * Otherwise, a new array is allocated with the runtime type of the 444 * specified array and the size of this queue. 445 * 446 * <p>If this queue fits in the specified array with room to spare 447 * (i.e., the array has more elements than this queue), the element in 448 * the array immediately following the end of the queue is set to 449 * <tt>null</tt>. 450 * 451 * <p>Like the {@link #toArray()} method, this method acts as bridge between 452 * array-based and collection-based APIs. Further, this method allows 453 * precise control over the runtime type of the output array, and may, 454 * under certain circumstances, be used to save allocation costs. 455 * 456 * <p>Suppose <tt>x</tt> is a queue known to contain only strings. 457 * The following code can be used to dump the queue into a newly 458 * allocated array of <tt>String</tt>: 459 * 460 * <pre> 461 * String[] y = x.toArray(new String[0]);</pre> 462 * 463 * Note that <tt>toArray(new Object[0])</tt> is identical in function to 464 * <tt>toArray()</tt>. 465 * 466 * @param a the array into which the elements of the queue are to 467 * be stored, if it is big enough; otherwise, a new array of the 468 * same runtime type is allocated for this purpose 469 * @return an array containing all of the elements in this queue 470 * @throws ArrayStoreException if the runtime type of the specified array 471 * is not a supertype of the runtime type of every element in 472 * this queue 473 * @throws NullPointerException if the specified array is null 474 */ 475 public <T> T[] toArray(T[] a) { 476 final ReentrantLock lock = this.lock; 477 lock.lock(); 478 try { 479 return q.toArray(a); 480 } finally { 481 lock.unlock(); 482 } 483 } 484 485 /** 486 * Returns an iterator over the elements in this queue. The 487 * iterator does not return the elements in any particular order. 488 * The returned <tt>Iterator</tt> is a "weakly consistent" 489 * iterator that will never throw {@link 490 * ConcurrentModificationException}, and guarantees to traverse 491 * elements as they existed upon construction of the iterator, and 492 * may (but is not guaranteed to) reflect any modifications 493 * subsequent to construction. 494 * 495 * @return an iterator over the elements in this queue 496 */ 497 public Iterator<E> iterator() { 498 return new Itr(toArray()); 499 } 500 501 /** 502 * Snapshot iterator that works off copy of underlying q array. 503 */ 504 private class Itr implements Iterator<E> { 505 final Object[] array; // Array of all elements 506 int cursor; // index of next element to return; 507 int lastRet; // index of last element, or -1 if no such 508 509 Itr(Object[] array) { 510 lastRet = -1; 511 this.array = array; 512 } 513 514 public boolean hasNext() { 515 return cursor < array.length; 516 } 517 518 public E next() { 519 if (cursor >= array.length) 520 throw new NoSuchElementException(); 521 lastRet = cursor; 522 return (E)array[cursor++]; 523 } 524 525 public void remove() { 526 if (lastRet < 0) 527 throw new IllegalStateException(); 528 Object x = array[lastRet]; 529 lastRet = -1; 530 // Traverse underlying queue to find == element, 531 // not just a .equals element. 532 lock.lock(); 533 try { 534 for (Iterator it = q.iterator(); it.hasNext(); ) { 535 if (it.next() == x) { 536 it.remove(); 537 return; 538 } 539 } 540 } finally { 541 lock.unlock(); 542 } 543 } 544 } 545 546 /** 547 * Saves the state to a stream (that is, serializes it). This 548 * merely wraps default serialization within lock. The 549 * serialization strategy for items is left to underlying 550 * Queue. Note that locking is not needed on deserialization, so 551 * readObject is not defined, just relying on default. 552 */ 553 private void writeObject(java.io.ObjectOutputStream s) 554 throws java.io.IOException { 555 lock.lock(); 556 try { 557 s.defaultWriteObject(); 558 } finally { 559 lock.unlock(); 560 } 561 } 562 563} 564