ipc_channel_proxy.cc revision 4e180b6a0b4720a9b8e9e959a882386f690f08ff
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "base/bind.h" 6#include "base/compiler_specific.h" 7#include "base/debug/trace_event.h" 8#include "base/location.h" 9#include "base/memory/ref_counted.h" 10#include "base/memory/scoped_ptr.h" 11#include "base/single_thread_task_runner.h" 12#include "base/thread_task_runner_handle.h" 13#include "ipc/ipc_channel_proxy.h" 14#include "ipc/ipc_listener.h" 15#include "ipc/ipc_logging.h" 16#include "ipc/ipc_message_macros.h" 17#include "ipc/ipc_message_utils.h" 18 19namespace IPC { 20 21//------------------------------------------------------------------------------ 22 23ChannelProxy::MessageFilter::MessageFilter() {} 24 25void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {} 26 27void ChannelProxy::MessageFilter::OnFilterRemoved() {} 28 29void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {} 30 31void ChannelProxy::MessageFilter::OnChannelError() {} 32 33void ChannelProxy::MessageFilter::OnChannelClosing() {} 34 35bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) { 36 return false; 37} 38 39ChannelProxy::MessageFilter::~MessageFilter() {} 40 41//------------------------------------------------------------------------------ 42 43ChannelProxy::Context::Context(Listener* listener, 44 base::SingleThreadTaskRunner* ipc_task_runner) 45 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 46 listener_(listener), 47 ipc_task_runner_(ipc_task_runner), 48 channel_connected_called_(false), 49 peer_pid_(base::kNullProcessId) { 50 DCHECK(ipc_task_runner_.get()); 51} 52 53ChannelProxy::Context::~Context() { 54} 55 56void ChannelProxy::Context::ClearIPCTaskRunner() { 57 ipc_task_runner_ = NULL; 58} 59 60void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle, 61 const Channel::Mode& mode) { 62 DCHECK(channel_.get() == NULL); 63 channel_id_ = handle.name; 64 channel_.reset(new Channel(handle, mode, this)); 65} 66 67bool ChannelProxy::Context::TryFilters(const Message& message) { 68#ifdef IPC_MESSAGE_LOG_ENABLED 69 Logging* logger = Logging::GetInstance(); 70 if (logger->Enabled()) 71 logger->OnPreDispatchMessage(message); 72#endif 73 74 for (size_t i = 0; i < filters_.size(); ++i) { 75 if (filters_[i]->OnMessageReceived(message)) { 76#ifdef IPC_MESSAGE_LOG_ENABLED 77 if (logger->Enabled()) 78 logger->OnPostDispatchMessage(message, channel_id_); 79#endif 80 return true; 81 } 82 } 83 return false; 84} 85 86// Called on the IPC::Channel thread 87bool ChannelProxy::Context::OnMessageReceived(const Message& message) { 88 // First give a chance to the filters to process this message. 89 if (!TryFilters(message)) 90 OnMessageReceivedNoFilter(message); 91 return true; 92} 93 94// Called on the IPC::Channel thread 95bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 96 // NOTE: This code relies on the listener's message loop not going away while 97 // this thread is active. That should be a reasonable assumption, but it 98 // feels risky. We may want to invent some more indirect way of referring to 99 // a MessageLoop if this becomes a problem. 100 listener_task_runner_->PostTask( 101 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); 102 return true; 103} 104 105// Called on the IPC::Channel thread 106void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { 107 // Add any pending filters. This avoids a race condition where someone 108 // creates a ChannelProxy, calls AddFilter, and then right after starts the 109 // peer process. The IO thread could receive a message before the task to add 110 // the filter is run on the IO thread. 111 OnAddFilter(); 112 113 // We cache off the peer_pid so it can be safely accessed from both threads. 114 peer_pid_ = channel_->peer_pid(); 115 for (size_t i = 0; i < filters_.size(); ++i) 116 filters_[i]->OnChannelConnected(peer_pid); 117 118 // See above comment about using listener_task_runner_ here. 119 listener_task_runner_->PostTask( 120 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); 121} 122 123// Called on the IPC::Channel thread 124void ChannelProxy::Context::OnChannelError() { 125 for (size_t i = 0; i < filters_.size(); ++i) 126 filters_[i]->OnChannelError(); 127 128 // See above comment about using listener_task_runner_ here. 129 listener_task_runner_->PostTask( 130 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 131} 132 133// Called on the IPC::Channel thread 134void ChannelProxy::Context::OnChannelOpened() { 135 DCHECK(channel_ != NULL); 136 137 // Assume a reference to ourselves on behalf of this thread. This reference 138 // will be released when we are closed. 139 AddRef(); 140 141 if (!channel_->Connect()) { 142 OnChannelError(); 143 return; 144 } 145 146 for (size_t i = 0; i < filters_.size(); ++i) 147 filters_[i]->OnFilterAdded(channel_.get()); 148} 149 150// Called on the IPC::Channel thread 151void ChannelProxy::Context::OnChannelClosed() { 152 // It's okay for IPC::ChannelProxy::Close to be called more than once, which 153 // would result in this branch being taken. 154 if (!channel_.get()) 155 return; 156 157 for (size_t i = 0; i < filters_.size(); ++i) { 158 filters_[i]->OnChannelClosing(); 159 filters_[i]->OnFilterRemoved(); 160 } 161 162 // We don't need the filters anymore. 163 filters_.clear(); 164 165 channel_.reset(); 166 167 // Balance with the reference taken during startup. This may result in 168 // self-destruction. 169 Release(); 170} 171 172void ChannelProxy::Context::Clear() { 173 listener_ = NULL; 174} 175 176// Called on the IPC::Channel thread 177void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) { 178 if (!channel_.get()) { 179 OnChannelClosed(); 180 return; 181 } 182 if (!channel_->Send(message.release())) 183 OnChannelError(); 184} 185 186// Called on the IPC::Channel thread 187void ChannelProxy::Context::OnAddFilter() { 188 std::vector<scoped_refptr<MessageFilter> > new_filters; 189 { 190 base::AutoLock auto_lock(pending_filters_lock_); 191 new_filters.swap(pending_filters_); 192 } 193 194 for (size_t i = 0; i < new_filters.size(); ++i) { 195 filters_.push_back(new_filters[i]); 196 197 // If the channel has already been created, then we need to send this 198 // message so that the filter gets access to the Channel. 199 if (channel_.get()) 200 new_filters[i]->OnFilterAdded(channel_.get()); 201 // Ditto for if the channel has been connected. 202 if (peer_pid_) 203 new_filters[i]->OnChannelConnected(peer_pid_); 204 } 205} 206 207// Called on the IPC::Channel thread 208void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { 209 if (!channel_.get()) 210 return; // The filters have already been deleted. 211 212 for (size_t i = 0; i < filters_.size(); ++i) { 213 if (filters_[i].get() == filter) { 214 filter->OnFilterRemoved(); 215 filters_.erase(filters_.begin() + i); 216 return; 217 } 218 } 219 220 NOTREACHED() << "filter to be removed not found"; 221} 222 223// Called on the listener's thread 224void ChannelProxy::Context::AddFilter(MessageFilter* filter) { 225 base::AutoLock auto_lock(pending_filters_lock_); 226 pending_filters_.push_back(make_scoped_refptr(filter)); 227 ipc_task_runner_->PostTask( 228 FROM_HERE, base::Bind(&Context::OnAddFilter, this)); 229} 230 231// Called on the listener's thread 232void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 233#ifdef IPC_MESSAGE_LOG_ENABLED 234 Logging* logger = Logging::GetInstance(); 235 std::string name; 236 logger->GetMessageText(message.type(), &name, &message, NULL); 237 TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage", 238 "name", name); 239#else 240 TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage", 241 "class", IPC_MESSAGE_ID_CLASS(message.type()), 242 "line", IPC_MESSAGE_ID_LINE(message.type())); 243#endif 244 245 if (!listener_) 246 return; 247 248 OnDispatchConnected(); 249 250#ifdef IPC_MESSAGE_LOG_ENABLED 251 if (message.type() == IPC_LOGGING_ID) { 252 logger->OnReceivedLoggingMessage(message); 253 return; 254 } 255 256 if (logger->Enabled()) 257 logger->OnPreDispatchMessage(message); 258#endif 259 260 listener_->OnMessageReceived(message); 261 262#ifdef IPC_MESSAGE_LOG_ENABLED 263 if (logger->Enabled()) 264 logger->OnPostDispatchMessage(message, channel_id_); 265#endif 266} 267 268// Called on the listener's thread 269void ChannelProxy::Context::OnDispatchConnected() { 270 if (channel_connected_called_) 271 return; 272 273 channel_connected_called_ = true; 274 if (listener_) 275 listener_->OnChannelConnected(peer_pid_); 276} 277 278// Called on the listener's thread 279void ChannelProxy::Context::OnDispatchError() { 280 if (listener_) 281 listener_->OnChannelError(); 282} 283 284//----------------------------------------------------------------------------- 285 286ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle, 287 Channel::Mode mode, 288 Listener* listener, 289 base::SingleThreadTaskRunner* ipc_task_runner) 290 : context_(new Context(listener, ipc_task_runner)), 291 outgoing_message_filter_(NULL), 292 did_init_(false) { 293 Init(channel_handle, mode, true); 294} 295 296ChannelProxy::ChannelProxy(Context* context) 297 : context_(context), 298 outgoing_message_filter_(NULL), 299 did_init_(false) { 300} 301 302ChannelProxy::~ChannelProxy() { 303 DCHECK(CalledOnValidThread()); 304 305 Close(); 306} 307 308void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, 309 Channel::Mode mode, 310 bool create_pipe_now) { 311 DCHECK(CalledOnValidThread()); 312 DCHECK(!did_init_); 313#if defined(OS_POSIX) 314 // When we are creating a server on POSIX, we need its file descriptor 315 // to be created immediately so that it can be accessed and passed 316 // to other processes. Forcing it to be created immediately avoids 317 // race conditions that may otherwise arise. 318 if (mode & Channel::MODE_SERVER_FLAG) { 319 create_pipe_now = true; 320 } 321#endif // defined(OS_POSIX) 322 323 if (create_pipe_now) { 324 // Create the channel immediately. This effectively sets up the 325 // low-level pipe so that the client can connect. Without creating 326 // the pipe immediately, it is possible for a listener to attempt 327 // to connect and get an error since the pipe doesn't exist yet. 328 context_->CreateChannel(channel_handle, mode); 329 } else { 330 context_->ipc_task_runner()->PostTask( 331 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(), 332 channel_handle, mode)); 333 } 334 335 // complete initialization on the background thread 336 context_->ipc_task_runner()->PostTask( 337 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get())); 338 339 did_init_ = true; 340} 341 342void ChannelProxy::Close() { 343 DCHECK(CalledOnValidThread()); 344 345 // Clear the backpointer to the listener so that any pending calls to 346 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 347 // possible that the channel could be closed while it is receiving messages! 348 context_->Clear(); 349 350 if (context_->ipc_task_runner()) { 351 context_->ipc_task_runner()->PostTask( 352 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get())); 353 } 354} 355 356bool ChannelProxy::Send(Message* message) { 357 DCHECK(did_init_); 358 359 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are 360 // tests that call Send() from a wrong thread. See http://crbug.com/163523. 361 if (outgoing_message_filter()) 362 message = outgoing_message_filter()->Rewrite(message); 363 364#ifdef IPC_MESSAGE_LOG_ENABLED 365 Logging::GetInstance()->OnSendMessage(message, context_->channel_id()); 366#endif 367 368 context_->ipc_task_runner()->PostTask( 369 FROM_HERE, 370 base::Bind(&ChannelProxy::Context::OnSendMessage, 371 context_, base::Passed(scoped_ptr<Message>(message)))); 372 return true; 373} 374 375void ChannelProxy::AddFilter(MessageFilter* filter) { 376 DCHECK(CalledOnValidThread()); 377 378 context_->AddFilter(filter); 379} 380 381void ChannelProxy::RemoveFilter(MessageFilter* filter) { 382 DCHECK(CalledOnValidThread()); 383 384 context_->ipc_task_runner()->PostTask( 385 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(), 386 make_scoped_refptr(filter))); 387} 388 389void ChannelProxy::ClearIPCTaskRunner() { 390 DCHECK(CalledOnValidThread()); 391 392 context()->ClearIPCTaskRunner(); 393} 394 395#if defined(OS_POSIX) && !defined(OS_NACL) 396// See the TODO regarding lazy initialization of the channel in 397// ChannelProxy::Init(). 398int ChannelProxy::GetClientFileDescriptor() { 399 DCHECK(CalledOnValidThread()); 400 401 Channel* channel = context_.get()->channel_.get(); 402 // Channel must have been created first. 403 DCHECK(channel) << context_.get()->channel_id_; 404 return channel->GetClientFileDescriptor(); 405} 406 407int ChannelProxy::TakeClientFileDescriptor() { 408 DCHECK(CalledOnValidThread()); 409 410 Channel* channel = context_.get()->channel_.get(); 411 // Channel must have been created first. 412 DCHECK(channel) << context_.get()->channel_id_; 413 return channel->TakeClientFileDescriptor(); 414} 415 416bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const { 417 DCHECK(CalledOnValidThread()); 418 419 Channel* channel = context_.get()->channel_.get(); 420 // Channel must have been created first. 421 DCHECK(channel) << context_.get()->channel_id_; 422 return channel->GetPeerEuid(peer_euid); 423} 424#endif 425 426//----------------------------------------------------------------------------- 427 428} // namespace IPC 429