1// Copyright 2015 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
6#define MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
7
8#include <stdint.h>
9
10#include <deque>
11#include <map>
12#include <memory>
13#include <string>
14
15#include "base/logging.h"
16#include "base/macros.h"
17#include "base/memory/ref_counted.h"
18#include "base/memory/weak_ptr.h"
19#include "base/single_thread_task_runner.h"
20#include "base/synchronization/lock.h"
21#include "base/threading/thread_checker.h"
22#include "mojo/public/cpp/bindings/associated_group_controller.h"
23#include "mojo/public/cpp/bindings/connector.h"
24#include "mojo/public/cpp/bindings/interface_id.h"
25#include "mojo/public/cpp/bindings/message_header_validator.h"
26#include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
27#include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
28#include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
29#include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
30
31namespace base {
32class SingleThreadTaskRunner;
33}
34
35namespace mojo {
36
37class AssociatedGroup;
38
39namespace internal {
40
41// MultiplexRouter supports routing messages for multiple interfaces over a
42// single message pipe.
43//
44// It is created on the thread where the master interface of the message pipe
45// lives. Although it is ref-counted, it is guarateed to be destructed on the
46// same thread.
47// Some public methods are only allowed to be called on the creating thread;
48// while the others are safe to call from any threads. Please see the method
49// comments for more details.
50class MultiplexRouter
51    : public MessageReceiver,
52      public AssociatedGroupController,
53      public PipeControlMessageHandlerDelegate {
54 public:
55  // If |set_interface_id_namespace_bit| is true, the interface IDs generated by
56  // this router will have the highest bit set.
57  MultiplexRouter(bool set_interface_id_namespace_bit,
58                  ScopedMessagePipeHandle message_pipe,
59                  scoped_refptr<base::SingleThreadTaskRunner> runner);
60
61  // Sets the master interface name for this router. Only used when reporting
62  // message header or control message validation errors.
63  void SetMasterInterfaceName(const std::string& name);
64
65  // ---------------------------------------------------------------------------
66  // The following public methods are safe to call from any threads.
67
68  // AssociatedGroupController implementation:
69  void CreateEndpointHandlePair(
70      ScopedInterfaceEndpointHandle* local_endpoint,
71      ScopedInterfaceEndpointHandle* remote_endpoint) override;
72  ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
73      InterfaceId id) override;
74  void CloseEndpointHandle(InterfaceId id, bool is_local) override;
75  InterfaceEndpointController* AttachEndpointClient(
76      const ScopedInterfaceEndpointHandle& handle,
77      InterfaceEndpointClient* endpoint_client,
78      scoped_refptr<base::SingleThreadTaskRunner> runner) override;
79  void DetachEndpointClient(
80      const ScopedInterfaceEndpointHandle& handle) override;
81  void RaiseError() override;
82
83  // ---------------------------------------------------------------------------
84  // The following public methods are called on the creating thread.
85
86  // Please note that this method shouldn't be called unless it results from an
87  // explicit request of the user of bindings (e.g., the user sets an
88  // InterfacePtr to null or closes a Binding).
89  void CloseMessagePipe();
90
91  // Extracts the underlying message pipe.
92  ScopedMessagePipeHandle PassMessagePipe() {
93    DCHECK(thread_checker_.CalledOnValidThread());
94    DCHECK(!HasAssociatedEndpoints());
95    return connector_.PassMessagePipe();
96  }
97
98  // Blocks the current thread until the first incoming message, or |deadline|.
99  bool WaitForIncomingMessage(MojoDeadline deadline) {
100    DCHECK(thread_checker_.CalledOnValidThread());
101    return connector_.WaitForIncomingMessage(deadline);
102  }
103
104  // See Binding for details of pause/resume.
105  void PauseIncomingMethodCallProcessing() {
106    DCHECK(thread_checker_.CalledOnValidThread());
107    connector_.PauseIncomingMethodCallProcessing();
108  }
109  void ResumeIncomingMethodCallProcessing() {
110    DCHECK(thread_checker_.CalledOnValidThread());
111    connector_.ResumeIncomingMethodCallProcessing();
112  }
113
114  // Whether there are any associated interfaces running currently.
115  bool HasAssociatedEndpoints() const;
116
117  // Sets this object to testing mode.
118  // In testing mode, the object doesn't disconnect the underlying message pipe
119  // when it receives unexpected or invalid messages.
120  void EnableTestingMode();
121
122  // Is the router bound to a message pipe handle?
123  bool is_valid() const {
124    DCHECK(thread_checker_.CalledOnValidThread());
125    return connector_.is_valid();
126  }
127
128  // TODO(yzshen): consider removing this getter.
129  MessagePipeHandle handle() const {
130    DCHECK(thread_checker_.CalledOnValidThread());
131    return connector_.handle();
132  }
133
134 private:
135  class InterfaceEndpoint;
136  struct Task;
137
138  ~MultiplexRouter() override;
139
140  // MessageReceiver implementation:
141  bool Accept(Message* message) override;
142
143  // PipeControlMessageHandlerDelegate implementation:
144  bool OnPeerAssociatedEndpointClosed(InterfaceId id) override;
145  bool OnAssociatedEndpointClosedBeforeSent(InterfaceId id) override;
146
147  void OnPipeConnectionError();
148
149  // Specifies whether we are allowed to directly call into
150  // InterfaceEndpointClient (given that we are already on the same thread as
151  // the client).
152  enum ClientCallBehavior {
153    // Don't call any InterfaceEndpointClient methods directly.
154    NO_DIRECT_CLIENT_CALLS,
155    // Only call InterfaceEndpointClient::HandleIncomingMessage directly to
156    // handle sync messages.
157    ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES,
158    // Allow to call any InterfaceEndpointClient methods directly.
159    ALLOW_DIRECT_CLIENT_CALLS
160  };
161
162  // Processes enqueued tasks (incoming messages and error notifications).
163  // |current_task_runner| is only used when |client_call_behavior| is
164  // ALLOW_DIRECT_CLIENT_CALLS to determine whether we are on the right task
165  // runner to make client calls for async messages or connection error
166  // notifications.
167  //
168  // Note: Because calling into InterfaceEndpointClient may lead to destruction
169  // of this object, if direct calls are allowed, the caller needs to hold on to
170  // a ref outside of |lock_| before calling this method.
171  void ProcessTasks(ClientCallBehavior client_call_behavior,
172                    base::SingleThreadTaskRunner* current_task_runner);
173
174  // Processes the first queued sync message for the endpoint corresponding to
175  // |id|; returns whether there are more sync messages for that endpoint in the
176  // queue.
177  //
178  // This method is only used by enpoints during sync watching. Therefore, not
179  // all sync messages are handled by it.
180  bool ProcessFirstSyncMessageForEndpoint(InterfaceId id);
181
182  // Returns true to indicate that |task|/|message| has been processed.
183  bool ProcessNotifyErrorTask(
184      Task* task,
185      ClientCallBehavior client_call_behavior,
186      base::SingleThreadTaskRunner* current_task_runner);
187  bool ProcessIncomingMessage(
188      Message* message,
189      ClientCallBehavior client_call_behavior,
190      base::SingleThreadTaskRunner* current_task_runner);
191
192  void MaybePostToProcessTasks(base::SingleThreadTaskRunner* task_runner);
193  void LockAndCallProcessTasks();
194
195  // Updates the state of |endpoint|. If both the endpoint and its peer have
196  // been closed, removes it from |endpoints_|.
197  // NOTE: The method may invalidate |endpoint|.
198  enum EndpointStateUpdateType { ENDPOINT_CLOSED, PEER_ENDPOINT_CLOSED };
199  void UpdateEndpointStateMayRemove(InterfaceEndpoint* endpoint,
200                                    EndpointStateUpdateType type);
201
202  void RaiseErrorInNonTestingMode();
203
204  InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted);
205
206  // Whether to set the namespace bit when generating interface IDs. Please see
207  // comments of kInterfaceIdNamespaceMask.
208  const bool set_interface_id_namespace_bit_;
209
210  MessageHeaderValidator header_validator_;
211  Connector connector_;
212
213  base::ThreadChecker thread_checker_;
214
215  // Protects the following members.
216  mutable base::Lock lock_;
217  PipeControlMessageHandler control_message_handler_;
218  PipeControlMessageProxy control_message_proxy_;
219
220  std::map<InterfaceId, scoped_refptr<InterfaceEndpoint>> endpoints_;
221  uint32_t next_interface_id_value_;
222
223  std::deque<std::unique_ptr<Task>> tasks_;
224  // It refers to tasks in |tasks_| and doesn't own any of them.
225  std::map<InterfaceId, std::deque<Task*>> sync_message_tasks_;
226
227  bool posted_to_process_tasks_;
228  scoped_refptr<base::SingleThreadTaskRunner> posted_to_task_runner_;
229
230  bool encountered_error_;
231
232  bool testing_mode_;
233
234  DISALLOW_COPY_AND_ASSIGN(MultiplexRouter);
235};
236
237}  // namespace internal
238}  // namespace mojo
239
240#endif  // MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
241