1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "ipc/ipc_channel_proxy.h"
6
7#include "base/bind.h"
8#include "base/compiler_specific.h"
9#include "base/location.h"
10#include "base/memory/ref_counted.h"
11#include "base/memory/scoped_ptr.h"
12#include "base/single_thread_task_runner.h"
13#include "base/thread_task_runner_handle.h"
14#include "ipc/ipc_listener.h"
15#include "ipc/ipc_logging.h"
16#include "ipc/ipc_message_macros.h"
17#include "ipc/message_filter.h"
18#include "ipc/message_filter_router.h"
19
20namespace IPC {
21
22//------------------------------------------------------------------------------
23
24ChannelProxy::Context::Context(Listener* listener,
25                               base::SingleThreadTaskRunner* ipc_task_runner)
26    : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
27      listener_(listener),
28      ipc_task_runner_(ipc_task_runner),
29      channel_connected_called_(false),
30      message_filter_router_(new MessageFilterRouter()),
31      peer_pid_(base::kNullProcessId) {
32  DCHECK(ipc_task_runner_.get());
33  // The Listener thread where Messages are handled must be a separate thread
34  // to avoid oversubscribing the IO thread. If you trigger this error, you
35  // need to either:
36  // 1) Create the ChannelProxy on a different thread, or
37  // 2) Just use Channel
38  // Note, we currently make an exception for a NULL listener. That usage
39  // basically works, but is outside the intent of ChannelProxy. This support
40  // will disappear, so please don't rely on it. See crbug.com/364241
41  DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
42}
43
44ChannelProxy::Context::~Context() {
45}
46
47void ChannelProxy::Context::ClearIPCTaskRunner() {
48  ipc_task_runner_ = NULL;
49}
50
51void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
52                                          const Channel::Mode& mode) {
53  DCHECK(!channel_);
54  channel_id_ = handle.name;
55  channel_ = Channel::Create(handle, mode, this);
56}
57
58bool ChannelProxy::Context::TryFilters(const Message& message) {
59  DCHECK(message_filter_router_);
60#ifdef IPC_MESSAGE_LOG_ENABLED
61  Logging* logger = Logging::GetInstance();
62  if (logger->Enabled())
63    logger->OnPreDispatchMessage(message);
64#endif
65
66  if (message_filter_router_->TryFilters(message)) {
67    if (message.dispatch_error()) {
68      listener_task_runner_->PostTask(
69          FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
70    }
71#ifdef IPC_MESSAGE_LOG_ENABLED
72    if (logger->Enabled())
73      logger->OnPostDispatchMessage(message, channel_id_);
74#endif
75    return true;
76  }
77  return false;
78}
79
80// Called on the IPC::Channel thread
81bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
82  // First give a chance to the filters to process this message.
83  if (!TryFilters(message))
84    OnMessageReceivedNoFilter(message);
85  return true;
86}
87
88// Called on the IPC::Channel thread
89bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
90  listener_task_runner_->PostTask(
91      FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
92  return true;
93}
94
95// Called on the IPC::Channel thread
96void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
97  // We cache off the peer_pid so it can be safely accessed from both threads.
98  peer_pid_ = channel_->GetPeerPID();
99
100  // Add any pending filters.  This avoids a race condition where someone
101  // creates a ChannelProxy, calls AddFilter, and then right after starts the
102  // peer process.  The IO thread could receive a message before the task to add
103  // the filter is run on the IO thread.
104  OnAddFilter();
105
106  // See above comment about using listener_task_runner_ here.
107  listener_task_runner_->PostTask(
108      FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
109}
110
111// Called on the IPC::Channel thread
112void ChannelProxy::Context::OnChannelError() {
113  for (size_t i = 0; i < filters_.size(); ++i)
114    filters_[i]->OnChannelError();
115
116  // See above comment about using listener_task_runner_ here.
117  listener_task_runner_->PostTask(
118      FROM_HERE, base::Bind(&Context::OnDispatchError, this));
119}
120
121// Called on the IPC::Channel thread
122void ChannelProxy::Context::OnChannelOpened() {
123  DCHECK(channel_ != NULL);
124
125  // Assume a reference to ourselves on behalf of this thread.  This reference
126  // will be released when we are closed.
127  AddRef();
128
129  if (!channel_->Connect()) {
130    OnChannelError();
131    return;
132  }
133
134  for (size_t i = 0; i < filters_.size(); ++i)
135    filters_[i]->OnFilterAdded(channel_.get());
136}
137
138// Called on the IPC::Channel thread
139void ChannelProxy::Context::OnChannelClosed() {
140  // It's okay for IPC::ChannelProxy::Close to be called more than once, which
141  // would result in this branch being taken.
142  if (!channel_)
143    return;
144
145  for (size_t i = 0; i < filters_.size(); ++i) {
146    filters_[i]->OnChannelClosing();
147    filters_[i]->OnFilterRemoved();
148  }
149
150  // We don't need the filters anymore.
151  message_filter_router_->Clear();
152  filters_.clear();
153  // We don't need the lock, because at this point, the listener thread can't
154  // access it any more.
155  pending_filters_.clear();
156
157  channel_.reset();
158
159  // Balance with the reference taken during startup.  This may result in
160  // self-destruction.
161  Release();
162}
163
164void ChannelProxy::Context::Clear() {
165  listener_ = NULL;
166}
167
168// Called on the IPC::Channel thread
169void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
170  if (!channel_) {
171    OnChannelClosed();
172    return;
173  }
174
175  if (!channel_->Send(message.release()))
176    OnChannelError();
177}
178
179// Called on the IPC::Channel thread
180void ChannelProxy::Context::OnAddFilter() {
181  // Our OnChannelConnected method has not yet been called, so we can't be
182  // sure that channel_ is valid yet. When OnChannelConnected *is* called,
183  // it invokes OnAddFilter, so any pending filter(s) will be added at that
184  // time.
185  if (peer_pid_ == base::kNullProcessId)
186    return;
187
188  std::vector<scoped_refptr<MessageFilter> > new_filters;
189  {
190    base::AutoLock auto_lock(pending_filters_lock_);
191    new_filters.swap(pending_filters_);
192  }
193
194  for (size_t i = 0; i < new_filters.size(); ++i) {
195    filters_.push_back(new_filters[i]);
196
197    message_filter_router_->AddFilter(new_filters[i].get());
198
199    // The channel has already been created and connected, so we need to
200    // inform the filters right now.
201    new_filters[i]->OnFilterAdded(channel_.get());
202    new_filters[i]->OnChannelConnected(peer_pid_);
203  }
204}
205
206// Called on the IPC::Channel thread
207void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
208  if (peer_pid_ == base::kNullProcessId) {
209    // The channel is not yet connected, so any filters are still pending.
210    base::AutoLock auto_lock(pending_filters_lock_);
211    for (size_t i = 0; i < pending_filters_.size(); ++i) {
212      if (pending_filters_[i].get() == filter) {
213        filter->OnFilterRemoved();
214        pending_filters_.erase(pending_filters_.begin() + i);
215        return;
216      }
217    }
218    return;
219  }
220  if (!channel_)
221    return;  // The filters have already been deleted.
222
223  message_filter_router_->RemoveFilter(filter);
224
225  for (size_t i = 0; i < filters_.size(); ++i) {
226    if (filters_[i].get() == filter) {
227      filter->OnFilterRemoved();
228      filters_.erase(filters_.begin() + i);
229      return;
230    }
231  }
232
233  NOTREACHED() << "filter to be removed not found";
234}
235
236// Called on the listener's thread
237void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
238  base::AutoLock auto_lock(pending_filters_lock_);
239  pending_filters_.push_back(make_scoped_refptr(filter));
240  ipc_task_runner_->PostTask(
241      FROM_HERE, base::Bind(&Context::OnAddFilter, this));
242}
243
244// Called on the listener's thread
245void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
246#ifdef IPC_MESSAGE_LOG_ENABLED
247  Logging* logger = Logging::GetInstance();
248  std::string name;
249  logger->GetMessageText(message.type(), &name, &message, NULL);
250  TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
251               "name", name);
252#else
253  TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
254               "class", IPC_MESSAGE_ID_CLASS(message.type()),
255               "line", IPC_MESSAGE_ID_LINE(message.type()));
256#endif
257
258  if (!listener_)
259    return;
260
261  OnDispatchConnected();
262
263#ifdef IPC_MESSAGE_LOG_ENABLED
264  if (message.type() == IPC_LOGGING_ID) {
265    logger->OnReceivedLoggingMessage(message);
266    return;
267  }
268
269  if (logger->Enabled())
270    logger->OnPreDispatchMessage(message);
271#endif
272
273  listener_->OnMessageReceived(message);
274  if (message.dispatch_error())
275    listener_->OnBadMessageReceived(message);
276
277#ifdef IPC_MESSAGE_LOG_ENABLED
278  if (logger->Enabled())
279    logger->OnPostDispatchMessage(message, channel_id_);
280#endif
281}
282
283// Called on the listener's thread
284void ChannelProxy::Context::OnDispatchConnected() {
285  if (channel_connected_called_)
286    return;
287
288  channel_connected_called_ = true;
289  if (listener_)
290    listener_->OnChannelConnected(peer_pid_);
291}
292
293// Called on the listener's thread
294void ChannelProxy::Context::OnDispatchError() {
295  if (listener_)
296    listener_->OnChannelError();
297}
298
299// Called on the listener's thread
300void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
301  if (listener_)
302    listener_->OnBadMessageReceived(message);
303}
304
305//-----------------------------------------------------------------------------
306
307// static
308scoped_ptr<ChannelProxy> ChannelProxy::Create(
309    const IPC::ChannelHandle& channel_handle,
310    Channel::Mode mode,
311    Listener* listener,
312    base::SingleThreadTaskRunner* ipc_task_runner) {
313  scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
314  channel->Init(channel_handle, mode, true);
315  return channel.Pass();
316}
317
318ChannelProxy::ChannelProxy(Context* context)
319    : context_(context),
320      did_init_(false) {
321}
322
323ChannelProxy::ChannelProxy(Listener* listener,
324                           base::SingleThreadTaskRunner* ipc_task_runner)
325    : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
326}
327
328ChannelProxy::~ChannelProxy() {
329  DCHECK(CalledOnValidThread());
330
331  Close();
332}
333
334void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
335                        Channel::Mode mode,
336                        bool create_pipe_now) {
337  DCHECK(CalledOnValidThread());
338  DCHECK(!did_init_);
339#if defined(OS_POSIX)
340  // When we are creating a server on POSIX, we need its file descriptor
341  // to be created immediately so that it can be accessed and passed
342  // to other processes. Forcing it to be created immediately avoids
343  // race conditions that may otherwise arise.
344  if (mode & Channel::MODE_SERVER_FLAG) {
345    create_pipe_now = true;
346  }
347#endif  // defined(OS_POSIX)
348
349  if (create_pipe_now) {
350    // Create the channel immediately.  This effectively sets up the
351    // low-level pipe so that the client can connect.  Without creating
352    // the pipe immediately, it is possible for a listener to attempt
353    // to connect and get an error since the pipe doesn't exist yet.
354    context_->CreateChannel(channel_handle, mode);
355  } else {
356    context_->ipc_task_runner()->PostTask(
357        FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
358                              channel_handle, mode));
359  }
360
361  // complete initialization on the background thread
362  context_->ipc_task_runner()->PostTask(
363      FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
364
365  did_init_ = true;
366}
367
368void ChannelProxy::Close() {
369  DCHECK(CalledOnValidThread());
370
371  // Clear the backpointer to the listener so that any pending calls to
372  // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
373  // possible that the channel could be closed while it is receiving messages!
374  context_->Clear();
375
376  if (context_->ipc_task_runner()) {
377    context_->ipc_task_runner()->PostTask(
378        FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
379  }
380}
381
382bool ChannelProxy::Send(Message* message) {
383  DCHECK(did_init_);
384
385  // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
386  // tests that call Send() from a wrong thread. See http://crbug.com/163523.
387
388#ifdef IPC_MESSAGE_LOG_ENABLED
389  Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
390#endif
391
392  context_->ipc_task_runner()->PostTask(
393      FROM_HERE,
394      base::Bind(&ChannelProxy::Context::OnSendMessage,
395                 context_, base::Passed(scoped_ptr<Message>(message))));
396  return true;
397}
398
399void ChannelProxy::AddFilter(MessageFilter* filter) {
400  DCHECK(CalledOnValidThread());
401
402  context_->AddFilter(filter);
403}
404
405void ChannelProxy::RemoveFilter(MessageFilter* filter) {
406  DCHECK(CalledOnValidThread());
407
408  context_->ipc_task_runner()->PostTask(
409      FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
410                            make_scoped_refptr(filter)));
411}
412
413void ChannelProxy::ClearIPCTaskRunner() {
414  DCHECK(CalledOnValidThread());
415
416  context()->ClearIPCTaskRunner();
417}
418
419#if defined(OS_POSIX) && !defined(OS_NACL)
420// See the TODO regarding lazy initialization of the channel in
421// ChannelProxy::Init().
422int ChannelProxy::GetClientFileDescriptor() {
423  DCHECK(CalledOnValidThread());
424
425  Channel* channel = context_.get()->channel_.get();
426  // Channel must have been created first.
427  DCHECK(channel) << context_.get()->channel_id_;
428  return channel->GetClientFileDescriptor();
429}
430
431int ChannelProxy::TakeClientFileDescriptor() {
432  DCHECK(CalledOnValidThread());
433
434  Channel* channel = context_.get()->channel_.get();
435  // Channel must have been created first.
436  DCHECK(channel) << context_.get()->channel_id_;
437  return channel->TakeClientFileDescriptor();
438}
439#endif
440
441//-----------------------------------------------------------------------------
442
443}  // namespace IPC
444