1/* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7package java.util.concurrent; 8 9/** 10 * A {@link CompletionService} that uses a supplied {@link Executor} 11 * to execute tasks. This class arranges that submitted tasks are, 12 * upon completion, placed on a queue accessible using {@code take}. 13 * The class is lightweight enough to be suitable for transient use 14 * when processing groups of tasks. 15 * 16 * <p> 17 * 18 * <b>Usage Examples.</b> 19 * 20 * Suppose you have a set of solvers for a certain problem, each 21 * returning a value of some type {@code Result}, and would like to 22 * run them concurrently, processing the results of each of them that 23 * return a non-null value, in some method {@code use(Result r)}. You 24 * could write this as: 25 * 26 * <pre> {@code 27 * void solve(Executor e, 28 * Collection<Callable<Result>> solvers) 29 * throws InterruptedException, ExecutionException { 30 * CompletionService<Result> ecs 31 * = new ExecutorCompletionService<Result>(e); 32 * for (Callable<Result> s : solvers) 33 * ecs.submit(s); 34 * int n = solvers.size(); 35 * for (int i = 0; i < n; ++i) { 36 * Result r = ecs.take().get(); 37 * if (r != null) 38 * use(r); 39 * } 40 * }}</pre> 41 * 42 * Suppose instead that you would like to use the first non-null result 43 * of the set of tasks, ignoring any that encounter exceptions, 44 * and cancelling all other tasks when the first one is ready: 45 * 46 * <pre> {@code 47 * void solve(Executor e, 48 * Collection<Callable<Result>> solvers) 49 * throws InterruptedException { 50 * CompletionService<Result> ecs 51 * = new ExecutorCompletionService<Result>(e); 52 * int n = solvers.size(); 53 * List<Future<Result>> futures 54 * = new ArrayList<Future<Result>>(n); 55 * Result result = null; 56 * try { 57 * for (Callable<Result> s : solvers) 58 * futures.add(ecs.submit(s)); 59 * for (int i = 0; i < n; ++i) { 60 * try { 61 * Result r = ecs.take().get(); 62 * if (r != null) { 63 * result = r; 64 * break; 65 * } 66 * } catch (ExecutionException ignore) {} 67 * } 68 * } 69 * finally { 70 * for (Future<Result> f : futures) 71 * f.cancel(true); 72 * } 73 * 74 * if (result != null) 75 * use(result); 76 * }}</pre> 77 */ 78public class ExecutorCompletionService<V> implements CompletionService<V> { 79 private final Executor executor; 80 private final AbstractExecutorService aes; 81 private final BlockingQueue<Future<V>> completionQueue; 82 83 /** 84 * FutureTask extension to enqueue upon completion 85 */ 86 private class QueueingFuture extends FutureTask<Void> { 87 QueueingFuture(RunnableFuture<V> task) { 88 super(task, null); 89 this.task = task; 90 } 91 protected void done() { completionQueue.add(task); } 92 private final Future<V> task; 93 } 94 95 private RunnableFuture<V> newTaskFor(Callable<V> task) { 96 if (aes == null) 97 return new FutureTask<V>(task); 98 else 99 return aes.newTaskFor(task); 100 } 101 102 private RunnableFuture<V> newTaskFor(Runnable task, V result) { 103 if (aes == null) 104 return new FutureTask<V>(task, result); 105 else 106 return aes.newTaskFor(task, result); 107 } 108 109 /** 110 * Creates an ExecutorCompletionService using the supplied 111 * executor for base task execution and a 112 * {@link LinkedBlockingQueue} as a completion queue. 113 * 114 * @param executor the executor to use 115 * @throws NullPointerException if executor is {@code null} 116 */ 117 public ExecutorCompletionService(Executor executor) { 118 if (executor == null) 119 throw new NullPointerException(); 120 this.executor = executor; 121 this.aes = (executor instanceof AbstractExecutorService) ? 122 (AbstractExecutorService) executor : null; 123 this.completionQueue = new LinkedBlockingQueue<Future<V>>(); 124 } 125 126 /** 127 * Creates an ExecutorCompletionService using the supplied 128 * executor for base task execution and the supplied queue as its 129 * completion queue. 130 * 131 * @param executor the executor to use 132 * @param completionQueue the queue to use as the completion queue 133 * normally one dedicated for use by this service. This 134 * queue is treated as unbounded -- failed attempted 135 * {@code Queue.add} operations for completed taskes cause 136 * them not to be retrievable. 137 * @throws NullPointerException if executor or completionQueue are {@code null} 138 */ 139 public ExecutorCompletionService(Executor executor, 140 BlockingQueue<Future<V>> completionQueue) { 141 if (executor == null || completionQueue == null) 142 throw new NullPointerException(); 143 this.executor = executor; 144 this.aes = (executor instanceof AbstractExecutorService) ? 145 (AbstractExecutorService) executor : null; 146 this.completionQueue = completionQueue; 147 } 148 149 public Future<V> submit(Callable<V> task) { 150 if (task == null) throw new NullPointerException(); 151 RunnableFuture<V> f = newTaskFor(task); 152 executor.execute(new QueueingFuture(f)); 153 return f; 154 } 155 156 public Future<V> submit(Runnable task, V result) { 157 if (task == null) throw new NullPointerException(); 158 RunnableFuture<V> f = newTaskFor(task, result); 159 executor.execute(new QueueingFuture(f)); 160 return f; 161 } 162 163 public Future<V> take() throws InterruptedException { 164 return completionQueue.take(); 165 } 166 167 public Future<V> poll() { 168 return completionQueue.poll(); 169 } 170 171 public Future<V> poll(long timeout, TimeUnit unit) 172 throws InterruptedException { 173 return completionQueue.poll(timeout, unit); 174 } 175 176} 177