1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8
9/**
10 * A {@link CompletionService} that uses a supplied {@link Executor}
11 * to execute tasks.  This class arranges that submitted tasks are,
12 * upon completion, placed on a queue accessible using {@code take}.
13 * The class is lightweight enough to be suitable for transient use
14 * when processing groups of tasks.
15 *
16 * <p>
17 *
18 * <b>Usage Examples.</b>
19 *
20 * Suppose you have a set of solvers for a certain problem, each
21 * returning a value of some type {@code Result}, and would like to
22 * run them concurrently, processing the results of each of them that
23 * return a non-null value, in some method {@code use(Result r)}. You
24 * could write this as:
25 *
26 * <pre> {@code
27 * void solve(Executor e,
28 *            Collection<Callable<Result>> solvers)
29 *     throws InterruptedException, ExecutionException {
30 *   CompletionService<Result> ecs
31 *       = new ExecutorCompletionService<Result>(e);
32 *   for (Callable<Result> s : solvers)
33 *     ecs.submit(s);
34 *   int n = solvers.size();
35 *   for (int i = 0; i < n; ++i) {
36 *     Result r = ecs.take().get();
37 *     if (r != null)
38 *       use(r);
39 *   }
40 * }}</pre>
41 *
42 * Suppose instead that you would like to use the first non-null result
43 * of the set of tasks, ignoring any that encounter exceptions,
44 * and cancelling all other tasks when the first one is ready:
45 *
46 * <pre> {@code
47 * void solve(Executor e,
48 *            Collection<Callable<Result>> solvers)
49 *     throws InterruptedException {
50 *   CompletionService<Result> ecs
51 *       = new ExecutorCompletionService<Result>(e);
52 *   int n = solvers.size();
53 *   List<Future<Result>> futures = new ArrayList<>(n);
54 *   Result result = null;
55 *   try {
56 *     for (Callable<Result> s : solvers)
57 *       futures.add(ecs.submit(s));
58 *     for (int i = 0; i < n; ++i) {
59 *       try {
60 *         Result r = ecs.take().get();
61 *         if (r != null) {
62 *           result = r;
63 *           break;
64 *         }
65 *       } catch (ExecutionException ignore) {}
66 *     }
67 *   }
68 *   finally {
69 *     for (Future<Result> f : futures)
70 *       f.cancel(true);
71 *   }
72 *
73 *   if (result != null)
74 *     use(result);
75 * }}</pre>
76 */
77public class ExecutorCompletionService<V> implements CompletionService<V> {
78    private final Executor executor;
79    private final AbstractExecutorService aes;
80    private final BlockingQueue<Future<V>> completionQueue;
81
82    /**
83     * FutureTask extension to enqueue upon completion.
84     */
85    private static class QueueingFuture<V> extends FutureTask<Void> {
86        QueueingFuture(RunnableFuture<V> task,
87                       BlockingQueue<Future<V>> completionQueue) {
88            super(task, null);
89            this.task = task;
90            this.completionQueue = completionQueue;
91        }
92        private final Future<V> task;
93        private final BlockingQueue<Future<V>> completionQueue;
94        protected void done() { completionQueue.add(task); }
95    }
96
97    private RunnableFuture<V> newTaskFor(Callable<V> task) {
98        if (aes == null)
99            return new FutureTask<V>(task);
100        else
101            return aes.newTaskFor(task);
102    }
103
104    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
105        if (aes == null)
106            return new FutureTask<V>(task, result);
107        else
108            return aes.newTaskFor(task, result);
109    }
110
111    /**
112     * Creates an ExecutorCompletionService using the supplied
113     * executor for base task execution and a
114     * {@link LinkedBlockingQueue} as a completion queue.
115     *
116     * @param executor the executor to use
117     * @throws NullPointerException if executor is {@code null}
118     */
119    public ExecutorCompletionService(Executor executor) {
120        if (executor == null)
121            throw new NullPointerException();
122        this.executor = executor;
123        this.aes = (executor instanceof AbstractExecutorService) ?
124            (AbstractExecutorService) executor : null;
125        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
126    }
127
128    /**
129     * Creates an ExecutorCompletionService using the supplied
130     * executor for base task execution and the supplied queue as its
131     * completion queue.
132     *
133     * @param executor the executor to use
134     * @param completionQueue the queue to use as the completion queue
135     *        normally one dedicated for use by this service. This
136     *        queue is treated as unbounded -- failed attempted
137     *        {@code Queue.add} operations for completed tasks cause
138     *        them not to be retrievable.
139     * @throws NullPointerException if executor or completionQueue are {@code null}
140     */
141    public ExecutorCompletionService(Executor executor,
142                                     BlockingQueue<Future<V>> completionQueue) {
143        if (executor == null || completionQueue == null)
144            throw new NullPointerException();
145        this.executor = executor;
146        this.aes = (executor instanceof AbstractExecutorService) ?
147            (AbstractExecutorService) executor : null;
148        this.completionQueue = completionQueue;
149    }
150
151    public Future<V> submit(Callable<V> task) {
152        if (task == null) throw new NullPointerException();
153        RunnableFuture<V> f = newTaskFor(task);
154        executor.execute(new QueueingFuture<V>(f, completionQueue));
155        return f;
156    }
157
158    public Future<V> submit(Runnable task, V result) {
159        if (task == null) throw new NullPointerException();
160        RunnableFuture<V> f = newTaskFor(task, result);
161        executor.execute(new QueueingFuture<V>(f, completionQueue));
162        return f;
163    }
164
165    public Future<V> take() throws InterruptedException {
166        return completionQueue.take();
167    }
168
169    public Future<V> poll() {
170        return completionQueue.poll();
171    }
172
173    public Future<V> poll(long timeout, TimeUnit unit)
174            throws InterruptedException {
175        return completionQueue.poll(timeout, unit);
176    }
177
178}
179