AbstractListeningExecutorService.java revision 1d580d0f6ee4f21eb309ba7b509d2c6d671c4044
1/* 2 * This file is a modified version of 3 * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.35 4 * which contained the following notice: 5 * 6 * Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to the 7 * public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/ 8 * 9 * Rationale for copying: 10 * Guava targets JDK5, whose AbstractExecutorService class lacks the newTaskFor protected 11 * customization methods needed by MoreExecutors.listeningDecorator. This class is a copy of 12 * AbstractExecutorService from the JSR166 CVS repository. It contains the desired methods. 13 */ 14 15package com.google.common.util.concurrent; 16 17import static com.google.common.base.Preconditions.checkArgument; 18 19import java.util.ArrayList; 20import java.util.Collection; 21import java.util.Iterator; 22import java.util.List; 23import java.util.concurrent.Callable; 24import java.util.concurrent.CancellationException; 25import java.util.concurrent.ExecutionException; 26import java.util.concurrent.ExecutorCompletionService; 27import java.util.concurrent.Future; 28import java.util.concurrent.TimeUnit; 29import java.util.concurrent.TimeoutException; 30 31/** 32 * Implements {@link ListeningExecutorService} execution methods atop the abstract {@link #execute} 33 * method. More concretely, the {@code submit}, {@code invokeAny} and {@code invokeAll} methods 34 * create {@link ListenableFutureTask} instances and pass them to {@link #execute}. 35 * 36 * <p>In addition to {@link #execute}, subclasses must implement all methods related to shutdown and 37 * termination. 38 * 39 * @author Doug Lea 40 */ 41abstract class AbstractListeningExecutorService implements ListeningExecutorService { 42 @Override public ListenableFuture<?> submit(Runnable task) { 43 ListenableFutureTask<Void> ftask = ListenableFutureTask.create(task, null); 44 execute(ftask); 45 return ftask; 46 } 47 48 @Override public <T> ListenableFuture<T> submit(Runnable task, T result) { 49 ListenableFutureTask<T> ftask = ListenableFutureTask.create(task, result); 50 execute(ftask); 51 return ftask; 52 } 53 54 @Override public <T> ListenableFuture<T> submit(Callable<T> task) { 55 ListenableFutureTask<T> ftask = ListenableFutureTask.create(task); 56 execute(ftask); 57 return ftask; 58 } 59 60 /** 61 * the main mechanics of invokeAny. 62 */ 63 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) 64 throws InterruptedException, ExecutionException, TimeoutException { 65 int ntasks = tasks.size(); 66 checkArgument(ntasks > 0); 67 List<Future<T>> futures = new ArrayList<Future<T>>(ntasks); 68 ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); 69 70 // For efficiency, especially in executors with limited 71 // parallelism, check to see if previously submitted tasks are 72 // done before submitting more of them. This interleaving 73 // plus the exception mechanics account for messiness of main 74 // loop. 75 76 try { 77 // Record exceptions so that if we fail to obtain any 78 // result, we can throw the last exception we got. 79 ExecutionException ee = null; 80 long lastTime = timed ? System.nanoTime() : 0; 81 Iterator<? extends Callable<T>> it = tasks.iterator(); 82 83 // Start one task for sure; the rest incrementally 84 futures.add(ecs.submit(it.next())); 85 --ntasks; 86 int active = 1; 87 88 for (;;) { 89 Future<T> f = ecs.poll(); 90 if (f == null) { 91 if (ntasks > 0) { 92 --ntasks; 93 futures.add(ecs.submit(it.next())); 94 ++active; 95 } else if (active == 0) { 96 break; 97 } else if (timed) { 98 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 99 if (f == null) { 100 throw new TimeoutException(); 101 } 102 long now = System.nanoTime(); 103 nanos -= now - lastTime; 104 lastTime = now; 105 } else { 106 f = ecs.take(); 107 } 108 } 109 if (f != null) { 110 --active; 111 try { 112 return f.get(); 113 } catch (ExecutionException eex) { 114 ee = eex; 115 } catch (RuntimeException rex) { 116 ee = new ExecutionException(rex); 117 } 118 } 119 } 120 121 if (ee == null) { 122 ee = new ExecutionException(null); 123 } 124 throw ee; 125 } finally { 126 for (Future<T> f : futures) { 127 f.cancel(true); 128 } 129 } 130 } 131 132 @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 133 throws InterruptedException, ExecutionException { 134 try { 135 return doInvokeAny(tasks, false, 0); 136 } catch (TimeoutException cannotHappen) { 137 throw new AssertionError(); 138 } 139 } 140 141 @Override public <T> T invokeAny( 142 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 143 throws InterruptedException, ExecutionException, TimeoutException { 144 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 145 } 146 147 @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 148 throws InterruptedException { 149 if (tasks == null) { 150 throw new NullPointerException(); 151 } 152 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 153 boolean done = false; 154 try { 155 for (Callable<T> t : tasks) { 156 ListenableFutureTask<T> f = ListenableFutureTask.create(t); 157 futures.add(f); 158 execute(f); 159 } 160 for (Future<T> f : futures) { 161 if (!f.isDone()) { 162 try { 163 f.get(); 164 } catch (CancellationException ignore) { 165 } catch (ExecutionException ignore) { 166 } 167 } 168 } 169 done = true; 170 return futures; 171 } finally { 172 if (!done) { 173 for (Future<T> f : futures) { 174 f.cancel(true); 175 } 176 } 177 } 178 } 179 180 @Override public <T> List<Future<T>> invokeAll( 181 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 182 throws InterruptedException { 183 if (tasks == null || unit == null) { 184 throw new NullPointerException(); 185 } 186 long nanos = unit.toNanos(timeout); 187 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 188 boolean done = false; 189 try { 190 for (Callable<T> t : tasks) { 191 futures.add(ListenableFutureTask.create(t)); 192 } 193 194 long lastTime = System.nanoTime(); 195 196 // Interleave time checks and calls to execute in case 197 // executor doesn't have any/much parallelism. 198 Iterator<Future<T>> it = futures.iterator(); 199 while (it.hasNext()) { 200 execute((Runnable) (it.next())); 201 long now = System.nanoTime(); 202 nanos -= now - lastTime; 203 lastTime = now; 204 if (nanos <= 0) { 205 return futures; 206 } 207 } 208 209 for (Future<T> f : futures) { 210 if (!f.isDone()) { 211 if (nanos <= 0) { 212 return futures; 213 } 214 try { 215 f.get(nanos, TimeUnit.NANOSECONDS); 216 } catch (CancellationException ignore) { 217 } catch (ExecutionException ignore) { 218 } catch (TimeoutException toe) { 219 return futures; 220 } 221 long now = System.nanoTime(); 222 nanos -= now - lastTime; 223 lastTime = now; 224 } 225 } 226 done = true; 227 return futures; 228 } finally { 229 if (!done) { 230 for (Future<T> f : futures) { 231 f.cancel(true); 232 } 233 } 234 } 235 } 236} 237