wait_set_dispatcher.cc revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2015 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/edk/system/wait_set_dispatcher.h"
6
7#include <stdint.h>
8
9#include <algorithm>
10#include <utility>
11
12#include "base/logging.h"
13#include "mojo/edk/system/awakable.h"
14
15namespace mojo {
16namespace edk {
17
18class WaitSetDispatcher::Waiter final : public Awakable {
19 public:
20  explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
21  ~Waiter() {}
22
23  // |Awakable| implementation.
24  bool Awake(MojoResult result, uintptr_t context) override {
25    // Note: This is called with various Mojo locks held.
26    dispatcher_->WakeDispatcher(result, context);
27    // Removes |this| from the dispatcher's list of waiters.
28    return false;
29  }
30
31 private:
32  WaitSetDispatcher* const dispatcher_;
33};
34
35WaitSetDispatcher::WaitState::WaitState() {}
36
37WaitSetDispatcher::WaitState::WaitState(const WaitState& other) = default;
38
39WaitSetDispatcher::WaitState::~WaitState() {}
40
41WaitSetDispatcher::WaitSetDispatcher()
42    : waiter_(new WaitSetDispatcher::Waiter(this)) {}
43
44Dispatcher::Type WaitSetDispatcher::GetType() const {
45  return Type::WAIT_SET;
46}
47
48MojoResult WaitSetDispatcher::Close() {
49  base::AutoLock lock(lock_);
50
51  if (is_closed_)
52    return MOJO_RESULT_INVALID_ARGUMENT;
53  is_closed_ = true;
54
55  {
56    base::AutoLock locker(awakable_lock_);
57    awakable_list_.CancelAll();
58  }
59
60  for (const auto& entry : waiting_dispatchers_)
61    entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
62  waiting_dispatchers_.clear();
63
64  base::AutoLock locker(awoken_lock_);
65  awoken_queue_.clear();
66  processed_dispatchers_.clear();
67
68  return MOJO_RESULT_OK;
69}
70
71MojoResult WaitSetDispatcher::AddWaitingDispatcher(
72    const scoped_refptr<Dispatcher>& dispatcher,
73    MojoHandleSignals signals,
74    uintptr_t context) {
75  if (dispatcher == this)
76    return MOJO_RESULT_INVALID_ARGUMENT;
77
78  base::AutoLock lock(lock_);
79
80  if (is_closed_)
81    return MOJO_RESULT_INVALID_ARGUMENT;
82
83  uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
84  auto it = waiting_dispatchers_.find(dispatcher_handle);
85  if (it != waiting_dispatchers_.end()) {
86    return MOJO_RESULT_ALREADY_EXISTS;
87  }
88
89  const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
90                                                    dispatcher_handle, nullptr);
91  if (result == MOJO_RESULT_INVALID_ARGUMENT) {
92    // Dispatcher is closed.
93    return result;
94  } else if (result != MOJO_RESULT_OK) {
95    WakeDispatcher(result, dispatcher_handle);
96  }
97
98  WaitState state;
99  state.dispatcher = dispatcher;
100  state.context = context;
101  state.signals = signals;
102  bool inserted = waiting_dispatchers_.insert(
103      std::make_pair(dispatcher_handle, state)).second;
104  DCHECK(inserted);
105
106  return MOJO_RESULT_OK;
107}
108
109MojoResult WaitSetDispatcher::RemoveWaitingDispatcher(
110    const scoped_refptr<Dispatcher>& dispatcher) {
111  uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
112
113  base::AutoLock lock(lock_);
114  if (is_closed_)
115    return MOJO_RESULT_INVALID_ARGUMENT;
116
117  auto it = waiting_dispatchers_.find(dispatcher_handle);
118  if (it == waiting_dispatchers_.end())
119    return MOJO_RESULT_NOT_FOUND;
120
121  dispatcher->RemoveAwakable(waiter_.get(), nullptr);
122  // At this point, it should not be possible for |waiter_| to be woken with
123  // |dispatcher|.
124  waiting_dispatchers_.erase(it);
125
126  base::AutoLock locker(awoken_lock_);
127  int num_erased = 0;
128  for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
129    if (it->first == dispatcher_handle) {
130      it = awoken_queue_.erase(it);
131      num_erased++;
132    } else {
133      ++it;
134    }
135  }
136  // The dispatcher should only exist in the queue once.
137  DCHECK_LE(num_erased, 1);
138  processed_dispatchers_.erase(
139      std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
140                  dispatcher_handle),
141      processed_dispatchers_.end());
142
143  return MOJO_RESULT_OK;
144}
145
146MojoResult WaitSetDispatcher::GetReadyDispatchers(
147    uint32_t* count,
148    DispatcherVector* dispatchers,
149    MojoResult* results,
150    uintptr_t* contexts) {
151  base::AutoLock lock(lock_);
152
153  if (is_closed_)
154    return MOJO_RESULT_INVALID_ARGUMENT;
155
156  dispatchers->clear();
157
158  // Re-queue any already retrieved dispatchers. These should be the dispatchers
159  // that were returned on the last call to this function. This loop is
160  // necessary to preserve the logically level-triggering behaviour of waiting
161  // in Mojo. In particular, if no action is taken on a signal, that signal
162  // continues to be satisfied, and therefore a |MojoWait()| on that
163  // handle/signal continues to return immediately.
164  std::deque<uintptr_t> pending;
165  {
166    base::AutoLock locker(awoken_lock_);
167    pending.swap(processed_dispatchers_);
168  }
169  for (uintptr_t d : pending) {
170    auto it = waiting_dispatchers_.find(d);
171    // Anything in |processed_dispatchers_| should also be in
172    // |waiting_dispatchers_| since dispatchers are removed from both in
173    // |RemoveWaitingDispatcherImplNoLock()|.
174    DCHECK(it != waiting_dispatchers_.end());
175
176    // |awoken_mutex_| cannot be held here because
177    // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
178    // mutex is held while running |WakeDispatcher()| below, which needs to
179    // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
180    // a deadlock.
181    const MojoResult result = it->second.dispatcher->AddAwakable(
182        waiter_.get(), it->second.signals, d, nullptr);
183
184    if (result == MOJO_RESULT_INVALID_ARGUMENT) {
185      // Dispatcher is closed. Implicitly remove it from the wait set since
186      // it may be impossible to remove using |MojoRemoveHandle()|.
187      waiting_dispatchers_.erase(it);
188    } else if (result != MOJO_RESULT_OK) {
189      WakeDispatcher(result, d);
190    }
191  }
192
193  const uint32_t max_woken = *count;
194  uint32_t num_woken = 0;
195
196  base::AutoLock locker(awoken_lock_);
197  while (!awoken_queue_.empty() && num_woken < max_woken) {
198    uintptr_t d = awoken_queue_.front().first;
199    MojoResult result = awoken_queue_.front().second;
200    awoken_queue_.pop_front();
201
202    auto it = waiting_dispatchers_.find(d);
203    DCHECK(it != waiting_dispatchers_.end());
204
205    results[num_woken] = result;
206    dispatchers->push_back(it->second.dispatcher);
207    if (contexts)
208      contexts[num_woken] = it->second.context;
209
210    if (result != MOJO_RESULT_CANCELLED) {
211      processed_dispatchers_.push_back(d);
212    } else {
213      // |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed.
214      // Return it, but also implcitly remove it from the wait set.
215      waiting_dispatchers_.erase(it);
216    }
217
218    num_woken++;
219  }
220
221  *count = num_woken;
222  if (!num_woken)
223    return MOJO_RESULT_SHOULD_WAIT;
224
225  return MOJO_RESULT_OK;
226}
227
228HandleSignalsState WaitSetDispatcher::GetHandleSignalsState() const {
229  base::AutoLock lock(lock_);
230  return GetHandleSignalsStateNoLock();
231}
232
233HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateNoLock() const {
234  lock_.AssertAcquired();
235  if (is_closed_)
236    return HandleSignalsState();
237
238  HandleSignalsState rv;
239  rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
240  base::AutoLock locker(awoken_lock_);
241  if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
242    rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
243  return rv;
244}
245
246MojoResult WaitSetDispatcher::AddAwakable(Awakable* awakable,
247                                          MojoHandleSignals signals,
248                                          uintptr_t context,
249                                          HandleSignalsState* signals_state) {
250  base::AutoLock lock(lock_);
251  // |awakable_lock_| is acquired here instead of immediately before adding to
252  // |awakable_list_| because we need to check the signals state and add to
253  // |awakable_list_| as an atomic operation. If the pair isn't atomic, it is
254  // possible for the signals state to change after it is checked, but before
255  // the awakable is added. In that case, the added awakable won't be signalled.
256  base::AutoLock awakable_locker(awakable_lock_);
257  HandleSignalsState state(GetHandleSignalsStateNoLock());
258  if (state.satisfies(signals)) {
259    if (signals_state)
260      *signals_state = state;
261    return MOJO_RESULT_ALREADY_EXISTS;
262  }
263  if (!state.can_satisfy(signals)) {
264    if (signals_state)
265      *signals_state = state;
266    return MOJO_RESULT_FAILED_PRECONDITION;
267  }
268
269  awakable_list_.Add(awakable, signals, context);
270  return MOJO_RESULT_OK;
271}
272
273void WaitSetDispatcher::RemoveAwakable(Awakable* awakable,
274                                       HandleSignalsState* signals_state) {
275  {
276    base::AutoLock locker(awakable_lock_);
277    awakable_list_.Remove(awakable);
278  }
279  if (signals_state)
280    *signals_state = GetHandleSignalsState();
281}
282
283bool WaitSetDispatcher::BeginTransit() {
284  // You can't transfer wait sets!
285  return false;
286}
287
288WaitSetDispatcher::~WaitSetDispatcher() {
289  DCHECK(waiting_dispatchers_.empty());
290  DCHECK(awoken_queue_.empty());
291  DCHECK(processed_dispatchers_.empty());
292}
293
294void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
295  {
296    base::AutoLock locker(awoken_lock_);
297
298    if (result == MOJO_RESULT_ALREADY_EXISTS)
299      result = MOJO_RESULT_OK;
300
301    awoken_queue_.push_back(std::make_pair(context, result));
302  }
303
304  base::AutoLock locker(awakable_lock_);
305  HandleSignalsState signals_state;
306  signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
307  signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
308  awakable_list_.AwakeForStateChange(signals_state);
309}
310
311}  // namespace edk
312}  // namespace mojo
313