Queues.java revision 1d580d0f6ee4f21eb309ba7b509d2c6d671c4044
1/*
2 * Copyright (C) 2011 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15package com.google.common.collect;
16
17import com.google.common.annotations.Beta;
18import com.google.common.base.Preconditions;
19
20import java.util.ArrayDeque;
21import java.util.Collection;
22import java.util.PriorityQueue;
23import java.util.Queue;
24import java.util.concurrent.ArrayBlockingQueue;
25import java.util.concurrent.BlockingQueue;
26import java.util.concurrent.ConcurrentLinkedQueue;
27import java.util.concurrent.LinkedBlockingDeque;
28import java.util.concurrent.LinkedBlockingQueue;
29import java.util.concurrent.PriorityBlockingQueue;
30import java.util.concurrent.SynchronousQueue;
31import java.util.concurrent.TimeUnit;
32
33/**
34 * Static utility methods pertaining to {@link Queue}
35 * instances. Also see this class's counterparts
36 * {@link Lists}, {@link Sets}, and {@link Maps}.
37 *
38 * @author Kurt Alfred Kluever
39 * @since 11.0
40 */
41@Beta
42public final class Queues {
43  private Queues() {}
44
45  // ArrayBlockingQueue
46
47  /**
48   * Creates an empty {@code ArrayBlockingQueue} instance.
49   *
50   * @return a new, empty {@code ArrayBlockingQueue}
51   */
52  public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
53    return new ArrayBlockingQueue<E>(capacity);
54  }
55
56  // ArrayDeque
57
58  // ConcurrentLinkedQueue
59
60  /**
61   * Creates an empty {@code ConcurrentLinkedQueue} instance.
62   *
63   * @return a new, empty {@code ConcurrentLinkedQueue}
64   */
65  public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
66    return new ConcurrentLinkedQueue<E>();
67  }
68
69  /**
70   * Creates an {@code ConcurrentLinkedQueue} instance containing the given elements.
71   *
72   * @param elements the elements that the queue should contain, in order
73   * @return a new {@code ConcurrentLinkedQueue} containing those elements
74   */
75  public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
76      Iterable<? extends E> elements) {
77    if (elements instanceof Collection) {
78      return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
79    }
80    ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
81    Iterables.addAll(queue, elements);
82    return queue;
83  }
84
85  // LinkedBlockingDeque
86
87  // LinkedBlockingQueue
88
89  /**
90   * Creates an empty {@code LinkedBlockingQueue} instance.
91   *
92   * @return a new, empty {@code LinkedBlockingQueue}
93   */
94  public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
95    return new LinkedBlockingQueue<E>();
96  }
97
98  /**
99   * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
100   *
101   * @param capacity the capacity of this queue
102   * @return a new, empty {@code LinkedBlockingQueue}
103   * @throws IllegalArgumentException if {@code capacity} is less than 1
104   */
105  public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
106    return new LinkedBlockingQueue<E>(capacity);
107  }
108
109  /**
110   * Creates an {@code LinkedBlockingQueue} instance containing the given elements.
111   *
112   * @param elements the elements that the queue should contain, in order
113   * @return a new {@code LinkedBlockingQueue} containing those elements
114   */
115  public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
116    if (elements instanceof Collection) {
117      return new LinkedBlockingQueue<E>(Collections2.cast(elements));
118    }
119    LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
120    Iterables.addAll(queue, elements);
121    return queue;
122  }
123
124  // LinkedList: see {@link com.google.common.collect.Lists}
125
126  // PriorityBlockingQueue
127
128  /**
129   * Creates an empty {@code PriorityBlockingQueue} instance.
130   *
131   * @return a new, empty {@code PriorityBlockingQueue}
132   */
133  public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
134    return new PriorityBlockingQueue<E>();
135  }
136
137  /**
138   * Creates an {@code PriorityBlockingQueue} instance containing the given elements.
139   *
140   * @param elements the elements that the queue should contain, in order
141   * @return a new {@code PriorityBlockingQueue} containing those elements
142   */
143  public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue(
144      Iterable<? extends E> elements) {
145    if (elements instanceof Collection) {
146      return new PriorityBlockingQueue<E>(Collections2.cast(elements));
147    }
148    PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
149    Iterables.addAll(queue, elements);
150    return queue;
151  }
152
153  // PriorityQueue
154
155  /**
156   * Creates an empty {@code PriorityQueue} instance.
157   *
158   * @return a new, empty {@code PriorityQueue}
159   */
160  public static <E> PriorityQueue<E> newPriorityQueue() {
161    return new PriorityQueue<E>();
162  }
163
164  /**
165   * Creates an {@code PriorityQueue} instance containing the given elements.
166   *
167   * @param elements the elements that the queue should contain, in order
168   * @return a new {@code PriorityQueue} containing those elements
169   */
170  public static <E> PriorityQueue<E> newPriorityQueue(Iterable<? extends E> elements) {
171    if (elements instanceof Collection) {
172      return new PriorityQueue<E>(Collections2.cast(elements));
173    }
174    PriorityQueue<E> queue = new PriorityQueue<E>();
175    Iterables.addAll(queue, elements);
176    return queue;
177  }
178
179  // SynchronousQueue
180
181  /**
182   * Creates an empty {@code SynchronousQueue} instance.
183   *
184   * @return a new, empty {@code SynchronousQueue}
185   */
186  public static <E> SynchronousQueue<E> newSynchronousQueue() {
187    return new SynchronousQueue<E>();
188  }
189
190  /**
191   * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested
192   * {@code numElements} elements are not available, it will wait for them up to the specified
193   * timeout.
194   *
195   * @param q the blocking queue to be drained
196   * @param buffer where to add the transferred elements
197   * @param numElements the number of elements to be waited for
198   * @param timeout how long to wait before giving up, in units of {@code unit}
199   * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
200   * @return the number of elements transferred
201   * @throws InterruptedException if interrupted while waiting
202   */
203  public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements,
204      long timeout, TimeUnit unit) throws InterruptedException {
205    Preconditions.checkNotNull(buffer);
206    /*
207     * This code performs one System.nanoTime() more than necessary, and in return, the time to
208     * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
209     * the timeout arbitrarily inaccurate, given a queue that is slow to drain).
210     */
211    long deadline = System.nanoTime() + unit.toNanos(timeout);
212    int added = 0;
213    while (added < numElements) {
214      // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
215      // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
216      added += q.drainTo(buffer, numElements - added);
217      if (added < numElements) { // not enough elements immediately available; will have to poll
218        E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
219        if (e == null) {
220          break; // we already waited enough, and there are no more elements in sight
221        }
222        buffer.add(e);
223        added++;
224      }
225    }
226    return added;
227  }
228
229  /**
230   * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)},
231   * but with a different behavior in case it is interrupted while waiting. In that case, the
232   * operation will continue as usual, and in the end the thread's interruption status will be set
233   * (no {@code InterruptedException} is thrown).
234   *
235   * @param q the blocking queue to be drained
236   * @param buffer where to add the transferred elements
237   * @param numElements the number of elements to be waited for
238   * @param timeout how long to wait before giving up, in units of {@code unit}
239   * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
240   * @return the number of elements transferred
241   */
242  public static <E> int drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer,
243      int numElements, long timeout, TimeUnit unit) {
244    Preconditions.checkNotNull(buffer);
245    long deadline = System.nanoTime() + unit.toNanos(timeout);
246    int added = 0;
247    boolean interrupted = false;
248    try {
249      while (added < numElements) {
250        // we could rely solely on #poll, but #drainTo might be more efficient when there are
251        // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
252        added += q.drainTo(buffer, numElements - added);
253        if (added < numElements) { // not enough elements immediately available; will have to poll
254          E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
255          while (true) {
256            try {
257              e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
258              break;
259            } catch (InterruptedException ex) {
260              interrupted = true; // note interruption and retry
261            }
262          }
263          if (e == null) {
264            break; // we already waited enough, and there are no more elements in sight
265          }
266          buffer.add(e);
267          added++;
268        }
269      }
270    } finally {
271      if (interrupted) {
272        Thread.currentThread().interrupt();
273      }
274    }
275    return added;
276  }
277}
278