wait_set_dispatcher.cc revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2015 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/edk/system/wait_set_dispatcher.h" 6 7#include <stdint.h> 8 9#include <algorithm> 10#include <utility> 11 12#include "base/logging.h" 13#include "mojo/edk/system/awakable.h" 14 15namespace mojo { 16namespace edk { 17 18class WaitSetDispatcher::Waiter final : public Awakable { 19 public: 20 explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {} 21 ~Waiter() {} 22 23 // |Awakable| implementation. 24 bool Awake(MojoResult result, uintptr_t context) override { 25 // Note: This is called with various Mojo locks held. 26 dispatcher_->WakeDispatcher(result, context); 27 // Removes |this| from the dispatcher's list of waiters. 28 return false; 29 } 30 31 private: 32 WaitSetDispatcher* const dispatcher_; 33}; 34 35WaitSetDispatcher::WaitState::WaitState() {} 36 37WaitSetDispatcher::WaitState::WaitState(const WaitState& other) = default; 38 39WaitSetDispatcher::WaitState::~WaitState() {} 40 41WaitSetDispatcher::WaitSetDispatcher() 42 : waiter_(new WaitSetDispatcher::Waiter(this)) {} 43 44Dispatcher::Type WaitSetDispatcher::GetType() const { 45 return Type::WAIT_SET; 46} 47 48MojoResult WaitSetDispatcher::Close() { 49 base::AutoLock lock(lock_); 50 51 if (is_closed_) 52 return MOJO_RESULT_INVALID_ARGUMENT; 53 is_closed_ = true; 54 55 { 56 base::AutoLock locker(awakable_lock_); 57 awakable_list_.CancelAll(); 58 } 59 60 for (const auto& entry : waiting_dispatchers_) 61 entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr); 62 waiting_dispatchers_.clear(); 63 64 base::AutoLock locker(awoken_lock_); 65 awoken_queue_.clear(); 66 processed_dispatchers_.clear(); 67 68 return MOJO_RESULT_OK; 69} 70 71MojoResult WaitSetDispatcher::AddWaitingDispatcher( 72 const scoped_refptr<Dispatcher>& dispatcher, 73 MojoHandleSignals signals, 74 uintptr_t context) { 75 if (dispatcher == this) 76 return MOJO_RESULT_INVALID_ARGUMENT; 77 78 base::AutoLock lock(lock_); 79 80 if (is_closed_) 81 return MOJO_RESULT_INVALID_ARGUMENT; 82 83 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); 84 auto it = waiting_dispatchers_.find(dispatcher_handle); 85 if (it != waiting_dispatchers_.end()) { 86 return MOJO_RESULT_ALREADY_EXISTS; 87 } 88 89 const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals, 90 dispatcher_handle, nullptr); 91 if (result == MOJO_RESULT_INVALID_ARGUMENT) { 92 // Dispatcher is closed. 93 return result; 94 } else if (result != MOJO_RESULT_OK) { 95 WakeDispatcher(result, dispatcher_handle); 96 } 97 98 WaitState state; 99 state.dispatcher = dispatcher; 100 state.context = context; 101 state.signals = signals; 102 bool inserted = waiting_dispatchers_.insert( 103 std::make_pair(dispatcher_handle, state)).second; 104 DCHECK(inserted); 105 106 return MOJO_RESULT_OK; 107} 108 109MojoResult WaitSetDispatcher::RemoveWaitingDispatcher( 110 const scoped_refptr<Dispatcher>& dispatcher) { 111 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); 112 113 base::AutoLock lock(lock_); 114 if (is_closed_) 115 return MOJO_RESULT_INVALID_ARGUMENT; 116 117 auto it = waiting_dispatchers_.find(dispatcher_handle); 118 if (it == waiting_dispatchers_.end()) 119 return MOJO_RESULT_NOT_FOUND; 120 121 dispatcher->RemoveAwakable(waiter_.get(), nullptr); 122 // At this point, it should not be possible for |waiter_| to be woken with 123 // |dispatcher|. 124 waiting_dispatchers_.erase(it); 125 126 base::AutoLock locker(awoken_lock_); 127 int num_erased = 0; 128 for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) { 129 if (it->first == dispatcher_handle) { 130 it = awoken_queue_.erase(it); 131 num_erased++; 132 } else { 133 ++it; 134 } 135 } 136 // The dispatcher should only exist in the queue once. 137 DCHECK_LE(num_erased, 1); 138 processed_dispatchers_.erase( 139 std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(), 140 dispatcher_handle), 141 processed_dispatchers_.end()); 142 143 return MOJO_RESULT_OK; 144} 145 146MojoResult WaitSetDispatcher::GetReadyDispatchers( 147 uint32_t* count, 148 DispatcherVector* dispatchers, 149 MojoResult* results, 150 uintptr_t* contexts) { 151 base::AutoLock lock(lock_); 152 153 if (is_closed_) 154 return MOJO_RESULT_INVALID_ARGUMENT; 155 156 dispatchers->clear(); 157 158 // Re-queue any already retrieved dispatchers. These should be the dispatchers 159 // that were returned on the last call to this function. This loop is 160 // necessary to preserve the logically level-triggering behaviour of waiting 161 // in Mojo. In particular, if no action is taken on a signal, that signal 162 // continues to be satisfied, and therefore a |MojoWait()| on that 163 // handle/signal continues to return immediately. 164 std::deque<uintptr_t> pending; 165 { 166 base::AutoLock locker(awoken_lock_); 167 pending.swap(processed_dispatchers_); 168 } 169 for (uintptr_t d : pending) { 170 auto it = waiting_dispatchers_.find(d); 171 // Anything in |processed_dispatchers_| should also be in 172 // |waiting_dispatchers_| since dispatchers are removed from both in 173 // |RemoveWaitingDispatcherImplNoLock()|. 174 DCHECK(it != waiting_dispatchers_.end()); 175 176 // |awoken_mutex_| cannot be held here because 177 // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This 178 // mutex is held while running |WakeDispatcher()| below, which needs to 179 // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in 180 // a deadlock. 181 const MojoResult result = it->second.dispatcher->AddAwakable( 182 waiter_.get(), it->second.signals, d, nullptr); 183 184 if (result == MOJO_RESULT_INVALID_ARGUMENT) { 185 // Dispatcher is closed. Implicitly remove it from the wait set since 186 // it may be impossible to remove using |MojoRemoveHandle()|. 187 waiting_dispatchers_.erase(it); 188 } else if (result != MOJO_RESULT_OK) { 189 WakeDispatcher(result, d); 190 } 191 } 192 193 const uint32_t max_woken = *count; 194 uint32_t num_woken = 0; 195 196 base::AutoLock locker(awoken_lock_); 197 while (!awoken_queue_.empty() && num_woken < max_woken) { 198 uintptr_t d = awoken_queue_.front().first; 199 MojoResult result = awoken_queue_.front().second; 200 awoken_queue_.pop_front(); 201 202 auto it = waiting_dispatchers_.find(d); 203 DCHECK(it != waiting_dispatchers_.end()); 204 205 results[num_woken] = result; 206 dispatchers->push_back(it->second.dispatcher); 207 if (contexts) 208 contexts[num_woken] = it->second.context; 209 210 if (result != MOJO_RESULT_CANCELLED) { 211 processed_dispatchers_.push_back(d); 212 } else { 213 // |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed. 214 // Return it, but also implcitly remove it from the wait set. 215 waiting_dispatchers_.erase(it); 216 } 217 218 num_woken++; 219 } 220 221 *count = num_woken; 222 if (!num_woken) 223 return MOJO_RESULT_SHOULD_WAIT; 224 225 return MOJO_RESULT_OK; 226} 227 228HandleSignalsState WaitSetDispatcher::GetHandleSignalsState() const { 229 base::AutoLock lock(lock_); 230 return GetHandleSignalsStateNoLock(); 231} 232 233HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateNoLock() const { 234 lock_.AssertAcquired(); 235 if (is_closed_) 236 return HandleSignalsState(); 237 238 HandleSignalsState rv; 239 rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; 240 base::AutoLock locker(awoken_lock_); 241 if (!awoken_queue_.empty() || !processed_dispatchers_.empty()) 242 rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; 243 return rv; 244} 245 246MojoResult WaitSetDispatcher::AddAwakable(Awakable* awakable, 247 MojoHandleSignals signals, 248 uintptr_t context, 249 HandleSignalsState* signals_state) { 250 base::AutoLock lock(lock_); 251 // |awakable_lock_| is acquired here instead of immediately before adding to 252 // |awakable_list_| because we need to check the signals state and add to 253 // |awakable_list_| as an atomic operation. If the pair isn't atomic, it is 254 // possible for the signals state to change after it is checked, but before 255 // the awakable is added. In that case, the added awakable won't be signalled. 256 base::AutoLock awakable_locker(awakable_lock_); 257 HandleSignalsState state(GetHandleSignalsStateNoLock()); 258 if (state.satisfies(signals)) { 259 if (signals_state) 260 *signals_state = state; 261 return MOJO_RESULT_ALREADY_EXISTS; 262 } 263 if (!state.can_satisfy(signals)) { 264 if (signals_state) 265 *signals_state = state; 266 return MOJO_RESULT_FAILED_PRECONDITION; 267 } 268 269 awakable_list_.Add(awakable, signals, context); 270 return MOJO_RESULT_OK; 271} 272 273void WaitSetDispatcher::RemoveAwakable(Awakable* awakable, 274 HandleSignalsState* signals_state) { 275 { 276 base::AutoLock locker(awakable_lock_); 277 awakable_list_.Remove(awakable); 278 } 279 if (signals_state) 280 *signals_state = GetHandleSignalsState(); 281} 282 283bool WaitSetDispatcher::BeginTransit() { 284 // You can't transfer wait sets! 285 return false; 286} 287 288WaitSetDispatcher::~WaitSetDispatcher() { 289 DCHECK(waiting_dispatchers_.empty()); 290 DCHECK(awoken_queue_.empty()); 291 DCHECK(processed_dispatchers_.empty()); 292} 293 294void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) { 295 { 296 base::AutoLock locker(awoken_lock_); 297 298 if (result == MOJO_RESULT_ALREADY_EXISTS) 299 result = MOJO_RESULT_OK; 300 301 awoken_queue_.push_back(std::make_pair(context, result)); 302 } 303 304 base::AutoLock locker(awakable_lock_); 305 HandleSignalsState signals_state; 306 signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; 307 signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; 308 awakable_list_.AwakeForStateChange(signals_state); 309} 310 311} // namespace edk 312} // namespace mojo 313