1// Copyright 2015 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6
7#include <stdint.h>
8
9#include <utility>
10
11#include "base/bind.h"
12#include "base/location.h"
13#include "base/macros.h"
14#include "base/memory/ptr_util.h"
15#include "base/single_thread_task_runner.h"
16#include "base/stl_util.h"
17#include "base/threading/thread_task_runner_handle.h"
18#include "mojo/public/cpp/bindings/associated_group.h"
19#include "mojo/public/cpp/bindings/interface_endpoint_client.h"
20#include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
21#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
22
23namespace mojo {
24namespace internal {
25
26// InterfaceEndpoint stores the information of an interface endpoint registered
27// with the router.
28// No one other than the router's |endpoints_| and |tasks_| should hold refs to
29// this object.
30class MultiplexRouter::InterfaceEndpoint
31    : public base::RefCounted<InterfaceEndpoint>,
32      public InterfaceEndpointController {
33 public:
34  InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
35      : router_(router),
36        id_(id),
37        closed_(false),
38        peer_closed_(false),
39        client_(nullptr),
40        event_signalled_(false) {}
41
42  // ---------------------------------------------------------------------------
43  // The following public methods are safe to call from any threads without
44  // locking.
45
46  InterfaceId id() const { return id_; }
47
48  // ---------------------------------------------------------------------------
49  // The following public methods are called under the router's lock.
50
51  bool closed() const { return closed_; }
52  void set_closed() {
53    router_->lock_.AssertAcquired();
54    closed_ = true;
55  }
56
57  bool peer_closed() const { return peer_closed_; }
58  void set_peer_closed() {
59    router_->lock_.AssertAcquired();
60    peer_closed_ = true;
61  }
62
63  base::SingleThreadTaskRunner* task_runner() const {
64    return task_runner_.get();
65  }
66
67  InterfaceEndpointClient* client() const { return client_; }
68
69  void AttachClient(InterfaceEndpointClient* client,
70                    scoped_refptr<base::SingleThreadTaskRunner> runner) {
71    router_->lock_.AssertAcquired();
72    DCHECK(!client_);
73    DCHECK(!closed_);
74    DCHECK(runner->BelongsToCurrentThread());
75
76    task_runner_ = std::move(runner);
77    client_ = client;
78  }
79
80  // This method must be called on the same thread as the corresponding
81  // AttachClient() call.
82  void DetachClient() {
83    router_->lock_.AssertAcquired();
84    DCHECK(client_);
85    DCHECK(task_runner_->BelongsToCurrentThread());
86    DCHECK(!closed_);
87
88    task_runner_ = nullptr;
89    client_ = nullptr;
90    sync_watcher_.reset();
91  }
92
93  void SignalSyncMessageEvent() {
94    router_->lock_.AssertAcquired();
95    if (event_signalled_)
96      return;
97
98    EnsureEventMessagePipeExists();
99    event_signalled_ = true;
100    MojoResult result =
101        WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
102                        0, MOJO_WRITE_MESSAGE_FLAG_NONE);
103    DCHECK_EQ(MOJO_RESULT_OK, result);
104  }
105
106  // ---------------------------------------------------------------------------
107  // The following public methods (i.e., InterfaceEndpointController
108  // implementation) are called by the client on the same thread as the
109  // AttachClient() call. They are called outside of the router's lock.
110
111  bool SendMessage(Message* message) override {
112    DCHECK(task_runner_->BelongsToCurrentThread());
113    message->set_interface_id(id_);
114    return router_->connector_.Accept(message);
115  }
116
117  void AllowWokenUpBySyncWatchOnSameThread() override {
118    DCHECK(task_runner_->BelongsToCurrentThread());
119
120    EnsureSyncWatcherExists();
121    sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
122  }
123
124  bool SyncWatch(const bool* should_stop) override {
125    DCHECK(task_runner_->BelongsToCurrentThread());
126
127    EnsureSyncWatcherExists();
128    return sync_watcher_->SyncWatch(should_stop);
129  }
130
131 private:
132  friend class base::RefCounted<InterfaceEndpoint>;
133
134  ~InterfaceEndpoint() override {
135    router_->lock_.AssertAcquired();
136
137    DCHECK(!client_);
138    DCHECK(closed_);
139    DCHECK(peer_closed_);
140    DCHECK(!sync_watcher_);
141  }
142
143  void OnHandleReady(MojoResult result) {
144    DCHECK(task_runner_->BelongsToCurrentThread());
145    scoped_refptr<InterfaceEndpoint> self_protector(this);
146    scoped_refptr<MultiplexRouter> router_protector(router_);
147
148    // Because we never close |sync_message_event_{sender,receiver}_| before
149    // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
150    DCHECK_EQ(MOJO_RESULT_OK, result);
151    bool reset_sync_watcher = false;
152    {
153      base::AutoLock locker(router_->lock_);
154
155      bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
156
157      if (!more_to_process)
158        ResetSyncMessageSignal();
159
160      // Currently there are no queued sync messages and the peer has closed so
161      // there won't be incoming sync messages in the future.
162      reset_sync_watcher = !more_to_process && peer_closed_;
163    }
164    if (reset_sync_watcher) {
165      // If a SyncWatch() call (or multiple ones) of this interface endpoint is
166      // on the call stack, resetting the sync watcher will allow it to exit
167      // when the call stack unwinds to that frame.
168      sync_watcher_.reset();
169    }
170  }
171
172  void EnsureSyncWatcherExists() {
173    DCHECK(task_runner_->BelongsToCurrentThread());
174    if (sync_watcher_)
175      return;
176
177    {
178      base::AutoLock locker(router_->lock_);
179      EnsureEventMessagePipeExists();
180
181      auto iter = router_->sync_message_tasks_.find(id_);
182      if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
183        SignalSyncMessageEvent();
184    }
185
186    sync_watcher_.reset(new SyncHandleWatcher(
187        sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
188        base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
189  }
190
191  void EnsureEventMessagePipeExists() {
192    router_->lock_.AssertAcquired();
193
194    if (sync_message_event_receiver_.is_valid())
195      return;
196
197    MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
198                                          &sync_message_event_receiver_);
199    DCHECK_EQ(MOJO_RESULT_OK, result);
200  }
201
202  void ResetSyncMessageSignal() {
203    router_->lock_.AssertAcquired();
204
205    if (!event_signalled_)
206      return;
207
208    DCHECK(sync_message_event_receiver_.is_valid());
209    MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
210                                       nullptr, nullptr, nullptr, nullptr,
211                                       MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
212    DCHECK_EQ(MOJO_RESULT_OK, result);
213    event_signalled_ = false;
214  }
215
216  // ---------------------------------------------------------------------------
217  // The following members are safe to access from any threads.
218
219  MultiplexRouter* const router_;
220  const InterfaceId id_;
221
222  // ---------------------------------------------------------------------------
223  // The following members are accessed under the router's lock.
224
225  // Whether the endpoint has been closed.
226  bool closed_;
227  // Whether the peer endpoint has been closed.
228  bool peer_closed_;
229
230  // The task runner on which |client_|'s methods can be called.
231  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
232  // Not owned. It is null if no client is attached to this endpoint.
233  InterfaceEndpointClient* client_;
234
235  // A message pipe used as an event to signal that sync messages are available.
236  // The message pipe handles are initialized under the router's lock and remain
237  // unchanged afterwards. They may be accessed outside of the router's lock
238  // later.
239  ScopedMessagePipeHandle sync_message_event_sender_;
240  ScopedMessagePipeHandle sync_message_event_receiver_;
241  bool event_signalled_;
242
243  // ---------------------------------------------------------------------------
244  // The following members are only valid while a client is attached. They are
245  // used exclusively on the client's thread. They may be accessed outside of
246  // the router's lock.
247
248  std::unique_ptr<SyncHandleWatcher> sync_watcher_;
249
250  DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
251};
252
253struct MultiplexRouter::Task {
254 public:
255  // Doesn't take ownership of |message| but takes its contents.
256  static std::unique_ptr<Task> CreateMessageTask(Message* message) {
257    Task* task = new Task(MESSAGE);
258    task->message.reset(new Message);
259    message->MoveTo(task->message.get());
260    return base::WrapUnique(task);
261  }
262  static std::unique_ptr<Task> CreateNotifyErrorTask(
263      InterfaceEndpoint* endpoint) {
264    Task* task = new Task(NOTIFY_ERROR);
265    task->endpoint_to_notify = endpoint;
266    return base::WrapUnique(task);
267  }
268
269  ~Task() {}
270
271  bool IsMessageTask() const { return type == MESSAGE; }
272  bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
273
274  std::unique_ptr<Message> message;
275  scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
276
277  enum Type { MESSAGE, NOTIFY_ERROR };
278  Type type;
279
280 private:
281  explicit Task(Type in_type) : type(in_type) {}
282};
283
284MultiplexRouter::MultiplexRouter(
285    bool set_interface_id_namesapce_bit,
286    ScopedMessagePipeHandle message_pipe,
287    scoped_refptr<base::SingleThreadTaskRunner> runner)
288    : AssociatedGroupController(base::ThreadTaskRunnerHandle::Get()),
289      set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
290      header_validator_(this),
291      connector_(std::move(message_pipe),
292                 Connector::MULTI_THREADED_SEND,
293                 std::move(runner)),
294      control_message_handler_(this),
295      control_message_proxy_(&connector_),
296      next_interface_id_value_(1),
297      posted_to_process_tasks_(false),
298      encountered_error_(false),
299      testing_mode_(false) {
300  // Always participate in sync handle watching, because even if it doesn't
301  // expect sync requests during sync handle watching, it may still need to
302  // dispatch messages to associated endpoints on a different thread.
303  connector_.AllowWokenUpBySyncWatchOnSameThread();
304  connector_.set_incoming_receiver(&header_validator_);
305  connector_.set_connection_error_handler(
306      base::Bind(&MultiplexRouter::OnPipeConnectionError,
307                 base::Unretained(this)));
308}
309
310MultiplexRouter::~MultiplexRouter() {
311  base::AutoLock locker(lock_);
312
313  sync_message_tasks_.clear();
314  tasks_.clear();
315
316  for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
317    InterfaceEndpoint* endpoint = iter->second.get();
318    // Increment the iterator before calling UpdateEndpointStateMayRemove()
319    // because it may remove the corresponding value from the map.
320    ++iter;
321
322    DCHECK(endpoint->closed());
323    UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
324  }
325
326  DCHECK(endpoints_.empty());
327}
328
329void MultiplexRouter::SetMasterInterfaceName(const std::string& name) {
330  DCHECK(thread_checker_.CalledOnValidThread());
331  header_validator_.SetDescription(name + " [master] MessageHeaderValidator");
332  control_message_handler_.SetDescription(
333      name + " [master] PipeControlMessageHandler");
334}
335
336void MultiplexRouter::CreateEndpointHandlePair(
337    ScopedInterfaceEndpointHandle* local_endpoint,
338    ScopedInterfaceEndpointHandle* remote_endpoint) {
339  base::AutoLock locker(lock_);
340  uint32_t id = 0;
341  do {
342    if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
343      next_interface_id_value_ = 1;
344    id = next_interface_id_value_++;
345    if (set_interface_id_namespace_bit_)
346      id |= kInterfaceIdNamespaceMask;
347  } while (ContainsKey(endpoints_, id));
348
349  InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
350  endpoints_[id] = endpoint;
351  if (encountered_error_)
352    UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
353
354  *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
355  *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
356}
357
358ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
359    InterfaceId id) {
360  if (!IsValidInterfaceId(id))
361    return ScopedInterfaceEndpointHandle();
362
363  base::AutoLock locker(lock_);
364  bool inserted = false;
365  InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
366  if (inserted) {
367    if (encountered_error_)
368      UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
369  } else {
370    // If the endpoint already exist, it is because we have received a
371    // notification that the peer endpoint has closed.
372    CHECK(!endpoint->closed());
373    CHECK(endpoint->peer_closed());
374  }
375  return CreateScopedInterfaceEndpointHandle(id, true);
376}
377
378void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
379  if (!IsValidInterfaceId(id))
380    return;
381
382  base::AutoLock locker(lock_);
383
384  if (!is_local) {
385    DCHECK(ContainsKey(endpoints_, id));
386    DCHECK(!IsMasterInterfaceId(id));
387
388    // We will receive a NotifyPeerEndpointClosed message from the other side.
389    control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
390
391    return;
392  }
393
394  DCHECK(ContainsKey(endpoints_, id));
395  InterfaceEndpoint* endpoint = endpoints_[id].get();
396  DCHECK(!endpoint->client());
397  DCHECK(!endpoint->closed());
398  UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
399
400  if (!IsMasterInterfaceId(id))
401    control_message_proxy_.NotifyPeerEndpointClosed(id);
402
403  ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
404}
405
406InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
407    const ScopedInterfaceEndpointHandle& handle,
408    InterfaceEndpointClient* client,
409    scoped_refptr<base::SingleThreadTaskRunner> runner) {
410  const InterfaceId id = handle.id();
411
412  DCHECK(IsValidInterfaceId(id));
413  DCHECK(client);
414
415  base::AutoLock locker(lock_);
416  DCHECK(ContainsKey(endpoints_, id));
417
418  InterfaceEndpoint* endpoint = endpoints_[id].get();
419  endpoint->AttachClient(client, std::move(runner));
420
421  if (endpoint->peer_closed())
422    tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
423  ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
424
425  return endpoint;
426}
427
428void MultiplexRouter::DetachEndpointClient(
429    const ScopedInterfaceEndpointHandle& handle) {
430  const InterfaceId id = handle.id();
431
432  DCHECK(IsValidInterfaceId(id));
433
434  base::AutoLock locker(lock_);
435  DCHECK(ContainsKey(endpoints_, id));
436
437  InterfaceEndpoint* endpoint = endpoints_[id].get();
438  endpoint->DetachClient();
439}
440
441void MultiplexRouter::RaiseError() {
442  if (task_runner_->BelongsToCurrentThread()) {
443    connector_.RaiseError();
444  } else {
445    task_runner_->PostTask(FROM_HERE,
446                           base::Bind(&MultiplexRouter::RaiseError, this));
447  }
448}
449
450void MultiplexRouter::CloseMessagePipe() {
451  DCHECK(thread_checker_.CalledOnValidThread());
452  connector_.CloseMessagePipe();
453  // CloseMessagePipe() above won't trigger connection error handler.
454  // Explicitly call OnPipeConnectionError() so that associated endpoints will
455  // get notified.
456  OnPipeConnectionError();
457}
458
459bool MultiplexRouter::HasAssociatedEndpoints() const {
460  DCHECK(thread_checker_.CalledOnValidThread());
461  base::AutoLock locker(lock_);
462
463  if (endpoints_.size() > 1)
464    return true;
465  if (endpoints_.size() == 0)
466    return false;
467
468  return !ContainsKey(endpoints_, kMasterInterfaceId);
469}
470
471void MultiplexRouter::EnableTestingMode() {
472  DCHECK(thread_checker_.CalledOnValidThread());
473  base::AutoLock locker(lock_);
474
475  testing_mode_ = true;
476  connector_.set_enforce_errors_from_incoming_receiver(false);
477}
478
479bool MultiplexRouter::Accept(Message* message) {
480  DCHECK(thread_checker_.CalledOnValidThread());
481
482  scoped_refptr<MultiplexRouter> protector(this);
483  base::AutoLock locker(lock_);
484
485  ClientCallBehavior client_call_behavior =
486      connector_.during_sync_handle_watcher_callback()
487          ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
488          : ALLOW_DIRECT_CLIENT_CALLS;
489
490  bool processed =
491      tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
492                                               connector_.task_runner());
493
494  if (!processed) {
495    // Either the task queue is not empty or we cannot process the message
496    // directly. In both cases, there is no need to call ProcessTasks().
497    tasks_.push_back(Task::CreateMessageTask(message));
498    Task* task = tasks_.back().get();
499
500    if (task->message->has_flag(Message::kFlagIsSync)) {
501      InterfaceId id = task->message->interface_id();
502      sync_message_tasks_[id].push_back(task);
503      auto iter = endpoints_.find(id);
504      if (iter != endpoints_.end())
505        iter->second->SignalSyncMessageEvent();
506    }
507  } else if (!tasks_.empty()) {
508    // Processing the message may result in new tasks (for error notification)
509    // being added to the queue. In this case, we have to attempt to process the
510    // tasks.
511    ProcessTasks(client_call_behavior, connector_.task_runner());
512  }
513
514  // Always return true. If we see errors during message processing, we will
515  // explicitly call Connector::RaiseError() to disconnect the message pipe.
516  return true;
517}
518
519bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
520  lock_.AssertAcquired();
521
522  if (IsMasterInterfaceId(id))
523    return false;
524
525  InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
526
527  // It is possible that this endpoint has been set as peer closed. That is
528  // because when the message pipe is closed, all the endpoints are updated with
529  // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
530  // as long as there are refs keeping the router alive. If there is a
531  // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
532  // here and see that the endpoint has been marked as peer closed.
533  if (!endpoint->peer_closed()) {
534    if (endpoint->client())
535      tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
536    UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
537  }
538
539  // No need to trigger a ProcessTasks() because it is already on the stack.
540
541  return true;
542}
543
544bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
545  lock_.AssertAcquired();
546
547  if (IsMasterInterfaceId(id))
548    return false;
549
550  InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
551  DCHECK(!endpoint->closed());
552  UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
553
554  control_message_proxy_.NotifyPeerEndpointClosed(id);
555
556  return true;
557}
558
559void MultiplexRouter::OnPipeConnectionError() {
560  DCHECK(thread_checker_.CalledOnValidThread());
561
562  scoped_refptr<MultiplexRouter> protector(this);
563  base::AutoLock locker(lock_);
564
565  encountered_error_ = true;
566
567  for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
568    InterfaceEndpoint* endpoint = iter->second.get();
569    // Increment the iterator before calling UpdateEndpointStateMayRemove()
570    // because it may remove the corresponding value from the map.
571    ++iter;
572
573    if (endpoint->client())
574      tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
575
576    UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
577  }
578
579  ProcessTasks(connector_.during_sync_handle_watcher_callback()
580                   ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
581                   : ALLOW_DIRECT_CLIENT_CALLS,
582               connector_.task_runner());
583}
584
585void MultiplexRouter::ProcessTasks(
586    ClientCallBehavior client_call_behavior,
587    base::SingleThreadTaskRunner* current_task_runner) {
588  lock_.AssertAcquired();
589
590  if (posted_to_process_tasks_)
591    return;
592
593  while (!tasks_.empty()) {
594    std::unique_ptr<Task> task(std::move(tasks_.front()));
595    tasks_.pop_front();
596
597    InterfaceId id = kInvalidInterfaceId;
598    bool sync_message = task->IsMessageTask() && task->message &&
599                        task->message->has_flag(Message::kFlagIsSync);
600    if (sync_message) {
601      id = task->message->interface_id();
602      auto& sync_message_queue = sync_message_tasks_[id];
603      DCHECK_EQ(task.get(), sync_message_queue.front());
604      sync_message_queue.pop_front();
605    }
606
607    bool processed =
608        task->IsNotifyErrorTask()
609            ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
610                                     current_task_runner)
611            : ProcessIncomingMessage(task->message.get(), client_call_behavior,
612                                     current_task_runner);
613
614    if (!processed) {
615      if (sync_message) {
616        auto& sync_message_queue = sync_message_tasks_[id];
617        sync_message_queue.push_front(task.get());
618      }
619      tasks_.push_front(std::move(task));
620      break;
621    } else {
622      if (sync_message) {
623        auto iter = sync_message_tasks_.find(id);
624        if (iter != sync_message_tasks_.end() && iter->second.empty())
625          sync_message_tasks_.erase(iter);
626      }
627    }
628  }
629}
630
631bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
632  lock_.AssertAcquired();
633
634  auto iter = sync_message_tasks_.find(id);
635  if (iter == sync_message_tasks_.end())
636    return false;
637
638  MultiplexRouter::Task* task = iter->second.front();
639  iter->second.pop_front();
640
641  DCHECK(task->IsMessageTask());
642  std::unique_ptr<Message> message(std::move(task->message));
643
644  // Note: after this call, |task| and  |iter| may be invalidated.
645  bool processed = ProcessIncomingMessage(
646      message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
647  DCHECK(processed);
648
649  iter = sync_message_tasks_.find(id);
650  if (iter == sync_message_tasks_.end())
651    return false;
652
653  if (iter->second.empty()) {
654    sync_message_tasks_.erase(iter);
655    return false;
656  }
657
658  return true;
659}
660
661bool MultiplexRouter::ProcessNotifyErrorTask(
662    Task* task,
663    ClientCallBehavior client_call_behavior,
664    base::SingleThreadTaskRunner* current_task_runner) {
665  DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
666  lock_.AssertAcquired();
667  InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
668  if (!endpoint->client())
669    return true;
670
671  if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
672      endpoint->task_runner() != current_task_runner) {
673    MaybePostToProcessTasks(endpoint->task_runner());
674    return false;
675  }
676
677  DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
678
679  InterfaceEndpointClient* client = endpoint->client();
680  {
681    // We must unlock before calling into |client| because it may call this
682    // object within NotifyError(). Holding the lock will lead to deadlock.
683    //
684    // It is safe to call into |client| without the lock. Because |client| is
685    // always accessed on the same thread, including DetachEndpointClient().
686    base::AutoUnlock unlocker(lock_);
687    client->NotifyError();
688  }
689  return true;
690}
691
692bool MultiplexRouter::ProcessIncomingMessage(
693    Message* message,
694    ClientCallBehavior client_call_behavior,
695    base::SingleThreadTaskRunner* current_task_runner) {
696  DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
697  lock_.AssertAcquired();
698
699  if (!message) {
700    // This is a sync message and has been processed during sync handle
701    // watching.
702    return true;
703  }
704
705  if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
706    if (!control_message_handler_.Accept(message))
707      RaiseErrorInNonTestingMode();
708    return true;
709  }
710
711  InterfaceId id = message->interface_id();
712  DCHECK(IsValidInterfaceId(id));
713
714  bool inserted = false;
715  InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
716  if (inserted) {
717    // Currently, it is legitimate to receive messages for an endpoint
718    // that is not registered. For example, the endpoint is transferred in
719    // a message that is discarded. Once we add support to specify all
720    // enclosing endpoints in message header, we should be able to remove
721    // this.
722    UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
723
724    // It is also possible that this newly-inserted endpoint is the master
725    // endpoint. When the master InterfacePtr/Binding goes away, the message
726    // pipe is closed and we explicitly trigger a pipe connection error. The
727    // error updates all the endpoints, including the master endpoint, with
728    // PEER_ENDPOINT_CLOSED and removes the master endpoint from the
729    // registration. We continue to process remaining tasks in the queue, as
730    // long as there are refs keeping the router alive. If there are remaining
731    // messages for the master endpoint, we will get here.
732    if (!IsMasterInterfaceId(id))
733      control_message_proxy_.NotifyPeerEndpointClosed(id);
734    return true;
735  }
736
737  if (endpoint->closed())
738    return true;
739
740  if (!endpoint->client()) {
741    // We need to wait until a client is attached in order to dispatch further
742    // messages.
743    return false;
744  }
745
746  bool can_direct_call;
747  if (message->has_flag(Message::kFlagIsSync)) {
748    can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
749                      endpoint->task_runner()->BelongsToCurrentThread();
750  } else {
751    can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
752                      endpoint->task_runner() == current_task_runner;
753  }
754
755  if (!can_direct_call) {
756    MaybePostToProcessTasks(endpoint->task_runner());
757    return false;
758  }
759
760  DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
761
762  InterfaceEndpointClient* client = endpoint->client();
763  bool result = false;
764  {
765    // We must unlock before calling into |client| because it may call this
766    // object within HandleIncomingMessage(). Holding the lock will lead to
767    // deadlock.
768    //
769    // It is safe to call into |client| without the lock. Because |client| is
770    // always accessed on the same thread, including DetachEndpointClient().
771    base::AutoUnlock unlocker(lock_);
772    result = client->HandleIncomingMessage(message);
773  }
774  if (!result)
775    RaiseErrorInNonTestingMode();
776
777  return true;
778}
779
780void MultiplexRouter::MaybePostToProcessTasks(
781    base::SingleThreadTaskRunner* task_runner) {
782  lock_.AssertAcquired();
783  if (posted_to_process_tasks_)
784    return;
785
786  posted_to_process_tasks_ = true;
787  posted_to_task_runner_ = task_runner;
788  task_runner->PostTask(
789      FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
790}
791
792void MultiplexRouter::LockAndCallProcessTasks() {
793  // There is no need to hold a ref to this class in this case because this is
794  // always called using base::Bind(), which holds a ref.
795  base::AutoLock locker(lock_);
796  posted_to_process_tasks_ = false;
797  scoped_refptr<base::SingleThreadTaskRunner> runner(
798      std::move(posted_to_task_runner_));
799  ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
800}
801
802void MultiplexRouter::UpdateEndpointStateMayRemove(
803    InterfaceEndpoint* endpoint,
804    EndpointStateUpdateType type) {
805  switch (type) {
806    case ENDPOINT_CLOSED:
807      endpoint->set_closed();
808      break;
809    case PEER_ENDPOINT_CLOSED:
810      endpoint->set_peer_closed();
811      // If the interface endpoint is performing a sync watch, this makes sure
812      // it is notified and eventually exits the sync watch.
813      endpoint->SignalSyncMessageEvent();
814      break;
815  }
816  if (endpoint->closed() && endpoint->peer_closed())
817    endpoints_.erase(endpoint->id());
818}
819
820void MultiplexRouter::RaiseErrorInNonTestingMode() {
821  lock_.AssertAcquired();
822  if (!testing_mode_)
823    RaiseError();
824}
825
826MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
827    InterfaceId id,
828    bool* inserted) {
829  lock_.AssertAcquired();
830  // Either |inserted| is nullptr or it points to a boolean initialized as
831  // false.
832  DCHECK(!inserted || !*inserted);
833
834  auto iter = endpoints_.find(id);
835  InterfaceEndpoint* endpoint;
836  if (iter == endpoints_.end()) {
837    endpoint = new InterfaceEndpoint(this, id);
838    endpoints_[id] = endpoint;
839    if (inserted)
840      *inserted = true;
841  } else {
842    endpoint = iter->second.get();
843  }
844
845  return endpoint;
846}
847
848}  // namespace internal
849}  // namespace mojo
850