AbstractListeningExecutorService.java revision dbd967a6e5c96cc1a97c5521f88dc1564ba2f81b
1/*
2 * This file is a modified version of
3 * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.35
4 * which contained the following notice:
5 *
6 * Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to the
7 * public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/
8 *
9 * Rationale for copying:
10 * Guava targets JDK5, whose AbstractExecutorService class lacks the newTaskFor protected
11 * customization methods needed by MoreExecutors.listeningDecorator. This class is a copy of
12 * AbstractExecutorService from the JSR166 CVS repository. It contains the desired methods.
13 */
14
15package com.google.common.util.concurrent;
16
17import static com.google.common.base.Preconditions.checkArgument;
18
19import java.util.ArrayList;
20import java.util.Collection;
21import java.util.Iterator;
22import java.util.List;
23import java.util.concurrent.Callable;
24import java.util.concurrent.CancellationException;
25import java.util.concurrent.ExecutionException;
26import java.util.concurrent.ExecutorCompletionService;
27import java.util.concurrent.Future;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
30
31/**
32 * Implements {@link ListeningExecutorService} execution methods atop the abstract {@link #execute}
33 * method. More concretely, the {@code submit}, {@code invokeAny} and {@code invokeAll} methods
34 * create {@link ListenableFutureTask} instances and pass them to {@link #execute}.
35 *
36 * <p>In addition to {@link #execute}, subclasses must implement all methods related to shutdown and
37 * termination.
38 *
39 * @author Doug Lea
40 */
41abstract class AbstractListeningExecutorService implements ListeningExecutorService {
42  @Override public ListenableFuture<?> submit(Runnable task) {
43    ListenableFutureTask<Void> ftask = ListenableFutureTask.create(task, null);
44    execute(ftask);
45    return ftask;
46  }
47
48  @Override public <T> ListenableFuture<T> submit(Runnable task, T result) {
49    ListenableFutureTask<T> ftask = ListenableFutureTask.create(task, result);
50    execute(ftask);
51    return ftask;
52  }
53
54  @Override public <T> ListenableFuture<T> submit(Callable<T> task) {
55    ListenableFutureTask<T> ftask = ListenableFutureTask.create(task);
56    execute(ftask);
57    return ftask;
58  }
59
60  /**
61   * the main mechanics of invokeAny.
62   */
63  private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
64      throws InterruptedException, ExecutionException, TimeoutException {
65    int ntasks = tasks.size();
66    checkArgument(ntasks > 0);
67    List<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
68    ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
69
70    // For efficiency, especially in executors with limited
71    // parallelism, check to see if previously submitted tasks are
72    // done before submitting more of them. This interleaving
73    // plus the exception mechanics account for messiness of main
74    // loop.
75
76    try {
77      // Record exceptions so that if we fail to obtain any
78      // result, we can throw the last exception we got.
79      ExecutionException ee = null;
80      long lastTime = timed ? System.nanoTime() : 0;
81      Iterator<? extends Callable<T>> it = tasks.iterator();
82
83      // Start one task for sure; the rest incrementally
84      futures.add(ecs.submit(it.next()));
85      --ntasks;
86      int active = 1;
87
88      for (;;) {
89        Future<T> f = ecs.poll();
90        if (f == null) {
91          if (ntasks > 0) {
92            --ntasks;
93            futures.add(ecs.submit(it.next()));
94            ++active;
95          } else if (active == 0) {
96            break;
97          } else if (timed) {
98            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
99            if (f == null) {
100              throw new TimeoutException();
101            }
102            long now = System.nanoTime();
103            nanos -= now - lastTime;
104            lastTime = now;
105          } else {
106            f = ecs.take();
107          }
108        }
109        if (f != null) {
110          --active;
111          try {
112            return f.get();
113          } catch (ExecutionException eex) {
114            ee = eex;
115          } catch (RuntimeException rex) {
116            ee = new ExecutionException(rex);
117          }
118        }
119      }
120
121      if (ee == null) {
122        ee = new ExecutionException(null);
123      }
124      throw ee;
125    } finally {
126      for (Future<T> f : futures) {
127        f.cancel(true);
128      }
129    }
130  }
131
132  @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
133      throws InterruptedException, ExecutionException {
134    try {
135      return doInvokeAny(tasks, false, 0);
136    } catch (TimeoutException cannotHappen) {
137      throw new AssertionError();
138    }
139  }
140
141  @Override public <T> T invokeAny(
142      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
143      throws InterruptedException, ExecutionException, TimeoutException {
144    return doInvokeAny(tasks, true, unit.toNanos(timeout));
145  }
146
147  @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
148      throws InterruptedException {
149    if (tasks == null) {
150      throw new NullPointerException();
151    }
152    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
153    boolean done = false;
154    try {
155      for (Callable<T> t : tasks) {
156        ListenableFutureTask<T> f = ListenableFutureTask.create(t);
157        futures.add(f);
158        execute(f);
159      }
160      for (Future<T> f : futures) {
161        if (!f.isDone()) {
162          try {
163            f.get();
164          } catch (CancellationException ignore) {
165          } catch (ExecutionException ignore) {
166          }
167        }
168      }
169      done = true;
170      return futures;
171    } finally {
172      if (!done) {
173        for (Future<T> f : futures) {
174          f.cancel(true);
175        }
176      }
177    }
178  }
179
180  @Override public <T> List<Future<T>> invokeAll(
181      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
182      throws InterruptedException {
183    if (tasks == null || unit == null) {
184      throw new NullPointerException();
185    }
186    long nanos = unit.toNanos(timeout);
187    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
188    boolean done = false;
189    try {
190      for (Callable<T> t : tasks) {
191        futures.add(ListenableFutureTask.create(t));
192      }
193
194      long lastTime = System.nanoTime();
195
196      // Interleave time checks and calls to execute in case
197      // executor doesn't have any/much parallelism.
198      Iterator<Future<T>> it = futures.iterator();
199      while (it.hasNext()) {
200        execute((Runnable) (it.next()));
201        long now = System.nanoTime();
202        nanos -= now - lastTime;
203        lastTime = now;
204        if (nanos <= 0) {
205          return futures;
206        }
207      }
208
209      for (Future<T> f : futures) {
210        if (!f.isDone()) {
211          if (nanos <= 0) {
212            return futures;
213          }
214          try {
215            f.get(nanos, TimeUnit.NANOSECONDS);
216          } catch (CancellationException ignore) {
217          } catch (ExecutionException ignore) {
218          } catch (TimeoutException toe) {
219            return futures;
220          }
221          long now = System.nanoTime();
222          nanos -= now - lastTime;
223          lastTime = now;
224        }
225      }
226      done = true;
227      return futures;
228    } finally {
229      if (!done) {
230        for (Future<T> f : futures) {
231          f.cancel(true);
232        }
233      }
234    }
235  }
236}
237