1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8
9import static java.util.concurrent.TimeUnit.NANOSECONDS;
10
11import java.util.AbstractQueue;
12import java.util.Collection;
13import java.util.Iterator;
14import java.util.NoSuchElementException;
15import java.util.PriorityQueue;
16import java.util.concurrent.locks.Condition;
17import java.util.concurrent.locks.ReentrantLock;
18
19// BEGIN android-note
20// removed link to collections framework docs
21// END android-note
22
23/**
24 * An unbounded {@linkplain BlockingQueue blocking queue} of
25 * {@code Delayed} elements, in which an element can only be taken
26 * when its delay has expired.  The <em>head</em> of the queue is that
27 * {@code Delayed} element whose delay expired furthest in the
28 * past.  If no delay has expired there is no head and {@code poll}
29 * will return {@code null}. Expiration occurs when an element's
30 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
31 * than or equal to zero.  Even though unexpired elements cannot be
32 * removed using {@code take} or {@code poll}, they are otherwise
33 * treated as normal elements. For example, the {@code size} method
34 * returns the count of both expired and unexpired elements.
35 * This queue does not permit null elements.
36 *
37 * <p>This class and its iterator implement all of the
38 * <em>optional</em> methods of the {@link Collection} and {@link
39 * Iterator} interfaces.  The Iterator provided in method {@link
40 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
41 * the DelayQueue in any particular order.
42 *
43 * @since 1.5
44 * @author Doug Lea
45 * @param <E> the type of elements held in this queue
46 */
47public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
48    implements BlockingQueue<E> {
49
50    private final transient ReentrantLock lock = new ReentrantLock();
51    private final PriorityQueue<E> q = new PriorityQueue<E>();
52
53    /**
54     * Thread designated to wait for the element at the head of
55     * the queue.  This variant of the Leader-Follower pattern
56     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
57     * minimize unnecessary timed waiting.  When a thread becomes
58     * the leader, it waits only for the next delay to elapse, but
59     * other threads await indefinitely.  The leader thread must
60     * signal some other thread before returning from take() or
61     * poll(...), unless some other thread becomes leader in the
62     * interim.  Whenever the head of the queue is replaced with
63     * an element with an earlier expiration time, the leader
64     * field is invalidated by being reset to null, and some
65     * waiting thread, but not necessarily the current leader, is
66     * signalled.  So waiting threads must be prepared to acquire
67     * and lose leadership while waiting.
68     */
69    private Thread leader;
70
71    /**
72     * Condition signalled when a newer element becomes available
73     * at the head of the queue or a new thread may need to
74     * become leader.
75     */
76    private final Condition available = lock.newCondition();
77
78    /**
79     * Creates a new {@code DelayQueue} that is initially empty.
80     */
81    public DelayQueue() {}
82
83    /**
84     * Creates a {@code DelayQueue} initially containing the elements of the
85     * given collection of {@link Delayed} instances.
86     *
87     * @param c the collection of elements to initially contain
88     * @throws NullPointerException if the specified collection or any
89     *         of its elements are null
90     */
91    public DelayQueue(Collection<? extends E> c) {
92        this.addAll(c);
93    }
94
95    /**
96     * Inserts the specified element into this delay queue.
97     *
98     * @param e the element to add
99     * @return {@code true} (as specified by {@link Collection#add})
100     * @throws NullPointerException if the specified element is null
101     */
102    public boolean add(E e) {
103        return offer(e);
104    }
105
106    /**
107     * Inserts the specified element into this delay queue.
108     *
109     * @param e the element to add
110     * @return {@code true}
111     * @throws NullPointerException if the specified element is null
112     */
113    public boolean offer(E e) {
114        final ReentrantLock lock = this.lock;
115        lock.lock();
116        try {
117            q.offer(e);
118            if (q.peek() == e) {
119                leader = null;
120                available.signal();
121            }
122            return true;
123        } finally {
124            lock.unlock();
125        }
126    }
127
128    /**
129     * Inserts the specified element into this delay queue. As the queue is
130     * unbounded this method will never block.
131     *
132     * @param e the element to add
133     * @throws NullPointerException {@inheritDoc}
134     */
135    public void put(E e) {
136        offer(e);
137    }
138
139    /**
140     * Inserts the specified element into this delay queue. As the queue is
141     * unbounded this method will never block.
142     *
143     * @param e the element to add
144     * @param timeout This parameter is ignored as the method never blocks
145     * @param unit This parameter is ignored as the method never blocks
146     * @return {@code true}
147     * @throws NullPointerException {@inheritDoc}
148     */
149    public boolean offer(E e, long timeout, TimeUnit unit) {
150        return offer(e);
151    }
152
153    /**
154     * Retrieves and removes the head of this queue, or returns {@code null}
155     * if this queue has no elements with an expired delay.
156     *
157     * @return the head of this queue, or {@code null} if this
158     *         queue has no elements with an expired delay
159     */
160    public E poll() {
161        final ReentrantLock lock = this.lock;
162        lock.lock();
163        try {
164            E first = q.peek();
165            return (first == null || first.getDelay(NANOSECONDS) > 0)
166                ? null
167                : q.poll();
168        } finally {
169            lock.unlock();
170        }
171    }
172
173    /**
174     * Retrieves and removes the head of this queue, waiting if necessary
175     * until an element with an expired delay is available on this queue.
176     *
177     * @return the head of this queue
178     * @throws InterruptedException {@inheritDoc}
179     */
180    public E take() throws InterruptedException {
181        final ReentrantLock lock = this.lock;
182        lock.lockInterruptibly();
183        try {
184            for (;;) {
185                E first = q.peek();
186                if (first == null)
187                    available.await();
188                else {
189                    long delay = first.getDelay(NANOSECONDS);
190                    if (delay <= 0L)
191                        return q.poll();
192                    first = null; // don't retain ref while waiting
193                    if (leader != null)
194                        available.await();
195                    else {
196                        Thread thisThread = Thread.currentThread();
197                        leader = thisThread;
198                        try {
199                            available.awaitNanos(delay);
200                        } finally {
201                            if (leader == thisThread)
202                                leader = null;
203                        }
204                    }
205                }
206            }
207        } finally {
208            if (leader == null && q.peek() != null)
209                available.signal();
210            lock.unlock();
211        }
212    }
213
214    /**
215     * Retrieves and removes the head of this queue, waiting if necessary
216     * until an element with an expired delay is available on this queue,
217     * or the specified wait time expires.
218     *
219     * @return the head of this queue, or {@code null} if the
220     *         specified waiting time elapses before an element with
221     *         an expired delay becomes available
222     * @throws InterruptedException {@inheritDoc}
223     */
224    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
225        long nanos = unit.toNanos(timeout);
226        final ReentrantLock lock = this.lock;
227        lock.lockInterruptibly();
228        try {
229            for (;;) {
230                E first = q.peek();
231                if (first == null) {
232                    if (nanos <= 0L)
233                        return null;
234                    else
235                        nanos = available.awaitNanos(nanos);
236                } else {
237                    long delay = first.getDelay(NANOSECONDS);
238                    if (delay <= 0L)
239                        return q.poll();
240                    if (nanos <= 0L)
241                        return null;
242                    first = null; // don't retain ref while waiting
243                    if (nanos < delay || leader != null)
244                        nanos = available.awaitNanos(nanos);
245                    else {
246                        Thread thisThread = Thread.currentThread();
247                        leader = thisThread;
248                        try {
249                            long timeLeft = available.awaitNanos(delay);
250                            nanos -= delay - timeLeft;
251                        } finally {
252                            if (leader == thisThread)
253                                leader = null;
254                        }
255                    }
256                }
257            }
258        } finally {
259            if (leader == null && q.peek() != null)
260                available.signal();
261            lock.unlock();
262        }
263    }
264
265    /**
266     * Retrieves, but does not remove, the head of this queue, or
267     * returns {@code null} if this queue is empty.  Unlike
268     * {@code poll}, if no expired elements are available in the queue,
269     * this method returns the element that will expire next,
270     * if one exists.
271     *
272     * @return the head of this queue, or {@code null} if this
273     *         queue is empty
274     */
275    public E peek() {
276        final ReentrantLock lock = this.lock;
277        lock.lock();
278        try {
279            return q.peek();
280        } finally {
281            lock.unlock();
282        }
283    }
284
285    public int size() {
286        final ReentrantLock lock = this.lock;
287        lock.lock();
288        try {
289            return q.size();
290        } finally {
291            lock.unlock();
292        }
293    }
294
295    /**
296     * Returns first element only if it is expired.
297     * Used only by drainTo.  Call only when holding lock.
298     */
299    private E peekExpired() {
300        // assert lock.isHeldByCurrentThread();
301        E first = q.peek();
302        return (first == null || first.getDelay(NANOSECONDS) > 0) ?
303            null : first;
304    }
305
306    /**
307     * @throws UnsupportedOperationException {@inheritDoc}
308     * @throws ClassCastException            {@inheritDoc}
309     * @throws NullPointerException          {@inheritDoc}
310     * @throws IllegalArgumentException      {@inheritDoc}
311     */
312    public int drainTo(Collection<? super E> c) {
313        if (c == null)
314            throw new NullPointerException();
315        if (c == this)
316            throw new IllegalArgumentException();
317        final ReentrantLock lock = this.lock;
318        lock.lock();
319        try {
320            int n = 0;
321            for (E e; (e = peekExpired()) != null;) {
322                c.add(e);       // In this order, in case add() throws.
323                q.poll();
324                ++n;
325            }
326            return n;
327        } finally {
328            lock.unlock();
329        }
330    }
331
332    /**
333     * @throws UnsupportedOperationException {@inheritDoc}
334     * @throws ClassCastException            {@inheritDoc}
335     * @throws NullPointerException          {@inheritDoc}
336     * @throws IllegalArgumentException      {@inheritDoc}
337     */
338    public int drainTo(Collection<? super E> c, int maxElements) {
339        if (c == null)
340            throw new NullPointerException();
341        if (c == this)
342            throw new IllegalArgumentException();
343        if (maxElements <= 0)
344            return 0;
345        final ReentrantLock lock = this.lock;
346        lock.lock();
347        try {
348            int n = 0;
349            for (E e; n < maxElements && (e = peekExpired()) != null;) {
350                c.add(e);       // In this order, in case add() throws.
351                q.poll();
352                ++n;
353            }
354            return n;
355        } finally {
356            lock.unlock();
357        }
358    }
359
360    /**
361     * Atomically removes all of the elements from this delay queue.
362     * The queue will be empty after this call returns.
363     * Elements with an unexpired delay are not waited for; they are
364     * simply discarded from the queue.
365     */
366    public void clear() {
367        final ReentrantLock lock = this.lock;
368        lock.lock();
369        try {
370            q.clear();
371        } finally {
372            lock.unlock();
373        }
374    }
375
376    /**
377     * Always returns {@code Integer.MAX_VALUE} because
378     * a {@code DelayQueue} is not capacity constrained.
379     *
380     * @return {@code Integer.MAX_VALUE}
381     */
382    public int remainingCapacity() {
383        return Integer.MAX_VALUE;
384    }
385
386    /**
387     * Returns an array containing all of the elements in this queue.
388     * The returned array elements are in no particular order.
389     *
390     * <p>The returned array will be "safe" in that no references to it are
391     * maintained by this queue.  (In other words, this method must allocate
392     * a new array).  The caller is thus free to modify the returned array.
393     *
394     * <p>This method acts as bridge between array-based and collection-based
395     * APIs.
396     *
397     * @return an array containing all of the elements in this queue
398     */
399    public Object[] toArray() {
400        final ReentrantLock lock = this.lock;
401        lock.lock();
402        try {
403            return q.toArray();
404        } finally {
405            lock.unlock();
406        }
407    }
408
409    /**
410     * Returns an array containing all of the elements in this queue; the
411     * runtime type of the returned array is that of the specified array.
412     * The returned array elements are in no particular order.
413     * If the queue fits in the specified array, it is returned therein.
414     * Otherwise, a new array is allocated with the runtime type of the
415     * specified array and the size of this queue.
416     *
417     * <p>If this queue fits in the specified array with room to spare
418     * (i.e., the array has more elements than this queue), the element in
419     * the array immediately following the end of the queue is set to
420     * {@code null}.
421     *
422     * <p>Like the {@link #toArray()} method, this method acts as bridge between
423     * array-based and collection-based APIs.  Further, this method allows
424     * precise control over the runtime type of the output array, and may,
425     * under certain circumstances, be used to save allocation costs.
426     *
427     * <p>The following code can be used to dump a delay queue into a newly
428     * allocated array of {@code Delayed}:
429     *
430     * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
431     *
432     * Note that {@code toArray(new Object[0])} is identical in function to
433     * {@code toArray()}.
434     *
435     * @param a the array into which the elements of the queue are to
436     *          be stored, if it is big enough; otherwise, a new array of the
437     *          same runtime type is allocated for this purpose
438     * @return an array containing all of the elements in this queue
439     * @throws ArrayStoreException if the runtime type of the specified array
440     *         is not a supertype of the runtime type of every element in
441     *         this queue
442     * @throws NullPointerException if the specified array is null
443     */
444    public <T> T[] toArray(T[] a) {
445        final ReentrantLock lock = this.lock;
446        lock.lock();
447        try {
448            return q.toArray(a);
449        } finally {
450            lock.unlock();
451        }
452    }
453
454    /**
455     * Removes a single instance of the specified element from this
456     * queue, if it is present, whether or not it has expired.
457     */
458    public boolean remove(Object o) {
459        final ReentrantLock lock = this.lock;
460        lock.lock();
461        try {
462            return q.remove(o);
463        } finally {
464            lock.unlock();
465        }
466    }
467
468    /**
469     * Identity-based version for use in Itr.remove.
470     */
471    void removeEQ(Object o) {
472        final ReentrantLock lock = this.lock;
473        lock.lock();
474        try {
475            for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
476                if (o == it.next()) {
477                    it.remove();
478                    break;
479                }
480            }
481        } finally {
482            lock.unlock();
483        }
484    }
485
486    /**
487     * Returns an iterator over all the elements (both expired and
488     * unexpired) in this queue. The iterator does not return the
489     * elements in any particular order.
490     *
491     * <p>The returned iterator is
492     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
493     *
494     * @return an iterator over the elements in this queue
495     */
496    public Iterator<E> iterator() {
497        return new Itr(toArray());
498    }
499
500    /**
501     * Snapshot iterator that works off copy of underlying q array.
502     */
503    private class Itr implements Iterator<E> {
504        final Object[] array; // Array of all elements
505        int cursor;           // index of next element to return
506        int lastRet;          // index of last element, or -1 if no such
507
508        Itr(Object[] array) {
509            lastRet = -1;
510            this.array = array;
511        }
512
513        public boolean hasNext() {
514            return cursor < array.length;
515        }
516
517        @SuppressWarnings("unchecked")
518        public E next() {
519            if (cursor >= array.length)
520                throw new NoSuchElementException();
521            lastRet = cursor;
522            return (E)array[cursor++];
523        }
524
525        public void remove() {
526            if (lastRet < 0)
527                throw new IllegalStateException();
528            removeEQ(array[lastRet]);
529            lastRet = -1;
530        }
531    }
532
533}
534