ipc_sync_channel.h revision 868fa2fe829687343ffae624259930155e16dbd8
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef IPC_IPC_SYNC_CHANNEL_H_
6#define IPC_IPC_SYNC_CHANNEL_H_
7
8#include <string>
9#include <deque>
10
11#include "base/basictypes.h"
12#include "base/memory/ref_counted.h"
13#include "base/synchronization/lock.h"
14#include "base/synchronization/waitable_event_watcher.h"
15#include "ipc/ipc_channel_handle.h"
16#include "ipc/ipc_channel_proxy.h"
17#include "ipc/ipc_sync_message.h"
18
19namespace base {
20class WaitableEvent;
21};
22
23namespace IPC {
24
25class SyncMessage;
26
27// This is similar to ChannelProxy, with the added feature of supporting sending
28// synchronous messages.
29//
30// Overview of how the sync channel works
31// --------------------------------------
32// When the sending thread sends a synchronous message, we create a bunch
33// of tracking info (created in SendWithTimeout, stored in the PendingSyncMsg
34// structure) associated with the message that we identify by the unique
35// "MessageId" on the SyncMessage. Among the things we save is the
36// "Deserializer" which is provided by the sync message. This object is in
37// charge of reading the parameters from the reply message and putting them in
38// the output variables provided by its caller.
39//
40// The info gets stashed in a queue since we could have a nested stack of sync
41// messages (each side could send sync messages in response to sync messages,
42// so it works like calling a function). The message is sent to the I/O thread
43// for dispatch and the original thread blocks waiting for the reply.
44//
45// SyncContext maintains the queue in a threadsafe way and listens for replies
46// on the I/O thread. When a reply comes in that matches one of the messages
47// it's looking for (using the unique message ID), it will execute the
48// deserializer stashed from before, and unblock the original thread.
49//
50//
51// Significant complexity results from the fact that messages are still coming
52// in while the original thread is blocked. Normal async messages are queued
53// and dispatched after the blocking call is complete. Sync messages must
54// be dispatched in a reentrant manner to avoid deadlock.
55//
56//
57// Note that care must be taken that the lifetime of the ipc_thread argument
58// is more than this object.  If the message loop goes away while this object
59// is running and it's used to send a message, then it will use the invalid
60// message loop pointer to proxy it to the ipc thread.
61class IPC_EXPORT SyncChannel : public ChannelProxy {
62 public:
63  enum RestrictDispatchGroup {
64    kRestrictDispatchGroup_None = 0,
65  };
66
67  // Creates and initializes a sync channel. If create_pipe_now is specified,
68  // the channel will be initialized synchronously.
69  SyncChannel(const IPC::ChannelHandle& channel_handle,
70              Channel::Mode mode,
71              Listener* listener,
72              base::SingleThreadTaskRunner* ipc_task_runner,
73              bool create_pipe_now,
74              base::WaitableEvent* shutdown_event);
75
76  // Creates an uninitialized sync channel. Call ChannelProxy::Init to
77  // initialize the channel. This two-step setup allows message filters to be
78  // added before any messages are sent or received.
79  SyncChannel(Listener* listener,
80              base::SingleThreadTaskRunner* ipc_task_runner,
81              base::WaitableEvent* shutdown_event);
82
83  virtual ~SyncChannel();
84
85  virtual bool Send(Message* message) OVERRIDE;
86  virtual bool SendWithTimeout(Message* message, int timeout_ms);
87
88  // Whether we allow sending messages with no time-out.
89  void set_sync_messages_with_no_timeout_allowed(bool value) {
90    sync_messages_with_no_timeout_allowed_ = value;
91  }
92
93  // Sets the dispatch group for this channel, to only allow re-entrant dispatch
94  // of messages to other channels in the same group.
95  //
96  // Normally, any unblocking message coming from any channel can be dispatched
97  // when any (possibly other) channel is blocked on sending a message. This is
98  // needed in some cases to unblock certain loops (e.g. necessary when some
99  // processes share a window hierarchy), but may cause re-entrancy issues in
100  // some cases where such loops are not possible. This flags allows the tagging
101  // of some particular channels to only re-enter in known correct cases.
102  //
103  // Incoming messages on channels belonging to a group that is not
104  // kRestrictDispatchGroup_None will only be dispatched while a sync message is
105  // being sent on a channel of the *same* group.
106  // Incoming messages belonging to the kRestrictDispatchGroup_None group (the
107  // default) will be dispatched in any case.
108  void SetRestrictDispatchChannelGroup(int group);
109
110 protected:
111  class ReceivedSyncMsgQueue;
112  friend class ReceivedSyncMsgQueue;
113
114  // SyncContext holds the per object data for SyncChannel, so that SyncChannel
115  // can be deleted while it's being used in a different thread.  See
116  // ChannelProxy::Context for more information.
117  class SyncContext : public Context {
118   public:
119    SyncContext(Listener* listener,
120                base::SingleThreadTaskRunner* ipc_task_runner,
121                base::WaitableEvent* shutdown_event);
122
123    // Adds information about an outgoing sync message to the context so that
124    // we know how to deserialize the reply.
125    void Push(SyncMessage* sync_msg);
126
127    // Cleanly remove the top deserializer (and throw it away).  Returns the
128    // result of the Send call for that message.
129    bool Pop();
130
131    // Returns an event that's set when the send is complete, timed out or the
132    // process shut down.
133    base::WaitableEvent* GetSendDoneEvent();
134
135    // Returns an event that's set when an incoming message that's not the reply
136    // needs to get dispatched (by calling SyncContext::DispatchMessages).
137    base::WaitableEvent* GetDispatchEvent();
138
139    void DispatchMessages();
140
141    // Checks if the given message is blocking the listener thread because of a
142    // synchronous send.  If it is, the thread is unblocked and true is
143    // returned. Otherwise the function returns false.
144    bool TryToUnblockListener(const Message* msg);
145
146    // Called on the IPC thread when a sync send that runs a nested message loop
147    // times out.
148    void OnSendTimeout(int message_id);
149
150    base::WaitableEvent* shutdown_event() { return shutdown_event_; }
151
152    ReceivedSyncMsgQueue* received_sync_msgs() {
153      return received_sync_msgs_.get();
154    }
155
156    void set_restrict_dispatch_group(int group) {
157      restrict_dispatch_group_ = group;
158    }
159
160    int restrict_dispatch_group() const {
161      return restrict_dispatch_group_;
162    }
163
164    base::WaitableEventWatcher::EventCallback MakeWaitableEventCallback();
165
166   private:
167    virtual ~SyncContext();
168    // ChannelProxy methods that we override.
169
170    // Called on the listener thread.
171    virtual void Clear() OVERRIDE;
172
173    // Called on the IPC thread.
174    virtual bool OnMessageReceived(const Message& msg) OVERRIDE;
175    virtual void OnChannelError() OVERRIDE;
176    virtual void OnChannelOpened() OVERRIDE;
177    virtual void OnChannelClosed() OVERRIDE;
178
179    // Cancels all pending Send calls.
180    void CancelPendingSends();
181
182    void OnWaitableEventSignaled(base::WaitableEvent* event);
183
184    typedef std::deque<PendingSyncMsg> PendingSyncMessageQueue;
185    PendingSyncMessageQueue deserializers_;
186    base::Lock deserializers_lock_;
187
188    scoped_refptr<ReceivedSyncMsgQueue> received_sync_msgs_;
189
190    base::WaitableEvent* shutdown_event_;
191    base::WaitableEventWatcher shutdown_watcher_;
192    base::WaitableEventWatcher::EventCallback shutdown_watcher_callback_;
193    int restrict_dispatch_group_;
194  };
195
196 private:
197  void OnWaitableEventSignaled(base::WaitableEvent* arg);
198
199  SyncContext* sync_context() {
200    return reinterpret_cast<SyncContext*>(context());
201  }
202
203  // Both these functions wait for a reply, timeout or process shutdown.  The
204  // latter one also runs a nested message loop in the meantime.
205  static void WaitForReply(
206      SyncContext* context, base::WaitableEvent* pump_messages_event);
207
208  // Runs a nested message loop until a reply arrives, times out, or the process
209  // shuts down.
210  static void WaitForReplyWithNestedMessageLoop(SyncContext* context);
211
212  // Starts the dispatch watcher.
213  void StartWatching();
214
215  bool sync_messages_with_no_timeout_allowed_;
216
217  // Used to signal events between the IPC and listener threads.
218  base::WaitableEventWatcher dispatch_watcher_;
219  base::WaitableEventWatcher::EventCallback dispatch_watcher_callback_;
220
221  DISALLOW_COPY_AND_ASSIGN(SyncChannel);
222};
223
224}  // namespace IPC
225
226#endif  // IPC_IPC_SYNC_CHANNEL_H_
227