10888a09821a98ac0680fad765217302858e70fa4Paul Duffin/* 20888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Copyright (C) 2008 The Guava Authors 30888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 40888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Licensed under the Apache License, Version 2.0 (the "License"); 50888a09821a98ac0680fad765217302858e70fa4Paul Duffin * you may not use this file except in compliance with the License. 60888a09821a98ac0680fad765217302858e70fa4Paul Duffin * You may obtain a copy of the License at 70888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 80888a09821a98ac0680fad765217302858e70fa4Paul Duffin * http://www.apache.org/licenses/LICENSE-2.0 90888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 100888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Unless required by applicable law or agreed to in writing, software 110888a09821a98ac0680fad765217302858e70fa4Paul Duffin * distributed under the License is distributed on an "AS IS" BASIS, 120888a09821a98ac0680fad765217302858e70fa4Paul Duffin * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 130888a09821a98ac0680fad765217302858e70fa4Paul Duffin * See the License for the specific language governing permissions and 140888a09821a98ac0680fad765217302858e70fa4Paul Duffin * limitations under the License. 150888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 160888a09821a98ac0680fad765217302858e70fa4Paul Duffin 170888a09821a98ac0680fad765217302858e70fa4Paul Duffinpackage com.google.common.util.concurrent; 180888a09821a98ac0680fad765217302858e70fa4Paul Duffin 190888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport com.google.common.base.Preconditions; 200888a09821a98ac0680fad765217302858e70fa4Paul Duffin 213ecfa412eddc4b084663f38d562537b86b9734d5Paul Duffinimport java.util.ArrayDeque; 220888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport java.util.Queue; 230888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport java.util.concurrent.Executor; 240888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport java.util.logging.Level; 250888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport java.util.logging.Logger; 260888a09821a98ac0680fad765217302858e70fa4Paul Duffin 270888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport javax.annotation.concurrent.GuardedBy; 280888a09821a98ac0680fad765217302858e70fa4Paul Duffin 290888a09821a98ac0680fad765217302858e70fa4Paul Duffin/** 300888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Executor ensuring that all Runnables submitted are executed in order, 310888a09821a98ac0680fad765217302858e70fa4Paul Duffin * using the provided Executor, and serially such that no two will ever 320888a09821a98ac0680fad765217302858e70fa4Paul Duffin * be running at the same time. 330888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 340888a09821a98ac0680fad765217302858e70fa4Paul Duffin * TODO(user): The tasks are given to the underlying executor as a single 350888a09821a98ac0680fad765217302858e70fa4Paul Duffin * task, which means the semantics of the executor may be changed, e.g. the 360888a09821a98ac0680fad765217302858e70fa4Paul Duffin * executor may have an afterExecute method that runs after every task 370888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 380888a09821a98ac0680fad765217302858e70fa4Paul Duffin * TODO(user): What happens in case of shutdown or shutdownNow? Should 390888a09821a98ac0680fad765217302858e70fa4Paul Duffin * TaskRunner check for interruption? 400888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 410888a09821a98ac0680fad765217302858e70fa4Paul Duffin * TODO(user): It would be nice to provide a handle to individual task 420888a09821a98ac0680fad765217302858e70fa4Paul Duffin * results using Future. Maybe SerializingExecutorService? 430888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 440888a09821a98ac0680fad765217302858e70fa4Paul Duffin * @author JJ Furman 450888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 460888a09821a98ac0680fad765217302858e70fa4Paul Duffinfinal class SerializingExecutor implements Executor { 470888a09821a98ac0680fad765217302858e70fa4Paul Duffin private static final Logger log = 480888a09821a98ac0680fad765217302858e70fa4Paul Duffin Logger.getLogger(SerializingExecutor.class.getName()); 490888a09821a98ac0680fad765217302858e70fa4Paul Duffin 500888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** Underlying executor that all submitted Runnable objects are run on. */ 510888a09821a98ac0680fad765217302858e70fa4Paul Duffin private final Executor executor; 520888a09821a98ac0680fad765217302858e70fa4Paul Duffin 530888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** A list of Runnables to be run in order. */ 540888a09821a98ac0680fad765217302858e70fa4Paul Duffin @GuardedBy("internalLock") 553ecfa412eddc4b084663f38d562537b86b9734d5Paul Duffin private final Queue<Runnable> waitQueue = new ArrayDeque<Runnable>(); 560888a09821a98ac0680fad765217302858e70fa4Paul Duffin 570888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** 580888a09821a98ac0680fad765217302858e70fa4Paul Duffin * We explicitly keep track of if the TaskRunner is currently scheduled to 590888a09821a98ac0680fad765217302858e70fa4Paul Duffin * run. If it isn't, we start it. We can't just use 600888a09821a98ac0680fad765217302858e70fa4Paul Duffin * waitQueue.isEmpty() as a proxy because we need to ensure that only one 610888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Runnable submitted is running at a time so even if waitQueue is empty 620888a09821a98ac0680fad765217302858e70fa4Paul Duffin * the isThreadScheduled isn't set to false until after the Runnable is 630888a09821a98ac0680fad765217302858e70fa4Paul Duffin * finished. 640888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 650888a09821a98ac0680fad765217302858e70fa4Paul Duffin @GuardedBy("internalLock") 660888a09821a98ac0680fad765217302858e70fa4Paul Duffin private boolean isThreadScheduled = false; 670888a09821a98ac0680fad765217302858e70fa4Paul Duffin 680888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** The object that actually runs the Runnables submitted, reused. */ 690888a09821a98ac0680fad765217302858e70fa4Paul Duffin private final TaskRunner taskRunner = new TaskRunner(); 700888a09821a98ac0680fad765217302858e70fa4Paul Duffin 710888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** 720888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Creates a SerializingExecutor, running tasks using {@code executor}. 730888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 740888a09821a98ac0680fad765217302858e70fa4Paul Duffin * @param executor Executor in which tasks should be run. Must not be null. 750888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 760888a09821a98ac0680fad765217302858e70fa4Paul Duffin public SerializingExecutor(Executor executor) { 770888a09821a98ac0680fad765217302858e70fa4Paul Duffin Preconditions.checkNotNull(executor, "'executor' must not be null."); 780888a09821a98ac0680fad765217302858e70fa4Paul Duffin this.executor = executor; 790888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 800888a09821a98ac0680fad765217302858e70fa4Paul Duffin 810888a09821a98ac0680fad765217302858e70fa4Paul Duffin private final Object internalLock = new Object() { 820888a09821a98ac0680fad765217302858e70fa4Paul Duffin @Override public String toString() { 830888a09821a98ac0680fad765217302858e70fa4Paul Duffin return "SerializingExecutor lock: " + super.toString(); 840888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 850888a09821a98ac0680fad765217302858e70fa4Paul Duffin }; 860888a09821a98ac0680fad765217302858e70fa4Paul Duffin 870888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** 880888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Runs the given runnable strictly after all Runnables that were submitted 890888a09821a98ac0680fad765217302858e70fa4Paul Duffin * before it, and using the {@code executor} passed to the constructor. . 900888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 910888a09821a98ac0680fad765217302858e70fa4Paul Duffin @Override 920888a09821a98ac0680fad765217302858e70fa4Paul Duffin public void execute(Runnable r) { 930888a09821a98ac0680fad765217302858e70fa4Paul Duffin Preconditions.checkNotNull(r, "'r' must not be null."); 940888a09821a98ac0680fad765217302858e70fa4Paul Duffin boolean scheduleTaskRunner = false; 950888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized (internalLock) { 960888a09821a98ac0680fad765217302858e70fa4Paul Duffin waitQueue.add(r); 970888a09821a98ac0680fad765217302858e70fa4Paul Duffin 980888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (!isThreadScheduled) { 990888a09821a98ac0680fad765217302858e70fa4Paul Duffin isThreadScheduled = true; 1000888a09821a98ac0680fad765217302858e70fa4Paul Duffin scheduleTaskRunner = true; 1010888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1020888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1030888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (scheduleTaskRunner) { 1040888a09821a98ac0680fad765217302858e70fa4Paul Duffin boolean threw = true; 1050888a09821a98ac0680fad765217302858e70fa4Paul Duffin try { 1060888a09821a98ac0680fad765217302858e70fa4Paul Duffin executor.execute(taskRunner); 1070888a09821a98ac0680fad765217302858e70fa4Paul Duffin threw = false; 1080888a09821a98ac0680fad765217302858e70fa4Paul Duffin } finally { 1090888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (threw) { 1100888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized (internalLock) { 1110888a09821a98ac0680fad765217302858e70fa4Paul Duffin // It is possible that at this point that there are still tasks in 1120888a09821a98ac0680fad765217302858e70fa4Paul Duffin // the queue, it would be nice to keep trying but the error may not 1130888a09821a98ac0680fad765217302858e70fa4Paul Duffin // be recoverable. So we update our state and propogate so that if 1140888a09821a98ac0680fad765217302858e70fa4Paul Duffin // our caller deems it recoverable we won't be stuck. 1150888a09821a98ac0680fad765217302858e70fa4Paul Duffin isThreadScheduled = false; 1160888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1170888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1180888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1190888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1200888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1210888a09821a98ac0680fad765217302858e70fa4Paul Duffin 1220888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** 1230888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Task that actually runs the Runnables. It takes the Runnables off of the 1240888a09821a98ac0680fad765217302858e70fa4Paul Duffin * queue one by one and runs them. After it is done with all Runnables and 1250888a09821a98ac0680fad765217302858e70fa4Paul Duffin * there are no more to run, puts the SerializingExecutor in the state where 1260888a09821a98ac0680fad765217302858e70fa4Paul Duffin * isThreadScheduled = false and returns. This allows the current worker 1270888a09821a98ac0680fad765217302858e70fa4Paul Duffin * thread to return to the original pool. 1280888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 1290888a09821a98ac0680fad765217302858e70fa4Paul Duffin private class TaskRunner implements Runnable { 1300888a09821a98ac0680fad765217302858e70fa4Paul Duffin @Override 1310888a09821a98ac0680fad765217302858e70fa4Paul Duffin public void run() { 1320888a09821a98ac0680fad765217302858e70fa4Paul Duffin boolean stillRunning = true; 1330888a09821a98ac0680fad765217302858e70fa4Paul Duffin try { 1340888a09821a98ac0680fad765217302858e70fa4Paul Duffin while (true) { 1350888a09821a98ac0680fad765217302858e70fa4Paul Duffin Preconditions.checkState(isThreadScheduled); 1360888a09821a98ac0680fad765217302858e70fa4Paul Duffin Runnable nextToRun; 1370888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized (internalLock) { 1380888a09821a98ac0680fad765217302858e70fa4Paul Duffin nextToRun = waitQueue.poll(); 1390888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (nextToRun == null) { 1400888a09821a98ac0680fad765217302858e70fa4Paul Duffin isThreadScheduled = false; 1410888a09821a98ac0680fad765217302858e70fa4Paul Duffin stillRunning = false; 1420888a09821a98ac0680fad765217302858e70fa4Paul Duffin break; 1430888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1440888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1450888a09821a98ac0680fad765217302858e70fa4Paul Duffin 1460888a09821a98ac0680fad765217302858e70fa4Paul Duffin // Always run while not holding the lock, to avoid deadlocks. 1470888a09821a98ac0680fad765217302858e70fa4Paul Duffin try { 1480888a09821a98ac0680fad765217302858e70fa4Paul Duffin nextToRun.run(); 1490888a09821a98ac0680fad765217302858e70fa4Paul Duffin } catch (RuntimeException e) { 1500888a09821a98ac0680fad765217302858e70fa4Paul Duffin // Log it and keep going. 1510888a09821a98ac0680fad765217302858e70fa4Paul Duffin log.log(Level.SEVERE, "Exception while executing runnable " 1520888a09821a98ac0680fad765217302858e70fa4Paul Duffin + nextToRun, e); 1530888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1540888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1550888a09821a98ac0680fad765217302858e70fa4Paul Duffin } finally { 1560888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (stillRunning) { 1570888a09821a98ac0680fad765217302858e70fa4Paul Duffin // An Error is bubbling up, we should mark ourselves as no longer 1580888a09821a98ac0680fad765217302858e70fa4Paul Duffin // running, that way if anyone tries to keep using us we won't be 1590888a09821a98ac0680fad765217302858e70fa4Paul Duffin // corrupted. 1600888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized (internalLock) { 1610888a09821a98ac0680fad765217302858e70fa4Paul Duffin isThreadScheduled = false; 1620888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1630888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1640888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1650888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1660888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1670888a09821a98ac0680fad765217302858e70fa4Paul Duffin} 168