AbstractExecutorService.java revision 51b1b6997fd3f980076b8081f7f1165ccc2a4008
1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37import java.util.*;
38
39/**
40 * Provides default implementations of {@link ExecutorService}
41 * execution methods. This class implements the <tt>submit</tt>,
42 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
43 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
44 * to the {@link FutureTask} class provided in this package.  For example,
45 * the implementation of <tt>submit(Runnable)</tt> creates an
46 * associated <tt>RunnableFuture</tt> that is executed and
47 * returned. Subclasses may override the <tt>newTaskFor</tt> methods
48 * to return <tt>RunnableFuture</tt> implementations other than
49 * <tt>FutureTask</tt>.
50 *
51 * <p> <b>Extension example</b>. Here is a sketch of a class
52 * that customizes {@link ThreadPoolExecutor} to use
53 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
54 *  <pre> {@code
55 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
56 *
57 *   static class CustomTask<V> implements RunnableFuture<V> {...}
58 *
59 *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
60 *       return new CustomTask<V>(c);
61 *   }
62 *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
63 *       return new CustomTask<V>(r, v);
64 *   }
65 *   // ... add constructors, etc.
66 * }}</pre>
67 *
68 * @since 1.5
69 * @author Doug Lea
70 */
71public abstract class AbstractExecutorService implements ExecutorService {
72
73    /**
74     * Returns a <tt>RunnableFuture</tt> for the given runnable and default
75     * value.
76     *
77     * @param runnable the runnable task being wrapped
78     * @param value the default value for the returned future
79     * @return a <tt>RunnableFuture</tt> which when run will run the
80     * underlying runnable and which, as a <tt>Future</tt>, will yield
81     * the given value as its result and provide for cancellation of
82     * the underlying task.
83     * @since 1.6
84     */
85    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
86        return new FutureTask<T>(runnable, value);
87    }
88
89    /**
90     * Returns a <tt>RunnableFuture</tt> for the given callable task.
91     *
92     * @param callable the callable task being wrapped
93     * @return a <tt>RunnableFuture</tt> which when run will call the
94     * underlying callable and which, as a <tt>Future</tt>, will yield
95     * the callable's result as its result and provide for
96     * cancellation of the underlying task.
97     * @since 1.6
98     */
99    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
100        return new FutureTask<T>(callable);
101    }
102
103    /**
104     * @throws RejectedExecutionException {@inheritDoc}
105     * @throws NullPointerException       {@inheritDoc}
106     */
107    public Future<?> submit(Runnable task) {
108        if (task == null) throw new NullPointerException();
109        RunnableFuture<Void> ftask = newTaskFor(task, null);
110        execute(ftask);
111        return ftask;
112    }
113
114    /**
115     * @throws RejectedExecutionException {@inheritDoc}
116     * @throws NullPointerException       {@inheritDoc}
117     */
118    public <T> Future<T> submit(Runnable task, T result) {
119        if (task == null) throw new NullPointerException();
120        RunnableFuture<T> ftask = newTaskFor(task, result);
121        execute(ftask);
122        return ftask;
123    }
124
125    /**
126     * @throws RejectedExecutionException {@inheritDoc}
127     * @throws NullPointerException       {@inheritDoc}
128     */
129    public <T> Future<T> submit(Callable<T> task) {
130        if (task == null) throw new NullPointerException();
131        RunnableFuture<T> ftask = newTaskFor(task);
132        execute(ftask);
133        return ftask;
134    }
135
136    /**
137     * the main mechanics of invokeAny.
138     */
139    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
140                            boolean timed, long nanos)
141        throws InterruptedException, ExecutionException, TimeoutException {
142        if (tasks == null)
143            throw new NullPointerException();
144        int ntasks = tasks.size();
145        if (ntasks == 0)
146            throw new IllegalArgumentException();
147        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
148        ExecutorCompletionService<T> ecs =
149            new ExecutorCompletionService<T>(this);
150
151        // For efficiency, especially in executors with limited
152        // parallelism, check to see if previously submitted tasks are
153        // done before submitting more of them. This interleaving
154        // plus the exception mechanics account for messiness of main
155        // loop.
156
157        try {
158            // Record exceptions so that if we fail to obtain any
159            // result, we can throw the last exception we got.
160            ExecutionException ee = null;
161            long lastTime = timed ? System.nanoTime() : 0;
162            Iterator<? extends Callable<T>> it = tasks.iterator();
163
164            // Start one task for sure; the rest incrementally
165            futures.add(ecs.submit(it.next()));
166            --ntasks;
167            int active = 1;
168
169            for (;;) {
170                Future<T> f = ecs.poll();
171                if (f == null) {
172                    if (ntasks > 0) {
173                        --ntasks;
174                        futures.add(ecs.submit(it.next()));
175                        ++active;
176                    }
177                    else if (active == 0)
178                        break;
179                    else if (timed) {
180                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
181                        if (f == null)
182                            throw new TimeoutException();
183                        long now = System.nanoTime();
184                        nanos -= now - lastTime;
185                        lastTime = now;
186                    }
187                    else
188                        f = ecs.take();
189                }
190                if (f != null) {
191                    --active;
192                    try {
193                        return f.get();
194                    } catch (ExecutionException eex) {
195                        ee = eex;
196                    } catch (RuntimeException rex) {
197                        ee = new ExecutionException(rex);
198                    }
199                }
200            }
201
202            if (ee == null)
203                ee = new ExecutionException();
204            throw ee;
205
206        } finally {
207            for (Future<T> f : futures)
208                f.cancel(true);
209        }
210    }
211
212    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
213        throws InterruptedException, ExecutionException {
214        try {
215            return doInvokeAny(tasks, false, 0);
216        } catch (TimeoutException cannotHappen) {
217            assert false;
218            return null;
219        }
220    }
221
222    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
223                           long timeout, TimeUnit unit)
224        throws InterruptedException, ExecutionException, TimeoutException {
225        return doInvokeAny(tasks, true, unit.toNanos(timeout));
226    }
227
228    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
229        throws InterruptedException {
230        if (tasks == null)
231            throw new NullPointerException();
232        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
233        boolean done = false;
234        try {
235            for (Callable<T> t : tasks) {
236                RunnableFuture<T> f = newTaskFor(t);
237                futures.add(f);
238                execute(f);
239            }
240            for (Future<T> f : futures) {
241                if (!f.isDone()) {
242                    try {
243                        f.get();
244                    } catch (CancellationException ignore) {
245                    } catch (ExecutionException ignore) {
246                    }
247                }
248            }
249            done = true;
250            return futures;
251        } finally {
252            if (!done)
253                for (Future<T> f : futures)
254                    f.cancel(true);
255        }
256    }
257
258    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
259                                         long timeout, TimeUnit unit)
260        throws InterruptedException {
261        if (tasks == null || unit == null)
262            throw new NullPointerException();
263        long nanos = unit.toNanos(timeout);
264        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
265        boolean done = false;
266        try {
267            for (Callable<T> t : tasks)
268                futures.add(newTaskFor(t));
269
270            long lastTime = System.nanoTime();
271
272            // Interleave time checks and calls to execute in case
273            // executor doesn't have any/much parallelism.
274            Iterator<Future<T>> it = futures.iterator();
275            while (it.hasNext()) {
276                execute((Runnable)(it.next()));
277                long now = System.nanoTime();
278                nanos -= now - lastTime;
279                lastTime = now;
280                if (nanos <= 0)
281                    return futures;
282            }
283
284            for (Future<T> f : futures) {
285                if (!f.isDone()) {
286                    if (nanos <= 0)
287                        return futures;
288                    try {
289                        f.get(nanos, TimeUnit.NANOSECONDS);
290                    } catch (CancellationException ignore) {
291                    } catch (ExecutionException ignore) {
292                    } catch (TimeoutException toe) {
293                        return futures;
294                    }
295                    long now = System.nanoTime();
296                    nanos -= now - lastTime;
297                    lastTime = now;
298                }
299            }
300            done = true;
301            return futures;
302        } finally {
303            if (!done)
304                for (Future<T> f : futures)
305                    f.cancel(true);
306        }
307    }
308
309}
310