1/*
2 * Copyright (C) 2007 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.eventbus;
18
19import com.google.common.annotations.Beta;
20import java.util.concurrent.ConcurrentLinkedQueue;
21import java.util.concurrent.Executor;
22
23/**
24 * An {@link EventBus} that takes the Executor of your choice and uses it to
25 * dispatch events, allowing dispatch to occur asynchronously.
26 *
27 * @author Cliff Biffle
28 * @since 10.0
29 */
30@Beta
31public class AsyncEventBus extends EventBus {
32  private final Executor executor;
33
34  /** the queue of events is shared across all threads */
35  private final ConcurrentLinkedQueue<EventWithHandler> eventsToDispatch =
36      new ConcurrentLinkedQueue<EventWithHandler>();
37
38  /**
39   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
40   * events.  Assigns {@code identifier} as the bus's name for logging purposes.
41   *
42   * @param identifier short name for the bus, for logging purposes.
43   * @param executor   Executor to use to dispatch events. It is the caller's
44   *        responsibility to shut down the executor after the last event has
45   *        been posted to this event bus.
46   */
47  public AsyncEventBus(String identifier, Executor executor) {
48    super(identifier);
49    this.executor = executor;
50  }
51
52  /**
53   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
54   * events.
55   *
56   * @param executor Executor to use to dispatch events. It is the caller's
57   *        responsibility to shut down the executor after the last event has
58   *        been posted to this event bus.
59   */
60  public AsyncEventBus(Executor executor) {
61    this.executor = executor;
62  }
63
64  @Override
65  protected void enqueueEvent(Object event, EventHandler handler) {
66    eventsToDispatch.offer(new EventWithHandler(event, handler));
67  }
68
69  /**
70   * Dispatch {@code events} in the order they were posted, regardless of
71   * the posting thread.
72   */
73  @Override
74  protected void dispatchQueuedEvents() {
75    while (true) {
76      EventWithHandler eventWithHandler = eventsToDispatch.poll();
77      if (eventWithHandler == null) {
78        break;
79      }
80
81      dispatch(eventWithHandler.event, eventWithHandler.handler);
82    }
83  }
84
85  /**
86   * Calls the {@link #executor} to dispatch {@code event} to {@code handler}.
87   */
88  @Override
89  protected void dispatch(final Object event, final EventHandler handler) {
90    executor.execute(new Runnable() {
91          @Override
92          @SuppressWarnings("synthetic-access")
93          public void run() {
94            AsyncEventBus.super.dispatch(event, handler);
95          }
96        });
97  }
98
99}
100