MoreExecutors.java revision 1d580d0f6ee4f21eb309ba7b509d2c6d671c4044
1/*
2 * Copyright (C) 2007 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20
21import com.google.common.annotations.Beta;
22
23import java.util.Collections;
24import java.util.List;
25import java.util.concurrent.Callable;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.RejectedExecutionException;
29import java.util.concurrent.ScheduledExecutorService;
30import java.util.concurrent.ScheduledFuture;
31import java.util.concurrent.ScheduledThreadPoolExecutor;
32import java.util.concurrent.ThreadFactory;
33import java.util.concurrent.ThreadPoolExecutor;
34import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
35import java.util.concurrent.TimeUnit;
36import java.util.concurrent.locks.Condition;
37import java.util.concurrent.locks.Lock;
38import java.util.concurrent.locks.ReentrantLock;
39
40/**
41 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
42 * ExecutorService}, and {@link ThreadFactory}.
43 *
44 * @author Eric Fellheimer
45 * @author Kyle Littlefield
46 * @author Justin Mahoney
47 * @since 3.0
48 */
49public final class MoreExecutors {
50  private MoreExecutors() {}
51
52  /**
53   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
54   * when the application is complete.  It does so by using daemon threads and
55   * adding a shutdown hook to wait for their completion.
56   *
57   * <p>This is mainly for fixed thread pools.
58   * See {@link Executors#newFixedThreadPool(int)}.
59   *
60   * @param executor the executor to modify to make sure it exits when the
61   *        application is finished
62   * @param terminationTimeout how long to wait for the executor to
63   *        finish before terminating the JVM
64   * @param timeUnit unit of time for the time parameter
65   * @return an unmodifiable version of the input which will not hang the JVM
66   */
67  @Beta
68  public static ExecutorService getExitingExecutorService(
69      ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
70    executor.setThreadFactory(new ThreadFactoryBuilder()
71        .setDaemon(true)
72        .setThreadFactory(executor.getThreadFactory())
73        .build());
74
75    ExecutorService service = Executors.unconfigurableExecutorService(executor);
76
77    addDelayedShutdownHook(service, terminationTimeout, timeUnit);
78
79    return service;
80  }
81
82  /**
83   * Converts the given ScheduledThreadPoolExecutor into a
84   * ScheduledExecutorService that exits when the application is complete.  It
85   * does so by using daemon threads and adding a shutdown hook to wait for
86   * their completion.
87   *
88   * <p>This is mainly for fixed thread pools.
89   * See {@link Executors#newScheduledThreadPool(int)}.
90   *
91   * @param executor the executor to modify to make sure it exits when the
92   *        application is finished
93   * @param terminationTimeout how long to wait for the executor to
94   *        finish before terminating the JVM
95   * @param timeUnit unit of time for the time parameter
96   * @return an unmodifiable version of the input which will not hang the JVM
97   */
98  @Beta
99  public static ScheduledExecutorService getExitingScheduledExecutorService(
100      ScheduledThreadPoolExecutor executor, long terminationTimeout,
101      TimeUnit timeUnit) {
102    executor.setThreadFactory(new ThreadFactoryBuilder()
103        .setDaemon(true)
104        .setThreadFactory(executor.getThreadFactory())
105        .build());
106
107    ScheduledExecutorService service =
108        Executors.unconfigurableScheduledExecutorService(executor);
109
110    addDelayedShutdownHook(service, terminationTimeout, timeUnit);
111
112    return service;
113  }
114
115  /**
116   * Add a shutdown hook to wait for thread completion in the given
117   * {@link ExecutorService service}.  This is useful if the given service uses
118   * daemon threads, and we want to keep the JVM from exiting immediately on
119   * shutdown, instead giving these daemon threads a chance to terminate
120   * normally.
121   * @param service ExecutorService which uses daemon threads
122   * @param terminationTimeout how long to wait for the executor to finish
123   *        before terminating the JVM
124   * @param timeUnit unit of time for the time parameter
125   */
126  @Beta
127  public static void addDelayedShutdownHook(
128      final ExecutorService service, final long terminationTimeout,
129      final TimeUnit timeUnit) {
130    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
131      @Override
132      public void run() {
133        try {
134          // We'd like to log progress and failures that may arise in the
135          // following code, but unfortunately the behavior of logging
136          // is undefined in shutdown hooks.
137          // This is because the logging code installs a shutdown hook of its
138          // own. See Cleaner class inside {@link LogManager}.
139          service.shutdown();
140          service.awaitTermination(terminationTimeout, timeUnit);
141        } catch (InterruptedException ignored) {
142          // We're shutting down anyway, so just ignore.
143        }
144      }
145    }));
146  }
147
148  /**
149   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
150   * when the application is complete.  It does so by using daemon threads and
151   * adding a shutdown hook to wait for their completion.
152   *
153   * <p>This method waits 120 seconds before continuing with JVM termination,
154   * even if the executor has not finished its work.
155   *
156   * <p>This is mainly for fixed thread pools.
157   * See {@link Executors#newFixedThreadPool(int)}.
158   *
159   * @param executor the executor to modify to make sure it exits when the
160   *        application is finished
161   * @return an unmodifiable version of the input which will not hang the JVM
162   */
163  @Beta
164  public static ExecutorService getExitingExecutorService(
165      ThreadPoolExecutor executor) {
166    return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
167  }
168
169  /**
170   * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
171   * exits when the application is complete.  It does so by using daemon threads
172   * and adding a shutdown hook to wait for their completion.
173   *
174   * <p>This method waits 120 seconds before continuing with JVM termination,
175   * even if the executor has not finished its work.
176   *
177   * <p>This is mainly for fixed thread pools.
178   * See {@link Executors#newScheduledThreadPool(int)}.
179   *
180   * @param executor the executor to modify to make sure it exits when the
181   *        application is finished
182   * @return an unmodifiable version of the input which will not hang the JVM
183   */
184  @Beta
185  public static ScheduledExecutorService getExitingScheduledExecutorService(
186      ScheduledThreadPoolExecutor executor) {
187    return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
188  }
189
190  /**
191   * Creates an executor service that runs each task in the thread
192   * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
193   * applies both to individually submitted tasks and to collections of tasks
194   * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
195   * tasks will run serially on the calling thread.  Tasks are run to
196   * completion before a {@code Future} is returned to the caller (unless the
197   * executor has been shutdown).
198   *
199   * <p>Although all tasks are immediately executed in the thread that
200   * submitted the task, this {@code ExecutorService} imposes a small
201   * locking overhead on each task submission in order to implement shutdown
202   * and termination behavior.
203   *
204   * <p>The implementation deviates from the {@code ExecutorService}
205   * specification with regards to the {@code shutdownNow} method.  First,
206   * "best-effort" with regards to canceling running tasks is implemented
207   * as "no-effort".  No interrupts or other attempts are made to stop
208   * threads executing tasks.  Second, the returned list will always be empty,
209   * as any submitted task is considered to have started execution.
210   * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
211   * which are pending serial execution, even the subset of the tasks that
212   * have not yet started execution.  It is unclear from the
213   * {@code ExecutorService} specification if these should be included, and
214   * it's much easier to implement the interpretation that they not be.
215   * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
216   * in concurrent calls to {@code invokeAll/invokeAny} throwing
217   * RejectedExecutionException, although a subset of the tasks may already
218   * have been executed.
219   *
220   * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
221   *        >mostly source-compatible</a> since 3.0)
222   */
223  public static ListeningExecutorService sameThreadExecutor() {
224    return new SameThreadExecutorService();
225  }
226
227  // See sameThreadExecutor javadoc for behavioral notes.
228  private static class SameThreadExecutorService
229      extends AbstractListeningExecutorService {
230    /**
231     * Lock used whenever accessing the state variables
232     * (runningTasks, shutdown, terminationCondition) of the executor
233     */
234    private final Lock lock = new ReentrantLock();
235
236    /** Signaled after the executor is shutdown and running tasks are done */
237    private final Condition termination = lock.newCondition();
238
239    /*
240     * Conceptually, these two variables describe the executor being in
241     * one of three states:
242     *   - Active: shutdown == false
243     *   - Shutdown: runningTasks > 0 and shutdown == true
244     *   - Terminated: runningTasks == 0 and shutdown == true
245     */
246    private int runningTasks = 0;
247    private boolean shutdown = false;
248
249    @Override
250    public void execute(Runnable command) {
251      startTask();
252      try {
253        command.run();
254      } finally {
255        endTask();
256      }
257    }
258
259    @Override
260    public boolean isShutdown() {
261      lock.lock();
262      try {
263        return shutdown;
264      } finally {
265        lock.unlock();
266      }
267    }
268
269    @Override
270    public void shutdown() {
271      lock.lock();
272      try {
273        shutdown = true;
274      } finally {
275        lock.unlock();
276      }
277    }
278
279    // See sameThreadExecutor javadoc for unusual behavior of this method.
280    @Override
281    public List<Runnable> shutdownNow() {
282      shutdown();
283      return Collections.emptyList();
284    }
285
286    @Override
287    public boolean isTerminated() {
288      lock.lock();
289      try {
290        return shutdown && runningTasks == 0;
291      } finally {
292        lock.unlock();
293      }
294    }
295
296    @Override
297    public boolean awaitTermination(long timeout, TimeUnit unit)
298        throws InterruptedException {
299      long nanos = unit.toNanos(timeout);
300      lock.lock();
301      try {
302        for (;;) {
303          if (isTerminated()) {
304            return true;
305          } else if (nanos <= 0) {
306            return false;
307          } else {
308            nanos = termination.awaitNanos(nanos);
309          }
310        }
311      } finally {
312        lock.unlock();
313      }
314    }
315
316    /**
317     * Checks if the executor has been shut down and increments the running
318     * task count.
319     *
320     * @throws RejectedExecutionException if the executor has been previously
321     *         shutdown
322     */
323    private void startTask() {
324      lock.lock();
325      try {
326        if (isShutdown()) {
327          throw new RejectedExecutionException("Executor already shutdown");
328        }
329        runningTasks++;
330      } finally {
331        lock.unlock();
332      }
333    }
334
335    /**
336     * Decrements the running task count.
337     */
338    private void endTask() {
339      lock.lock();
340      try {
341        runningTasks--;
342        if (isTerminated()) {
343          termination.signalAll();
344        }
345      } finally {
346        lock.unlock();
347      }
348    }
349  }
350
351  /**
352   * Creates an {@link ExecutorService} whose {@code submit} and {@code
353   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
354   * given delegate executor. Those methods, as well as {@code execute} and
355   * {@code invokeAny}, are implemented in terms of calls to {@code
356   * delegate.execute}. All other methods are forwarded unchanged to the
357   * delegate. This implies that the returned {@code ListeningExecutorService}
358   * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
359   * invokeAny} methods, so any special handling of tasks must be implemented in
360   * the delegate's {@code execute} method or by wrapping the returned {@code
361   * ListeningExecutorService}.
362   *
363   * <p>If the delegate executor was already an instance of {@code
364   * ListeningExecutorService}, it is returned untouched, and the rest of this
365   * documentation does not apply.
366   *
367   * @since 10.0
368   */
369  public static ListeningExecutorService listeningDecorator(
370      ExecutorService delegate) {
371    return (delegate instanceof ListeningExecutorService)
372        ? (ListeningExecutorService) delegate
373        : (delegate instanceof ScheduledExecutorService)
374        ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
375        : new ListeningDecorator(delegate);
376  }
377
378  /**
379   * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
380   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
381   * given delegate executor. Those methods, as well as {@code execute} and
382   * {@code invokeAny}, are implemented in terms of calls to {@code
383   * delegate.execute}. All other methods are forwarded unchanged to the
384   * delegate. This implies that the returned {@code
385   * SchedulingListeningExecutorService} never calls the delegate's {@code
386   * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
387   * handling of tasks must be implemented in the delegate's {@code execute}
388   * method or by wrapping the returned {@code
389   * SchedulingListeningExecutorService}.
390   *
391   * <p>If the delegate executor was already an instance of {@code
392   * ListeningScheduledExecutorService}, it is returned untouched, and the rest
393   * of this documentation does not apply.
394   *
395   * @since 10.0
396   */
397  public static ListeningScheduledExecutorService listeningDecorator(
398      ScheduledExecutorService delegate) {
399    return (delegate instanceof ListeningScheduledExecutorService)
400        ? (ListeningScheduledExecutorService) delegate
401        : new ScheduledListeningDecorator(delegate);
402  }
403
404  private static class ListeningDecorator
405      extends AbstractListeningExecutorService {
406    final ExecutorService delegate;
407
408    ListeningDecorator(ExecutorService delegate) {
409      this.delegate = checkNotNull(delegate);
410    }
411
412    @Override
413    public boolean awaitTermination(long timeout, TimeUnit unit)
414        throws InterruptedException {
415      return delegate.awaitTermination(timeout, unit);
416    }
417
418    @Override
419    public boolean isShutdown() {
420      return delegate.isShutdown();
421    }
422
423    @Override
424    public boolean isTerminated() {
425      return delegate.isTerminated();
426    }
427
428    @Override
429    public void shutdown() {
430      delegate.shutdown();
431    }
432
433    @Override
434    public List<Runnable> shutdownNow() {
435      return delegate.shutdownNow();
436    }
437
438    @Override
439    public void execute(Runnable command) {
440      delegate.execute(command);
441    }
442  }
443
444  private static class ScheduledListeningDecorator
445      extends ListeningDecorator implements ListeningScheduledExecutorService {
446    final ScheduledExecutorService delegate;
447
448    ScheduledListeningDecorator(ScheduledExecutorService delegate) {
449      super(delegate);
450      this.delegate = checkNotNull(delegate);
451    }
452
453    @Override
454    public ScheduledFuture<?> schedule(
455        Runnable command, long delay, TimeUnit unit) {
456      return delegate.schedule(command, delay, unit);
457    }
458
459    @Override
460    public <V> ScheduledFuture<V> schedule(
461        Callable<V> callable, long delay, TimeUnit unit) {
462      return delegate.schedule(callable, delay, unit);
463    }
464
465    @Override
466    public ScheduledFuture<?> scheduleAtFixedRate(
467        Runnable command, long initialDelay, long period, TimeUnit unit) {
468      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
469    }
470
471    @Override
472    public ScheduledFuture<?> scheduleWithFixedDelay(
473        Runnable command, long initialDelay, long delay, TimeUnit unit) {
474      return delegate.scheduleWithFixedDelay(
475          command, initialDelay, delay, unit);
476    }
477  }
478}
479