1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/common/message_pump_mojo.h"
6
7#include <algorithm>
8#include <vector>
9
10#include "base/debug/alias.h"
11#include "base/lazy_instance.h"
12#include "base/logging.h"
13#include "base/threading/thread_local.h"
14#include "base/time/time.h"
15#include "mojo/common/message_pump_mojo_handler.h"
16#include "mojo/common/time_helper.h"
17
18namespace mojo {
19namespace common {
20namespace {
21
22base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
23    g_tls_current_pump = LAZY_INSTANCE_INITIALIZER;
24
25MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
26                                     base::TimeTicks now) {
27  // The is_null() check matches that of HandleWatcher as well as how
28  // |delayed_work_time| is used.
29  if (time_ticks.is_null())
30    return MOJO_DEADLINE_INDEFINITE;
31  const int64_t delta = (time_ticks - now).InMicroseconds();
32  return delta < 0 ? static_cast<MojoDeadline>(0) :
33                     static_cast<MojoDeadline>(delta);
34}
35
36}  // namespace
37
38// State needed for one iteration of WaitMany. The first handle and flags
39// corresponds to that of the control pipe.
40struct MessagePumpMojo::WaitState {
41  std::vector<Handle> handles;
42  std::vector<MojoHandleSignals> wait_signals;
43};
44
45struct MessagePumpMojo::RunState {
46  RunState() : should_quit(false) {
47    CreateMessagePipe(NULL, &read_handle, &write_handle);
48  }
49
50  base::TimeTicks delayed_work_time;
51
52  // Used to wake up WaitForWork().
53  ScopedMessagePipeHandle read_handle;
54  ScopedMessagePipeHandle write_handle;
55
56  bool should_quit;
57};
58
59MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
60  DCHECK(!current())
61      << "There is already a MessagePumpMojo instance on this thread.";
62  g_tls_current_pump.Pointer()->Set(this);
63}
64
65MessagePumpMojo::~MessagePumpMojo() {
66  DCHECK_EQ(this, current());
67  g_tls_current_pump.Pointer()->Set(NULL);
68}
69
70// static
71scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
72  return scoped_ptr<MessagePump>(new MessagePumpMojo());
73}
74
75// static
76MessagePumpMojo* MessagePumpMojo::current() {
77  return g_tls_current_pump.Pointer()->Get();
78}
79
80void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
81                                 const Handle& handle,
82                                 MojoHandleSignals wait_signals,
83                                 base::TimeTicks deadline) {
84  CHECK(handler);
85  DCHECK(handle.is_valid());
86  // Assume it's an error if someone tries to reregister an existing handle.
87  CHECK_EQ(0u, handlers_.count(handle));
88  Handler handler_data;
89  handler_data.handler = handler;
90  handler_data.wait_signals = wait_signals;
91  handler_data.deadline = deadline;
92  handler_data.id = next_handler_id_++;
93  handlers_[handle] = handler_data;
94}
95
96void MessagePumpMojo::RemoveHandler(const Handle& handle) {
97  handlers_.erase(handle);
98}
99
100void MessagePumpMojo::Run(Delegate* delegate) {
101  RunState run_state;
102  // TODO: better deal with error handling.
103  CHECK(run_state.read_handle.is_valid());
104  CHECK(run_state.write_handle.is_valid());
105  RunState* old_state = NULL;
106  {
107    base::AutoLock auto_lock(run_state_lock_);
108    old_state = run_state_;
109    run_state_ = &run_state;
110  }
111  DoRunLoop(&run_state, delegate);
112  {
113    base::AutoLock auto_lock(run_state_lock_);
114    run_state_ = old_state;
115  }
116}
117
118void MessagePumpMojo::Quit() {
119  base::AutoLock auto_lock(run_state_lock_);
120  if (run_state_)
121    run_state_->should_quit = true;
122}
123
124void MessagePumpMojo::ScheduleWork() {
125  base::AutoLock auto_lock(run_state_lock_);
126  if (run_state_)
127    SignalControlPipe(*run_state_);
128}
129
130void MessagePumpMojo::ScheduleDelayedWork(
131    const base::TimeTicks& delayed_work_time) {
132  base::AutoLock auto_lock(run_state_lock_);
133  if (!run_state_)
134    return;
135  run_state_->delayed_work_time = delayed_work_time;
136}
137
138void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
139  bool more_work_is_plausible = true;
140  for (;;) {
141    const bool block = !more_work_is_plausible;
142    DoInternalWork(*run_state, block);
143
144    // There isn't a good way to know if there are more handles ready, we assume
145    // not.
146    more_work_is_plausible = false;
147
148    if (run_state->should_quit)
149      break;
150
151    more_work_is_plausible |= delegate->DoWork();
152    if (run_state->should_quit)
153      break;
154
155    more_work_is_plausible |= delegate->DoDelayedWork(
156        &run_state->delayed_work_time);
157    if (run_state->should_quit)
158      break;
159
160    if (more_work_is_plausible)
161      continue;
162
163    more_work_is_plausible = delegate->DoIdleWork();
164    if (run_state->should_quit)
165      break;
166  }
167}
168
169void MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
170  const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
171  const WaitState wait_state = GetWaitState(run_state);
172  const MojoResult result =
173      WaitMany(wait_state.handles, wait_state.wait_signals, deadline);
174  if (result == 0) {
175    // Control pipe was written to.
176    uint32_t num_bytes = 0;
177    ReadMessageRaw(run_state.read_handle.get(), NULL, &num_bytes, NULL, NULL,
178                   MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
179  } else if (result > 0) {
180    const size_t index = static_cast<size_t>(result);
181    DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
182    handlers_[wait_state.handles[index]].handler->OnHandleReady(
183        wait_state.handles[index]);
184  } else {
185    switch (result) {
186      case MOJO_RESULT_CANCELLED:
187      case MOJO_RESULT_FAILED_PRECONDITION:
188        RemoveFirstInvalidHandle(wait_state);
189        break;
190      case MOJO_RESULT_DEADLINE_EXCEEDED:
191        break;
192      default:
193        base::debug::Alias(&result);
194        // Unexpected result is likely fatal, crash so we can determine cause.
195        CHECK(false);
196    }
197  }
198
199  // Notify and remove any handlers whose time has expired. Make a copy in case
200  // someone tries to add/remove new handlers from notification.
201  const HandleToHandler cloned_handlers(handlers_);
202  const base::TimeTicks now(internal::NowTicks());
203  for (HandleToHandler::const_iterator i = cloned_handlers.begin();
204       i != cloned_handlers.end(); ++i) {
205    // Since we're iterating over a clone of the handlers, verify the handler is
206    // still valid before notifying.
207    if (!i->second.deadline.is_null() && i->second.deadline < now &&
208        handlers_.find(i->first) != handlers_.end() &&
209        handlers_[i->first].id == i->second.id) {
210      i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
211    }
212  }
213}
214
215void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) {
216  // TODO(sky): deal with control pipe going bad.
217  for (size_t i = 0; i < wait_state.handles.size(); ++i) {
218    const MojoResult result =
219        Wait(wait_state.handles[i], wait_state.wait_signals[i], 0);
220    if (result == MOJO_RESULT_INVALID_ARGUMENT) {
221      // We should never have an invalid argument. If we do it indicates
222      // RemoveHandler() was not invoked and is likely to cause problems else
223      // where in the stack if we ignore it.
224      CHECK(false);
225    } else if (result == MOJO_RESULT_FAILED_PRECONDITION ||
226               result == MOJO_RESULT_CANCELLED) {
227      CHECK_NE(i, 0u);  // Indicates the control pipe went bad.
228
229      // Remove the handle first, this way if OnHandleError() tries to remove
230      // the handle our iterator isn't invalidated.
231      CHECK(handlers_.find(wait_state.handles[i]) != handlers_.end());
232      MessagePumpMojoHandler* handler =
233          handlers_[wait_state.handles[i]].handler;
234      handlers_.erase(wait_state.handles[i]);
235      handler->OnHandleError(wait_state.handles[i], result);
236      return;
237    }
238  }
239}
240
241void MessagePumpMojo::SignalControlPipe(const RunState& run_state) {
242  const MojoResult result =
243      WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0,
244                      MOJO_WRITE_MESSAGE_FLAG_NONE);
245  // If we can't write we likely won't wake up the thread and there is a strong
246  // chance we'll deadlock.
247  CHECK_EQ(MOJO_RESULT_OK, result);
248}
249
250MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState(
251    const RunState& run_state) const {
252  WaitState wait_state;
253  wait_state.handles.push_back(run_state.read_handle.get());
254  wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
255
256  for (HandleToHandler::const_iterator i = handlers_.begin();
257       i != handlers_.end(); ++i) {
258    wait_state.handles.push_back(i->first);
259    wait_state.wait_signals.push_back(i->second.wait_signals);
260  }
261  return wait_state;
262}
263
264MojoDeadline MessagePumpMojo::GetDeadlineForWait(
265    const RunState& run_state) const {
266  const base::TimeTicks now(internal::NowTicks());
267  MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
268                                                  now);
269  for (HandleToHandler::const_iterator i = handlers_.begin();
270       i != handlers_.end(); ++i) {
271    deadline = std::min(
272        TimeTicksToMojoDeadline(i->second.deadline, now), deadline);
273  }
274  return deadline;
275}
276
277}  // namespace common
278}  // namespace mojo
279