1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/common/handle_watcher.h"
6
7#include <map>
8
9#include "base/atomic_sequence_num.h"
10#include "base/bind.h"
11#include "base/lazy_instance.h"
12#include "base/memory/weak_ptr.h"
13#include "base/message_loop/message_loop.h"
14#include "base/message_loop/message_loop_proxy.h"
15#include "base/threading/thread.h"
16#include "base/time/tick_clock.h"
17#include "base/time/time.h"
18#include "mojo/common/message_pump_mojo.h"
19#include "mojo/common/message_pump_mojo_handler.h"
20
21namespace mojo {
22namespace common {
23
24typedef int WatcherID;
25
26namespace {
27
28const char kWatcherThreadName[] = "handle-watcher-thread";
29
30// TODO(sky): this should be unnecessary once MessageLoop has been refactored.
31MessagePumpMojo* message_pump_mojo = NULL;
32
33scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
34  message_pump_mojo = new MessagePumpMojo;
35  return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
36}
37
38// Tracks the data for a single call to Start().
39struct WatchData {
40  WatchData()
41      : id(0),
42        wait_flags(MOJO_WAIT_FLAG_NONE),
43        message_loop(NULL) {}
44
45  WatcherID id;
46  Handle handle;
47  MojoWaitFlags wait_flags;
48  base::TimeTicks deadline;
49  base::Callback<void(MojoResult)> callback;
50  scoped_refptr<base::MessageLoopProxy> message_loop;
51};
52
53// WatcherBackend --------------------------------------------------------------
54
55// WatcherBackend is responsible for managing the requests and interacting with
56// MessagePumpMojo. All access (outside of creation/destruction) is done on the
57// thread WatcherThreadManager creates.
58class WatcherBackend : public MessagePumpMojoHandler {
59 public:
60  WatcherBackend();
61  virtual ~WatcherBackend();
62
63  void StartWatching(const WatchData& data);
64  void StopWatching(WatcherID watcher_id);
65
66 private:
67  typedef std::map<Handle, WatchData> HandleToWatchDataMap;
68
69  // Invoked when a handle needs to be removed and notified.
70  void RemoveAndNotify(const Handle& handle, MojoResult result);
71
72  // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
73  // and sets |handle| to the Handle. Returns false if not a known id.
74  bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
75
76  // MessagePumpMojoHandler overrides:
77  virtual void OnHandleReady(const Handle& handle) OVERRIDE;
78  virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
79
80  // Maps from assigned id to WatchData.
81  HandleToWatchDataMap handle_to_data_;
82
83  DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
84};
85
86WatcherBackend::WatcherBackend() {
87}
88
89WatcherBackend::~WatcherBackend() {
90}
91
92void WatcherBackend::StartWatching(const WatchData& data) {
93  RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
94
95  DCHECK_EQ(0u, handle_to_data_.count(data.handle));
96
97  handle_to_data_[data.handle] = data;
98  message_pump_mojo->AddHandler(this, data.handle,
99                                data.wait_flags,
100                                data.deadline);
101}
102
103void WatcherBackend::StopWatching(WatcherID watcher_id) {
104  // Because of the thread hop it is entirely possible to get here and not
105  // have a valid handle registered for |watcher_id|.
106  Handle handle;
107  if (!GetMojoHandleByWatcherID(watcher_id, &handle))
108    return;
109
110  handle_to_data_.erase(handle);
111  message_pump_mojo->RemoveHandler(handle);
112}
113
114void WatcherBackend::RemoveAndNotify(const Handle& handle,
115                                     MojoResult result) {
116  if (handle_to_data_.count(handle) == 0)
117    return;
118
119  const WatchData data(handle_to_data_[handle]);
120  handle_to_data_.erase(handle);
121  message_pump_mojo->RemoveHandler(handle);
122  data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
123}
124
125bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
126                                              Handle* handle) const {
127  for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
128       i != handle_to_data_.end(); ++i) {
129    if (i->second.id == watcher_id) {
130      *handle = i->second.handle;
131      return true;
132    }
133  }
134  return false;
135}
136
137void WatcherBackend::OnHandleReady(const Handle& handle) {
138  RemoveAndNotify(handle, MOJO_RESULT_OK);
139}
140
141void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
142  RemoveAndNotify(handle, result);
143}
144
145// WatcherThreadManager --------------------------------------------------------
146
147// WatcherThreadManager manages the background thread that listens for handles
148// to be ready. All requests are handled by WatcherBackend.
149class WatcherThreadManager {
150 public:
151  // Returns the shared instance.
152  static WatcherThreadManager* GetInstance();
153
154  // Starts watching the requested handle. Returns a unique ID that is used to
155  // stop watching the handle. When the handle is ready |callback| is notified
156  // on the thread StartWatching() was invoked on.
157  // This may be invoked on any thread.
158  WatcherID StartWatching(const Handle& handle,
159                          MojoWaitFlags wait_flags,
160                          base::TimeTicks deadline,
161                          const base::Callback<void(MojoResult)>& callback);
162
163  // Stops watching a handle.
164  // This may be invoked on any thread.
165  void StopWatching(WatcherID watcher_id);
166
167 private:
168  friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
169
170  WatcherThreadManager();
171  ~WatcherThreadManager();
172
173  base::Thread thread_;
174
175  base::AtomicSequenceNumber watcher_id_generator_;
176
177  WatcherBackend backend_;
178
179  DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
180};
181
182WatcherThreadManager* WatcherThreadManager::GetInstance() {
183  static base::LazyInstance<WatcherThreadManager> instance =
184      LAZY_INSTANCE_INITIALIZER;
185  return &instance.Get();
186}
187
188WatcherID WatcherThreadManager::StartWatching(
189    const Handle& handle,
190    MojoWaitFlags wait_flags,
191    base::TimeTicks deadline,
192    const base::Callback<void(MojoResult)>& callback) {
193  WatchData data;
194  data.id = watcher_id_generator_.GetNext();
195  data.handle = handle;
196  data.callback = callback;
197  data.wait_flags = wait_flags;
198  data.deadline = deadline;
199  data.message_loop = base::MessageLoopProxy::current();
200  // We outlive |thread_|, so it's safe to use Unretained() here.
201  thread_.message_loop()->PostTask(
202      FROM_HERE,
203      base::Bind(&WatcherBackend::StartWatching,
204                 base::Unretained(&backend_),
205                 data));
206  return data.id;
207}
208
209void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
210  // We outlive |thread_|, so it's safe to use Unretained() here.
211  thread_.message_loop()->PostTask(
212      FROM_HERE,
213      base::Bind(&WatcherBackend::StopWatching,
214                 base::Unretained(&backend_),
215                 watcher_id));
216}
217
218WatcherThreadManager::WatcherThreadManager()
219    : thread_(kWatcherThreadName) {
220  base::Thread::Options thread_options;
221  thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
222  thread_.StartWithOptions(thread_options);
223}
224
225WatcherThreadManager::~WatcherThreadManager() {
226  thread_.Stop();
227}
228
229}  // namespace
230
231// HandleWatcher::StartState ---------------------------------------------------
232
233// Contains the information passed to Start().
234struct HandleWatcher::StartState {
235  explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
236  }
237
238  ~StartState() {
239  }
240
241  // ID assigned by WatcherThreadManager.
242  WatcherID watcher_id;
243
244  // Callback to notify when done.
245  base::Callback<void(MojoResult)> callback;
246
247  // When Start() is invoked a callback is passed to WatcherThreadManager
248  // using a WeakRef from |weak_refactory_|. The callback invokes
249  // OnHandleReady() (on the thread Start() is invoked from) which in turn
250  // notifies |callback_|. Doing this allows us to reset state when the handle
251  // is ready, and then notify the callback. Doing this also means Stop()
252  // cancels any pending callbacks that may be inflight.
253  base::WeakPtrFactory<HandleWatcher> weak_factory;
254};
255
256// HandleWatcher ---------------------------------------------------------------
257
258// static
259base::TickClock* HandleWatcher::tick_clock_ = NULL;
260
261HandleWatcher::HandleWatcher() {
262}
263
264HandleWatcher::~HandleWatcher() {
265  Stop();
266}
267
268void HandleWatcher::Start(const Handle& handle,
269                          MojoWaitFlags wait_flags,
270                          MojoDeadline deadline,
271                          const base::Callback<void(MojoResult)>& callback) {
272  DCHECK(handle.is_valid());
273  DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
274
275  Stop();
276
277  start_state_.reset(new StartState(this));
278  start_state_->callback = callback;
279  start_state_->watcher_id =
280      WatcherThreadManager::GetInstance()->StartWatching(
281          handle,
282          wait_flags,
283          MojoDeadlineToTimeTicks(deadline),
284          base::Bind(&HandleWatcher::OnHandleReady,
285                     start_state_->weak_factory.GetWeakPtr()));
286}
287
288void HandleWatcher::Stop() {
289  if (!start_state_.get())
290    return;
291
292  scoped_ptr<StartState> old_state(start_state_.Pass());
293  WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
294}
295
296void HandleWatcher::OnHandleReady(MojoResult result) {
297  DCHECK(start_state_.get());
298  scoped_ptr<StartState> old_state(start_state_.Pass());
299  old_state->callback.Run(result);
300
301  // NOTE: We may have been deleted during callback execution.
302}
303
304// static
305base::TimeTicks HandleWatcher::NowTicks() {
306  return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now();
307}
308
309// static
310base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
311  return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
312      NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
313}
314
315}  // namespace common
316}  // namespace mojo
317