ipc_channel_proxy.cc revision 4e180b6a0b4720a9b8e9e959a882386f690f08ff
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/bind.h"
6#include "base/compiler_specific.h"
7#include "base/debug/trace_event.h"
8#include "base/location.h"
9#include "base/memory/ref_counted.h"
10#include "base/memory/scoped_ptr.h"
11#include "base/single_thread_task_runner.h"
12#include "base/thread_task_runner_handle.h"
13#include "ipc/ipc_channel_proxy.h"
14#include "ipc/ipc_listener.h"
15#include "ipc/ipc_logging.h"
16#include "ipc/ipc_message_macros.h"
17#include "ipc/ipc_message_utils.h"
18
19namespace IPC {
20
21//------------------------------------------------------------------------------
22
23ChannelProxy::MessageFilter::MessageFilter() {}
24
25void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {}
26
27void ChannelProxy::MessageFilter::OnFilterRemoved() {}
28
29void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {}
30
31void ChannelProxy::MessageFilter::OnChannelError() {}
32
33void ChannelProxy::MessageFilter::OnChannelClosing() {}
34
35bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) {
36  return false;
37}
38
39ChannelProxy::MessageFilter::~MessageFilter() {}
40
41//------------------------------------------------------------------------------
42
43ChannelProxy::Context::Context(Listener* listener,
44                               base::SingleThreadTaskRunner* ipc_task_runner)
45    : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
46      listener_(listener),
47      ipc_task_runner_(ipc_task_runner),
48      channel_connected_called_(false),
49      peer_pid_(base::kNullProcessId) {
50  DCHECK(ipc_task_runner_.get());
51}
52
53ChannelProxy::Context::~Context() {
54}
55
56void ChannelProxy::Context::ClearIPCTaskRunner() {
57  ipc_task_runner_ = NULL;
58}
59
60void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
61                                          const Channel::Mode& mode) {
62  DCHECK(channel_.get() == NULL);
63  channel_id_ = handle.name;
64  channel_.reset(new Channel(handle, mode, this));
65}
66
67bool ChannelProxy::Context::TryFilters(const Message& message) {
68#ifdef IPC_MESSAGE_LOG_ENABLED
69  Logging* logger = Logging::GetInstance();
70  if (logger->Enabled())
71    logger->OnPreDispatchMessage(message);
72#endif
73
74  for (size_t i = 0; i < filters_.size(); ++i) {
75    if (filters_[i]->OnMessageReceived(message)) {
76#ifdef IPC_MESSAGE_LOG_ENABLED
77      if (logger->Enabled())
78        logger->OnPostDispatchMessage(message, channel_id_);
79#endif
80      return true;
81    }
82  }
83  return false;
84}
85
86// Called on the IPC::Channel thread
87bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
88  // First give a chance to the filters to process this message.
89  if (!TryFilters(message))
90    OnMessageReceivedNoFilter(message);
91  return true;
92}
93
94// Called on the IPC::Channel thread
95bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
96  // NOTE: This code relies on the listener's message loop not going away while
97  // this thread is active.  That should be a reasonable assumption, but it
98  // feels risky.  We may want to invent some more indirect way of referring to
99  // a MessageLoop if this becomes a problem.
100  listener_task_runner_->PostTask(
101      FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
102  return true;
103}
104
105// Called on the IPC::Channel thread
106void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
107  // Add any pending filters.  This avoids a race condition where someone
108  // creates a ChannelProxy, calls AddFilter, and then right after starts the
109  // peer process.  The IO thread could receive a message before the task to add
110  // the filter is run on the IO thread.
111  OnAddFilter();
112
113  // We cache off the peer_pid so it can be safely accessed from both threads.
114  peer_pid_ = channel_->peer_pid();
115  for (size_t i = 0; i < filters_.size(); ++i)
116    filters_[i]->OnChannelConnected(peer_pid);
117
118  // See above comment about using listener_task_runner_ here.
119  listener_task_runner_->PostTask(
120      FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
121}
122
123// Called on the IPC::Channel thread
124void ChannelProxy::Context::OnChannelError() {
125  for (size_t i = 0; i < filters_.size(); ++i)
126    filters_[i]->OnChannelError();
127
128  // See above comment about using listener_task_runner_ here.
129  listener_task_runner_->PostTask(
130      FROM_HERE, base::Bind(&Context::OnDispatchError, this));
131}
132
133// Called on the IPC::Channel thread
134void ChannelProxy::Context::OnChannelOpened() {
135  DCHECK(channel_ != NULL);
136
137  // Assume a reference to ourselves on behalf of this thread.  This reference
138  // will be released when we are closed.
139  AddRef();
140
141  if (!channel_->Connect()) {
142    OnChannelError();
143    return;
144  }
145
146  for (size_t i = 0; i < filters_.size(); ++i)
147    filters_[i]->OnFilterAdded(channel_.get());
148}
149
150// Called on the IPC::Channel thread
151void ChannelProxy::Context::OnChannelClosed() {
152  // It's okay for IPC::ChannelProxy::Close to be called more than once, which
153  // would result in this branch being taken.
154  if (!channel_.get())
155    return;
156
157  for (size_t i = 0; i < filters_.size(); ++i) {
158    filters_[i]->OnChannelClosing();
159    filters_[i]->OnFilterRemoved();
160  }
161
162  // We don't need the filters anymore.
163  filters_.clear();
164
165  channel_.reset();
166
167  // Balance with the reference taken during startup.  This may result in
168  // self-destruction.
169  Release();
170}
171
172void ChannelProxy::Context::Clear() {
173  listener_ = NULL;
174}
175
176// Called on the IPC::Channel thread
177void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
178  if (!channel_.get()) {
179    OnChannelClosed();
180    return;
181  }
182  if (!channel_->Send(message.release()))
183    OnChannelError();
184}
185
186// Called on the IPC::Channel thread
187void ChannelProxy::Context::OnAddFilter() {
188  std::vector<scoped_refptr<MessageFilter> > new_filters;
189  {
190    base::AutoLock auto_lock(pending_filters_lock_);
191    new_filters.swap(pending_filters_);
192  }
193
194  for (size_t i = 0; i < new_filters.size(); ++i) {
195    filters_.push_back(new_filters[i]);
196
197    // If the channel has already been created, then we need to send this
198    // message so that the filter gets access to the Channel.
199    if (channel_.get())
200      new_filters[i]->OnFilterAdded(channel_.get());
201    // Ditto for if the channel has been connected.
202    if (peer_pid_)
203      new_filters[i]->OnChannelConnected(peer_pid_);
204  }
205}
206
207// Called on the IPC::Channel thread
208void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
209  if (!channel_.get())
210    return;  // The filters have already been deleted.
211
212  for (size_t i = 0; i < filters_.size(); ++i) {
213    if (filters_[i].get() == filter) {
214      filter->OnFilterRemoved();
215      filters_.erase(filters_.begin() + i);
216      return;
217    }
218  }
219
220  NOTREACHED() << "filter to be removed not found";
221}
222
223// Called on the listener's thread
224void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
225  base::AutoLock auto_lock(pending_filters_lock_);
226  pending_filters_.push_back(make_scoped_refptr(filter));
227  ipc_task_runner_->PostTask(
228      FROM_HERE, base::Bind(&Context::OnAddFilter, this));
229}
230
231// Called on the listener's thread
232void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
233#ifdef IPC_MESSAGE_LOG_ENABLED
234  Logging* logger = Logging::GetInstance();
235  std::string name;
236  logger->GetMessageText(message.type(), &name, &message, NULL);
237  TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage",
238               "name", name);
239#else
240  TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage",
241               "class", IPC_MESSAGE_ID_CLASS(message.type()),
242               "line", IPC_MESSAGE_ID_LINE(message.type()));
243#endif
244
245  if (!listener_)
246    return;
247
248  OnDispatchConnected();
249
250#ifdef IPC_MESSAGE_LOG_ENABLED
251  if (message.type() == IPC_LOGGING_ID) {
252    logger->OnReceivedLoggingMessage(message);
253    return;
254  }
255
256  if (logger->Enabled())
257    logger->OnPreDispatchMessage(message);
258#endif
259
260  listener_->OnMessageReceived(message);
261
262#ifdef IPC_MESSAGE_LOG_ENABLED
263  if (logger->Enabled())
264    logger->OnPostDispatchMessage(message, channel_id_);
265#endif
266}
267
268// Called on the listener's thread
269void ChannelProxy::Context::OnDispatchConnected() {
270  if (channel_connected_called_)
271    return;
272
273  channel_connected_called_ = true;
274  if (listener_)
275    listener_->OnChannelConnected(peer_pid_);
276}
277
278// Called on the listener's thread
279void ChannelProxy::Context::OnDispatchError() {
280  if (listener_)
281    listener_->OnChannelError();
282}
283
284//-----------------------------------------------------------------------------
285
286ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
287                           Channel::Mode mode,
288                           Listener* listener,
289                           base::SingleThreadTaskRunner* ipc_task_runner)
290    : context_(new Context(listener, ipc_task_runner)),
291      outgoing_message_filter_(NULL),
292      did_init_(false) {
293  Init(channel_handle, mode, true);
294}
295
296ChannelProxy::ChannelProxy(Context* context)
297    : context_(context),
298      outgoing_message_filter_(NULL),
299      did_init_(false) {
300}
301
302ChannelProxy::~ChannelProxy() {
303  DCHECK(CalledOnValidThread());
304
305  Close();
306}
307
308void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
309                        Channel::Mode mode,
310                        bool create_pipe_now) {
311  DCHECK(CalledOnValidThread());
312  DCHECK(!did_init_);
313#if defined(OS_POSIX)
314  // When we are creating a server on POSIX, we need its file descriptor
315  // to be created immediately so that it can be accessed and passed
316  // to other processes. Forcing it to be created immediately avoids
317  // race conditions that may otherwise arise.
318  if (mode & Channel::MODE_SERVER_FLAG) {
319    create_pipe_now = true;
320  }
321#endif  // defined(OS_POSIX)
322
323  if (create_pipe_now) {
324    // Create the channel immediately.  This effectively sets up the
325    // low-level pipe so that the client can connect.  Without creating
326    // the pipe immediately, it is possible for a listener to attempt
327    // to connect and get an error since the pipe doesn't exist yet.
328    context_->CreateChannel(channel_handle, mode);
329  } else {
330    context_->ipc_task_runner()->PostTask(
331        FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
332                              channel_handle, mode));
333  }
334
335  // complete initialization on the background thread
336  context_->ipc_task_runner()->PostTask(
337      FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
338
339  did_init_ = true;
340}
341
342void ChannelProxy::Close() {
343  DCHECK(CalledOnValidThread());
344
345  // Clear the backpointer to the listener so that any pending calls to
346  // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
347  // possible that the channel could be closed while it is receiving messages!
348  context_->Clear();
349
350  if (context_->ipc_task_runner()) {
351    context_->ipc_task_runner()->PostTask(
352        FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
353  }
354}
355
356bool ChannelProxy::Send(Message* message) {
357  DCHECK(did_init_);
358
359  // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
360  // tests that call Send() from a wrong thread. See http://crbug.com/163523.
361  if (outgoing_message_filter())
362    message = outgoing_message_filter()->Rewrite(message);
363
364#ifdef IPC_MESSAGE_LOG_ENABLED
365  Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
366#endif
367
368  context_->ipc_task_runner()->PostTask(
369      FROM_HERE,
370      base::Bind(&ChannelProxy::Context::OnSendMessage,
371                 context_, base::Passed(scoped_ptr<Message>(message))));
372  return true;
373}
374
375void ChannelProxy::AddFilter(MessageFilter* filter) {
376  DCHECK(CalledOnValidThread());
377
378  context_->AddFilter(filter);
379}
380
381void ChannelProxy::RemoveFilter(MessageFilter* filter) {
382  DCHECK(CalledOnValidThread());
383
384  context_->ipc_task_runner()->PostTask(
385      FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
386                            make_scoped_refptr(filter)));
387}
388
389void ChannelProxy::ClearIPCTaskRunner() {
390  DCHECK(CalledOnValidThread());
391
392  context()->ClearIPCTaskRunner();
393}
394
395#if defined(OS_POSIX) && !defined(OS_NACL)
396// See the TODO regarding lazy initialization of the channel in
397// ChannelProxy::Init().
398int ChannelProxy::GetClientFileDescriptor() {
399  DCHECK(CalledOnValidThread());
400
401  Channel* channel = context_.get()->channel_.get();
402  // Channel must have been created first.
403  DCHECK(channel) << context_.get()->channel_id_;
404  return channel->GetClientFileDescriptor();
405}
406
407int ChannelProxy::TakeClientFileDescriptor() {
408  DCHECK(CalledOnValidThread());
409
410  Channel* channel = context_.get()->channel_.get();
411  // Channel must have been created first.
412  DCHECK(channel) << context_.get()->channel_id_;
413  return channel->TakeClientFileDescriptor();
414}
415
416bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const {
417  DCHECK(CalledOnValidThread());
418
419  Channel* channel = context_.get()->channel_.get();
420  // Channel must have been created first.
421  DCHECK(channel) << context_.get()->channel_id_;
422  return channel->GetPeerEuid(peer_euid);
423}
424#endif
425
426//-----------------------------------------------------------------------------
427
428}  // namespace IPC
429