1/*
2 * Copyright (C) 2014 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.common.util.concurrent;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20
21import com.google.common.base.Preconditions;
22import com.google.common.collect.Queues;
23
24import java.util.Queue;
25import java.util.concurrent.Executor;
26import java.util.logging.Level;
27import java.util.logging.Logger;
28
29import javax.annotation.concurrent.GuardedBy;
30
31/**
32 * A special purpose queue/executor that executes listener callbacks serially on a configured
33 * executor.  Each callback task can be enqueued and executed as separate phases.
34 *
35 * <p>This class is very similar to {@link SerializingExecutor} with the exception that tasks can
36 * be enqueued without necessarily executing immediately.
37 */
38final class ListenerCallQueue<L> implements Runnable {
39  // TODO(cpovirk): consider using the logger associated with listener.getClass().
40  private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName());
41
42  abstract static class Callback<L> {
43    private final String methodCall;
44
45    Callback(String methodCall) {
46      this.methodCall = methodCall;
47    }
48
49    abstract void call(L listener);
50
51    /** Helper method to add this callback to all the queues. */
52    void enqueueOn(Iterable<ListenerCallQueue<L>> queues) {
53      for (ListenerCallQueue<L> queue : queues) {
54        queue.add(this);
55      }
56    }
57  }
58
59  private final L listener;
60  private final Executor executor;
61
62  @GuardedBy("this") private final Queue<Callback<L>> waitQueue = Queues.newArrayDeque();
63  @GuardedBy("this") private boolean isThreadScheduled;
64
65  ListenerCallQueue(L listener, Executor executor) {
66    this.listener = checkNotNull(listener);
67    this.executor = checkNotNull(executor);
68  }
69
70  /** Enqueues a task to be run. */
71  synchronized void add(Callback<L> callback) {
72    waitQueue.add(callback);
73  }
74
75  /** Executes all listeners {@linkplain #add added} prior to this call, serially and in order.*/
76  void execute() {
77    boolean scheduleTaskRunner = false;
78    synchronized (this) {
79      if (!isThreadScheduled) {
80        isThreadScheduled = true;
81        scheduleTaskRunner = true;
82      }
83    }
84    if (scheduleTaskRunner) {
85      try {
86        executor.execute(this);
87      } catch (RuntimeException e) {
88        // reset state in case of an error so that later calls to execute will actually do something
89        synchronized (this) {
90          isThreadScheduled = false;
91        }
92        // Log it and keep going.
93        logger.log(Level.SEVERE,
94            "Exception while running callbacks for " + listener + " on " + executor,
95            e);
96        throw e;
97      }
98    }
99  }
100
101  @Override public void run() {
102    boolean stillRunning = true;
103    try {
104      while (true) {
105        Callback<L> nextToRun;
106        synchronized (ListenerCallQueue.this) {
107          Preconditions.checkState(isThreadScheduled);
108          nextToRun = waitQueue.poll();
109          if (nextToRun == null) {
110            isThreadScheduled = false;
111            stillRunning = false;
112            break;
113          }
114        }
115
116        // Always run while _not_ holding the lock, to avoid deadlocks.
117        try {
118          nextToRun.call(listener);
119        } catch (RuntimeException e) {
120          // Log it and keep going.
121          logger.log(Level.SEVERE,
122              "Exception while executing callback: " + listener + "." + nextToRun.methodCall,
123              e);
124        }
125      }
126    } finally {
127      if (stillRunning) {
128        // An Error is bubbling up, we should mark ourselves as no longer
129        // running, that way if anyone tries to keep using us we won't be
130        // corrupted.
131        synchronized (ListenerCallQueue.this) {
132          isThreadScheduled = false;
133        }
134      }
135    }
136  }
137}
138