1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "ipc/ipc_sync_channel.h"
6
7#include "base/bind.h"
8#include "base/debug/trace_event.h"
9#include "base/lazy_instance.h"
10#include "base/location.h"
11#include "base/logging.h"
12#include "base/synchronization/waitable_event.h"
13#include "base/synchronization/waitable_event_watcher.h"
14#include "base/thread_task_runner_handle.h"
15#include "base/threading/thread_local.h"
16#include "ipc/ipc_channel_factory.h"
17#include "ipc/ipc_logging.h"
18#include "ipc/ipc_message_macros.h"
19#include "ipc/ipc_sync_message.h"
20
21using base::TimeDelta;
22using base::TimeTicks;
23using base::WaitableEvent;
24
25namespace IPC {
26// When we're blocked in a Send(), we need to process incoming synchronous
27// messages right away because it could be blocking our reply (either
28// directly from the same object we're calling, or indirectly through one or
29// more other channels).  That means that in SyncContext's OnMessageReceived,
30// we need to process sync message right away if we're blocked.  However a
31// simple check isn't sufficient, because the listener thread can be in the
32// process of calling Send.
33// To work around this, when SyncChannel filters a sync message, it sets
34// an event that the listener thread waits on during its Send() call.  This
35// allows us to dispatch incoming sync messages when blocked.  The race
36// condition is handled because if Send is in the process of being called, it
37// will check the event.  In case the listener thread isn't sending a message,
38// we queue a task on the listener thread to dispatch the received messages.
39// The messages are stored in this queue object that's shared among all
40// SyncChannel objects on the same thread (since one object can receive a
41// sync message while another one is blocked).
42
43class SyncChannel::ReceivedSyncMsgQueue :
44    public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
45 public:
46  // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
47  // if necessary.  Call RemoveContext on the same thread when done.
48  static ReceivedSyncMsgQueue* AddContext() {
49    // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
50    // SyncChannel objects can block the same thread).
51    ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
52    if (!rv) {
53      rv = new ReceivedSyncMsgQueue();
54      ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
55    }
56    rv->listener_count_++;
57    return rv;
58  }
59
60  // Called on IPC thread when a synchronous message or reply arrives.
61  void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
62    bool was_task_pending;
63    {
64      base::AutoLock auto_lock(message_lock_);
65
66      was_task_pending = task_pending_;
67      task_pending_ = true;
68
69      // We set the event in case the listener thread is blocked (or is about
70      // to). In case it's not, the PostTask dispatches the messages.
71      message_queue_.push_back(QueuedMessage(new Message(msg), context));
72      message_queue_version_++;
73    }
74
75    dispatch_event_.Signal();
76    if (!was_task_pending) {
77      listener_task_runner_->PostTask(
78          FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
79                                this, scoped_refptr<SyncContext>(context)));
80    }
81  }
82
83  void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
84    received_replies_.push_back(QueuedMessage(new Message(msg), context));
85  }
86
87  // Called on the listener's thread to process any queues synchronous
88  // messages.
89  void DispatchMessagesTask(SyncContext* context) {
90    {
91      base::AutoLock auto_lock(message_lock_);
92      task_pending_ = false;
93    }
94    context->DispatchMessages();
95  }
96
97  void DispatchMessages(SyncContext* dispatching_context) {
98    bool first_time = true;
99    uint32 expected_version = 0;
100    SyncMessageQueue::iterator it;
101    while (true) {
102      Message* message = NULL;
103      scoped_refptr<SyncChannel::SyncContext> context;
104      {
105        base::AutoLock auto_lock(message_lock_);
106        if (first_time || message_queue_version_ != expected_version) {
107          it = message_queue_.begin();
108          first_time = false;
109        }
110        for (; it != message_queue_.end(); it++) {
111          int message_group = it->context->restrict_dispatch_group();
112          if (message_group == kRestrictDispatchGroup_None ||
113              message_group == dispatching_context->restrict_dispatch_group()) {
114            message = it->message;
115            context = it->context;
116            it = message_queue_.erase(it);
117            message_queue_version_++;
118            expected_version = message_queue_version_;
119            break;
120          }
121        }
122      }
123
124      if (message == NULL)
125        break;
126      context->OnDispatchMessage(*message);
127      delete message;
128    }
129  }
130
131  // SyncChannel calls this in its destructor.
132  void RemoveContext(SyncContext* context) {
133    base::AutoLock auto_lock(message_lock_);
134
135    SyncMessageQueue::iterator iter = message_queue_.begin();
136    while (iter != message_queue_.end()) {
137      if (iter->context.get() == context) {
138        delete iter->message;
139        iter = message_queue_.erase(iter);
140        message_queue_version_++;
141      } else {
142        iter++;
143      }
144    }
145
146    if (--listener_count_ == 0) {
147      DCHECK(lazy_tls_ptr_.Pointer()->Get());
148      lazy_tls_ptr_.Pointer()->Set(NULL);
149    }
150  }
151
152  WaitableEvent* dispatch_event() { return &dispatch_event_; }
153  base::SingleThreadTaskRunner* listener_task_runner() {
154    return listener_task_runner_.get();
155  }
156
157  // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
158  static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
159      lazy_tls_ptr_;
160
161  // Called on the ipc thread to check if we can unblock any current Send()
162  // calls based on a queued reply.
163  void DispatchReplies() {
164    for (size_t i = 0; i < received_replies_.size(); ++i) {
165      Message* message = received_replies_[i].message;
166      if (received_replies_[i].context->TryToUnblockListener(message)) {
167        delete message;
168        received_replies_.erase(received_replies_.begin() + i);
169        return;
170      }
171    }
172  }
173
174  base::WaitableEventWatcher* top_send_done_watcher() {
175    return top_send_done_watcher_;
176  }
177
178  void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
179    top_send_done_watcher_ = watcher;
180  }
181
182 private:
183  friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
184
185  // See the comment in SyncChannel::SyncChannel for why this event is created
186  // as manual reset.
187  ReceivedSyncMsgQueue() :
188      message_queue_version_(0),
189      dispatch_event_(true, false),
190      listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
191      task_pending_(false),
192      listener_count_(0),
193      top_send_done_watcher_(NULL) {
194  }
195
196  ~ReceivedSyncMsgQueue() {}
197
198  // Holds information about a queued synchronous message or reply.
199  struct QueuedMessage {
200    QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
201    Message* message;
202    scoped_refptr<SyncChannel::SyncContext> context;
203  };
204
205  typedef std::list<QueuedMessage> SyncMessageQueue;
206  SyncMessageQueue message_queue_;
207  uint32 message_queue_version_;  // Used to signal DispatchMessages to rescan
208
209  std::vector<QueuedMessage> received_replies_;
210
211  // Set when we got a synchronous message that we must respond to as the
212  // sender needs its reply before it can reply to our original synchronous
213  // message.
214  WaitableEvent dispatch_event_;
215  scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
216  base::Lock message_lock_;
217  bool task_pending_;
218  int listener_count_;
219
220  // The current send done event watcher for this thread. Used to maintain
221  // a local global stack of send done watchers to ensure that nested sync
222  // message loops complete correctly.
223  base::WaitableEventWatcher* top_send_done_watcher_;
224};
225
226base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
227    SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
228        LAZY_INSTANCE_INITIALIZER;
229
230SyncChannel::SyncContext::SyncContext(
231    Listener* listener,
232    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
233    WaitableEvent* shutdown_event)
234    : ChannelProxy::Context(listener, ipc_task_runner),
235      received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
236      shutdown_event_(shutdown_event),
237      restrict_dispatch_group_(kRestrictDispatchGroup_None) {
238}
239
240SyncChannel::SyncContext::~SyncContext() {
241  while (!deserializers_.empty())
242    Pop();
243}
244
245// Adds information about an outgoing sync message to the context so that
246// we know how to deserialize the reply.  Returns a handle that's set when
247// the reply has arrived.
248void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
249  // Create the tracking information for this message. This object is stored
250  // by value since all members are pointers that are cheap to copy. These
251  // pointers are cleaned up in the Pop() function.
252  //
253  // The event is created as manual reset because in between Signal and
254  // OnObjectSignalled, another Send can happen which would stop the watcher
255  // from being called.  The event would get watched later, when the nested
256  // Send completes, so the event will need to remain set.
257  PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
258                         sync_msg->GetReplyDeserializer(),
259                         new WaitableEvent(true, false));
260  base::AutoLock auto_lock(deserializers_lock_);
261  deserializers_.push_back(pending);
262}
263
264bool SyncChannel::SyncContext::Pop() {
265  bool result;
266  {
267    base::AutoLock auto_lock(deserializers_lock_);
268    PendingSyncMsg msg = deserializers_.back();
269    delete msg.deserializer;
270    delete msg.done_event;
271    msg.done_event = NULL;
272    deserializers_.pop_back();
273    result = msg.send_result;
274  }
275
276  // We got a reply to a synchronous Send() call that's blocking the listener
277  // thread.  However, further down the call stack there could be another
278  // blocking Send() call, whose reply we received after we made this last
279  // Send() call.  So check if we have any queued replies available that
280  // can now unblock the listener thread.
281  ipc_task_runner()->PostTask(
282      FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
283                            received_sync_msgs_.get()));
284
285  return result;
286}
287
288WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
289  base::AutoLock auto_lock(deserializers_lock_);
290  return deserializers_.back().done_event;
291}
292
293WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
294  return received_sync_msgs_->dispatch_event();
295}
296
297void SyncChannel::SyncContext::DispatchMessages() {
298  received_sync_msgs_->DispatchMessages(this);
299}
300
301bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
302  base::AutoLock auto_lock(deserializers_lock_);
303  if (deserializers_.empty() ||
304      !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
305    return false;
306  }
307
308  // TODO(bauerb): Remove logging once investigation of http://crbug.com/141055
309  // has finished.
310  if (!msg->is_reply_error()) {
311    bool send_result = deserializers_.back().deserializer->
312        SerializeOutputParameters(*msg);
313    deserializers_.back().send_result = send_result;
314    VLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
315  } else {
316    VLOG(1) << "Received error reply";
317  }
318  deserializers_.back().done_event->Signal();
319
320  return true;
321}
322
323void SyncChannel::SyncContext::Clear() {
324  CancelPendingSends();
325  received_sync_msgs_->RemoveContext(this);
326  Context::Clear();
327}
328
329bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
330  // Give the filters a chance at processing this message.
331  if (TryFilters(msg))
332    return true;
333
334  if (TryToUnblockListener(&msg))
335    return true;
336
337  if (msg.is_reply()) {
338    received_sync_msgs_->QueueReply(msg, this);
339    return true;
340  }
341
342  if (msg.should_unblock()) {
343    received_sync_msgs_->QueueMessage(msg, this);
344    return true;
345  }
346
347  return Context::OnMessageReceivedNoFilter(msg);
348}
349
350void SyncChannel::SyncContext::OnChannelError() {
351  CancelPendingSends();
352  shutdown_watcher_.StopWatching();
353  Context::OnChannelError();
354}
355
356void SyncChannel::SyncContext::OnChannelOpened() {
357  shutdown_watcher_.StartWatching(
358      shutdown_event_,
359      base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled,
360                 base::Unretained(this)));
361  Context::OnChannelOpened();
362}
363
364void SyncChannel::SyncContext::OnChannelClosed() {
365  CancelPendingSends();
366  shutdown_watcher_.StopWatching();
367  Context::OnChannelClosed();
368}
369
370void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
371  base::AutoLock auto_lock(deserializers_lock_);
372  PendingSyncMessageQueue::iterator iter;
373  VLOG(1) << "Send timeout";
374  for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
375    if (iter->id == message_id) {
376      iter->done_event->Signal();
377      break;
378    }
379  }
380}
381
382void SyncChannel::SyncContext::CancelPendingSends() {
383  base::AutoLock auto_lock(deserializers_lock_);
384  PendingSyncMessageQueue::iterator iter;
385  // TODO(bauerb): Remove once http://crbug/141055 is fixed.
386  VLOG(1) << "Canceling pending sends";
387  for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
388    iter->done_event->Signal();
389}
390
391void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
392  if (event == shutdown_event_) {
393    // Process shut down before we can get a reply to a synchronous message.
394    // Cancel pending Send calls, which will end up setting the send done event.
395    CancelPendingSends();
396  } else {
397    // We got the reply, timed out or the process shutdown.
398    DCHECK_EQ(GetSendDoneEvent(), event);
399    base::MessageLoop::current()->QuitNow();
400  }
401}
402
403base::WaitableEventWatcher::EventCallback
404    SyncChannel::SyncContext::MakeWaitableEventCallback() {
405  return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this);
406}
407
408// static
409scoped_ptr<SyncChannel> SyncChannel::Create(
410    const IPC::ChannelHandle& channel_handle,
411    Channel::Mode mode,
412    Listener* listener,
413    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
414    bool create_pipe_now,
415    base::WaitableEvent* shutdown_event) {
416  scoped_ptr<SyncChannel> channel =
417      Create(listener, ipc_task_runner, shutdown_event);
418  channel->Init(channel_handle, mode, create_pipe_now);
419  return channel.Pass();
420}
421
422// static
423scoped_ptr<SyncChannel> SyncChannel::Create(
424    scoped_ptr<ChannelFactory> factory,
425    Listener* listener,
426    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
427    bool create_pipe_now,
428    base::WaitableEvent* shutdown_event) {
429  scoped_ptr<SyncChannel> channel =
430      Create(listener, ipc_task_runner, shutdown_event);
431  channel->Init(factory.Pass(), create_pipe_now);
432  return channel.Pass();
433}
434
435// static
436scoped_ptr<SyncChannel> SyncChannel::Create(
437    Listener* listener,
438    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
439    WaitableEvent* shutdown_event) {
440  return make_scoped_ptr(
441      new SyncChannel(listener, ipc_task_runner, shutdown_event));
442}
443
444SyncChannel::SyncChannel(
445    Listener* listener,
446    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
447    WaitableEvent* shutdown_event)
448    : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) {
449  // The current (listener) thread must be distinct from the IPC thread, or else
450  // sending synchronous messages will deadlock.
451  DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
452  StartWatching();
453}
454
455SyncChannel::~SyncChannel() {
456}
457
458void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
459  sync_context()->set_restrict_dispatch_group(group);
460}
461
462bool SyncChannel::Send(Message* message) {
463#ifdef IPC_MESSAGE_LOG_ENABLED
464  Logging* logger = Logging::GetInstance();
465  std::string name;
466  logger->GetMessageText(message->type(), &name, message, NULL);
467  TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
468#else
469  TRACE_EVENT2("ipc", "SyncChannel::Send",
470               "class", IPC_MESSAGE_ID_CLASS(message->type()),
471               "line", IPC_MESSAGE_ID_LINE(message->type()));
472#endif
473  if (!message->is_sync()) {
474    ChannelProxy::Send(message);
475    return true;
476  }
477
478  // *this* might get deleted in WaitForReply.
479  scoped_refptr<SyncContext> context(sync_context());
480  if (context->shutdown_event()->IsSignaled()) {
481    VLOG(1) << "shutdown event is signaled";
482    delete message;
483    return false;
484  }
485
486  SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
487  context->Push(sync_msg);
488  WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
489
490  ChannelProxy::Send(message);
491
492  // Wait for reply, or for any other incoming synchronous messages.
493  // *this* might get deleted, so only call static functions at this point.
494  WaitForReply(context.get(), pump_messages_event);
495
496  return context->Pop();
497}
498
499void SyncChannel::WaitForReply(
500    SyncContext* context, WaitableEvent* pump_messages_event) {
501  context->DispatchMessages();
502  while (true) {
503    WaitableEvent* objects[] = {
504      context->GetDispatchEvent(),
505      context->GetSendDoneEvent(),
506      pump_messages_event
507    };
508
509    unsigned count = pump_messages_event ? 3: 2;
510    size_t result = WaitableEvent::WaitMany(objects, count);
511    if (result == 0 /* dispatch event */) {
512      // We're waiting for a reply, but we received a blocking synchronous
513      // call.  We must process it or otherwise a deadlock might occur.
514      context->GetDispatchEvent()->Reset();
515      context->DispatchMessages();
516      continue;
517    }
518
519    if (result == 2 /* pump_messages_event */)
520      WaitForReplyWithNestedMessageLoop(context);  // Run a nested message loop.
521
522    break;
523  }
524}
525
526void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
527  base::WaitableEventWatcher send_done_watcher;
528
529  ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
530  DCHECK(sync_msg_queue != NULL);
531
532  base::WaitableEventWatcher* old_send_done_event_watcher =
533      sync_msg_queue->top_send_done_watcher();
534
535  base::WaitableEventWatcher::EventCallback old_callback;
536  base::WaitableEvent* old_event = NULL;
537
538  // Maintain a local global stack of send done delegates to ensure that
539  // nested sync calls complete in the correct sequence, i.e. the
540  // outermost call completes first, etc.
541  if (old_send_done_event_watcher) {
542    old_callback = old_send_done_event_watcher->callback();
543    old_event = old_send_done_event_watcher->GetWatchedEvent();
544    old_send_done_event_watcher->StopWatching();
545  }
546
547  sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
548
549  send_done_watcher.StartWatching(context->GetSendDoneEvent(),
550                                  context->MakeWaitableEventCallback());
551
552  {
553    base::MessageLoop::ScopedNestableTaskAllower allow(
554        base::MessageLoop::current());
555    base::MessageLoop::current()->Run();
556  }
557
558  sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
559  if (old_send_done_event_watcher && old_event) {
560    old_send_done_event_watcher->StartWatching(old_event, old_callback);
561  }
562}
563
564void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
565  DCHECK(event == sync_context()->GetDispatchEvent());
566  // The call to DispatchMessages might delete this object, so reregister
567  // the object watcher first.
568  event->Reset();
569  dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_);
570  sync_context()->DispatchMessages();
571}
572
573void SyncChannel::StartWatching() {
574  // Ideally we only want to watch this object when running a nested message
575  // loop.  However, we don't know when it exits if there's another nested
576  // message loop running under it or not, so we wouldn't know whether to
577  // stop or keep watching.  So we always watch it, and create the event as
578  // manual reset since the object watcher might otherwise reset the event
579  // when we're doing a WaitMany.
580  dispatch_watcher_callback_ =
581      base::Bind(&SyncChannel::OnWaitableEventSignaled,
582                  base::Unretained(this));
583  dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
584                                  dispatch_watcher_callback_);
585}
586
587}  // namespace IPC
588