AbstractExecutorService.java revision 51b1b6997fd3f980076b8081f7f1165ccc2a4008
1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25/* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36package java.util.concurrent; 37import java.util.*; 38 39/** 40 * Provides default implementations of {@link ExecutorService} 41 * execution methods. This class implements the <tt>submit</tt>, 42 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a 43 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults 44 * to the {@link FutureTask} class provided in this package. For example, 45 * the implementation of <tt>submit(Runnable)</tt> creates an 46 * associated <tt>RunnableFuture</tt> that is executed and 47 * returned. Subclasses may override the <tt>newTaskFor</tt> methods 48 * to return <tt>RunnableFuture</tt> implementations other than 49 * <tt>FutureTask</tt>. 50 * 51 * <p> <b>Extension example</b>. Here is a sketch of a class 52 * that customizes {@link ThreadPoolExecutor} to use 53 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>: 54 * <pre> {@code 55 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 56 * 57 * static class CustomTask<V> implements RunnableFuture<V> {...} 58 * 59 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 60 * return new CustomTask<V>(c); 61 * } 62 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { 63 * return new CustomTask<V>(r, v); 64 * } 65 * // ... add constructors, etc. 66 * }}</pre> 67 * 68 * @since 1.5 69 * @author Doug Lea 70 */ 71public abstract class AbstractExecutorService implements ExecutorService { 72 73 /** 74 * Returns a <tt>RunnableFuture</tt> for the given runnable and default 75 * value. 76 * 77 * @param runnable the runnable task being wrapped 78 * @param value the default value for the returned future 79 * @return a <tt>RunnableFuture</tt> which when run will run the 80 * underlying runnable and which, as a <tt>Future</tt>, will yield 81 * the given value as its result and provide for cancellation of 82 * the underlying task. 83 * @since 1.6 84 */ 85 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 86 return new FutureTask<T>(runnable, value); 87 } 88 89 /** 90 * Returns a <tt>RunnableFuture</tt> for the given callable task. 91 * 92 * @param callable the callable task being wrapped 93 * @return a <tt>RunnableFuture</tt> which when run will call the 94 * underlying callable and which, as a <tt>Future</tt>, will yield 95 * the callable's result as its result and provide for 96 * cancellation of the underlying task. 97 * @since 1.6 98 */ 99 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 100 return new FutureTask<T>(callable); 101 } 102 103 /** 104 * @throws RejectedExecutionException {@inheritDoc} 105 * @throws NullPointerException {@inheritDoc} 106 */ 107 public Future<?> submit(Runnable task) { 108 if (task == null) throw new NullPointerException(); 109 RunnableFuture<Void> ftask = newTaskFor(task, null); 110 execute(ftask); 111 return ftask; 112 } 113 114 /** 115 * @throws RejectedExecutionException {@inheritDoc} 116 * @throws NullPointerException {@inheritDoc} 117 */ 118 public <T> Future<T> submit(Runnable task, T result) { 119 if (task == null) throw new NullPointerException(); 120 RunnableFuture<T> ftask = newTaskFor(task, result); 121 execute(ftask); 122 return ftask; 123 } 124 125 /** 126 * @throws RejectedExecutionException {@inheritDoc} 127 * @throws NullPointerException {@inheritDoc} 128 */ 129 public <T> Future<T> submit(Callable<T> task) { 130 if (task == null) throw new NullPointerException(); 131 RunnableFuture<T> ftask = newTaskFor(task); 132 execute(ftask); 133 return ftask; 134 } 135 136 /** 137 * the main mechanics of invokeAny. 138 */ 139 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 140 boolean timed, long nanos) 141 throws InterruptedException, ExecutionException, TimeoutException { 142 if (tasks == null) 143 throw new NullPointerException(); 144 int ntasks = tasks.size(); 145 if (ntasks == 0) 146 throw new IllegalArgumentException(); 147 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); 148 ExecutorCompletionService<T> ecs = 149 new ExecutorCompletionService<T>(this); 150 151 // For efficiency, especially in executors with limited 152 // parallelism, check to see if previously submitted tasks are 153 // done before submitting more of them. This interleaving 154 // plus the exception mechanics account for messiness of main 155 // loop. 156 157 try { 158 // Record exceptions so that if we fail to obtain any 159 // result, we can throw the last exception we got. 160 ExecutionException ee = null; 161 long lastTime = timed ? System.nanoTime() : 0; 162 Iterator<? extends Callable<T>> it = tasks.iterator(); 163 164 // Start one task for sure; the rest incrementally 165 futures.add(ecs.submit(it.next())); 166 --ntasks; 167 int active = 1; 168 169 for (;;) { 170 Future<T> f = ecs.poll(); 171 if (f == null) { 172 if (ntasks > 0) { 173 --ntasks; 174 futures.add(ecs.submit(it.next())); 175 ++active; 176 } 177 else if (active == 0) 178 break; 179 else if (timed) { 180 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 181 if (f == null) 182 throw new TimeoutException(); 183 long now = System.nanoTime(); 184 nanos -= now - lastTime; 185 lastTime = now; 186 } 187 else 188 f = ecs.take(); 189 } 190 if (f != null) { 191 --active; 192 try { 193 return f.get(); 194 } catch (ExecutionException eex) { 195 ee = eex; 196 } catch (RuntimeException rex) { 197 ee = new ExecutionException(rex); 198 } 199 } 200 } 201 202 if (ee == null) 203 ee = new ExecutionException(); 204 throw ee; 205 206 } finally { 207 for (Future<T> f : futures) 208 f.cancel(true); 209 } 210 } 211 212 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 213 throws InterruptedException, ExecutionException { 214 try { 215 return doInvokeAny(tasks, false, 0); 216 } catch (TimeoutException cannotHappen) { 217 assert false; 218 return null; 219 } 220 } 221 222 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 223 long timeout, TimeUnit unit) 224 throws InterruptedException, ExecutionException, TimeoutException { 225 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 226 } 227 228 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 229 throws InterruptedException { 230 if (tasks == null) 231 throw new NullPointerException(); 232 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 233 boolean done = false; 234 try { 235 for (Callable<T> t : tasks) { 236 RunnableFuture<T> f = newTaskFor(t); 237 futures.add(f); 238 execute(f); 239 } 240 for (Future<T> f : futures) { 241 if (!f.isDone()) { 242 try { 243 f.get(); 244 } catch (CancellationException ignore) { 245 } catch (ExecutionException ignore) { 246 } 247 } 248 } 249 done = true; 250 return futures; 251 } finally { 252 if (!done) 253 for (Future<T> f : futures) 254 f.cancel(true); 255 } 256 } 257 258 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 259 long timeout, TimeUnit unit) 260 throws InterruptedException { 261 if (tasks == null || unit == null) 262 throw new NullPointerException(); 263 long nanos = unit.toNanos(timeout); 264 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 265 boolean done = false; 266 try { 267 for (Callable<T> t : tasks) 268 futures.add(newTaskFor(t)); 269 270 long lastTime = System.nanoTime(); 271 272 // Interleave time checks and calls to execute in case 273 // executor doesn't have any/much parallelism. 274 Iterator<Future<T>> it = futures.iterator(); 275 while (it.hasNext()) { 276 execute((Runnable)(it.next())); 277 long now = System.nanoTime(); 278 nanos -= now - lastTime; 279 lastTime = now; 280 if (nanos <= 0) 281 return futures; 282 } 283 284 for (Future<T> f : futures) { 285 if (!f.isDone()) { 286 if (nanos <= 0) 287 return futures; 288 try { 289 f.get(nanos, TimeUnit.NANOSECONDS); 290 } catch (CancellationException ignore) { 291 } catch (ExecutionException ignore) { 292 } catch (TimeoutException toe) { 293 return futures; 294 } 295 long now = System.nanoTime(); 296 nanos -= now - lastTime; 297 lastTime = now; 298 } 299 } 300 done = true; 301 return futures; 302 } finally { 303 if (!done) 304 for (Future<T> f : futures) 305 f.cancel(true); 306 } 307 } 308 309} 310