1511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall/*
2511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Copyright (C) 2007 The Guava Authors
3511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall *
4511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Licensed under the Apache License, Version 2.0 (the "License");
5511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * you may not use this file except in compliance with the License.
6511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * You may obtain a copy of the License at
7511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall *
8511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * http://www.apache.org/licenses/LICENSE-2.0
9511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall *
10511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * Unless required by applicable law or agreed to in writing, software
11511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * distributed under the License is distributed on an "AS IS" BASIS,
12511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * See the License for the specific language governing permissions and
14511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * limitations under the License.
15511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */
16511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
17511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallpackage com.google.common.eventbus;
18511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
19511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallimport static com.google.common.base.Preconditions.checkNotNull;
20511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
21511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallimport com.google.common.annotations.Beta;
22511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
23511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallimport java.util.concurrent.ConcurrentLinkedQueue;
24511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallimport java.util.concurrent.Executor;
25511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
26511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall/**
27511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * An {@link EventBus} that takes the Executor of your choice and uses it to
28511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * dispatch events, allowing dispatch to occur asynchronously.
29511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall *
30511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @author Cliff Biffle
31511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall * @since 10.0
32511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall */
33511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall@Beta
34511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrallpublic class AsyncEventBus extends EventBus {
35511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  private final Executor executor;
36511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
37511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  /** the queue of events is shared across all threads */
38511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch =
39511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall      new ConcurrentLinkedQueue<EventWithSubscriber>();
40511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
41511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  /**
42511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
43511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * events.  Assigns {@code identifier} as the bus's name for logging purposes.
44511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *
45511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * @param identifier short name for the bus, for logging purposes.
46511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * @param executor   Executor to use to dispatch events. It is the caller's
47511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *        responsibility to shut down the executor after the last event has
48511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *        been posted to this event bus.
49511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   */
50511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  public AsyncEventBus(String identifier, Executor executor) {
51511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    super(identifier);
52511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    this.executor = checkNotNull(executor);
53511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  }
54511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
55511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  /**
56511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
57511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * events.
58511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *
59511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * @param executor Executor to use to dispatch events. It is the caller's
60511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *        responsibility to shut down the executor after the last event has
61511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *        been posted to this event bus.
62511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
63511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *    See {@link SubscriberExceptionHandler} for more information.
64511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * @since 16.0
65511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   */
66511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
67511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    super(subscriberExceptionHandler);
68511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    this.executor = checkNotNull(executor);
69511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  }
70511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
71511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  /**
72511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
73511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * events.
74511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *
75511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * @param executor Executor to use to dispatch events. It is the caller's
76511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *        responsibility to shut down the executor after the last event has
77511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   *        been posted to this event bus.
78511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   */
79511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  public AsyncEventBus(Executor executor) {
80511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    super("default");
81511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    this.executor = checkNotNull(executor);
82511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  }
83511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
84511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  @Override
85511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  void enqueueEvent(Object event, EventSubscriber subscriber) {
86511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    eventsToDispatch.offer(new EventWithSubscriber(event, subscriber));
87511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  }
88511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
89511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  /**
90511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * Dispatch {@code events} in the order they were posted, regardless of
91511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * the posting thread.
92511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   */
93511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  @SuppressWarnings("deprecation") // only deprecated for external subclasses
94511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  @Override
95511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  protected void dispatchQueuedEvents() {
96511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    while (true) {
97511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall      EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll();
98511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall      if (eventWithSubscriber == null) {
99511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall        break;
100511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall      }
101511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
102511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall      dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
103511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    }
104511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  }
105511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall
106511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  /**
107511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}.
108511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall   */
109511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  @Override
110511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  void dispatch(final Object event, final EventSubscriber subscriber) {
111511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    checkNotNull(event);
112511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    checkNotNull(subscriber);
113511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall    executor.execute(
114511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall        new Runnable() {
115511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall          @Override
116511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall          public void run() {
117511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall            AsyncEventBus.super.dispatch(event, subscriber);
118511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall          }
119511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall        });
120511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall  }
121511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall}
122511eca30a483e912c274e1d8ba3a0f8f081e2227JP Abgrall