10888a09821a98ac0680fad765217302858e70fa4Paul Duffin/* 20888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Copyright (C) 2014 The Guava Authors 30888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 40888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Licensed under the Apache License, Version 2.0 (the "License"); 50888a09821a98ac0680fad765217302858e70fa4Paul Duffin * you may not use this file except in compliance with the License. 60888a09821a98ac0680fad765217302858e70fa4Paul Duffin * You may obtain a copy of the License at 70888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 80888a09821a98ac0680fad765217302858e70fa4Paul Duffin * http://www.apache.org/licenses/LICENSE-2.0 90888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 100888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Unless required by applicable law or agreed to in writing, software 110888a09821a98ac0680fad765217302858e70fa4Paul Duffin * distributed under the License is distributed on an "AS IS" BASIS, 120888a09821a98ac0680fad765217302858e70fa4Paul Duffin * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 130888a09821a98ac0680fad765217302858e70fa4Paul Duffin * See the License for the specific language governing permissions and 140888a09821a98ac0680fad765217302858e70fa4Paul Duffin * limitations under the License. 150888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 160888a09821a98ac0680fad765217302858e70fa4Paul Duffin 170888a09821a98ac0680fad765217302858e70fa4Paul Duffinpackage com.google.common.util.concurrent; 180888a09821a98ac0680fad765217302858e70fa4Paul Duffin 190888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport static com.google.common.base.Preconditions.checkNotNull; 200888a09821a98ac0680fad765217302858e70fa4Paul Duffin 210888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport com.google.common.base.Preconditions; 223ecfa412eddc4b084663f38d562537b86b9734d5Paul Duffinimport com.google.common.collect.Queues; 230888a09821a98ac0680fad765217302858e70fa4Paul Duffin 240888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport java.util.Queue; 250888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport java.util.concurrent.Executor; 260888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport java.util.logging.Level; 270888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport java.util.logging.Logger; 280888a09821a98ac0680fad765217302858e70fa4Paul Duffin 290888a09821a98ac0680fad765217302858e70fa4Paul Duffinimport javax.annotation.concurrent.GuardedBy; 300888a09821a98ac0680fad765217302858e70fa4Paul Duffin 310888a09821a98ac0680fad765217302858e70fa4Paul Duffin/** 320888a09821a98ac0680fad765217302858e70fa4Paul Duffin * A special purpose queue/executor that executes listener callbacks serially on a configured 330888a09821a98ac0680fad765217302858e70fa4Paul Duffin * executor. Each callback task can be enqueued and executed as separate phases. 340888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 350888a09821a98ac0680fad765217302858e70fa4Paul Duffin * <p>This class is very similar to {@link SerializingExecutor} with the exception that tasks can 360888a09821a98ac0680fad765217302858e70fa4Paul Duffin * be enqueued without necessarily executing immediately. 370888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 380888a09821a98ac0680fad765217302858e70fa4Paul Duffinfinal class ListenerCallQueue<L> implements Runnable { 390888a09821a98ac0680fad765217302858e70fa4Paul Duffin // TODO(cpovirk): consider using the logger associated with listener.getClass(). 400888a09821a98ac0680fad765217302858e70fa4Paul Duffin private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName()); 410888a09821a98ac0680fad765217302858e70fa4Paul Duffin 420888a09821a98ac0680fad765217302858e70fa4Paul Duffin abstract static class Callback<L> { 430888a09821a98ac0680fad765217302858e70fa4Paul Duffin private final String methodCall; 440888a09821a98ac0680fad765217302858e70fa4Paul Duffin 450888a09821a98ac0680fad765217302858e70fa4Paul Duffin Callback(String methodCall) { 460888a09821a98ac0680fad765217302858e70fa4Paul Duffin this.methodCall = methodCall; 470888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 480888a09821a98ac0680fad765217302858e70fa4Paul Duffin 490888a09821a98ac0680fad765217302858e70fa4Paul Duffin abstract void call(L listener); 500888a09821a98ac0680fad765217302858e70fa4Paul Duffin 510888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** Helper method to add this callback to all the queues. */ 520888a09821a98ac0680fad765217302858e70fa4Paul Duffin void enqueueOn(Iterable<ListenerCallQueue<L>> queues) { 530888a09821a98ac0680fad765217302858e70fa4Paul Duffin for (ListenerCallQueue<L> queue : queues) { 540888a09821a98ac0680fad765217302858e70fa4Paul Duffin queue.add(this); 550888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 560888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 570888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 580888a09821a98ac0680fad765217302858e70fa4Paul Duffin 590888a09821a98ac0680fad765217302858e70fa4Paul Duffin private final L listener; 600888a09821a98ac0680fad765217302858e70fa4Paul Duffin private final Executor executor; 610888a09821a98ac0680fad765217302858e70fa4Paul Duffin 623ecfa412eddc4b084663f38d562537b86b9734d5Paul Duffin @GuardedBy("this") private final Queue<Callback<L>> waitQueue = Queues.newArrayDeque(); 630888a09821a98ac0680fad765217302858e70fa4Paul Duffin @GuardedBy("this") private boolean isThreadScheduled; 640888a09821a98ac0680fad765217302858e70fa4Paul Duffin 650888a09821a98ac0680fad765217302858e70fa4Paul Duffin ListenerCallQueue(L listener, Executor executor) { 660888a09821a98ac0680fad765217302858e70fa4Paul Duffin this.listener = checkNotNull(listener); 670888a09821a98ac0680fad765217302858e70fa4Paul Duffin this.executor = checkNotNull(executor); 680888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 690888a09821a98ac0680fad765217302858e70fa4Paul Duffin 700888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** Enqueues a task to be run. */ 710888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized void add(Callback<L> callback) { 720888a09821a98ac0680fad765217302858e70fa4Paul Duffin waitQueue.add(callback); 730888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 740888a09821a98ac0680fad765217302858e70fa4Paul Duffin 750888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** Executes all listeners {@linkplain #add added} prior to this call, serially and in order.*/ 760888a09821a98ac0680fad765217302858e70fa4Paul Duffin void execute() { 770888a09821a98ac0680fad765217302858e70fa4Paul Duffin boolean scheduleTaskRunner = false; 780888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized (this) { 790888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (!isThreadScheduled) { 800888a09821a98ac0680fad765217302858e70fa4Paul Duffin isThreadScheduled = true; 810888a09821a98ac0680fad765217302858e70fa4Paul Duffin scheduleTaskRunner = true; 820888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 830888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 840888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (scheduleTaskRunner) { 850888a09821a98ac0680fad765217302858e70fa4Paul Duffin try { 860888a09821a98ac0680fad765217302858e70fa4Paul Duffin executor.execute(this); 870888a09821a98ac0680fad765217302858e70fa4Paul Duffin } catch (RuntimeException e) { 880888a09821a98ac0680fad765217302858e70fa4Paul Duffin // reset state in case of an error so that later calls to execute will actually do something 890888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized (this) { 900888a09821a98ac0680fad765217302858e70fa4Paul Duffin isThreadScheduled = false; 910888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 920888a09821a98ac0680fad765217302858e70fa4Paul Duffin // Log it and keep going. 930888a09821a98ac0680fad765217302858e70fa4Paul Duffin logger.log(Level.SEVERE, 940888a09821a98ac0680fad765217302858e70fa4Paul Duffin "Exception while running callbacks for " + listener + " on " + executor, 950888a09821a98ac0680fad765217302858e70fa4Paul Duffin e); 960888a09821a98ac0680fad765217302858e70fa4Paul Duffin throw e; 970888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 980888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 990888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1000888a09821a98ac0680fad765217302858e70fa4Paul Duffin 1010888a09821a98ac0680fad765217302858e70fa4Paul Duffin @Override public void run() { 1020888a09821a98ac0680fad765217302858e70fa4Paul Duffin boolean stillRunning = true; 1030888a09821a98ac0680fad765217302858e70fa4Paul Duffin try { 1040888a09821a98ac0680fad765217302858e70fa4Paul Duffin while (true) { 1050888a09821a98ac0680fad765217302858e70fa4Paul Duffin Callback<L> nextToRun; 1060888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized (ListenerCallQueue.this) { 1070888a09821a98ac0680fad765217302858e70fa4Paul Duffin Preconditions.checkState(isThreadScheduled); 1080888a09821a98ac0680fad765217302858e70fa4Paul Duffin nextToRun = waitQueue.poll(); 1090888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (nextToRun == null) { 1100888a09821a98ac0680fad765217302858e70fa4Paul Duffin isThreadScheduled = false; 1110888a09821a98ac0680fad765217302858e70fa4Paul Duffin stillRunning = false; 1120888a09821a98ac0680fad765217302858e70fa4Paul Duffin break; 1130888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1140888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1150888a09821a98ac0680fad765217302858e70fa4Paul Duffin 1160888a09821a98ac0680fad765217302858e70fa4Paul Duffin // Always run while _not_ holding the lock, to avoid deadlocks. 1170888a09821a98ac0680fad765217302858e70fa4Paul Duffin try { 1180888a09821a98ac0680fad765217302858e70fa4Paul Duffin nextToRun.call(listener); 1190888a09821a98ac0680fad765217302858e70fa4Paul Duffin } catch (RuntimeException e) { 1200888a09821a98ac0680fad765217302858e70fa4Paul Duffin // Log it and keep going. 1210888a09821a98ac0680fad765217302858e70fa4Paul Duffin logger.log(Level.SEVERE, 1220888a09821a98ac0680fad765217302858e70fa4Paul Duffin "Exception while executing callback: " + listener + "." + nextToRun.methodCall, 1230888a09821a98ac0680fad765217302858e70fa4Paul Duffin e); 1240888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1250888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1260888a09821a98ac0680fad765217302858e70fa4Paul Duffin } finally { 1270888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (stillRunning) { 1280888a09821a98ac0680fad765217302858e70fa4Paul Duffin // An Error is bubbling up, we should mark ourselves as no longer 1290888a09821a98ac0680fad765217302858e70fa4Paul Duffin // running, that way if anyone tries to keep using us we won't be 1300888a09821a98ac0680fad765217302858e70fa4Paul Duffin // corrupted. 1310888a09821a98ac0680fad765217302858e70fa4Paul Duffin synchronized (ListenerCallQueue.this) { 1320888a09821a98ac0680fad765217302858e70fa4Paul Duffin isThreadScheduled = false; 1330888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1340888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1350888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1360888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1370888a09821a98ac0680fad765217302858e70fa4Paul Duffin} 138