1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/common/message_pump_mojo.h" 6 7#include <algorithm> 8#include <vector> 9 10#include "base/debug/alias.h" 11#include "base/lazy_instance.h" 12#include "base/logging.h" 13#include "base/threading/thread_local.h" 14#include "base/time/time.h" 15#include "mojo/common/message_pump_mojo_handler.h" 16#include "mojo/common/time_helper.h" 17 18namespace mojo { 19namespace common { 20namespace { 21 22base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky 23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER; 24 25MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, 26 base::TimeTicks now) { 27 // The is_null() check matches that of HandleWatcher as well as how 28 // |delayed_work_time| is used. 29 if (time_ticks.is_null()) 30 return MOJO_DEADLINE_INDEFINITE; 31 const int64_t delta = (time_ticks - now).InMicroseconds(); 32 return delta < 0 ? static_cast<MojoDeadline>(0) : 33 static_cast<MojoDeadline>(delta); 34} 35 36} // namespace 37 38// State needed for one iteration of WaitMany. The first handle and flags 39// corresponds to that of the control pipe. 40struct MessagePumpMojo::WaitState { 41 std::vector<Handle> handles; 42 std::vector<MojoHandleSignals> wait_signals; 43}; 44 45struct MessagePumpMojo::RunState { 46 RunState() : should_quit(false) { 47 CreateMessagePipe(NULL, &read_handle, &write_handle); 48 } 49 50 base::TimeTicks delayed_work_time; 51 52 // Used to wake up WaitForWork(). 53 ScopedMessagePipeHandle read_handle; 54 ScopedMessagePipeHandle write_handle; 55 56 bool should_quit; 57}; 58 59MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { 60 DCHECK(!current()) 61 << "There is already a MessagePumpMojo instance on this thread."; 62 g_tls_current_pump.Pointer()->Set(this); 63} 64 65MessagePumpMojo::~MessagePumpMojo() { 66 DCHECK_EQ(this, current()); 67 g_tls_current_pump.Pointer()->Set(NULL); 68} 69 70// static 71scoped_ptr<base::MessagePump> MessagePumpMojo::Create() { 72 return scoped_ptr<MessagePump>(new MessagePumpMojo()); 73} 74 75// static 76MessagePumpMojo* MessagePumpMojo::current() { 77 return g_tls_current_pump.Pointer()->Get(); 78} 79 80void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, 81 const Handle& handle, 82 MojoHandleSignals wait_signals, 83 base::TimeTicks deadline) { 84 CHECK(handler); 85 DCHECK(handle.is_valid()); 86 // Assume it's an error if someone tries to reregister an existing handle. 87 CHECK_EQ(0u, handlers_.count(handle)); 88 Handler handler_data; 89 handler_data.handler = handler; 90 handler_data.wait_signals = wait_signals; 91 handler_data.deadline = deadline; 92 handler_data.id = next_handler_id_++; 93 handlers_[handle] = handler_data; 94} 95 96void MessagePumpMojo::RemoveHandler(const Handle& handle) { 97 handlers_.erase(handle); 98} 99 100void MessagePumpMojo::Run(Delegate* delegate) { 101 RunState run_state; 102 // TODO: better deal with error handling. 103 CHECK(run_state.read_handle.is_valid()); 104 CHECK(run_state.write_handle.is_valid()); 105 RunState* old_state = NULL; 106 { 107 base::AutoLock auto_lock(run_state_lock_); 108 old_state = run_state_; 109 run_state_ = &run_state; 110 } 111 DoRunLoop(&run_state, delegate); 112 { 113 base::AutoLock auto_lock(run_state_lock_); 114 run_state_ = old_state; 115 } 116} 117 118void MessagePumpMojo::Quit() { 119 base::AutoLock auto_lock(run_state_lock_); 120 if (run_state_) 121 run_state_->should_quit = true; 122} 123 124void MessagePumpMojo::ScheduleWork() { 125 base::AutoLock auto_lock(run_state_lock_); 126 if (run_state_) 127 SignalControlPipe(*run_state_); 128} 129 130void MessagePumpMojo::ScheduleDelayedWork( 131 const base::TimeTicks& delayed_work_time) { 132 base::AutoLock auto_lock(run_state_lock_); 133 if (!run_state_) 134 return; 135 run_state_->delayed_work_time = delayed_work_time; 136} 137 138void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) { 139 bool more_work_is_plausible = true; 140 for (;;) { 141 const bool block = !more_work_is_plausible; 142 DoInternalWork(*run_state, block); 143 144 // There isn't a good way to know if there are more handles ready, we assume 145 // not. 146 more_work_is_plausible = false; 147 148 if (run_state->should_quit) 149 break; 150 151 more_work_is_plausible |= delegate->DoWork(); 152 if (run_state->should_quit) 153 break; 154 155 more_work_is_plausible |= delegate->DoDelayedWork( 156 &run_state->delayed_work_time); 157 if (run_state->should_quit) 158 break; 159 160 if (more_work_is_plausible) 161 continue; 162 163 more_work_is_plausible = delegate->DoIdleWork(); 164 if (run_state->should_quit) 165 break; 166 } 167} 168 169void MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { 170 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; 171 const WaitState wait_state = GetWaitState(run_state); 172 const MojoResult result = 173 WaitMany(wait_state.handles, wait_state.wait_signals, deadline); 174 if (result == 0) { 175 // Control pipe was written to. 176 uint32_t num_bytes = 0; 177 ReadMessageRaw(run_state.read_handle.get(), NULL, &num_bytes, NULL, NULL, 178 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); 179 } else if (result > 0) { 180 const size_t index = static_cast<size_t>(result); 181 DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end()); 182 handlers_[wait_state.handles[index]].handler->OnHandleReady( 183 wait_state.handles[index]); 184 } else { 185 switch (result) { 186 case MOJO_RESULT_CANCELLED: 187 case MOJO_RESULT_FAILED_PRECONDITION: 188 RemoveFirstInvalidHandle(wait_state); 189 break; 190 case MOJO_RESULT_DEADLINE_EXCEEDED: 191 break; 192 default: 193 base::debug::Alias(&result); 194 // Unexpected result is likely fatal, crash so we can determine cause. 195 CHECK(false); 196 } 197 } 198 199 // Notify and remove any handlers whose time has expired. Make a copy in case 200 // someone tries to add/remove new handlers from notification. 201 const HandleToHandler cloned_handlers(handlers_); 202 const base::TimeTicks now(internal::NowTicks()); 203 for (HandleToHandler::const_iterator i = cloned_handlers.begin(); 204 i != cloned_handlers.end(); ++i) { 205 // Since we're iterating over a clone of the handlers, verify the handler is 206 // still valid before notifying. 207 if (!i->second.deadline.is_null() && i->second.deadline < now && 208 handlers_.find(i->first) != handlers_.end() && 209 handlers_[i->first].id == i->second.id) { 210 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED); 211 } 212 } 213} 214 215void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) { 216 // TODO(sky): deal with control pipe going bad. 217 for (size_t i = 0; i < wait_state.handles.size(); ++i) { 218 const MojoResult result = 219 Wait(wait_state.handles[i], wait_state.wait_signals[i], 0); 220 if (result == MOJO_RESULT_INVALID_ARGUMENT) { 221 // We should never have an invalid argument. If we do it indicates 222 // RemoveHandler() was not invoked and is likely to cause problems else 223 // where in the stack if we ignore it. 224 CHECK(false); 225 } else if (result == MOJO_RESULT_FAILED_PRECONDITION || 226 result == MOJO_RESULT_CANCELLED) { 227 CHECK_NE(i, 0u); // Indicates the control pipe went bad. 228 229 // Remove the handle first, this way if OnHandleError() tries to remove 230 // the handle our iterator isn't invalidated. 231 CHECK(handlers_.find(wait_state.handles[i]) != handlers_.end()); 232 MessagePumpMojoHandler* handler = 233 handlers_[wait_state.handles[i]].handler; 234 handlers_.erase(wait_state.handles[i]); 235 handler->OnHandleError(wait_state.handles[i], result); 236 return; 237 } 238 } 239} 240 241void MessagePumpMojo::SignalControlPipe(const RunState& run_state) { 242 const MojoResult result = 243 WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0, 244 MOJO_WRITE_MESSAGE_FLAG_NONE); 245 // If we can't write we likely won't wake up the thread and there is a strong 246 // chance we'll deadlock. 247 CHECK_EQ(MOJO_RESULT_OK, result); 248} 249 250MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState( 251 const RunState& run_state) const { 252 WaitState wait_state; 253 wait_state.handles.push_back(run_state.read_handle.get()); 254 wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); 255 256 for (HandleToHandler::const_iterator i = handlers_.begin(); 257 i != handlers_.end(); ++i) { 258 wait_state.handles.push_back(i->first); 259 wait_state.wait_signals.push_back(i->second.wait_signals); 260 } 261 return wait_state; 262} 263 264MojoDeadline MessagePumpMojo::GetDeadlineForWait( 265 const RunState& run_state) const { 266 const base::TimeTicks now(internal::NowTicks()); 267 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time, 268 now); 269 for (HandleToHandler::const_iterator i = handlers_.begin(); 270 i != handlers_.end(); ++i) { 271 deadline = std::min( 272 TimeTicksToMojoDeadline(i->second.deadline, now), deadline); 273 } 274 return deadline; 275} 276 277} // namespace common 278} // namespace mojo 279