11d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert/* 21d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Copyright (C) 2007 The Guava Authors 31d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * 41d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Licensed under the Apache License, Version 2.0 (the "License"); 51d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * you may not use this file except in compliance with the License. 61d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * You may obtain a copy of the License at 71d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * 81d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * http://www.apache.org/licenses/LICENSE-2.0 91d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * 101d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Unless required by applicable law or agreed to in writing, software 111d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * distributed under the License is distributed on an "AS IS" BASIS, 121d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 131d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * See the License for the specific language governing permissions and 141d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * limitations under the License. 151d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert */ 161d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 171d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertpackage com.google.common.eventbus; 181d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 197dd252788645e940eada959bdde927426e2531c9Paul Duffinimport static com.google.common.base.Preconditions.checkNotNull; 207dd252788645e940eada959bdde927426e2531c9Paul Duffin 211d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertimport com.google.common.annotations.Beta; 227dd252788645e940eada959bdde927426e2531c9Paul Duffin 231d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertimport java.util.concurrent.ConcurrentLinkedQueue; 241d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertimport java.util.concurrent.Executor; 251d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 261d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert/** 271d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * An {@link EventBus} that takes the Executor of your choice and uses it to 281d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * dispatch events, allowing dispatch to occur asynchronously. 291d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * 301d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * @author Cliff Biffle 311d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * @since 10.0 321d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert */ 331d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert@Beta 341d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertpublic class AsyncEventBus extends EventBus { 351d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert private final Executor executor; 361d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 371d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert /** the queue of events is shared across all threads */ 380888a09821a98ac0680fad765217302858e70fa4Paul Duffin private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch = 390888a09821a98ac0680fad765217302858e70fa4Paul Duffin new ConcurrentLinkedQueue<EventWithSubscriber>(); 401d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 411d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert /** 421d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Creates a new AsyncEventBus that will use {@code executor} to dispatch 431d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * events. Assigns {@code identifier} as the bus's name for logging purposes. 441d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * 451d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * @param identifier short name for the bus, for logging purposes. 461d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * @param executor Executor to use to dispatch events. It is the caller's 471d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * responsibility to shut down the executor after the last event has 481d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * been posted to this event bus. 491d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert */ 501d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert public AsyncEventBus(String identifier, Executor executor) { 511d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert super(identifier); 527dd252788645e940eada959bdde927426e2531c9Paul Duffin this.executor = checkNotNull(executor); 531d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert } 541d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 551d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert /** 561d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Creates a new AsyncEventBus that will use {@code executor} to dispatch 571d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * events. 581d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * 591d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * @param executor Executor to use to dispatch events. It is the caller's 601d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * responsibility to shut down the executor after the last event has 611d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * been posted to this event bus. 620888a09821a98ac0680fad765217302858e70fa4Paul Duffin * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. 630888a09821a98ac0680fad765217302858e70fa4Paul Duffin * See {@link SubscriberExceptionHandler} for more information. 640888a09821a98ac0680fad765217302858e70fa4Paul Duffin * @since 16.0 650888a09821a98ac0680fad765217302858e70fa4Paul Duffin */ 660888a09821a98ac0680fad765217302858e70fa4Paul Duffin public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { 670888a09821a98ac0680fad765217302858e70fa4Paul Duffin super(subscriberExceptionHandler); 680888a09821a98ac0680fad765217302858e70fa4Paul Duffin this.executor = checkNotNull(executor); 690888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 700888a09821a98ac0680fad765217302858e70fa4Paul Duffin 710888a09821a98ac0680fad765217302858e70fa4Paul Duffin /** 720888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Creates a new AsyncEventBus that will use {@code executor} to dispatch 730888a09821a98ac0680fad765217302858e70fa4Paul Duffin * events. 740888a09821a98ac0680fad765217302858e70fa4Paul Duffin * 750888a09821a98ac0680fad765217302858e70fa4Paul Duffin * @param executor Executor to use to dispatch events. It is the caller's 760888a09821a98ac0680fad765217302858e70fa4Paul Duffin * responsibility to shut down the executor after the last event has 770888a09821a98ac0680fad765217302858e70fa4Paul Duffin * been posted to this event bus. 781d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert */ 791d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert public AsyncEventBus(Executor executor) { 800888a09821a98ac0680fad765217302858e70fa4Paul Duffin super("default"); 817dd252788645e940eada959bdde927426e2531c9Paul Duffin this.executor = checkNotNull(executor); 821d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert } 831d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 841d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert @Override 850888a09821a98ac0680fad765217302858e70fa4Paul Duffin void enqueueEvent(Object event, EventSubscriber subscriber) { 860888a09821a98ac0680fad765217302858e70fa4Paul Duffin eventsToDispatch.offer(new EventWithSubscriber(event, subscriber)); 871d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert } 881d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 891d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert /** 901d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Dispatch {@code events} in the order they were posted, regardless of 911d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * the posting thread. 921d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert */ 930888a09821a98ac0680fad765217302858e70fa4Paul Duffin @SuppressWarnings("deprecation") // only deprecated for external subclasses 941d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert @Override 951d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert protected void dispatchQueuedEvents() { 961d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert while (true) { 970888a09821a98ac0680fad765217302858e70fa4Paul Duffin EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll(); 980888a09821a98ac0680fad765217302858e70fa4Paul Duffin if (eventWithSubscriber == null) { 991d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert break; 1001d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert } 1011d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 1020888a09821a98ac0680fad765217302858e70fa4Paul Duffin dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber); 1031d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert } 1041d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert } 1051d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert 1061d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert /** 1070888a09821a98ac0680fad765217302858e70fa4Paul Duffin * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}. 1081d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert */ 1091d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert @Override 1100888a09821a98ac0680fad765217302858e70fa4Paul Duffin void dispatch(final Object event, final EventSubscriber subscriber) { 1117dd252788645e940eada959bdde927426e2531c9Paul Duffin checkNotNull(event); 1120888a09821a98ac0680fad765217302858e70fa4Paul Duffin checkNotNull(subscriber); 1130888a09821a98ac0680fad765217302858e70fa4Paul Duffin executor.execute( 1140888a09821a98ac0680fad765217302858e70fa4Paul Duffin new Runnable() { 1150888a09821a98ac0680fad765217302858e70fa4Paul Duffin @Override 1160888a09821a98ac0680fad765217302858e70fa4Paul Duffin public void run() { 1170888a09821a98ac0680fad765217302858e70fa4Paul Duffin AsyncEventBus.super.dispatch(event, subscriber); 1180888a09821a98ac0680fad765217302858e70fa4Paul Duffin } 1190888a09821a98ac0680fad765217302858e70fa4Paul Duffin }); 1207dd252788645e940eada959bdde927426e2531c9Paul Duffin } 1211d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert} 122