1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8
9/**
10 * A {@link CompletionService} that uses a supplied {@link Executor}
11 * to execute tasks.  This class arranges that submitted tasks are,
12 * upon completion, placed on a queue accessible using {@code take}.
13 * The class is lightweight enough to be suitable for transient use
14 * when processing groups of tasks.
15 *
16 * <p>
17 *
18 * <b>Usage Examples.</b>
19 *
20 * Suppose you have a set of solvers for a certain problem, each
21 * returning a value of some type {@code Result}, and would like to
22 * run them concurrently, processing the results of each of them that
23 * return a non-null value, in some method {@code use(Result r)}. You
24 * could write this as:
25 *
26 * <pre> {@code
27 * void solve(Executor e,
28 *            Collection<Callable<Result>> solvers)
29 *     throws InterruptedException, ExecutionException {
30 *     CompletionService<Result> ecs
31 *         = new ExecutorCompletionService<Result>(e);
32 *     for (Callable<Result> s : solvers)
33 *         ecs.submit(s);
34 *     int n = solvers.size();
35 *     for (int i = 0; i < n; ++i) {
36 *         Result r = ecs.take().get();
37 *         if (r != null)
38 *             use(r);
39 *     }
40 * }}</pre>
41 *
42 * Suppose instead that you would like to use the first non-null result
43 * of the set of tasks, ignoring any that encounter exceptions,
44 * and cancelling all other tasks when the first one is ready:
45 *
46 * <pre> {@code
47 * void solve(Executor e,
48 *            Collection<Callable<Result>> solvers)
49 *     throws InterruptedException {
50 *     CompletionService<Result> ecs
51 *         = new ExecutorCompletionService<Result>(e);
52 *     int n = solvers.size();
53 *     List<Future<Result>> futures
54 *         = new ArrayList<Future<Result>>(n);
55 *     Result result = null;
56 *     try {
57 *         for (Callable<Result> s : solvers)
58 *             futures.add(ecs.submit(s));
59 *         for (int i = 0; i < n; ++i) {
60 *             try {
61 *                 Result r = ecs.take().get();
62 *                 if (r != null) {
63 *                     result = r;
64 *                     break;
65 *                 }
66 *             } catch (ExecutionException ignore) {}
67 *         }
68 *     }
69 *     finally {
70 *         for (Future<Result> f : futures)
71 *             f.cancel(true);
72 *     }
73 *
74 *     if (result != null)
75 *         use(result);
76 * }}</pre>
77 */
78public class ExecutorCompletionService<V> implements CompletionService<V> {
79    private final Executor executor;
80    private final AbstractExecutorService aes;
81    private final BlockingQueue<Future<V>> completionQueue;
82
83    /**
84     * FutureTask extension to enqueue upon completion
85     */
86    private class QueueingFuture extends FutureTask<Void> {
87        QueueingFuture(RunnableFuture<V> task) {
88            super(task, null);
89            this.task = task;
90        }
91        protected void done() { completionQueue.add(task); }
92        private final Future<V> task;
93    }
94
95    private RunnableFuture<V> newTaskFor(Callable<V> task) {
96        if (aes == null)
97            return new FutureTask<V>(task);
98        else
99            return aes.newTaskFor(task);
100    }
101
102    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
103        if (aes == null)
104            return new FutureTask<V>(task, result);
105        else
106            return aes.newTaskFor(task, result);
107    }
108
109    /**
110     * Creates an ExecutorCompletionService using the supplied
111     * executor for base task execution and a
112     * {@link LinkedBlockingQueue} as a completion queue.
113     *
114     * @param executor the executor to use
115     * @throws NullPointerException if executor is {@code null}
116     */
117    public ExecutorCompletionService(Executor executor) {
118        if (executor == null)
119            throw new NullPointerException();
120        this.executor = executor;
121        this.aes = (executor instanceof AbstractExecutorService) ?
122            (AbstractExecutorService) executor : null;
123        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
124    }
125
126    /**
127     * Creates an ExecutorCompletionService using the supplied
128     * executor for base task execution and the supplied queue as its
129     * completion queue.
130     *
131     * @param executor the executor to use
132     * @param completionQueue the queue to use as the completion queue
133     *        normally one dedicated for use by this service. This
134     *        queue is treated as unbounded -- failed attempted
135     *        {@code Queue.add} operations for completed taskes cause
136     *        them not to be retrievable.
137     * @throws NullPointerException if executor or completionQueue are {@code null}
138     */
139    public ExecutorCompletionService(Executor executor,
140                                     BlockingQueue<Future<V>> completionQueue) {
141        if (executor == null || completionQueue == null)
142            throw new NullPointerException();
143        this.executor = executor;
144        this.aes = (executor instanceof AbstractExecutorService) ?
145            (AbstractExecutorService) executor : null;
146        this.completionQueue = completionQueue;
147    }
148
149    public Future<V> submit(Callable<V> task) {
150        if (task == null) throw new NullPointerException();
151        RunnableFuture<V> f = newTaskFor(task);
152        executor.execute(new QueueingFuture(f));
153        return f;
154    }
155
156    public Future<V> submit(Runnable task, V result) {
157        if (task == null) throw new NullPointerException();
158        RunnableFuture<V> f = newTaskFor(task, result);
159        executor.execute(new QueueingFuture(f));
160        return f;
161    }
162
163    public Future<V> take() throws InterruptedException {
164        return completionQueue.take();
165    }
166
167    public Future<V> poll() {
168        return completionQueue.poll();
169    }
170
171    public Future<V> poll(long timeout, TimeUnit unit)
172            throws InterruptedException {
173        return completionQueue.poll(timeout, unit);
174    }
175
176}
177