1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "ipc/ipc_channel_proxy.h"
6
7#include "base/bind.h"
8#include "base/compiler_specific.h"
9#include "base/location.h"
10#include "base/memory/ref_counted.h"
11#include "base/memory/scoped_ptr.h"
12#include "base/single_thread_task_runner.h"
13#include "base/thread_task_runner_handle.h"
14#include "ipc/ipc_channel_factory.h"
15#include "ipc/ipc_listener.h"
16#include "ipc/ipc_logging.h"
17#include "ipc/ipc_message_macros.h"
18#include "ipc/message_filter.h"
19#include "ipc/message_filter_router.h"
20
21namespace IPC {
22
23//------------------------------------------------------------------------------
24
25ChannelProxy::Context::Context(
26    Listener* listener,
27    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
28    : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
29      listener_(listener),
30      ipc_task_runner_(ipc_task_runner),
31      channel_connected_called_(false),
32      message_filter_router_(new MessageFilterRouter()),
33      peer_pid_(base::kNullProcessId) {
34  DCHECK(ipc_task_runner_.get());
35  // The Listener thread where Messages are handled must be a separate thread
36  // to avoid oversubscribing the IO thread. If you trigger this error, you
37  // need to either:
38  // 1) Create the ChannelProxy on a different thread, or
39  // 2) Just use Channel
40  // Note, we currently make an exception for a NULL listener. That usage
41  // basically works, but is outside the intent of ChannelProxy. This support
42  // will disappear, so please don't rely on it. See crbug.com/364241
43  DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
44}
45
46ChannelProxy::Context::~Context() {
47}
48
49void ChannelProxy::Context::ClearIPCTaskRunner() {
50  ipc_task_runner_ = NULL;
51}
52
53void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
54  DCHECK(!channel_);
55  channel_id_ = factory->GetName();
56  channel_ = factory->BuildChannel(this);
57}
58
59bool ChannelProxy::Context::TryFilters(const Message& message) {
60  DCHECK(message_filter_router_);
61#ifdef IPC_MESSAGE_LOG_ENABLED
62  Logging* logger = Logging::GetInstance();
63  if (logger->Enabled())
64    logger->OnPreDispatchMessage(message);
65#endif
66
67  if (message_filter_router_->TryFilters(message)) {
68    if (message.dispatch_error()) {
69      listener_task_runner_->PostTask(
70          FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
71    }
72#ifdef IPC_MESSAGE_LOG_ENABLED
73    if (logger->Enabled())
74      logger->OnPostDispatchMessage(message, channel_id_);
75#endif
76    return true;
77  }
78  return false;
79}
80
81// Called on the IPC::Channel thread
82bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
83  // First give a chance to the filters to process this message.
84  if (!TryFilters(message))
85    OnMessageReceivedNoFilter(message);
86  return true;
87}
88
89// Called on the IPC::Channel thread
90bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
91  listener_task_runner_->PostTask(
92      FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
93  return true;
94}
95
96// Called on the IPC::Channel thread
97void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
98  // We cache off the peer_pid so it can be safely accessed from both threads.
99  peer_pid_ = channel_->GetPeerPID();
100
101  // Add any pending filters.  This avoids a race condition where someone
102  // creates a ChannelProxy, calls AddFilter, and then right after starts the
103  // peer process.  The IO thread could receive a message before the task to add
104  // the filter is run on the IO thread.
105  OnAddFilter();
106
107  // See above comment about using listener_task_runner_ here.
108  listener_task_runner_->PostTask(
109      FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
110}
111
112// Called on the IPC::Channel thread
113void ChannelProxy::Context::OnChannelError() {
114  for (size_t i = 0; i < filters_.size(); ++i)
115    filters_[i]->OnChannelError();
116
117  // See above comment about using listener_task_runner_ here.
118  listener_task_runner_->PostTask(
119      FROM_HERE, base::Bind(&Context::OnDispatchError, this));
120}
121
122// Called on the IPC::Channel thread
123void ChannelProxy::Context::OnChannelOpened() {
124  DCHECK(channel_ != NULL);
125
126  // Assume a reference to ourselves on behalf of this thread.  This reference
127  // will be released when we are closed.
128  AddRef();
129
130  if (!channel_->Connect()) {
131    OnChannelError();
132    return;
133  }
134
135  for (size_t i = 0; i < filters_.size(); ++i)
136    filters_[i]->OnFilterAdded(channel_.get());
137}
138
139// Called on the IPC::Channel thread
140void ChannelProxy::Context::OnChannelClosed() {
141  // It's okay for IPC::ChannelProxy::Close to be called more than once, which
142  // would result in this branch being taken.
143  if (!channel_)
144    return;
145
146  for (size_t i = 0; i < filters_.size(); ++i) {
147    filters_[i]->OnChannelClosing();
148    filters_[i]->OnFilterRemoved();
149  }
150
151  // We don't need the filters anymore.
152  message_filter_router_->Clear();
153  filters_.clear();
154  // We don't need the lock, because at this point, the listener thread can't
155  // access it any more.
156  pending_filters_.clear();
157
158  channel_.reset();
159
160  // Balance with the reference taken during startup.  This may result in
161  // self-destruction.
162  Release();
163}
164
165void ChannelProxy::Context::Clear() {
166  listener_ = NULL;
167}
168
169// Called on the IPC::Channel thread
170void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
171  if (!channel_) {
172    OnChannelClosed();
173    return;
174  }
175
176  if (!channel_->Send(message.release()))
177    OnChannelError();
178}
179
180// Called on the IPC::Channel thread
181void ChannelProxy::Context::OnAddFilter() {
182  // Our OnChannelConnected method has not yet been called, so we can't be
183  // sure that channel_ is valid yet. When OnChannelConnected *is* called,
184  // it invokes OnAddFilter, so any pending filter(s) will be added at that
185  // time.
186  if (peer_pid_ == base::kNullProcessId)
187    return;
188
189  std::vector<scoped_refptr<MessageFilter> > new_filters;
190  {
191    base::AutoLock auto_lock(pending_filters_lock_);
192    new_filters.swap(pending_filters_);
193  }
194
195  for (size_t i = 0; i < new_filters.size(); ++i) {
196    filters_.push_back(new_filters[i]);
197
198    message_filter_router_->AddFilter(new_filters[i].get());
199
200    // The channel has already been created and connected, so we need to
201    // inform the filters right now.
202    new_filters[i]->OnFilterAdded(channel_.get());
203    new_filters[i]->OnChannelConnected(peer_pid_);
204  }
205}
206
207// Called on the IPC::Channel thread
208void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
209  if (peer_pid_ == base::kNullProcessId) {
210    // The channel is not yet connected, so any filters are still pending.
211    base::AutoLock auto_lock(pending_filters_lock_);
212    for (size_t i = 0; i < pending_filters_.size(); ++i) {
213      if (pending_filters_[i].get() == filter) {
214        filter->OnFilterRemoved();
215        pending_filters_.erase(pending_filters_.begin() + i);
216        return;
217      }
218    }
219    return;
220  }
221  if (!channel_)
222    return;  // The filters have already been deleted.
223
224  message_filter_router_->RemoveFilter(filter);
225
226  for (size_t i = 0; i < filters_.size(); ++i) {
227    if (filters_[i].get() == filter) {
228      filter->OnFilterRemoved();
229      filters_.erase(filters_.begin() + i);
230      return;
231    }
232  }
233
234  NOTREACHED() << "filter to be removed not found";
235}
236
237// Called on the listener's thread
238void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
239  base::AutoLock auto_lock(pending_filters_lock_);
240  pending_filters_.push_back(make_scoped_refptr(filter));
241  ipc_task_runner_->PostTask(
242      FROM_HERE, base::Bind(&Context::OnAddFilter, this));
243}
244
245// Called on the listener's thread
246void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
247#ifdef IPC_MESSAGE_LOG_ENABLED
248  Logging* logger = Logging::GetInstance();
249  std::string name;
250  logger->GetMessageText(message.type(), &name, &message, NULL);
251  TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
252               "name", name);
253#else
254  TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
255               "class", IPC_MESSAGE_ID_CLASS(message.type()),
256               "line", IPC_MESSAGE_ID_LINE(message.type()));
257#endif
258
259  if (!listener_)
260    return;
261
262  OnDispatchConnected();
263
264#ifdef IPC_MESSAGE_LOG_ENABLED
265  if (message.type() == IPC_LOGGING_ID) {
266    logger->OnReceivedLoggingMessage(message);
267    return;
268  }
269
270  if (logger->Enabled())
271    logger->OnPreDispatchMessage(message);
272#endif
273
274  listener_->OnMessageReceived(message);
275  if (message.dispatch_error())
276    listener_->OnBadMessageReceived(message);
277
278#ifdef IPC_MESSAGE_LOG_ENABLED
279  if (logger->Enabled())
280    logger->OnPostDispatchMessage(message, channel_id_);
281#endif
282}
283
284// Called on the listener's thread
285void ChannelProxy::Context::OnDispatchConnected() {
286  if (channel_connected_called_)
287    return;
288
289  channel_connected_called_ = true;
290  if (listener_)
291    listener_->OnChannelConnected(peer_pid_);
292}
293
294// Called on the listener's thread
295void ChannelProxy::Context::OnDispatchError() {
296  if (listener_)
297    listener_->OnChannelError();
298}
299
300// Called on the listener's thread
301void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
302  if (listener_)
303    listener_->OnBadMessageReceived(message);
304}
305
306//-----------------------------------------------------------------------------
307
308// static
309scoped_ptr<ChannelProxy> ChannelProxy::Create(
310    const IPC::ChannelHandle& channel_handle,
311    Channel::Mode mode,
312    Listener* listener,
313    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
314  scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
315  channel->Init(channel_handle, mode, true);
316  return channel.Pass();
317}
318
319// static
320scoped_ptr<ChannelProxy> ChannelProxy::Create(
321    scoped_ptr<ChannelFactory> factory,
322    Listener* listener,
323    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
324  scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
325  channel->Init(factory.Pass(), true);
326  return channel.Pass();
327}
328
329ChannelProxy::ChannelProxy(Context* context)
330    : context_(context),
331      did_init_(false) {
332}
333
334ChannelProxy::ChannelProxy(
335    Listener* listener,
336    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
337    : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
338}
339
340ChannelProxy::~ChannelProxy() {
341  DCHECK(CalledOnValidThread());
342
343  Close();
344}
345
346void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
347                        Channel::Mode mode,
348                        bool create_pipe_now) {
349#if defined(OS_POSIX)
350  // When we are creating a server on POSIX, we need its file descriptor
351  // to be created immediately so that it can be accessed and passed
352  // to other processes. Forcing it to be created immediately avoids
353  // race conditions that may otherwise arise.
354  if (mode & Channel::MODE_SERVER_FLAG) {
355    create_pipe_now = true;
356  }
357#endif  // defined(OS_POSIX)
358  Init(ChannelFactory::Create(channel_handle, mode),
359       create_pipe_now);
360}
361
362void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
363                        bool create_pipe_now) {
364  DCHECK(CalledOnValidThread());
365  DCHECK(!did_init_);
366
367  if (create_pipe_now) {
368    // Create the channel immediately.  This effectively sets up the
369    // low-level pipe so that the client can connect.  Without creating
370    // the pipe immediately, it is possible for a listener to attempt
371    // to connect and get an error since the pipe doesn't exist yet.
372    context_->CreateChannel(factory.Pass());
373  } else {
374    context_->ipc_task_runner()->PostTask(
375        FROM_HERE, base::Bind(&Context::CreateChannel,
376                              context_.get(), Passed(factory.Pass())));
377  }
378
379  // complete initialization on the background thread
380  context_->ipc_task_runner()->PostTask(
381      FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
382
383  did_init_ = true;
384}
385
386void ChannelProxy::Close() {
387  DCHECK(CalledOnValidThread());
388
389  // Clear the backpointer to the listener so that any pending calls to
390  // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
391  // possible that the channel could be closed while it is receiving messages!
392  context_->Clear();
393
394  if (context_->ipc_task_runner()) {
395    context_->ipc_task_runner()->PostTask(
396        FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
397  }
398}
399
400bool ChannelProxy::Send(Message* message) {
401  DCHECK(did_init_);
402
403  // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
404  // tests that call Send() from a wrong thread. See http://crbug.com/163523.
405
406#ifdef IPC_MESSAGE_LOG_ENABLED
407  Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
408#endif
409
410  context_->ipc_task_runner()->PostTask(
411      FROM_HERE,
412      base::Bind(&ChannelProxy::Context::OnSendMessage,
413                 context_, base::Passed(scoped_ptr<Message>(message))));
414  return true;
415}
416
417void ChannelProxy::AddFilter(MessageFilter* filter) {
418  DCHECK(CalledOnValidThread());
419
420  context_->AddFilter(filter);
421}
422
423void ChannelProxy::RemoveFilter(MessageFilter* filter) {
424  DCHECK(CalledOnValidThread());
425
426  context_->ipc_task_runner()->PostTask(
427      FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
428                            make_scoped_refptr(filter)));
429}
430
431void ChannelProxy::ClearIPCTaskRunner() {
432  DCHECK(CalledOnValidThread());
433
434  context()->ClearIPCTaskRunner();
435}
436
437#if defined(OS_POSIX) && !defined(OS_NACL)
438// See the TODO regarding lazy initialization of the channel in
439// ChannelProxy::Init().
440int ChannelProxy::GetClientFileDescriptor() {
441  DCHECK(CalledOnValidThread());
442
443  Channel* channel = context_.get()->channel_.get();
444  // Channel must have been created first.
445  DCHECK(channel) << context_.get()->channel_id_;
446  return channel->GetClientFileDescriptor();
447}
448
449int ChannelProxy::TakeClientFileDescriptor() {
450  DCHECK(CalledOnValidThread());
451
452  Channel* channel = context_.get()->channel_.get();
453  // Channel must have been created first.
454  DCHECK(channel) << context_.get()->channel_id_;
455  return channel->TakeClientFileDescriptor();
456}
457#endif
458
459//-----------------------------------------------------------------------------
460
461}  // namespace IPC
462