ipc_channel_proxy.cc revision 868fa2fe829687343ffae624259930155e16dbd8
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/bind.h"
6#include "base/compiler_specific.h"
7#include "base/debug/trace_event.h"
8#include "base/location.h"
9#include "base/memory/ref_counted.h"
10#include "base/memory/scoped_ptr.h"
11#include "base/single_thread_task_runner.h"
12#include "base/thread_task_runner_handle.h"
13#include "ipc/ipc_channel_proxy.h"
14#include "ipc/ipc_listener.h"
15#include "ipc/ipc_logging.h"
16#include "ipc/ipc_message_macros.h"
17#include "ipc/ipc_message_utils.h"
18
19namespace IPC {
20
21//------------------------------------------------------------------------------
22
23ChannelProxy::MessageFilter::MessageFilter() {}
24
25void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {}
26
27void ChannelProxy::MessageFilter::OnFilterRemoved() {}
28
29void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {}
30
31void ChannelProxy::MessageFilter::OnChannelError() {}
32
33void ChannelProxy::MessageFilter::OnChannelClosing() {}
34
35bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) {
36  return false;
37}
38
39void ChannelProxy::MessageFilter::OnDestruct() const {
40  delete this;
41}
42
43ChannelProxy::MessageFilter::~MessageFilter() {}
44
45//------------------------------------------------------------------------------
46
47ChannelProxy::Context::Context(Listener* listener,
48                               base::SingleThreadTaskRunner* ipc_task_runner)
49    : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
50      listener_(listener),
51      ipc_task_runner_(ipc_task_runner),
52      channel_connected_called_(false),
53      peer_pid_(base::kNullProcessId) {
54  DCHECK(ipc_task_runner_.get());
55}
56
57ChannelProxy::Context::~Context() {
58}
59
60void ChannelProxy::Context::ClearIPCTaskRunner() {
61  ipc_task_runner_ = NULL;
62}
63
64void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
65                                          const Channel::Mode& mode) {
66  DCHECK(channel_.get() == NULL);
67  channel_id_ = handle.name;
68  channel_.reset(new Channel(handle, mode, this));
69}
70
71bool ChannelProxy::Context::TryFilters(const Message& message) {
72#ifdef IPC_MESSAGE_LOG_ENABLED
73  Logging* logger = Logging::GetInstance();
74  if (logger->Enabled())
75    logger->OnPreDispatchMessage(message);
76#endif
77
78  for (size_t i = 0; i < filters_.size(); ++i) {
79    if (filters_[i]->OnMessageReceived(message)) {
80#ifdef IPC_MESSAGE_LOG_ENABLED
81      if (logger->Enabled())
82        logger->OnPostDispatchMessage(message, channel_id_);
83#endif
84      return true;
85    }
86  }
87  return false;
88}
89
90// Called on the IPC::Channel thread
91bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
92  // First give a chance to the filters to process this message.
93  if (!TryFilters(message))
94    OnMessageReceivedNoFilter(message);
95  return true;
96}
97
98// Called on the IPC::Channel thread
99bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
100  // NOTE: This code relies on the listener's message loop not going away while
101  // this thread is active.  That should be a reasonable assumption, but it
102  // feels risky.  We may want to invent some more indirect way of referring to
103  // a MessageLoop if this becomes a problem.
104  listener_task_runner_->PostTask(
105      FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
106  return true;
107}
108
109// Called on the IPC::Channel thread
110void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
111  // Add any pending filters.  This avoids a race condition where someone
112  // creates a ChannelProxy, calls AddFilter, and then right after starts the
113  // peer process.  The IO thread could receive a message before the task to add
114  // the filter is run on the IO thread.
115  OnAddFilter();
116
117  // We cache off the peer_pid so it can be safely accessed from both threads.
118  peer_pid_ = channel_->peer_pid();
119  for (size_t i = 0; i < filters_.size(); ++i)
120    filters_[i]->OnChannelConnected(peer_pid);
121
122  // See above comment about using listener_task_runner_ here.
123  listener_task_runner_->PostTask(
124      FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
125}
126
127// Called on the IPC::Channel thread
128void ChannelProxy::Context::OnChannelError() {
129  for (size_t i = 0; i < filters_.size(); ++i)
130    filters_[i]->OnChannelError();
131
132  // See above comment about using listener_task_runner_ here.
133  listener_task_runner_->PostTask(
134      FROM_HERE, base::Bind(&Context::OnDispatchError, this));
135}
136
137// Called on the IPC::Channel thread
138void ChannelProxy::Context::OnChannelOpened() {
139  DCHECK(channel_ != NULL);
140
141  // Assume a reference to ourselves on behalf of this thread.  This reference
142  // will be released when we are closed.
143  AddRef();
144
145  if (!channel_->Connect()) {
146    OnChannelError();
147    return;
148  }
149
150  for (size_t i = 0; i < filters_.size(); ++i)
151    filters_[i]->OnFilterAdded(channel_.get());
152}
153
154// Called on the IPC::Channel thread
155void ChannelProxy::Context::OnChannelClosed() {
156  // It's okay for IPC::ChannelProxy::Close to be called more than once, which
157  // would result in this branch being taken.
158  if (!channel_.get())
159    return;
160
161  for (size_t i = 0; i < filters_.size(); ++i) {
162    filters_[i]->OnChannelClosing();
163    filters_[i]->OnFilterRemoved();
164  }
165
166  // We don't need the filters anymore.
167  filters_.clear();
168
169  channel_.reset();
170
171  // Balance with the reference taken during startup.  This may result in
172  // self-destruction.
173  Release();
174}
175
176void ChannelProxy::Context::Clear() {
177  listener_ = NULL;
178}
179
180// Called on the IPC::Channel thread
181void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
182  if (!channel_.get()) {
183    OnChannelClosed();
184    return;
185  }
186  if (!channel_->Send(message.release()))
187    OnChannelError();
188}
189
190// Called on the IPC::Channel thread
191void ChannelProxy::Context::OnAddFilter() {
192  std::vector<scoped_refptr<MessageFilter> > new_filters;
193  {
194    base::AutoLock auto_lock(pending_filters_lock_);
195    new_filters.swap(pending_filters_);
196  }
197
198  for (size_t i = 0; i < new_filters.size(); ++i) {
199    filters_.push_back(new_filters[i]);
200
201    // If the channel has already been created, then we need to send this
202    // message so that the filter gets access to the Channel.
203    if (channel_.get())
204      new_filters[i]->OnFilterAdded(channel_.get());
205    // Ditto for if the channel has been connected.
206    if (peer_pid_)
207      new_filters[i]->OnChannelConnected(peer_pid_);
208  }
209}
210
211// Called on the IPC::Channel thread
212void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
213  for (size_t i = 0; i < filters_.size(); ++i) {
214    if (filters_[i].get() == filter) {
215      filter->OnFilterRemoved();
216      filters_.erase(filters_.begin() + i);
217      return;
218    }
219  }
220
221  NOTREACHED() << "filter to be removed not found";
222}
223
224// Called on the listener's thread
225void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
226  base::AutoLock auto_lock(pending_filters_lock_);
227  pending_filters_.push_back(make_scoped_refptr(filter));
228  ipc_task_runner_->PostTask(
229      FROM_HERE, base::Bind(&Context::OnAddFilter, this));
230}
231
232// Called on the listener's thread
233void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
234#ifdef IPC_MESSAGE_LOG_ENABLED
235  Logging* logger = Logging::GetInstance();
236  std::string name;
237  logger->GetMessageText(message.type(), &name, &message, NULL);
238  TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage",
239               "name", name);
240#else
241  TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage",
242               "class", IPC_MESSAGE_ID_CLASS(message.type()),
243               "line", IPC_MESSAGE_ID_LINE(message.type()));
244#endif
245
246  if (!listener_)
247    return;
248
249  OnDispatchConnected();
250
251#ifdef IPC_MESSAGE_LOG_ENABLED
252  if (message.type() == IPC_LOGGING_ID) {
253    logger->OnReceivedLoggingMessage(message);
254    return;
255  }
256
257  if (logger->Enabled())
258    logger->OnPreDispatchMessage(message);
259#endif
260
261  listener_->OnMessageReceived(message);
262
263#ifdef IPC_MESSAGE_LOG_ENABLED
264  if (logger->Enabled())
265    logger->OnPostDispatchMessage(message, channel_id_);
266#endif
267}
268
269// Called on the listener's thread
270void ChannelProxy::Context::OnDispatchConnected() {
271  if (channel_connected_called_)
272    return;
273
274  channel_connected_called_ = true;
275  if (listener_)
276    listener_->OnChannelConnected(peer_pid_);
277}
278
279// Called on the listener's thread
280void ChannelProxy::Context::OnDispatchError() {
281  if (listener_)
282    listener_->OnChannelError();
283}
284
285//-----------------------------------------------------------------------------
286
287ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
288                           Channel::Mode mode,
289                           Listener* listener,
290                           base::SingleThreadTaskRunner* ipc_task_runner)
291    : context_(new Context(listener, ipc_task_runner)),
292      outgoing_message_filter_(NULL),
293      did_init_(false) {
294  Init(channel_handle, mode, true);
295}
296
297ChannelProxy::ChannelProxy(Context* context)
298    : context_(context),
299      outgoing_message_filter_(NULL),
300      did_init_(false) {
301}
302
303ChannelProxy::~ChannelProxy() {
304  DCHECK(CalledOnValidThread());
305
306  Close();
307}
308
309void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
310                        Channel::Mode mode,
311                        bool create_pipe_now) {
312  DCHECK(CalledOnValidThread());
313  DCHECK(!did_init_);
314#if defined(OS_POSIX)
315  // When we are creating a server on POSIX, we need its file descriptor
316  // to be created immediately so that it can be accessed and passed
317  // to other processes. Forcing it to be created immediately avoids
318  // race conditions that may otherwise arise.
319  if (mode & Channel::MODE_SERVER_FLAG) {
320    create_pipe_now = true;
321  }
322#endif  // defined(OS_POSIX)
323
324  if (create_pipe_now) {
325    // Create the channel immediately.  This effectively sets up the
326    // low-level pipe so that the client can connect.  Without creating
327    // the pipe immediately, it is possible for a listener to attempt
328    // to connect and get an error since the pipe doesn't exist yet.
329    context_->CreateChannel(channel_handle, mode);
330  } else {
331    context_->ipc_task_runner()->PostTask(
332        FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
333                              channel_handle, mode));
334  }
335
336  // complete initialization on the background thread
337  context_->ipc_task_runner()->PostTask(
338      FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
339
340  did_init_ = true;
341}
342
343void ChannelProxy::Close() {
344  DCHECK(CalledOnValidThread());
345
346  // Clear the backpointer to the listener so that any pending calls to
347  // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
348  // possible that the channel could be closed while it is receiving messages!
349  context_->Clear();
350
351  if (context_->ipc_task_runner()) {
352    context_->ipc_task_runner()->PostTask(
353        FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
354  }
355}
356
357bool ChannelProxy::Send(Message* message) {
358  DCHECK(did_init_);
359
360  // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
361  // tests that call Send() from a wrong thread. See http://crbug.com/163523.
362  if (outgoing_message_filter())
363    message = outgoing_message_filter()->Rewrite(message);
364
365#ifdef IPC_MESSAGE_LOG_ENABLED
366  Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
367#endif
368
369  context_->ipc_task_runner()->PostTask(
370      FROM_HERE,
371      base::Bind(&ChannelProxy::Context::OnSendMessage,
372                 context_, base::Passed(scoped_ptr<Message>(message))));
373  return true;
374}
375
376void ChannelProxy::AddFilter(MessageFilter* filter) {
377  DCHECK(CalledOnValidThread());
378
379  context_->AddFilter(filter);
380}
381
382void ChannelProxy::RemoveFilter(MessageFilter* filter) {
383  DCHECK(CalledOnValidThread());
384
385  context_->ipc_task_runner()->PostTask(
386      FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
387                            make_scoped_refptr(filter)));
388}
389
390void ChannelProxy::ClearIPCTaskRunner() {
391  DCHECK(CalledOnValidThread());
392
393  context()->ClearIPCTaskRunner();
394}
395
396#if defined(OS_POSIX) && !defined(OS_NACL)
397// See the TODO regarding lazy initialization of the channel in
398// ChannelProxy::Init().
399int ChannelProxy::GetClientFileDescriptor() {
400  DCHECK(CalledOnValidThread());
401
402  Channel* channel = context_.get()->channel_.get();
403  // Channel must have been created first.
404  DCHECK(channel) << context_.get()->channel_id_;
405  return channel->GetClientFileDescriptor();
406}
407
408int ChannelProxy::TakeClientFileDescriptor() {
409  DCHECK(CalledOnValidThread());
410
411  Channel* channel = context_.get()->channel_.get();
412  // Channel must have been created first.
413  DCHECK(channel) << context_.get()->channel_id_;
414  return channel->TakeClientFileDescriptor();
415}
416
417bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const {
418  DCHECK(CalledOnValidThread());
419
420  Channel* channel = context_.get()->channel_.get();
421  // Channel must have been created first.
422  DCHECK(channel) << context_.get()->channel_id_;
423  return channel->GetPeerEuid(peer_euid);
424}
425#endif
426
427//-----------------------------------------------------------------------------
428
429}  // namespace IPC
430