11d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert/*
21d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Copyright (C) 2007 The Guava Authors
31d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert *
41d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Licensed under the Apache License, Version 2.0 (the "License");
51d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * you may not use this file except in compliance with the License.
61d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * You may obtain a copy of the License at
71d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert *
81d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * http://www.apache.org/licenses/LICENSE-2.0
91d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert *
101d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * Unless required by applicable law or agreed to in writing, software
111d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * distributed under the License is distributed on an "AS IS" BASIS,
121d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
131d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * See the License for the specific language governing permissions and
141d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * limitations under the License.
151d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert */
161d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
171d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertpackage com.google.common.eventbus;
181d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
197dd252788645e940eada959bdde927426e2531c9Paul Duffinimport static com.google.common.base.Preconditions.checkNotNull;
207dd252788645e940eada959bdde927426e2531c9Paul Duffin
211d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertimport com.google.common.annotations.Beta;
227dd252788645e940eada959bdde927426e2531c9Paul Duffin
231d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertimport java.util.concurrent.ConcurrentLinkedQueue;
241d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertimport java.util.concurrent.Executor;
251d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
261d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert/**
271d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * An {@link EventBus} that takes the Executor of your choice and uses it to
281d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * dispatch events, allowing dispatch to occur asynchronously.
291d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert *
301d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * @author Cliff Biffle
311d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert * @since 10.0
321d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert */
331d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert@Beta
341d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringertpublic class AsyncEventBus extends EventBus {
351d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  private final Executor executor;
361d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
371d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  /** the queue of events is shared across all threads */
380888a09821a98ac0680fad765217302858e70fa4Paul Duffin  private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch =
390888a09821a98ac0680fad765217302858e70fa4Paul Duffin      new ConcurrentLinkedQueue<EventWithSubscriber>();
401d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
411d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  /**
421d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
431d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * events.  Assigns {@code identifier} as the bus's name for logging purposes.
441d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   *
451d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * @param identifier short name for the bus, for logging purposes.
461d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * @param executor   Executor to use to dispatch events. It is the caller's
471d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   *        responsibility to shut down the executor after the last event has
481d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   *        been posted to this event bus.
491d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   */
501d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  public AsyncEventBus(String identifier, Executor executor) {
511d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert    super(identifier);
527dd252788645e940eada959bdde927426e2531c9Paul Duffin    this.executor = checkNotNull(executor);
531d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  }
541d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
551d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  /**
561d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
571d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * events.
581d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   *
591d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * @param executor Executor to use to dispatch events. It is the caller's
601d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   *        responsibility to shut down the executor after the last event has
611d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   *        been posted to this event bus.
620888a09821a98ac0680fad765217302858e70fa4Paul Duffin   * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
630888a09821a98ac0680fad765217302858e70fa4Paul Duffin   *    See {@link SubscriberExceptionHandler} for more information.
640888a09821a98ac0680fad765217302858e70fa4Paul Duffin   * @since 16.0
650888a09821a98ac0680fad765217302858e70fa4Paul Duffin   */
660888a09821a98ac0680fad765217302858e70fa4Paul Duffin  public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
670888a09821a98ac0680fad765217302858e70fa4Paul Duffin    super(subscriberExceptionHandler);
680888a09821a98ac0680fad765217302858e70fa4Paul Duffin    this.executor = checkNotNull(executor);
690888a09821a98ac0680fad765217302858e70fa4Paul Duffin  }
700888a09821a98ac0680fad765217302858e70fa4Paul Duffin
710888a09821a98ac0680fad765217302858e70fa4Paul Duffin  /**
720888a09821a98ac0680fad765217302858e70fa4Paul Duffin   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
730888a09821a98ac0680fad765217302858e70fa4Paul Duffin   * events.
740888a09821a98ac0680fad765217302858e70fa4Paul Duffin   *
750888a09821a98ac0680fad765217302858e70fa4Paul Duffin   * @param executor Executor to use to dispatch events. It is the caller's
760888a09821a98ac0680fad765217302858e70fa4Paul Duffin   *        responsibility to shut down the executor after the last event has
770888a09821a98ac0680fad765217302858e70fa4Paul Duffin   *        been posted to this event bus.
781d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   */
791d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  public AsyncEventBus(Executor executor) {
800888a09821a98ac0680fad765217302858e70fa4Paul Duffin    super("default");
817dd252788645e940eada959bdde927426e2531c9Paul Duffin    this.executor = checkNotNull(executor);
821d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  }
831d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
841d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  @Override
850888a09821a98ac0680fad765217302858e70fa4Paul Duffin  void enqueueEvent(Object event, EventSubscriber subscriber) {
860888a09821a98ac0680fad765217302858e70fa4Paul Duffin    eventsToDispatch.offer(new EventWithSubscriber(event, subscriber));
871d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  }
881d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
891d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  /**
901d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * Dispatch {@code events} in the order they were posted, regardless of
911d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   * the posting thread.
921d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   */
930888a09821a98ac0680fad765217302858e70fa4Paul Duffin  @SuppressWarnings("deprecation") // only deprecated for external subclasses
941d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  @Override
951d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  protected void dispatchQueuedEvents() {
961d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert    while (true) {
970888a09821a98ac0680fad765217302858e70fa4Paul Duffin      EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll();
980888a09821a98ac0680fad765217302858e70fa4Paul Duffin      if (eventWithSubscriber == null) {
991d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert        break;
1001d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert      }
1011d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
1020888a09821a98ac0680fad765217302858e70fa4Paul Duffin      dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
1031d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert    }
1041d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  }
1051d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert
1061d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  /**
1070888a09821a98ac0680fad765217302858e70fa4Paul Duffin   * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}.
1081d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert   */
1091d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert  @Override
1100888a09821a98ac0680fad765217302858e70fa4Paul Duffin  void dispatch(final Object event, final EventSubscriber subscriber) {
1117dd252788645e940eada959bdde927426e2531c9Paul Duffin    checkNotNull(event);
1120888a09821a98ac0680fad765217302858e70fa4Paul Duffin    checkNotNull(subscriber);
1130888a09821a98ac0680fad765217302858e70fa4Paul Duffin    executor.execute(
1140888a09821a98ac0680fad765217302858e70fa4Paul Duffin        new Runnable() {
1150888a09821a98ac0680fad765217302858e70fa4Paul Duffin          @Override
1160888a09821a98ac0680fad765217302858e70fa4Paul Duffin          public void run() {
1170888a09821a98ac0680fad765217302858e70fa4Paul Duffin            AsyncEventBus.super.dispatch(event, subscriber);
1180888a09821a98ac0680fad765217302858e70fa4Paul Duffin          }
1190888a09821a98ac0680fad765217302858e70fa4Paul Duffin        });
1207dd252788645e940eada959bdde927426e2531c9Paul Duffin  }
1211d580d0f6ee4f21eb309ba7b509d2c6d671c4044Bjorn Bringert}
122