1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/common/handle_watcher.h" 6 7#include <map> 8 9#include "base/atomic_sequence_num.h" 10#include "base/bind.h" 11#include "base/lazy_instance.h" 12#include "base/memory/weak_ptr.h" 13#include "base/message_loop/message_loop.h" 14#include "base/message_loop/message_loop_proxy.h" 15#include "base/threading/thread.h" 16#include "base/time/tick_clock.h" 17#include "base/time/time.h" 18#include "mojo/common/message_pump_mojo.h" 19#include "mojo/common/message_pump_mojo_handler.h" 20 21namespace mojo { 22namespace common { 23 24typedef int WatcherID; 25 26namespace { 27 28const char kWatcherThreadName[] = "handle-watcher-thread"; 29 30// TODO(sky): this should be unnecessary once MessageLoop has been refactored. 31MessagePumpMojo* message_pump_mojo = NULL; 32 33scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { 34 message_pump_mojo = new MessagePumpMojo; 35 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass(); 36} 37 38// Tracks the data for a single call to Start(). 39struct WatchData { 40 WatchData() 41 : id(0), 42 wait_flags(MOJO_WAIT_FLAG_NONE), 43 message_loop(NULL) {} 44 45 WatcherID id; 46 Handle handle; 47 MojoWaitFlags wait_flags; 48 base::TimeTicks deadline; 49 base::Callback<void(MojoResult)> callback; 50 scoped_refptr<base::MessageLoopProxy> message_loop; 51}; 52 53// WatcherBackend -------------------------------------------------------------- 54 55// WatcherBackend is responsible for managing the requests and interacting with 56// MessagePumpMojo. All access (outside of creation/destruction) is done on the 57// thread WatcherThreadManager creates. 58class WatcherBackend : public MessagePumpMojoHandler { 59 public: 60 WatcherBackend(); 61 virtual ~WatcherBackend(); 62 63 void StartWatching(const WatchData& data); 64 void StopWatching(WatcherID watcher_id); 65 66 private: 67 typedef std::map<Handle, WatchData> HandleToWatchDataMap; 68 69 // Invoked when a handle needs to be removed and notified. 70 void RemoveAndNotify(const Handle& handle, MojoResult result); 71 72 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found 73 // and sets |handle| to the Handle. Returns false if not a known id. 74 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; 75 76 // MessagePumpMojoHandler overrides: 77 virtual void OnHandleReady(const Handle& handle) OVERRIDE; 78 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE; 79 80 // Maps from assigned id to WatchData. 81 HandleToWatchDataMap handle_to_data_; 82 83 DISALLOW_COPY_AND_ASSIGN(WatcherBackend); 84}; 85 86WatcherBackend::WatcherBackend() { 87} 88 89WatcherBackend::~WatcherBackend() { 90} 91 92void WatcherBackend::StartWatching(const WatchData& data) { 93 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); 94 95 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); 96 97 handle_to_data_[data.handle] = data; 98 message_pump_mojo->AddHandler(this, data.handle, 99 data.wait_flags, 100 data.deadline); 101} 102 103void WatcherBackend::StopWatching(WatcherID watcher_id) { 104 // Because of the thread hop it is entirely possible to get here and not 105 // have a valid handle registered for |watcher_id|. 106 Handle handle; 107 if (!GetMojoHandleByWatcherID(watcher_id, &handle)) 108 return; 109 110 handle_to_data_.erase(handle); 111 message_pump_mojo->RemoveHandler(handle); 112} 113 114void WatcherBackend::RemoveAndNotify(const Handle& handle, 115 MojoResult result) { 116 if (handle_to_data_.count(handle) == 0) 117 return; 118 119 const WatchData data(handle_to_data_[handle]); 120 handle_to_data_.erase(handle); 121 message_pump_mojo->RemoveHandler(handle); 122 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); 123} 124 125bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, 126 Handle* handle) const { 127 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); 128 i != handle_to_data_.end(); ++i) { 129 if (i->second.id == watcher_id) { 130 *handle = i->second.handle; 131 return true; 132 } 133 } 134 return false; 135} 136 137void WatcherBackend::OnHandleReady(const Handle& handle) { 138 RemoveAndNotify(handle, MOJO_RESULT_OK); 139} 140 141void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { 142 RemoveAndNotify(handle, result); 143} 144 145// WatcherThreadManager -------------------------------------------------------- 146 147// WatcherThreadManager manages the background thread that listens for handles 148// to be ready. All requests are handled by WatcherBackend. 149class WatcherThreadManager { 150 public: 151 // Returns the shared instance. 152 static WatcherThreadManager* GetInstance(); 153 154 // Starts watching the requested handle. Returns a unique ID that is used to 155 // stop watching the handle. When the handle is ready |callback| is notified 156 // on the thread StartWatching() was invoked on. 157 // This may be invoked on any thread. 158 WatcherID StartWatching(const Handle& handle, 159 MojoWaitFlags wait_flags, 160 base::TimeTicks deadline, 161 const base::Callback<void(MojoResult)>& callback); 162 163 // Stops watching a handle. 164 // This may be invoked on any thread. 165 void StopWatching(WatcherID watcher_id); 166 167 private: 168 friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>; 169 170 WatcherThreadManager(); 171 ~WatcherThreadManager(); 172 173 base::Thread thread_; 174 175 base::AtomicSequenceNumber watcher_id_generator_; 176 177 WatcherBackend backend_; 178 179 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); 180}; 181 182WatcherThreadManager* WatcherThreadManager::GetInstance() { 183 static base::LazyInstance<WatcherThreadManager> instance = 184 LAZY_INSTANCE_INITIALIZER; 185 return &instance.Get(); 186} 187 188WatcherID WatcherThreadManager::StartWatching( 189 const Handle& handle, 190 MojoWaitFlags wait_flags, 191 base::TimeTicks deadline, 192 const base::Callback<void(MojoResult)>& callback) { 193 WatchData data; 194 data.id = watcher_id_generator_.GetNext(); 195 data.handle = handle; 196 data.callback = callback; 197 data.wait_flags = wait_flags; 198 data.deadline = deadline; 199 data.message_loop = base::MessageLoopProxy::current(); 200 // We outlive |thread_|, so it's safe to use Unretained() here. 201 thread_.message_loop()->PostTask( 202 FROM_HERE, 203 base::Bind(&WatcherBackend::StartWatching, 204 base::Unretained(&backend_), 205 data)); 206 return data.id; 207} 208 209void WatcherThreadManager::StopWatching(WatcherID watcher_id) { 210 // We outlive |thread_|, so it's safe to use Unretained() here. 211 thread_.message_loop()->PostTask( 212 FROM_HERE, 213 base::Bind(&WatcherBackend::StopWatching, 214 base::Unretained(&backend_), 215 watcher_id)); 216} 217 218WatcherThreadManager::WatcherThreadManager() 219 : thread_(kWatcherThreadName) { 220 base::Thread::Options thread_options; 221 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); 222 thread_.StartWithOptions(thread_options); 223} 224 225WatcherThreadManager::~WatcherThreadManager() { 226 thread_.Stop(); 227} 228 229} // namespace 230 231// HandleWatcher::StartState --------------------------------------------------- 232 233// Contains the information passed to Start(). 234struct HandleWatcher::StartState { 235 explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) { 236 } 237 238 ~StartState() { 239 } 240 241 // ID assigned by WatcherThreadManager. 242 WatcherID watcher_id; 243 244 // Callback to notify when done. 245 base::Callback<void(MojoResult)> callback; 246 247 // When Start() is invoked a callback is passed to WatcherThreadManager 248 // using a WeakRef from |weak_refactory_|. The callback invokes 249 // OnHandleReady() (on the thread Start() is invoked from) which in turn 250 // notifies |callback_|. Doing this allows us to reset state when the handle 251 // is ready, and then notify the callback. Doing this also means Stop() 252 // cancels any pending callbacks that may be inflight. 253 base::WeakPtrFactory<HandleWatcher> weak_factory; 254}; 255 256// HandleWatcher --------------------------------------------------------------- 257 258// static 259base::TickClock* HandleWatcher::tick_clock_ = NULL; 260 261HandleWatcher::HandleWatcher() { 262} 263 264HandleWatcher::~HandleWatcher() { 265 Stop(); 266} 267 268void HandleWatcher::Start(const Handle& handle, 269 MojoWaitFlags wait_flags, 270 MojoDeadline deadline, 271 const base::Callback<void(MojoResult)>& callback) { 272 DCHECK(handle.is_valid()); 273 DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags); 274 275 Stop(); 276 277 start_state_.reset(new StartState(this)); 278 start_state_->callback = callback; 279 start_state_->watcher_id = 280 WatcherThreadManager::GetInstance()->StartWatching( 281 handle, 282 wait_flags, 283 MojoDeadlineToTimeTicks(deadline), 284 base::Bind(&HandleWatcher::OnHandleReady, 285 start_state_->weak_factory.GetWeakPtr())); 286} 287 288void HandleWatcher::Stop() { 289 if (!start_state_.get()) 290 return; 291 292 scoped_ptr<StartState> old_state(start_state_.Pass()); 293 WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id); 294} 295 296void HandleWatcher::OnHandleReady(MojoResult result) { 297 DCHECK(start_state_.get()); 298 scoped_ptr<StartState> old_state(start_state_.Pass()); 299 old_state->callback.Run(result); 300 301 // NOTE: We may have been deleted during callback execution. 302} 303 304// static 305base::TimeTicks HandleWatcher::NowTicks() { 306 return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now(); 307} 308 309// static 310base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) { 311 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : 312 NowTicks() + base::TimeDelta::FromMicroseconds(deadline); 313} 314 315} // namespace common 316} // namespace mojo 317