1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8import java.util.*;
9
10/**
11 * Provides default implementations of {@link ExecutorService}
12 * execution methods. This class implements the <tt>submit</tt>,
13 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
14 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
15 * to the {@link FutureTask} class provided in this package.  For example,
16 * the implementation of <tt>submit(Runnable)</tt> creates an
17 * associated <tt>RunnableFuture</tt> that is executed and
18 * returned. Subclasses may override the <tt>newTaskFor</tt> methods
19 * to return <tt>RunnableFuture</tt> implementations other than
20 * <tt>FutureTask</tt>.
21 *
22 * <p> <b>Extension example</b>. Here is a sketch of a class
23 * that customizes {@link ThreadPoolExecutor} to use
24 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
25 *  <pre> {@code
26 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
27 *
28 *   static class CustomTask<V> implements RunnableFuture<V> {...}
29 *
30 *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
31 *       return new CustomTask<V>(c);
32 *   }
33 *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
34 *       return new CustomTask<V>(r, v);
35 *   }
36 *   // ... add constructors, etc.
37 * }}</pre>
38 *
39 * @since 1.5
40 * @author Doug Lea
41 */
42public abstract class AbstractExecutorService implements ExecutorService {
43
44    /**
45     * Returns a <tt>RunnableFuture</tt> for the given runnable and default
46     * value.
47     *
48     * @param runnable the runnable task being wrapped
49     * @param value the default value for the returned future
50     * @return a <tt>RunnableFuture</tt> which when run will run the
51     * underlying runnable and which, as a <tt>Future</tt>, will yield
52     * the given value as its result and provide for cancellation of
53     * the underlying task.
54     * @since 1.6
55     */
56    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
57        return new FutureTask<T>(runnable, value);
58    }
59
60    /**
61     * Returns a <tt>RunnableFuture</tt> for the given callable task.
62     *
63     * @param callable the callable task being wrapped
64     * @return a <tt>RunnableFuture</tt> which when run will call the
65     * underlying callable and which, as a <tt>Future</tt>, will yield
66     * the callable's result as its result and provide for
67     * cancellation of the underlying task.
68     * @since 1.6
69     */
70    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
71        return new FutureTask<T>(callable);
72    }
73
74    /**
75     * @throws RejectedExecutionException {@inheritDoc}
76     * @throws NullPointerException       {@inheritDoc}
77     */
78    public Future<?> submit(Runnable task) {
79        if (task == null) throw new NullPointerException();
80        RunnableFuture<Void> ftask = newTaskFor(task, null);
81        execute(ftask);
82        return ftask;
83    }
84
85    /**
86     * @throws RejectedExecutionException {@inheritDoc}
87     * @throws NullPointerException       {@inheritDoc}
88     */
89    public <T> Future<T> submit(Runnable task, T result) {
90        if (task == null) throw new NullPointerException();
91        RunnableFuture<T> ftask = newTaskFor(task, result);
92        execute(ftask);
93        return ftask;
94    }
95
96    /**
97     * @throws RejectedExecutionException {@inheritDoc}
98     * @throws NullPointerException       {@inheritDoc}
99     */
100    public <T> Future<T> submit(Callable<T> task) {
101        if (task == null) throw new NullPointerException();
102        RunnableFuture<T> ftask = newTaskFor(task);
103        execute(ftask);
104        return ftask;
105    }
106
107    /**
108     * the main mechanics of invokeAny.
109     */
110    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
111                            boolean timed, long nanos)
112        throws InterruptedException, ExecutionException, TimeoutException {
113        if (tasks == null)
114            throw new NullPointerException();
115        int ntasks = tasks.size();
116        if (ntasks == 0)
117            throw new IllegalArgumentException();
118        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
119        ExecutorCompletionService<T> ecs =
120            new ExecutorCompletionService<T>(this);
121
122        // For efficiency, especially in executors with limited
123        // parallelism, check to see if previously submitted tasks are
124        // done before submitting more of them. This interleaving
125        // plus the exception mechanics account for messiness of main
126        // loop.
127
128        try {
129            // Record exceptions so that if we fail to obtain any
130            // result, we can throw the last exception we got.
131            ExecutionException ee = null;
132            long lastTime = timed ? System.nanoTime() : 0;
133            Iterator<? extends Callable<T>> it = tasks.iterator();
134
135            // Start one task for sure; the rest incrementally
136            futures.add(ecs.submit(it.next()));
137            --ntasks;
138            int active = 1;
139
140            for (;;) {
141                Future<T> f = ecs.poll();
142                if (f == null) {
143                    if (ntasks > 0) {
144                        --ntasks;
145                        futures.add(ecs.submit(it.next()));
146                        ++active;
147                    }
148                    else if (active == 0)
149                        break;
150                    else if (timed) {
151                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
152                        if (f == null)
153                            throw new TimeoutException();
154                        long now = System.nanoTime();
155                        nanos -= now - lastTime;
156                        lastTime = now;
157                    }
158                    else
159                        f = ecs.take();
160                }
161                if (f != null) {
162                    --active;
163                    try {
164                        return f.get();
165                    } catch (ExecutionException eex) {
166                        ee = eex;
167                    } catch (RuntimeException rex) {
168                        ee = new ExecutionException(rex);
169                    }
170                }
171            }
172
173            if (ee == null)
174                ee = new ExecutionException();
175            throw ee;
176
177        } finally {
178            for (Future<T> f : futures)
179                f.cancel(true);
180        }
181    }
182
183    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
184        throws InterruptedException, ExecutionException {
185        try {
186            return doInvokeAny(tasks, false, 0);
187        } catch (TimeoutException cannotHappen) {
188            assert false;
189            return null;
190        }
191    }
192
193    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
194                           long timeout, TimeUnit unit)
195        throws InterruptedException, ExecutionException, TimeoutException {
196        return doInvokeAny(tasks, true, unit.toNanos(timeout));
197    }
198
199    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
200        throws InterruptedException {
201        if (tasks == null)
202            throw new NullPointerException();
203        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
204        boolean done = false;
205        try {
206            for (Callable<T> t : tasks) {
207                RunnableFuture<T> f = newTaskFor(t);
208                futures.add(f);
209                execute(f);
210            }
211            for (Future<T> f : futures) {
212                if (!f.isDone()) {
213                    try {
214                        f.get();
215                    } catch (CancellationException ignore) {
216                    } catch (ExecutionException ignore) {
217                    }
218                }
219            }
220            done = true;
221            return futures;
222        } finally {
223            if (!done)
224                for (Future<T> f : futures)
225                    f.cancel(true);
226        }
227    }
228
229    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
230                                         long timeout, TimeUnit unit)
231        throws InterruptedException {
232        if (tasks == null || unit == null)
233            throw new NullPointerException();
234        long nanos = unit.toNanos(timeout);
235        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
236        boolean done = false;
237        try {
238            for (Callable<T> t : tasks)
239                futures.add(newTaskFor(t));
240
241            long lastTime = System.nanoTime();
242
243            // Interleave time checks and calls to execute in case
244            // executor doesn't have any/much parallelism.
245            Iterator<Future<T>> it = futures.iterator();
246            while (it.hasNext()) {
247                execute((Runnable)(it.next()));
248                long now = System.nanoTime();
249                nanos -= now - lastTime;
250                lastTime = now;
251                if (nanos <= 0)
252                    return futures;
253            }
254
255            for (Future<T> f : futures) {
256                if (!f.isDone()) {
257                    if (nanos <= 0)
258                        return futures;
259                    try {
260                        f.get(nanos, TimeUnit.NANOSECONDS);
261                    } catch (CancellationException ignore) {
262                    } catch (ExecutionException ignore) {
263                    } catch (TimeoutException toe) {
264                        return futures;
265                    }
266                    long now = System.nanoTime();
267                    nanos -= now - lastTime;
268                    lastTime = now;
269                }
270            }
271            done = true;
272            return futures;
273        } finally {
274            if (!done)
275                for (Future<T> f : futures)
276                    f.cancel(true);
277        }
278    }
279
280}
281