1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_ROUTER_H_
6#define MOJO_PUBLIC_CPP_BINDINGS_LIB_ROUTER_H_
7
8#include <stdint.h>
9
10#include <map>
11#include <memory>
12#include <queue>
13
14#include "base/callback.h"
15#include "base/macros.h"
16#include "base/memory/ref_counted.h"
17#include "base/memory/weak_ptr.h"
18#include "base/single_thread_task_runner.h"
19#include "base/threading/thread_checker.h"
20#include "mojo/public/cpp/bindings/connector.h"
21#include "mojo/public/cpp/bindings/lib/filter_chain.h"
22
23namespace mojo {
24namespace internal {
25
26// TODO(yzshen): Consider removing this class and use MultiplexRouter in all
27// cases. crbug.com/594244
28class Router : public MessageReceiverWithResponder {
29 public:
30  Router(ScopedMessagePipeHandle message_pipe,
31         FilterChain filters,
32         bool expects_sync_requests,
33         scoped_refptr<base::SingleThreadTaskRunner> runner);
34  ~Router() override;
35
36  // Sets the receiver to handle messages read from the message pipe that do
37  // not have the Message::kFlagIsResponse flag set.
38  void set_incoming_receiver(MessageReceiverWithResponderStatus* receiver) {
39    incoming_receiver_ = receiver;
40  }
41
42  // Sets the error handler to receive notifications when an error is
43  // encountered while reading from the pipe or waiting to read from the pipe.
44  void set_connection_error_handler(const base::Closure& error_handler) {
45    error_handler_ = error_handler;
46  }
47
48  // Returns true if an error was encountered while reading from the pipe or
49  // waiting to read from the pipe.
50  bool encountered_error() const {
51    DCHECK(thread_checker_.CalledOnValidThread());
52    return encountered_error_;
53  }
54
55  // Is the router bound to a MessagePipe handle?
56  bool is_valid() const {
57    DCHECK(thread_checker_.CalledOnValidThread());
58    return connector_.is_valid();
59  }
60
61  // Please note that this method shouldn't be called unless it results from an
62  // explicit request of the user of bindings (e.g., the user sets an
63  // InterfacePtr to null or closes a Binding).
64  void CloseMessagePipe() {
65    DCHECK(thread_checker_.CalledOnValidThread());
66    connector_.CloseMessagePipe();
67  }
68
69  ScopedMessagePipeHandle PassMessagePipe() {
70    DCHECK(thread_checker_.CalledOnValidThread());
71    return connector_.PassMessagePipe();
72  }
73
74  void RaiseError() {
75    DCHECK(thread_checker_.CalledOnValidThread());
76    connector_.RaiseError();
77  }
78
79  // MessageReceiver implementation:
80  bool Accept(Message* message) override;
81  bool AcceptWithResponder(Message* message,
82                           MessageReceiver* responder) override;
83
84  // Blocks the current thread until the first incoming method call, i.e.,
85  // either a call to a client method or a callback method, or |deadline|.
86  bool WaitForIncomingMessage(MojoDeadline deadline) {
87    DCHECK(thread_checker_.CalledOnValidThread());
88    return connector_.WaitForIncomingMessage(deadline);
89  }
90
91  // See Binding for details of pause/resume.
92  void PauseIncomingMethodCallProcessing() {
93    DCHECK(thread_checker_.CalledOnValidThread());
94    connector_.PauseIncomingMethodCallProcessing();
95  }
96  void ResumeIncomingMethodCallProcessing() {
97    DCHECK(thread_checker_.CalledOnValidThread());
98    connector_.ResumeIncomingMethodCallProcessing();
99  }
100
101  // Sets this object to testing mode.
102  // In testing mode:
103  // - the object is more tolerant of unrecognized response messages;
104  // - the connector continues working after seeing errors from its incoming
105  //   receiver.
106  void EnableTestingMode();
107
108  MessagePipeHandle handle() const { return connector_.handle(); }
109
110  // Returns true if this Router has any pending callbacks.
111  bool has_pending_responders() const {
112    DCHECK(thread_checker_.CalledOnValidThread());
113    return !async_responders_.empty() || !sync_responses_.empty();
114  }
115
116 private:
117  // Maps from the id of a response to the MessageReceiver that handles the
118  // response.
119  using AsyncResponderMap =
120      std::map<uint64_t, std::unique_ptr<MessageReceiver>>;
121
122  struct SyncResponseInfo {
123   public:
124    explicit SyncResponseInfo(bool* in_response_received);
125    ~SyncResponseInfo();
126
127    std::unique_ptr<Message> response;
128
129    // Points to a stack-allocated variable.
130    bool* response_received;
131
132   private:
133    DISALLOW_COPY_AND_ASSIGN(SyncResponseInfo);
134  };
135
136  using SyncResponseMap = std::map<uint64_t, std::unique_ptr<SyncResponseInfo>>;
137
138  class HandleIncomingMessageThunk : public MessageReceiver {
139   public:
140    HandleIncomingMessageThunk(Router* router);
141    ~HandleIncomingMessageThunk() override;
142
143    // MessageReceiver implementation:
144    bool Accept(Message* message) override;
145
146   private:
147    Router* router_;
148  };
149
150  bool HandleIncomingMessage(Message* message);
151  void HandleQueuedMessages();
152  bool HandleMessageInternal(Message* message);
153
154  void OnConnectionError();
155
156  HandleIncomingMessageThunk thunk_;
157  FilterChain filters_;
158  Connector connector_;
159  MessageReceiverWithResponderStatus* incoming_receiver_;
160  AsyncResponderMap async_responders_;
161  SyncResponseMap sync_responses_;
162  uint64_t next_request_id_;
163  bool testing_mode_;
164  std::queue<std::unique_ptr<Message>> pending_messages_;
165  // Whether a task has been posted to trigger processing of
166  // |pending_messages_|.
167  bool pending_task_for_messages_;
168  bool encountered_error_;
169  base::Closure error_handler_;
170  base::ThreadChecker thread_checker_;
171  base::WeakPtrFactory<Router> weak_factory_;
172};
173
174}  // namespace internal
175}  // namespace mojo
176
177#endif  // MOJO_PUBLIC_CPP_BINDINGS_LIB_ROUTER_H_
178