1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "ipc/ipc_channel_proxy.h" 6 7#include "base/bind.h" 8#include "base/compiler_specific.h" 9#include "base/location.h" 10#include "base/memory/ref_counted.h" 11#include "base/memory/scoped_ptr.h" 12#include "base/single_thread_task_runner.h" 13#include "base/thread_task_runner_handle.h" 14#include "ipc/ipc_listener.h" 15#include "ipc/ipc_logging.h" 16#include "ipc/ipc_message_macros.h" 17#include "ipc/message_filter.h" 18#include "ipc/message_filter_router.h" 19 20namespace IPC { 21 22//------------------------------------------------------------------------------ 23 24ChannelProxy::Context::Context(Listener* listener, 25 base::SingleThreadTaskRunner* ipc_task_runner) 26 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 27 listener_(listener), 28 ipc_task_runner_(ipc_task_runner), 29 channel_connected_called_(false), 30 message_filter_router_(new MessageFilterRouter()), 31 peer_pid_(base::kNullProcessId) { 32 DCHECK(ipc_task_runner_.get()); 33 // The Listener thread where Messages are handled must be a separate thread 34 // to avoid oversubscribing the IO thread. If you trigger this error, you 35 // need to either: 36 // 1) Create the ChannelProxy on a different thread, or 37 // 2) Just use Channel 38 // Note, we currently make an exception for a NULL listener. That usage 39 // basically works, but is outside the intent of ChannelProxy. This support 40 // will disappear, so please don't rely on it. See crbug.com/364241 41 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get())); 42} 43 44ChannelProxy::Context::~Context() { 45} 46 47void ChannelProxy::Context::ClearIPCTaskRunner() { 48 ipc_task_runner_ = NULL; 49} 50 51void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle, 52 const Channel::Mode& mode) { 53 DCHECK(!channel_); 54 channel_id_ = handle.name; 55 channel_ = Channel::Create(handle, mode, this); 56} 57 58bool ChannelProxy::Context::TryFilters(const Message& message) { 59 DCHECK(message_filter_router_); 60#ifdef IPC_MESSAGE_LOG_ENABLED 61 Logging* logger = Logging::GetInstance(); 62 if (logger->Enabled()) 63 logger->OnPreDispatchMessage(message); 64#endif 65 66 if (message_filter_router_->TryFilters(message)) { 67 if (message.dispatch_error()) { 68 listener_task_runner_->PostTask( 69 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message)); 70 } 71#ifdef IPC_MESSAGE_LOG_ENABLED 72 if (logger->Enabled()) 73 logger->OnPostDispatchMessage(message, channel_id_); 74#endif 75 return true; 76 } 77 return false; 78} 79 80// Called on the IPC::Channel thread 81bool ChannelProxy::Context::OnMessageReceived(const Message& message) { 82 // First give a chance to the filters to process this message. 83 if (!TryFilters(message)) 84 OnMessageReceivedNoFilter(message); 85 return true; 86} 87 88// Called on the IPC::Channel thread 89bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 90 listener_task_runner_->PostTask( 91 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); 92 return true; 93} 94 95// Called on the IPC::Channel thread 96void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { 97 // We cache off the peer_pid so it can be safely accessed from both threads. 98 peer_pid_ = channel_->GetPeerPID(); 99 100 // Add any pending filters. This avoids a race condition where someone 101 // creates a ChannelProxy, calls AddFilter, and then right after starts the 102 // peer process. The IO thread could receive a message before the task to add 103 // the filter is run on the IO thread. 104 OnAddFilter(); 105 106 // See above comment about using listener_task_runner_ here. 107 listener_task_runner_->PostTask( 108 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); 109} 110 111// Called on the IPC::Channel thread 112void ChannelProxy::Context::OnChannelError() { 113 for (size_t i = 0; i < filters_.size(); ++i) 114 filters_[i]->OnChannelError(); 115 116 // See above comment about using listener_task_runner_ here. 117 listener_task_runner_->PostTask( 118 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 119} 120 121// Called on the IPC::Channel thread 122void ChannelProxy::Context::OnChannelOpened() { 123 DCHECK(channel_ != NULL); 124 125 // Assume a reference to ourselves on behalf of this thread. This reference 126 // will be released when we are closed. 127 AddRef(); 128 129 if (!channel_->Connect()) { 130 OnChannelError(); 131 return; 132 } 133 134 for (size_t i = 0; i < filters_.size(); ++i) 135 filters_[i]->OnFilterAdded(channel_.get()); 136} 137 138// Called on the IPC::Channel thread 139void ChannelProxy::Context::OnChannelClosed() { 140 // It's okay for IPC::ChannelProxy::Close to be called more than once, which 141 // would result in this branch being taken. 142 if (!channel_) 143 return; 144 145 for (size_t i = 0; i < filters_.size(); ++i) { 146 filters_[i]->OnChannelClosing(); 147 filters_[i]->OnFilterRemoved(); 148 } 149 150 // We don't need the filters anymore. 151 message_filter_router_->Clear(); 152 filters_.clear(); 153 // We don't need the lock, because at this point, the listener thread can't 154 // access it any more. 155 pending_filters_.clear(); 156 157 channel_.reset(); 158 159 // Balance with the reference taken during startup. This may result in 160 // self-destruction. 161 Release(); 162} 163 164void ChannelProxy::Context::Clear() { 165 listener_ = NULL; 166} 167 168// Called on the IPC::Channel thread 169void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) { 170 if (!channel_) { 171 OnChannelClosed(); 172 return; 173 } 174 175 if (!channel_->Send(message.release())) 176 OnChannelError(); 177} 178 179// Called on the IPC::Channel thread 180void ChannelProxy::Context::OnAddFilter() { 181 // Our OnChannelConnected method has not yet been called, so we can't be 182 // sure that channel_ is valid yet. When OnChannelConnected *is* called, 183 // it invokes OnAddFilter, so any pending filter(s) will be added at that 184 // time. 185 if (peer_pid_ == base::kNullProcessId) 186 return; 187 188 std::vector<scoped_refptr<MessageFilter> > new_filters; 189 { 190 base::AutoLock auto_lock(pending_filters_lock_); 191 new_filters.swap(pending_filters_); 192 } 193 194 for (size_t i = 0; i < new_filters.size(); ++i) { 195 filters_.push_back(new_filters[i]); 196 197 message_filter_router_->AddFilter(new_filters[i].get()); 198 199 // The channel has already been created and connected, so we need to 200 // inform the filters right now. 201 new_filters[i]->OnFilterAdded(channel_.get()); 202 new_filters[i]->OnChannelConnected(peer_pid_); 203 } 204} 205 206// Called on the IPC::Channel thread 207void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { 208 if (peer_pid_ == base::kNullProcessId) { 209 // The channel is not yet connected, so any filters are still pending. 210 base::AutoLock auto_lock(pending_filters_lock_); 211 for (size_t i = 0; i < pending_filters_.size(); ++i) { 212 if (pending_filters_[i].get() == filter) { 213 filter->OnFilterRemoved(); 214 pending_filters_.erase(pending_filters_.begin() + i); 215 return; 216 } 217 } 218 return; 219 } 220 if (!channel_) 221 return; // The filters have already been deleted. 222 223 message_filter_router_->RemoveFilter(filter); 224 225 for (size_t i = 0; i < filters_.size(); ++i) { 226 if (filters_[i].get() == filter) { 227 filter->OnFilterRemoved(); 228 filters_.erase(filters_.begin() + i); 229 return; 230 } 231 } 232 233 NOTREACHED() << "filter to be removed not found"; 234} 235 236// Called on the listener's thread 237void ChannelProxy::Context::AddFilter(MessageFilter* filter) { 238 base::AutoLock auto_lock(pending_filters_lock_); 239 pending_filters_.push_back(make_scoped_refptr(filter)); 240 ipc_task_runner_->PostTask( 241 FROM_HERE, base::Bind(&Context::OnAddFilter, this)); 242} 243 244// Called on the listener's thread 245void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 246#ifdef IPC_MESSAGE_LOG_ENABLED 247 Logging* logger = Logging::GetInstance(); 248 std::string name; 249 logger->GetMessageText(message.type(), &name, &message, NULL); 250 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage", 251 "name", name); 252#else 253 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage", 254 "class", IPC_MESSAGE_ID_CLASS(message.type()), 255 "line", IPC_MESSAGE_ID_LINE(message.type())); 256#endif 257 258 if (!listener_) 259 return; 260 261 OnDispatchConnected(); 262 263#ifdef IPC_MESSAGE_LOG_ENABLED 264 if (message.type() == IPC_LOGGING_ID) { 265 logger->OnReceivedLoggingMessage(message); 266 return; 267 } 268 269 if (logger->Enabled()) 270 logger->OnPreDispatchMessage(message); 271#endif 272 273 listener_->OnMessageReceived(message); 274 if (message.dispatch_error()) 275 listener_->OnBadMessageReceived(message); 276 277#ifdef IPC_MESSAGE_LOG_ENABLED 278 if (logger->Enabled()) 279 logger->OnPostDispatchMessage(message, channel_id_); 280#endif 281} 282 283// Called on the listener's thread 284void ChannelProxy::Context::OnDispatchConnected() { 285 if (channel_connected_called_) 286 return; 287 288 channel_connected_called_ = true; 289 if (listener_) 290 listener_->OnChannelConnected(peer_pid_); 291} 292 293// Called on the listener's thread 294void ChannelProxy::Context::OnDispatchError() { 295 if (listener_) 296 listener_->OnChannelError(); 297} 298 299// Called on the listener's thread 300void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) { 301 if (listener_) 302 listener_->OnBadMessageReceived(message); 303} 304 305//----------------------------------------------------------------------------- 306 307// static 308scoped_ptr<ChannelProxy> ChannelProxy::Create( 309 const IPC::ChannelHandle& channel_handle, 310 Channel::Mode mode, 311 Listener* listener, 312 base::SingleThreadTaskRunner* ipc_task_runner) { 313 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner)); 314 channel->Init(channel_handle, mode, true); 315 return channel.Pass(); 316} 317 318ChannelProxy::ChannelProxy(Context* context) 319 : context_(context), 320 did_init_(false) { 321} 322 323ChannelProxy::ChannelProxy(Listener* listener, 324 base::SingleThreadTaskRunner* ipc_task_runner) 325 : context_(new Context(listener, ipc_task_runner)), did_init_(false) { 326} 327 328ChannelProxy::~ChannelProxy() { 329 DCHECK(CalledOnValidThread()); 330 331 Close(); 332} 333 334void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, 335 Channel::Mode mode, 336 bool create_pipe_now) { 337 DCHECK(CalledOnValidThread()); 338 DCHECK(!did_init_); 339#if defined(OS_POSIX) 340 // When we are creating a server on POSIX, we need its file descriptor 341 // to be created immediately so that it can be accessed and passed 342 // to other processes. Forcing it to be created immediately avoids 343 // race conditions that may otherwise arise. 344 if (mode & Channel::MODE_SERVER_FLAG) { 345 create_pipe_now = true; 346 } 347#endif // defined(OS_POSIX) 348 349 if (create_pipe_now) { 350 // Create the channel immediately. This effectively sets up the 351 // low-level pipe so that the client can connect. Without creating 352 // the pipe immediately, it is possible for a listener to attempt 353 // to connect and get an error since the pipe doesn't exist yet. 354 context_->CreateChannel(channel_handle, mode); 355 } else { 356 context_->ipc_task_runner()->PostTask( 357 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(), 358 channel_handle, mode)); 359 } 360 361 // complete initialization on the background thread 362 context_->ipc_task_runner()->PostTask( 363 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get())); 364 365 did_init_ = true; 366} 367 368void ChannelProxy::Close() { 369 DCHECK(CalledOnValidThread()); 370 371 // Clear the backpointer to the listener so that any pending calls to 372 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 373 // possible that the channel could be closed while it is receiving messages! 374 context_->Clear(); 375 376 if (context_->ipc_task_runner()) { 377 context_->ipc_task_runner()->PostTask( 378 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get())); 379 } 380} 381 382bool ChannelProxy::Send(Message* message) { 383 DCHECK(did_init_); 384 385 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are 386 // tests that call Send() from a wrong thread. See http://crbug.com/163523. 387 388#ifdef IPC_MESSAGE_LOG_ENABLED 389 Logging::GetInstance()->OnSendMessage(message, context_->channel_id()); 390#endif 391 392 context_->ipc_task_runner()->PostTask( 393 FROM_HERE, 394 base::Bind(&ChannelProxy::Context::OnSendMessage, 395 context_, base::Passed(scoped_ptr<Message>(message)))); 396 return true; 397} 398 399void ChannelProxy::AddFilter(MessageFilter* filter) { 400 DCHECK(CalledOnValidThread()); 401 402 context_->AddFilter(filter); 403} 404 405void ChannelProxy::RemoveFilter(MessageFilter* filter) { 406 DCHECK(CalledOnValidThread()); 407 408 context_->ipc_task_runner()->PostTask( 409 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(), 410 make_scoped_refptr(filter))); 411} 412 413void ChannelProxy::ClearIPCTaskRunner() { 414 DCHECK(CalledOnValidThread()); 415 416 context()->ClearIPCTaskRunner(); 417} 418 419#if defined(OS_POSIX) && !defined(OS_NACL) 420// See the TODO regarding lazy initialization of the channel in 421// ChannelProxy::Init(). 422int ChannelProxy::GetClientFileDescriptor() { 423 DCHECK(CalledOnValidThread()); 424 425 Channel* channel = context_.get()->channel_.get(); 426 // Channel must have been created first. 427 DCHECK(channel) << context_.get()->channel_id_; 428 return channel->GetClientFileDescriptor(); 429} 430 431int ChannelProxy::TakeClientFileDescriptor() { 432 DCHECK(CalledOnValidThread()); 433 434 Channel* channel = context_.get()->channel_.get(); 435 // Channel must have been created first. 436 DCHECK(channel) << context_.get()->channel_id_; 437 return channel->TakeClientFileDescriptor(); 438} 439#endif 440 441//----------------------------------------------------------------------------- 442 443} // namespace IPC 444