ipc_channel_proxy.cc revision 868fa2fe829687343ffae624259930155e16dbd8
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "base/bind.h" 6#include "base/compiler_specific.h" 7#include "base/debug/trace_event.h" 8#include "base/location.h" 9#include "base/memory/ref_counted.h" 10#include "base/memory/scoped_ptr.h" 11#include "base/single_thread_task_runner.h" 12#include "base/thread_task_runner_handle.h" 13#include "ipc/ipc_channel_proxy.h" 14#include "ipc/ipc_listener.h" 15#include "ipc/ipc_logging.h" 16#include "ipc/ipc_message_macros.h" 17#include "ipc/ipc_message_utils.h" 18 19namespace IPC { 20 21//------------------------------------------------------------------------------ 22 23ChannelProxy::MessageFilter::MessageFilter() {} 24 25void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {} 26 27void ChannelProxy::MessageFilter::OnFilterRemoved() {} 28 29void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {} 30 31void ChannelProxy::MessageFilter::OnChannelError() {} 32 33void ChannelProxy::MessageFilter::OnChannelClosing() {} 34 35bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) { 36 return false; 37} 38 39void ChannelProxy::MessageFilter::OnDestruct() const { 40 delete this; 41} 42 43ChannelProxy::MessageFilter::~MessageFilter() {} 44 45//------------------------------------------------------------------------------ 46 47ChannelProxy::Context::Context(Listener* listener, 48 base::SingleThreadTaskRunner* ipc_task_runner) 49 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 50 listener_(listener), 51 ipc_task_runner_(ipc_task_runner), 52 channel_connected_called_(false), 53 peer_pid_(base::kNullProcessId) { 54 DCHECK(ipc_task_runner_.get()); 55} 56 57ChannelProxy::Context::~Context() { 58} 59 60void ChannelProxy::Context::ClearIPCTaskRunner() { 61 ipc_task_runner_ = NULL; 62} 63 64void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle, 65 const Channel::Mode& mode) { 66 DCHECK(channel_.get() == NULL); 67 channel_id_ = handle.name; 68 channel_.reset(new Channel(handle, mode, this)); 69} 70 71bool ChannelProxy::Context::TryFilters(const Message& message) { 72#ifdef IPC_MESSAGE_LOG_ENABLED 73 Logging* logger = Logging::GetInstance(); 74 if (logger->Enabled()) 75 logger->OnPreDispatchMessage(message); 76#endif 77 78 for (size_t i = 0; i < filters_.size(); ++i) { 79 if (filters_[i]->OnMessageReceived(message)) { 80#ifdef IPC_MESSAGE_LOG_ENABLED 81 if (logger->Enabled()) 82 logger->OnPostDispatchMessage(message, channel_id_); 83#endif 84 return true; 85 } 86 } 87 return false; 88} 89 90// Called on the IPC::Channel thread 91bool ChannelProxy::Context::OnMessageReceived(const Message& message) { 92 // First give a chance to the filters to process this message. 93 if (!TryFilters(message)) 94 OnMessageReceivedNoFilter(message); 95 return true; 96} 97 98// Called on the IPC::Channel thread 99bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 100 // NOTE: This code relies on the listener's message loop not going away while 101 // this thread is active. That should be a reasonable assumption, but it 102 // feels risky. We may want to invent some more indirect way of referring to 103 // a MessageLoop if this becomes a problem. 104 listener_task_runner_->PostTask( 105 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); 106 return true; 107} 108 109// Called on the IPC::Channel thread 110void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { 111 // Add any pending filters. This avoids a race condition where someone 112 // creates a ChannelProxy, calls AddFilter, and then right after starts the 113 // peer process. The IO thread could receive a message before the task to add 114 // the filter is run on the IO thread. 115 OnAddFilter(); 116 117 // We cache off the peer_pid so it can be safely accessed from both threads. 118 peer_pid_ = channel_->peer_pid(); 119 for (size_t i = 0; i < filters_.size(); ++i) 120 filters_[i]->OnChannelConnected(peer_pid); 121 122 // See above comment about using listener_task_runner_ here. 123 listener_task_runner_->PostTask( 124 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); 125} 126 127// Called on the IPC::Channel thread 128void ChannelProxy::Context::OnChannelError() { 129 for (size_t i = 0; i < filters_.size(); ++i) 130 filters_[i]->OnChannelError(); 131 132 // See above comment about using listener_task_runner_ here. 133 listener_task_runner_->PostTask( 134 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 135} 136 137// Called on the IPC::Channel thread 138void ChannelProxy::Context::OnChannelOpened() { 139 DCHECK(channel_ != NULL); 140 141 // Assume a reference to ourselves on behalf of this thread. This reference 142 // will be released when we are closed. 143 AddRef(); 144 145 if (!channel_->Connect()) { 146 OnChannelError(); 147 return; 148 } 149 150 for (size_t i = 0; i < filters_.size(); ++i) 151 filters_[i]->OnFilterAdded(channel_.get()); 152} 153 154// Called on the IPC::Channel thread 155void ChannelProxy::Context::OnChannelClosed() { 156 // It's okay for IPC::ChannelProxy::Close to be called more than once, which 157 // would result in this branch being taken. 158 if (!channel_.get()) 159 return; 160 161 for (size_t i = 0; i < filters_.size(); ++i) { 162 filters_[i]->OnChannelClosing(); 163 filters_[i]->OnFilterRemoved(); 164 } 165 166 // We don't need the filters anymore. 167 filters_.clear(); 168 169 channel_.reset(); 170 171 // Balance with the reference taken during startup. This may result in 172 // self-destruction. 173 Release(); 174} 175 176void ChannelProxy::Context::Clear() { 177 listener_ = NULL; 178} 179 180// Called on the IPC::Channel thread 181void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) { 182 if (!channel_.get()) { 183 OnChannelClosed(); 184 return; 185 } 186 if (!channel_->Send(message.release())) 187 OnChannelError(); 188} 189 190// Called on the IPC::Channel thread 191void ChannelProxy::Context::OnAddFilter() { 192 std::vector<scoped_refptr<MessageFilter> > new_filters; 193 { 194 base::AutoLock auto_lock(pending_filters_lock_); 195 new_filters.swap(pending_filters_); 196 } 197 198 for (size_t i = 0; i < new_filters.size(); ++i) { 199 filters_.push_back(new_filters[i]); 200 201 // If the channel has already been created, then we need to send this 202 // message so that the filter gets access to the Channel. 203 if (channel_.get()) 204 new_filters[i]->OnFilterAdded(channel_.get()); 205 // Ditto for if the channel has been connected. 206 if (peer_pid_) 207 new_filters[i]->OnChannelConnected(peer_pid_); 208 } 209} 210 211// Called on the IPC::Channel thread 212void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { 213 for (size_t i = 0; i < filters_.size(); ++i) { 214 if (filters_[i].get() == filter) { 215 filter->OnFilterRemoved(); 216 filters_.erase(filters_.begin() + i); 217 return; 218 } 219 } 220 221 NOTREACHED() << "filter to be removed not found"; 222} 223 224// Called on the listener's thread 225void ChannelProxy::Context::AddFilter(MessageFilter* filter) { 226 base::AutoLock auto_lock(pending_filters_lock_); 227 pending_filters_.push_back(make_scoped_refptr(filter)); 228 ipc_task_runner_->PostTask( 229 FROM_HERE, base::Bind(&Context::OnAddFilter, this)); 230} 231 232// Called on the listener's thread 233void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 234#ifdef IPC_MESSAGE_LOG_ENABLED 235 Logging* logger = Logging::GetInstance(); 236 std::string name; 237 logger->GetMessageText(message.type(), &name, &message, NULL); 238 TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage", 239 "name", name); 240#else 241 TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage", 242 "class", IPC_MESSAGE_ID_CLASS(message.type()), 243 "line", IPC_MESSAGE_ID_LINE(message.type())); 244#endif 245 246 if (!listener_) 247 return; 248 249 OnDispatchConnected(); 250 251#ifdef IPC_MESSAGE_LOG_ENABLED 252 if (message.type() == IPC_LOGGING_ID) { 253 logger->OnReceivedLoggingMessage(message); 254 return; 255 } 256 257 if (logger->Enabled()) 258 logger->OnPreDispatchMessage(message); 259#endif 260 261 listener_->OnMessageReceived(message); 262 263#ifdef IPC_MESSAGE_LOG_ENABLED 264 if (logger->Enabled()) 265 logger->OnPostDispatchMessage(message, channel_id_); 266#endif 267} 268 269// Called on the listener's thread 270void ChannelProxy::Context::OnDispatchConnected() { 271 if (channel_connected_called_) 272 return; 273 274 channel_connected_called_ = true; 275 if (listener_) 276 listener_->OnChannelConnected(peer_pid_); 277} 278 279// Called on the listener's thread 280void ChannelProxy::Context::OnDispatchError() { 281 if (listener_) 282 listener_->OnChannelError(); 283} 284 285//----------------------------------------------------------------------------- 286 287ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle, 288 Channel::Mode mode, 289 Listener* listener, 290 base::SingleThreadTaskRunner* ipc_task_runner) 291 : context_(new Context(listener, ipc_task_runner)), 292 outgoing_message_filter_(NULL), 293 did_init_(false) { 294 Init(channel_handle, mode, true); 295} 296 297ChannelProxy::ChannelProxy(Context* context) 298 : context_(context), 299 outgoing_message_filter_(NULL), 300 did_init_(false) { 301} 302 303ChannelProxy::~ChannelProxy() { 304 DCHECK(CalledOnValidThread()); 305 306 Close(); 307} 308 309void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, 310 Channel::Mode mode, 311 bool create_pipe_now) { 312 DCHECK(CalledOnValidThread()); 313 DCHECK(!did_init_); 314#if defined(OS_POSIX) 315 // When we are creating a server on POSIX, we need its file descriptor 316 // to be created immediately so that it can be accessed and passed 317 // to other processes. Forcing it to be created immediately avoids 318 // race conditions that may otherwise arise. 319 if (mode & Channel::MODE_SERVER_FLAG) { 320 create_pipe_now = true; 321 } 322#endif // defined(OS_POSIX) 323 324 if (create_pipe_now) { 325 // Create the channel immediately. This effectively sets up the 326 // low-level pipe so that the client can connect. Without creating 327 // the pipe immediately, it is possible for a listener to attempt 328 // to connect and get an error since the pipe doesn't exist yet. 329 context_->CreateChannel(channel_handle, mode); 330 } else { 331 context_->ipc_task_runner()->PostTask( 332 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(), 333 channel_handle, mode)); 334 } 335 336 // complete initialization on the background thread 337 context_->ipc_task_runner()->PostTask( 338 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get())); 339 340 did_init_ = true; 341} 342 343void ChannelProxy::Close() { 344 DCHECK(CalledOnValidThread()); 345 346 // Clear the backpointer to the listener so that any pending calls to 347 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 348 // possible that the channel could be closed while it is receiving messages! 349 context_->Clear(); 350 351 if (context_->ipc_task_runner()) { 352 context_->ipc_task_runner()->PostTask( 353 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get())); 354 } 355} 356 357bool ChannelProxy::Send(Message* message) { 358 DCHECK(did_init_); 359 360 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are 361 // tests that call Send() from a wrong thread. See http://crbug.com/163523. 362 if (outgoing_message_filter()) 363 message = outgoing_message_filter()->Rewrite(message); 364 365#ifdef IPC_MESSAGE_LOG_ENABLED 366 Logging::GetInstance()->OnSendMessage(message, context_->channel_id()); 367#endif 368 369 context_->ipc_task_runner()->PostTask( 370 FROM_HERE, 371 base::Bind(&ChannelProxy::Context::OnSendMessage, 372 context_, base::Passed(scoped_ptr<Message>(message)))); 373 return true; 374} 375 376void ChannelProxy::AddFilter(MessageFilter* filter) { 377 DCHECK(CalledOnValidThread()); 378 379 context_->AddFilter(filter); 380} 381 382void ChannelProxy::RemoveFilter(MessageFilter* filter) { 383 DCHECK(CalledOnValidThread()); 384 385 context_->ipc_task_runner()->PostTask( 386 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(), 387 make_scoped_refptr(filter))); 388} 389 390void ChannelProxy::ClearIPCTaskRunner() { 391 DCHECK(CalledOnValidThread()); 392 393 context()->ClearIPCTaskRunner(); 394} 395 396#if defined(OS_POSIX) && !defined(OS_NACL) 397// See the TODO regarding lazy initialization of the channel in 398// ChannelProxy::Init(). 399int ChannelProxy::GetClientFileDescriptor() { 400 DCHECK(CalledOnValidThread()); 401 402 Channel* channel = context_.get()->channel_.get(); 403 // Channel must have been created first. 404 DCHECK(channel) << context_.get()->channel_id_; 405 return channel->GetClientFileDescriptor(); 406} 407 408int ChannelProxy::TakeClientFileDescriptor() { 409 DCHECK(CalledOnValidThread()); 410 411 Channel* channel = context_.get()->channel_.get(); 412 // Channel must have been created first. 413 DCHECK(channel) << context_.get()->channel_id_; 414 return channel->TakeClientFileDescriptor(); 415} 416 417bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const { 418 DCHECK(CalledOnValidThread()); 419 420 Channel* channel = context_.get()->channel_.get(); 421 // Channel must have been created first. 422 DCHECK(channel) << context_.get()->channel_id_; 423 return channel->GetPeerEuid(peer_euid); 424} 425#endif 426 427//----------------------------------------------------------------------------- 428 429} // namespace IPC 430