1511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall/* 2511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Copyright (C) 2007 The Guava Authors 3511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * 4511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Licensed under the Apache License, Version 2.0 (the "License"); 5511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * you may not use this file except in compliance with the License. 6511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * You may obtain a copy of the License at 7511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * 8511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * http://www.apache.org/licenses/LICENSE-2.0 9511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * 10511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Unless required by applicable law or agreed to in writing, software 11511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * distributed under the License is distributed on an "AS IS" BASIS, 12511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * See the License for the specific language governing permissions and 14511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * limitations under the License. 15511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */ 16511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 17511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallpackage com.google.common.eventbus; 18511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 19511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallimport static com.google.common.base.Preconditions.checkNotNull; 20511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 21511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallimport com.google.common.annotations.Beta; 22511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 23511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallimport java.util.concurrent.ConcurrentLinkedQueue; 24511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallimport java.util.concurrent.Executor; 25511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 26511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall/** 27511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * An {@link EventBus} that takes the Executor of your choice and uses it to 28511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * dispatch events, allowing dispatch to occur asynchronously. 29511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * 30511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @author Cliff Biffle 31511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @since 10.0 32511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */ 33511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall@Beta 34511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallpublic class AsyncEventBus extends EventBus { 35511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall private final Executor executor; 36511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 37511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall /** the queue of events is shared across all threads */ 38511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch = 39511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall new ConcurrentLinkedQueue<EventWithSubscriber>(); 40511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 41511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall /** 42511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Creates a new AsyncEventBus that will use {@code executor} to dispatch 43511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * events. Assigns {@code identifier} as the bus's name for logging purposes. 44511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * 45511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @param identifier short name for the bus, for logging purposes. 46511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @param executor Executor to use to dispatch events. It is the caller's 47511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * responsibility to shut down the executor after the last event has 48511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * been posted to this event bus. 49511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */ 50511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall public AsyncEventBus(String identifier, Executor executor) { 51511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall super(identifier); 52511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall this.executor = checkNotNull(executor); 53511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 54511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 55511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall /** 56511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Creates a new AsyncEventBus that will use {@code executor} to dispatch 57511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * events. 58511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * 59511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @param executor Executor to use to dispatch events. It is the caller's 60511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * responsibility to shut down the executor after the last event has 61511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * been posted to this event bus. 62511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. 63511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * See {@link SubscriberExceptionHandler} for more information. 64511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @since 16.0 65511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */ 66511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { 67511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall super(subscriberExceptionHandler); 68511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall this.executor = checkNotNull(executor); 69511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 70511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 71511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall /** 72511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Creates a new AsyncEventBus that will use {@code executor} to dispatch 73511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * events. 74511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * 75511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @param executor Executor to use to dispatch events. It is the caller's 76511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * responsibility to shut down the executor after the last event has 77511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * been posted to this event bus. 78511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */ 79511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall public AsyncEventBus(Executor executor) { 80511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall super("default"); 81511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall this.executor = checkNotNull(executor); 82511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 83511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 84511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall @Override 85511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall void enqueueEvent(Object event, EventSubscriber subscriber) { 86511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall eventsToDispatch.offer(new EventWithSubscriber(event, subscriber)); 87511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 88511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 89511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall /** 90511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Dispatch {@code events} in the order they were posted, regardless of 91511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * the posting thread. 92511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */ 93511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall @SuppressWarnings("deprecation") // only deprecated for external subclasses 94511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall @Override 95511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall protected void dispatchQueuedEvents() { 96511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall while (true) { 97511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll(); 98511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall if (eventWithSubscriber == null) { 99511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall break; 100511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 101511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 102511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber); 103511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 104511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 105511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall 106511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall /** 107511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}. 108511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */ 109511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall @Override 110511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall void dispatch(final Object event, final EventSubscriber subscriber) { 111511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall checkNotNull(event); 112511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall checkNotNull(subscriber); 113511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall executor.execute( 114511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall new Runnable() { 115511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall @Override 116511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall public void run() { 117511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall AsyncEventBus.super.dispatch(event, subscriber); 118511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 119511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall }); 120511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall } 121511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall} 122511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall