node_controller.cc revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2016 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/edk/system/node_controller.h"
6
7#include <algorithm>
8#include <limits>
9
10#include "base/bind.h"
11#include "base/location.h"
12#include "base/logging.h"
13#include "base/macros.h"
14#include "base/message_loop/message_loop.h"
15#include "base/metrics/histogram_macros.h"
16#include "base/process/process_handle.h"
17#include "base/rand_util.h"
18#include "base/time/time.h"
19#include "base/timer/elapsed_timer.h"
20#include "mojo/edk/embedder/embedder_internal.h"
21#include "mojo/edk/embedder/platform_channel_pair.h"
22#include "mojo/edk/system/broker.h"
23#include "mojo/edk/system/broker_host.h"
24#include "mojo/edk/system/core.h"
25#include "mojo/edk/system/ports_message.h"
26#include "mojo/edk/system/request_context.h"
27
28#if defined(OS_MACOSX) && !defined(OS_IOS)
29#include "mojo/edk/system/mach_port_relay.h"
30#endif
31
32#if !defined(OS_NACL)
33#include "crypto/random.h"
34#endif
35
36namespace mojo {
37namespace edk {
38
39namespace {
40
41#if defined(OS_NACL)
42template <typename T>
43void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); }
44#else
45template <typename T>
46void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
47#endif
48
49ports::NodeName GetRandomNodeName() {
50  ports::NodeName name;
51  GenerateRandomName(&name);
52  return name;
53}
54
55void RecordPeerCount(size_t count) {
56  DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
57
58  // 8k is the maximum number of file descriptors allowed in Chrome.
59  UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers",
60                              static_cast<int32_t>(count),
61                              0 /* min */,
62                              8000 /* max */,
63                              50 /* bucket count */);
64}
65
66void RecordPendingChildCount(size_t count) {
67  DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
68
69  // 8k is the maximum number of file descriptors allowed in Chrome.
70  UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren",
71                              static_cast<int32_t>(count),
72                              0 /* min */,
73                              8000 /* max */,
74                              50 /* bucket count */);
75}
76
77bool ParsePortsMessage(Channel::Message* message,
78                       void** data,
79                       size_t* num_data_bytes,
80                       size_t* num_header_bytes,
81                       size_t* num_payload_bytes,
82                       size_t* num_ports_bytes) {
83  DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
84         num_ports_bytes);
85
86  NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
87  if (!*num_data_bytes)
88    return false;
89
90  if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
91                             num_payload_bytes, num_ports_bytes)) {
92    return false;
93  }
94
95  return true;
96}
97
98// Used by NodeController to watch for shutdown. Since no IO can happen once
99// the IO thread is killed, the NodeController can cleanly drop all its peers
100// at that time.
101class ThreadDestructionObserver :
102    public base::MessageLoop::DestructionObserver {
103 public:
104  static void Create(scoped_refptr<base::TaskRunner> task_runner,
105                     const base::Closure& callback) {
106    if (task_runner->RunsTasksOnCurrentThread()) {
107      // Owns itself.
108      new ThreadDestructionObserver(callback);
109    } else {
110      task_runner->PostTask(FROM_HERE,
111                            base::Bind(&Create, task_runner, callback));
112    }
113  }
114
115 private:
116  explicit ThreadDestructionObserver(const base::Closure& callback)
117      : callback_(callback) {
118    base::MessageLoop::current()->AddDestructionObserver(this);
119  }
120
121  ~ThreadDestructionObserver() override {
122    base::MessageLoop::current()->RemoveDestructionObserver(this);
123  }
124
125  // base::MessageLoop::DestructionObserver:
126  void WillDestroyCurrentMessageLoop() override {
127    callback_.Run();
128    delete this;
129  }
130
131  const base::Closure callback_;
132
133  DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
134};
135
136}  // namespace
137
138NodeController::~NodeController() {}
139
140NodeController::NodeController(Core* core)
141    : core_(core),
142      name_(GetRandomNodeName()),
143      node_(new ports::Node(name_, this)) {
144  DVLOG(1) << "Initializing node " << name_;
145}
146
147#if defined(OS_MACOSX) && !defined(OS_IOS)
148void NodeController::CreateMachPortRelay(
149    base::PortProvider* port_provider) {
150  base::AutoLock lock(mach_port_relay_lock_);
151  DCHECK(!mach_port_relay_);
152  mach_port_relay_.reset(new MachPortRelay(port_provider));
153}
154#endif
155
156void NodeController::SetIOTaskRunner(
157    scoped_refptr<base::TaskRunner> task_runner) {
158  io_task_runner_ = task_runner;
159  ThreadDestructionObserver::Create(
160      io_task_runner_,
161      base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
162}
163
164void NodeController::ConnectToChild(
165    base::ProcessHandle process_handle,
166    ScopedPlatformHandle platform_handle,
167    const std::string& child_token,
168    const ProcessErrorCallback& process_error_callback) {
169  // Generate the temporary remote node name here so that it can be associated
170  // with the embedder's child_token. If an error occurs in the child process
171  // after it is launched, but before any reserved ports are connected, this can
172  // be used to clean up any dangling ports.
173  ports::NodeName node_name;
174  GenerateRandomName(&node_name);
175
176  {
177    base::AutoLock lock(reserved_ports_lock_);
178    bool inserted = pending_child_tokens_.insert(
179        std::make_pair(node_name, child_token)).second;
180    DCHECK(inserted);
181  }
182
183#if defined(OS_WIN)
184  // On Windows, we need to duplicate the process handle because we have no
185  // control over its lifetime and it may become invalid by the time the posted
186  // task runs.
187  HANDLE dup_handle = INVALID_HANDLE_VALUE;
188  BOOL ok = ::DuplicateHandle(
189      base::GetCurrentProcessHandle(), process_handle,
190      base::GetCurrentProcessHandle(), &dup_handle,
191      0, FALSE, DUPLICATE_SAME_ACCESS);
192  DPCHECK(ok);
193  process_handle = dup_handle;
194#endif
195
196  io_task_runner_->PostTask(
197      FROM_HERE,
198      base::Bind(&NodeController::ConnectToChildOnIOThread,
199                 base::Unretained(this),
200                 process_handle,
201                 base::Passed(&platform_handle),
202                 node_name,
203                 process_error_callback));
204}
205
206void NodeController::CloseChildPorts(const std::string& child_token) {
207  std::vector<ports::PortRef> ports_to_close;
208  {
209    std::vector<std::string> port_tokens;
210    base::AutoLock lock(reserved_ports_lock_);
211    for (const auto& port : reserved_ports_) {
212      if (port.second.child_token == child_token) {
213        DVLOG(1) << "Closing reserved port " << port.second.port.name();
214        ports_to_close.push_back(port.second.port);
215        port_tokens.push_back(port.first);
216      }
217    }
218
219    for (const auto& token : port_tokens)
220      reserved_ports_.erase(token);
221  }
222
223  for (const auto& port : ports_to_close)
224    node_->ClosePort(port);
225
226  // Ensure local port closure messages are processed.
227  AcceptIncomingMessages();
228}
229
230void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
231// TODO(amistry): Consider the need for a broker on Windows.
232#if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
233  // On posix, use the bootstrap channel for the broker and receive the node's
234  // channel synchronously as the first message from the broker.
235  base::ElapsedTimer timer;
236  broker_.reset(new Broker(std::move(platform_handle)));
237  platform_handle = broker_->GetParentPlatformHandle();
238  UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime",
239                      timer.Elapsed());
240
241  if (!platform_handle.is_valid()) {
242    // Most likely the browser side of the channel has already been closed and
243    // the broker was unable to negotiate a NodeChannel pipe. In this case we
244    // can cancel parent connection.
245    DVLOG(1) << "Cannot connect to invalid parent channel.";
246    return;
247  }
248#endif
249
250  io_task_runner_->PostTask(
251      FROM_HERE,
252      base::Bind(&NodeController::ConnectToParentOnIOThread,
253                 base::Unretained(this),
254                 base::Passed(&platform_handle)));
255}
256
257void NodeController::SetPortObserver(
258    const ports::PortRef& port,
259    const scoped_refptr<PortObserver>& observer) {
260  node_->SetUserData(port, observer);
261}
262
263void NodeController::ClosePort(const ports::PortRef& port) {
264  SetPortObserver(port, nullptr);
265  int rv = node_->ClosePort(port);
266  DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
267
268  AcceptIncomingMessages();
269}
270
271int NodeController::SendMessage(const ports::PortRef& port,
272                                std::unique_ptr<PortsMessage> message) {
273  ports::ScopedMessage ports_message(message.release());
274  int rv = node_->SendMessage(port, std::move(ports_message));
275
276  AcceptIncomingMessages();
277  return rv;
278}
279
280void NodeController::ReservePort(const std::string& token,
281                                 const ports::PortRef& port,
282                                 const std::string& child_token) {
283  DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
284           << token;
285
286  base::AutoLock lock(reserved_ports_lock_);
287  auto result = reserved_ports_.insert(
288      std::make_pair(token, ReservedPort{port, child_token}));
289  DCHECK(result.second);
290}
291
292void NodeController::MergePortIntoParent(const std::string& token,
293                                         const ports::PortRef& port) {
294  bool was_merged = false;
295  {
296    // This request may be coming from within the process that reserved the
297    // "parent" side (e.g. for Chrome single-process mode), so if this token is
298    // reserved locally, merge locally instead.
299    base::AutoLock lock(reserved_ports_lock_);
300    auto it = reserved_ports_.find(token);
301    if (it != reserved_ports_.end()) {
302      node_->MergePorts(port, name_, it->second.port.name());
303      reserved_ports_.erase(it);
304      was_merged = true;
305    }
306  }
307  if (was_merged) {
308    AcceptIncomingMessages();
309    return;
310  }
311
312  scoped_refptr<NodeChannel> parent;
313  bool reject_merge = false;
314  {
315    // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
316    // there is a race where the parent can be set, and |pending_port_merges_|
317    // be processed between retrieving |parent| and adding the merge to
318    // |pending_port_merges_|.
319    base::AutoLock lock(pending_port_merges_lock_);
320    parent = GetParentChannel();
321    if (reject_pending_merges_) {
322      reject_merge = true;
323    } else if (!parent) {
324      pending_port_merges_.push_back(std::make_pair(token, port));
325      return;
326    }
327  }
328  if (reject_merge) {
329    node_->ClosePort(port);
330    DVLOG(2) << "Rejecting port merge for token " << token
331             << " due to closed parent channel.";
332    AcceptIncomingMessages();
333    return;
334  }
335
336  parent->RequestPortMerge(port.name(), token);
337}
338
339int NodeController::MergeLocalPorts(const ports::PortRef& port0,
340                                    const ports::PortRef& port1) {
341  int rv = node_->MergeLocalPorts(port0, port1);
342  AcceptIncomingMessages();
343  return rv;
344}
345
346scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
347    size_t num_bytes) {
348#if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
349  // Shared buffer creation failure is fatal, so always use the broker when we
350  // have one. This does mean that a non-root process that has children will use
351  // the broker for shared buffer creation even though that process is
352  // privileged.
353  if (broker_) {
354    return broker_->GetSharedBuffer(num_bytes);
355  }
356#endif
357  return PlatformSharedBuffer::Create(num_bytes);
358}
359
360void NodeController::RequestShutdown(const base::Closure& callback) {
361  {
362    base::AutoLock lock(shutdown_lock_);
363    shutdown_callback_ = callback;
364    shutdown_callback_flag_.Set(true);
365  }
366
367  AttemptShutdownIfRequested();
368}
369
370void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
371                                          const std::string& error) {
372  scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
373  if (peer)
374    peer->NotifyBadMessage(error);
375}
376
377void NodeController::ConnectToChildOnIOThread(
378    base::ProcessHandle process_handle,
379    ScopedPlatformHandle platform_handle,
380    ports::NodeName token,
381    const ProcessErrorCallback& process_error_callback) {
382  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
383
384#if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL)
385  PlatformChannelPair node_channel;
386  // BrokerHost owns itself.
387  BrokerHost* broker_host = new BrokerHost(std::move(platform_handle));
388  broker_host->SendChannel(node_channel.PassClientHandle());
389  scoped_refptr<NodeChannel> channel = NodeChannel::Create(
390      this, node_channel.PassServerHandle(), io_task_runner_,
391      process_error_callback);
392#else
393  scoped_refptr<NodeChannel> channel =
394      NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
395                          process_error_callback);
396#endif
397
398  // We set up the child channel with a temporary name so it can be identified
399  // as a pending child if it writes any messages to the channel. We may start
400  // receiving messages from it (though we shouldn't) as soon as Start() is
401  // called below.
402
403  pending_children_.insert(std::make_pair(token, channel));
404  RecordPendingChildCount(pending_children_.size());
405
406  channel->SetRemoteNodeName(token);
407  channel->SetRemoteProcessHandle(process_handle);
408  channel->Start();
409
410  channel->AcceptChild(name_, token);
411}
412
413void NodeController::ConnectToParentOnIOThread(
414    ScopedPlatformHandle platform_handle) {
415  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
416
417  {
418    base::AutoLock lock(parent_lock_);
419    DCHECK(parent_name_ == ports::kInvalidNodeName);
420
421    // At this point we don't know the parent's name, so we can't yet insert it
422    // into our |peers_| map. That will happen as soon as we receive an
423    // AcceptChild message from them.
424    bootstrap_parent_channel_ =
425        NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
426                            ProcessErrorCallback());
427    // Prevent the parent pipe handle from being closed on shutdown. Pipe
428    // closure is used by the parent to detect the child process has exited.
429    // Relying on message pipes to be closed is not enough because the parent
430    // may see the message pipe closure before the child is dead, causing the
431    // child process to be unexpectedly SIGKILL'd.
432    bootstrap_parent_channel_->LeakHandleOnShutdown();
433  }
434  bootstrap_parent_channel_->Start();
435}
436
437scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
438    const ports::NodeName& name) {
439  base::AutoLock lock(peers_lock_);
440  auto it = peers_.find(name);
441  if (it == peers_.end())
442    return nullptr;
443  return it->second;
444}
445
446scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
447  ports::NodeName parent_name;
448  {
449    base::AutoLock lock(parent_lock_);
450    parent_name = parent_name_;
451  }
452  return GetPeerChannel(parent_name);
453}
454
455scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
456  ports::NodeName broker_name;
457  {
458    base::AutoLock lock(broker_lock_);
459    broker_name = broker_name_;
460  }
461  return GetPeerChannel(broker_name);
462}
463
464void NodeController::AddPeer(const ports::NodeName& name,
465                             scoped_refptr<NodeChannel> channel,
466                             bool start_channel) {
467  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
468
469  DCHECK(name != ports::kInvalidNodeName);
470  DCHECK(channel);
471
472  channel->SetRemoteNodeName(name);
473
474  OutgoingMessageQueue pending_messages;
475  {
476    base::AutoLock lock(peers_lock_);
477    if (peers_.find(name) != peers_.end()) {
478      // This can happen normally if two nodes race to be introduced to each
479      // other. The losing pipe will be silently closed and introduction should
480      // not be affected.
481      DVLOG(1) << "Ignoring duplicate peer name " << name;
482      return;
483    }
484
485    auto result = peers_.insert(std::make_pair(name, channel));
486    DCHECK(result.second);
487
488    DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
489
490    RecordPeerCount(peers_.size());
491
492    auto it = pending_peer_messages_.find(name);
493    if (it != pending_peer_messages_.end()) {
494      std::swap(pending_messages, it->second);
495      pending_peer_messages_.erase(it);
496    }
497  }
498
499  if (start_channel)
500    channel->Start();
501
502  // Flush any queued message we need to deliver to this node.
503  while (!pending_messages.empty()) {
504    channel->PortsMessage(std::move(pending_messages.front()));
505    pending_messages.pop();
506  }
507}
508
509void NodeController::DropPeer(const ports::NodeName& name,
510                              NodeChannel* channel) {
511  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
512
513  {
514    base::AutoLock lock(peers_lock_);
515    auto it = peers_.find(name);
516
517    if (it != peers_.end()) {
518      ports::NodeName peer = it->first;
519      peers_.erase(it);
520      DVLOG(1) << "Dropped peer " << peer;
521    }
522
523    pending_peer_messages_.erase(name);
524    pending_children_.erase(name);
525
526    RecordPeerCount(peers_.size());
527    RecordPendingChildCount(pending_children_.size());
528  }
529
530  std::vector<ports::PortRef> ports_to_close;
531  {
532    // Clean up any reserved ports.
533    base::AutoLock lock(reserved_ports_lock_);
534    auto it = pending_child_tokens_.find(name);
535    if (it != pending_child_tokens_.end()) {
536      const std::string& child_token = it->second;
537
538      std::vector<std::string> port_tokens;
539      for (const auto& port : reserved_ports_) {
540        if (port.second.child_token == child_token) {
541          DVLOG(1) << "Closing reserved port: " << port.second.port.name();
542          ports_to_close.push_back(port.second.port);
543          port_tokens.push_back(port.first);
544        }
545      }
546
547      // We have to erase reserved ports in a two-step manner because the usual
548      // manner of using the returned iterator from map::erase isn't technically
549      // valid in C++11 (although it is in C++14).
550      for (const auto& token : port_tokens)
551        reserved_ports_.erase(token);
552
553      pending_child_tokens_.erase(it);
554    }
555  }
556
557  bool is_parent;
558  {
559    base::AutoLock lock(parent_lock_);
560    is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
561  }
562  // If the error comes from the parent channel, we also need to cancel any
563  // port merge requests, so that errors can be propagated to the message
564  // pipes.
565  if (is_parent) {
566    base::AutoLock lock(pending_port_merges_lock_);
567    reject_pending_merges_ = true;
568
569    for (const auto& port : pending_port_merges_)
570      ports_to_close.push_back(port.second);
571    pending_port_merges_.clear();
572  }
573
574  for (const auto& port : ports_to_close)
575    node_->ClosePort(port);
576
577  node_->LostConnectionToNode(name);
578
579  AcceptIncomingMessages();
580}
581
582void NodeController::SendPeerMessage(const ports::NodeName& name,
583                                     ports::ScopedMessage message) {
584  Channel::MessagePtr channel_message =
585      static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
586
587  scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
588#if defined(OS_WIN)
589  if (channel_message->has_handles()) {
590    // If we're sending a message with handles we aren't the destination
591    // node's parent or broker (i.e. we don't know its process handle), ask
592    // the broker to relay for us.
593    scoped_refptr<NodeChannel> broker = GetBrokerChannel();
594    if (!peer || !peer->HasRemoteProcessHandle()) {
595      if (broker) {
596        broker->RelayPortsMessage(name, std::move(channel_message));
597      } else {
598        base::AutoLock lock(broker_lock_);
599        pending_relay_messages_[name].emplace(std::move(channel_message));
600      }
601      return;
602    }
603  }
604#elif defined(OS_MACOSX) && !defined(OS_IOS)
605  if (channel_message->has_mach_ports()) {
606    // Messages containing Mach ports are always routed through the broker, even
607    // if the broker process is the intended recipient.
608    bool use_broker = false;
609    {
610      base::AutoLock lock(parent_lock_);
611      use_broker = (bootstrap_parent_channel_ ||
612                    parent_name_ != ports::kInvalidNodeName);
613    }
614    if (use_broker) {
615      scoped_refptr<NodeChannel> broker = GetBrokerChannel();
616      if (broker) {
617        broker->RelayPortsMessage(name, std::move(channel_message));
618      } else {
619        base::AutoLock lock(broker_lock_);
620        pending_relay_messages_[name].emplace(std::move(channel_message));
621      }
622      return;
623    }
624  }
625#endif  // defined(OS_WIN)
626
627  if (peer) {
628    peer->PortsMessage(std::move(channel_message));
629    return;
630  }
631
632  // If we don't know who the peer is, queue the message for delivery. If this
633  // is the first message queued for the peer, we also ask the broker to
634  // introduce us to them.
635
636  bool needs_introduction = false;
637  {
638    base::AutoLock lock(peers_lock_);
639    auto& queue = pending_peer_messages_[name];
640    needs_introduction = queue.empty();
641    queue.emplace(std::move(channel_message));
642  }
643
644  if (needs_introduction) {
645    scoped_refptr<NodeChannel> broker = GetBrokerChannel();
646    if (!broker) {
647      DVLOG(1) << "Dropping message for unknown peer: " << name;
648      return;
649    }
650
651    // HACK: On ARC++ we never really need this codepath since it is always hit
652    // when RemoteMessagePipeBootstrap races against us in delivering the
653    // three-way broker handshake. If we do send the RequestIntroduction message,
654    // the broker (Chrome) won't yet know our peer, and it will cause us to drop
655    // that peer's connection, causing it to go in loops requesting for a
656    // re-introduction. Instead, let's assume that the node will eventually be
657    // introduced to us and just stick it in the queue. Note that this is only
658    // safe since ARC++ processes strictly follow a star topology, and we never
659    // pass handles between children.
660    //
661    // We need to revert this eventually when a long-term fix is ready. See
662    // b/33453258 for more details.
663    LOG(ERROR) << "Averted b/33453258 by dropping the introduction request for "
664               << name;
665    return;
666
667    broker->RequestIntroduction(name);
668  }
669}
670
671void NodeController::AcceptIncomingMessages() {
672  {
673    base::AutoLock lock(messages_lock_);
674    if (!incoming_messages_.empty()) {
675      // libstdc++'s deque creates an internal buffer on construction, even when
676      // the size is 0. So avoid creating it until it is necessary.
677      std::queue<ports::ScopedMessage> messages;
678      std::swap(messages, incoming_messages_);
679      base::AutoUnlock unlock(messages_lock_);
680
681      while (!messages.empty()) {
682        node_->AcceptMessage(std::move(messages.front()));
683        messages.pop();
684      }
685    }
686  }
687
688  AttemptShutdownIfRequested();
689}
690
691void NodeController::ProcessIncomingMessages() {
692  RequestContext request_context(RequestContext::Source::SYSTEM);
693
694  {
695    base::AutoLock lock(messages_lock_);
696    // Allow a new incoming messages processing task to be posted. This can't be
697    // done after AcceptIncomingMessages() otherwise a message might be missed.
698    // Doing it here may result in at most two tasks existing at the same time;
699    // this running one, and one pending in the task runner.
700    incoming_messages_task_posted_ = false;
701  }
702
703  AcceptIncomingMessages();
704}
705
706void NodeController::DropAllPeers() {
707  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
708
709  std::vector<scoped_refptr<NodeChannel>> all_peers;
710  {
711    base::AutoLock lock(parent_lock_);
712    if (bootstrap_parent_channel_) {
713      // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
714      // existence to determine whether or not this is the root node. Once
715      // bootstrap_parent_channel_->ShutDown() has been called,
716      // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
717      // matter if it's deleted now or when |this| is deleted.
718      // Note: |bootstrap_parent_channel_| is only modified on the IO thread.
719      all_peers.push_back(bootstrap_parent_channel_);
720    }
721  }
722
723  {
724    base::AutoLock lock(peers_lock_);
725    for (const auto& peer : peers_)
726      all_peers.push_back(peer.second);
727    for (const auto& peer : pending_children_)
728      all_peers.push_back(peer.second);
729    peers_.clear();
730    pending_children_.clear();
731    pending_peer_messages_.clear();
732  }
733
734  for (const auto& peer : all_peers)
735    peer->ShutDown();
736
737  if (destroy_on_io_thread_shutdown_)
738    delete this;
739}
740
741void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
742  GenerateRandomName(port_name);
743}
744
745void NodeController::AllocMessage(size_t num_header_bytes,
746                                  ports::ScopedMessage* message) {
747  message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
748}
749
750void NodeController::ForwardMessage(const ports::NodeName& node,
751                                    ports::ScopedMessage message) {
752  DCHECK(message);
753  bool schedule_pump_task = false;
754  if (node == name_) {
755    // NOTE: We need to avoid re-entering the Node instance within
756    // ForwardMessage. Because ForwardMessage is only ever called
757    // (synchronously) in response to Node's ClosePort, SendMessage, or
758    // AcceptMessage, we flush the queue after calling any of those methods.
759    base::AutoLock lock(messages_lock_);
760    // |io_task_runner_| may be null in tests or processes that don't require
761    // multi-process Mojo.
762    schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
763        !incoming_messages_task_posted_;
764    incoming_messages_task_posted_ |= schedule_pump_task;
765    incoming_messages_.emplace(std::move(message));
766  } else {
767    SendPeerMessage(node, std::move(message));
768  }
769
770  if (schedule_pump_task) {
771    // Normally, the queue is processed after the action that added the local
772    // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
773    // possible for a local message to be added as a result of a remote message,
774    // and OnChannelMessage() doesn't process this queue (although
775    // OnPortsMessage() does). There may also be other code paths, now or added
776    // in the future, which cause local messages to be added but don't process
777    // this message queue.
778    //
779    // Instead of adding a call to AcceptIncomingMessages() on every possible
780    // code path, post a task to the IO thread to process the queue. If the
781    // current call stack processes the queue, this may end up doing nothing.
782    io_task_runner_->PostTask(
783        FROM_HERE,
784        base::Bind(&NodeController::ProcessIncomingMessages,
785                   base::Unretained(this)));
786  }
787}
788
789void NodeController::BroadcastMessage(ports::ScopedMessage message) {
790  CHECK_EQ(message->num_ports(), 0u);
791  Channel::MessagePtr channel_message =
792      static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
793  CHECK(!channel_message->has_handles());
794
795  scoped_refptr<NodeChannel> broker = GetBrokerChannel();
796  if (broker)
797    broker->Broadcast(std::move(channel_message));
798  else
799    OnBroadcast(name_, std::move(channel_message));
800}
801
802void NodeController::PortStatusChanged(const ports::PortRef& port) {
803  scoped_refptr<ports::UserData> user_data;
804  node_->GetUserData(port, &user_data);
805
806  PortObserver* observer = static_cast<PortObserver*>(user_data.get());
807  if (observer) {
808    observer->OnPortStatusChanged();
809  } else {
810    DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
811             << "doesn't have an observer.";
812  }
813}
814
815void NodeController::OnAcceptChild(const ports::NodeName& from_node,
816                                   const ports::NodeName& parent_name,
817                                   const ports::NodeName& token) {
818  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
819
820  scoped_refptr<NodeChannel> parent;
821  {
822    base::AutoLock lock(parent_lock_);
823    if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
824      parent_name_ = parent_name;
825      parent = bootstrap_parent_channel_;
826    }
827  }
828
829  if (!parent) {
830    DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
831    DropPeer(from_node, nullptr);
832    return;
833  }
834
835  parent->SetRemoteNodeName(parent_name);
836  parent->AcceptParent(token, name_);
837
838  // NOTE: The child does not actually add its parent as a peer until
839  // receiving an AcceptBrokerClient message from the broker. The parent
840  // will request that said message be sent upon receiving AcceptParent.
841
842  DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
843}
844
845void NodeController::OnAcceptParent(const ports::NodeName& from_node,
846                                    const ports::NodeName& token,
847                                    const ports::NodeName& child_name) {
848  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
849
850  auto it = pending_children_.find(from_node);
851  if (it == pending_children_.end() || token != from_node) {
852    DLOG(ERROR) << "Received unexpected AcceptParent message from "
853                << from_node;
854    DropPeer(from_node, nullptr);
855    return;
856  }
857
858  scoped_refptr<NodeChannel> channel = it->second;
859  pending_children_.erase(it);
860
861  DCHECK(channel);
862
863  DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
864
865  AddPeer(child_name, channel, false /* start_channel */);
866
867  // TODO(rockot/amistry): We could simplify child initialization if we could
868  // synchronously get a new async broker channel from the broker. For now we do
869  // it asynchronously since it's only used to facilitate handle passing, not
870  // handle creation.
871  scoped_refptr<NodeChannel> broker = GetBrokerChannel();
872  if (broker) {
873    // Inform the broker of this new child.
874    broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle());
875  } else {
876    // If we have no broker, either we need to wait for one, or we *are* the
877    // broker.
878    scoped_refptr<NodeChannel> parent = GetParentChannel();
879    if (!parent) {
880      base::AutoLock lock(parent_lock_);
881      parent = bootstrap_parent_channel_;
882    }
883
884    if (!parent) {
885      // Yes, we're the broker. We can initialize the child directly.
886      channel->AcceptBrokerClient(name_, ScopedPlatformHandle());
887    } else {
888      // We aren't the broker, so wait for a broker connection.
889      base::AutoLock lock(broker_lock_);
890      pending_broker_clients_.push(child_name);
891    }
892  }
893}
894
895void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
896                                       const ports::NodeName& client_name,
897                                       base::ProcessHandle process_handle) {
898#if defined(OS_WIN)
899  // Scoped handle to avoid leaks on error.
900  ScopedPlatformHandle scoped_process_handle =
901      ScopedPlatformHandle(PlatformHandle(process_handle));
902#endif
903  scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
904  if (!sender) {
905    DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
906    return;
907  }
908
909  if (GetPeerChannel(client_name)) {
910    DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
911    DropPeer(from_node, nullptr);
912    return;
913  }
914
915  PlatformChannelPair broker_channel;
916  scoped_refptr<NodeChannel> client = NodeChannel::Create(
917      this, broker_channel.PassServerHandle(), io_task_runner_,
918      ProcessErrorCallback());
919
920#if defined(OS_WIN)
921  // The broker must have a working handle to the client process in order to
922  // properly copy other handles to and from the client.
923  if (!scoped_process_handle.is_valid()) {
924    DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
925    return;
926  }
927  client->SetRemoteProcessHandle(scoped_process_handle.release().handle);
928#else
929  client->SetRemoteProcessHandle(process_handle);
930#endif
931
932  AddPeer(client_name, client, true /* start_channel */);
933
934  DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
935           << " from peer " << from_node;
936
937  sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle());
938}
939
940void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
941                                         const ports::NodeName& client_name,
942                                         ScopedPlatformHandle broker_channel) {
943  scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
944  if (!client) {
945    DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name;
946    return;
947  }
948
949  // This should have come from our own broker.
950  if (GetBrokerChannel() != GetPeerChannel(from_node)) {
951    DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
952    return;
953  }
954
955  DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node;
956
957  client->AcceptBrokerClient(from_node, std::move(broker_channel));
958}
959
960void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
961                                          const ports::NodeName& broker_name,
962                                          ScopedPlatformHandle broker_channel) {
963  // This node should already have a parent in bootstrap mode.
964  ports::NodeName parent_name;
965  scoped_refptr<NodeChannel> parent;
966  {
967    base::AutoLock lock(parent_lock_);
968    parent_name = parent_name_;
969    parent = bootstrap_parent_channel_;
970    bootstrap_parent_channel_ = nullptr;
971  }
972  DCHECK(parent_name == from_node);
973  DCHECK(parent);
974
975  std::queue<ports::NodeName> pending_broker_clients;
976  std::unordered_map<ports::NodeName, OutgoingMessageQueue>
977      pending_relay_messages;
978  {
979    base::AutoLock lock(broker_lock_);
980    broker_name_ = broker_name;
981    std::swap(pending_broker_clients, pending_broker_clients_);
982    std::swap(pending_relay_messages, pending_relay_messages_);
983  }
984  DCHECK(broker_name != ports::kInvalidNodeName);
985
986  // It's now possible to add both the broker and the parent as peers.
987  // Note that the broker and parent may be the same node.
988  scoped_refptr<NodeChannel> broker;
989  if (broker_name == parent_name) {
990    DCHECK(!broker_channel.is_valid());
991    broker = parent;
992  } else {
993    DCHECK(broker_channel.is_valid());
994    broker = NodeChannel::Create(this, std::move(broker_channel),
995                                 io_task_runner_, ProcessErrorCallback());
996    AddPeer(broker_name, broker, true /* start_channel */);
997  }
998
999  AddPeer(parent_name, parent, false /* start_channel */);
1000
1001  {
1002    // Complete any port merge requests we have waiting for the parent.
1003    base::AutoLock lock(pending_port_merges_lock_);
1004    for (const auto& request : pending_port_merges_)
1005      parent->RequestPortMerge(request.second.name(), request.first);
1006    pending_port_merges_.clear();
1007  }
1008
1009  // Feed the broker any pending children of our own.
1010  while (!pending_broker_clients.empty()) {
1011    const ports::NodeName& child_name = pending_broker_clients.front();
1012    auto it = pending_children_.find(child_name);
1013    DCHECK(it != pending_children_.end());
1014    broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
1015    pending_broker_clients.pop();
1016  }
1017
1018#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1019  // Have the broker relay any messages we have waiting.
1020  for (auto& entry : pending_relay_messages) {
1021    const ports::NodeName& destination = entry.first;
1022    auto& message_queue = entry.second;
1023    while (!message_queue.empty()) {
1024      broker->RelayPortsMessage(destination, std::move(message_queue.front()));
1025      message_queue.pop();
1026    }
1027  }
1028#endif
1029
1030  DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
1031}
1032
1033void NodeController::OnPortsMessage(const ports::NodeName& from_node,
1034                                    Channel::MessagePtr channel_message) {
1035  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1036
1037  void* data;
1038  size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1039  if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
1040                         &num_header_bytes, &num_payload_bytes,
1041                         &num_ports_bytes)) {
1042    DropPeer(from_node, nullptr);
1043    return;
1044  }
1045
1046  CHECK(channel_message);
1047  std::unique_ptr<PortsMessage> ports_message(
1048      new PortsMessage(num_header_bytes,
1049                       num_payload_bytes,
1050                       num_ports_bytes,
1051                       std::move(channel_message)));
1052  ports_message->set_source_node(from_node);
1053  node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
1054  AcceptIncomingMessages();
1055}
1056
1057void NodeController::OnRequestPortMerge(
1058    const ports::NodeName& from_node,
1059    const ports::PortName& connector_port_name,
1060    const std::string& token) {
1061  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1062
1063  DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
1064           << token << " and port " << connector_port_name << "@" << from_node;
1065
1066  ports::PortRef local_port;
1067  {
1068    base::AutoLock lock(reserved_ports_lock_);
1069    auto it = reserved_ports_.find(token);
1070    if (it == reserved_ports_.end()) {
1071      DVLOG(1) << "Ignoring request to connect to port for unknown token "
1072               << token;
1073      return;
1074    }
1075    local_port = it->second.port;
1076  }
1077
1078  int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1079  if (rv != ports::OK)
1080    DLOG(ERROR) << "MergePorts failed: " << rv;
1081
1082  AcceptIncomingMessages();
1083}
1084
1085void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1086                                           const ports::NodeName& name) {
1087  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1088
1089  scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1090  if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1091    DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1092                << from_node;
1093    DropPeer(from_node, nullptr);
1094    return;
1095  }
1096
1097  scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1098  if (!new_friend) {
1099    // We don't know who they're talking about!
1100    requestor->Introduce(name, ScopedPlatformHandle());
1101  } else {
1102    PlatformChannelPair new_channel;
1103    requestor->Introduce(name, new_channel.PassServerHandle());
1104    new_friend->Introduce(from_node, new_channel.PassClientHandle());
1105  }
1106}
1107
1108void NodeController::OnIntroduce(const ports::NodeName& from_node,
1109                                 const ports::NodeName& name,
1110                                 ScopedPlatformHandle channel_handle) {
1111  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1112
1113  if (!channel_handle.is_valid()) {
1114    node_->LostConnectionToNode(name);
1115
1116    DLOG(ERROR) << "Could not be introduced to peer " << name;
1117    base::AutoLock lock(peers_lock_);
1118    pending_peer_messages_.erase(name);
1119    return;
1120  }
1121
1122  scoped_refptr<NodeChannel> channel =
1123      NodeChannel::Create(this, std::move(channel_handle), io_task_runner_,
1124                          ProcessErrorCallback());
1125
1126  DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
1127  AddPeer(name, channel, true /* start_channel */);
1128}
1129
1130void NodeController::OnBroadcast(const ports::NodeName& from_node,
1131                                 Channel::MessagePtr message) {
1132  DCHECK(!message->has_handles());
1133
1134  void* data;
1135  size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1136  if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
1137                         &num_header_bytes, &num_payload_bytes,
1138                         &num_ports_bytes)) {
1139    DropPeer(from_node, nullptr);
1140    return;
1141  }
1142
1143  // Broadcast messages must not contain ports.
1144  if (num_ports_bytes > 0) {
1145    DropPeer(from_node, nullptr);
1146    return;
1147  }
1148
1149  base::AutoLock lock(peers_lock_);
1150  for (auto& iter : peers_) {
1151    // Copy and send the message to each known peer.
1152    Channel::MessagePtr peer_message(
1153        new Channel::Message(message->payload_size(), 0));
1154    memcpy(peer_message->mutable_payload(), message->payload(),
1155           message->payload_size());
1156    iter.second->PortsMessage(std::move(peer_message));
1157  }
1158}
1159
1160#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1161void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1162                                         base::ProcessHandle from_process,
1163                                         const ports::NodeName& destination,
1164                                         Channel::MessagePtr message) {
1165  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1166
1167  if (GetBrokerChannel()) {
1168    // Only the broker should be asked to relay a message.
1169    LOG(ERROR) << "Non-broker refusing to relay message.";
1170    DropPeer(from_node, nullptr);
1171    return;
1172  }
1173
1174  // The parent should always know which process this came from.
1175  DCHECK(from_process != base::kNullProcessHandle);
1176
1177#if defined(OS_WIN)
1178  // Rewrite the handles to this (the parent) process. If the message is
1179  // destined for another child process, the handles will be rewritten to that
1180  // process before going out (see NodeChannel::WriteChannelMessage).
1181  //
1182  // TODO: We could avoid double-duplication.
1183  //
1184  // Note that we explicitly mark the handles as being owned by the sending
1185  // process before rewriting them, in order to accommodate RewriteHandles'
1186  // internal sanity checks.
1187  ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
1188  for (size_t i = 0; i < handles->size(); ++i)
1189    (*handles)[i].owning_process = from_process;
1190  if (!Channel::Message::RewriteHandles(from_process,
1191                                        base::GetCurrentProcessHandle(),
1192                                        handles.get())) {
1193    DLOG(ERROR) << "Failed to relay one or more handles.";
1194  }
1195  message->SetHandles(std::move(handles));
1196#else
1197  MachPortRelay* relay = GetMachPortRelay();
1198  if (!relay) {
1199    LOG(ERROR) << "Receiving Mach ports without a port relay from "
1200               << from_node << ". Dropping message.";
1201    return;
1202  }
1203  if (!relay->ExtractPortRights(message.get(), from_process)) {
1204    // NodeChannel should ensure that MachPortRelay is ready for the remote
1205    // process. At this point, if the port extraction failed, either something
1206    // went wrong in the mach stuff, or the remote process died.
1207    LOG(ERROR) << "Error on receiving Mach ports " << from_node
1208               << ". Dropping message.";
1209    return;
1210  }
1211#endif  // defined(OS_WIN)
1212
1213  if (destination == name_) {
1214    // Great, we can deliver this message locally.
1215    OnPortsMessage(from_node, std::move(message));
1216    return;
1217  }
1218
1219  scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
1220  if (peer)
1221    peer->PortsMessageFromRelay(from_node, std::move(message));
1222  else
1223    DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1224}
1225
1226void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
1227                                             const ports::NodeName& source_node,
1228                                             Channel::MessagePtr message) {
1229  if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1230    LOG(ERROR) << "Refusing relayed message from non-broker node.";
1231    DropPeer(from_node, nullptr);
1232    return;
1233  }
1234
1235  OnPortsMessage(source_node, std::move(message));
1236}
1237#endif
1238
1239void NodeController::OnChannelError(const ports::NodeName& from_node,
1240                                    NodeChannel* channel) {
1241  if (io_task_runner_->RunsTasksOnCurrentThread()) {
1242    DropPeer(from_node, channel);
1243    // DropPeer may have caused local port closures, so be sure to process any
1244    // pending local messages.
1245    AcceptIncomingMessages();
1246  } else {
1247    io_task_runner_->PostTask(
1248        FROM_HERE,
1249        base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1250                   from_node, channel));
1251  }
1252}
1253
1254#if defined(OS_MACOSX) && !defined(OS_IOS)
1255MachPortRelay* NodeController::GetMachPortRelay() {
1256  {
1257    base::AutoLock lock(parent_lock_);
1258    // Return null if we're not the root.
1259    if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
1260      return nullptr;
1261  }
1262
1263  base::AutoLock lock(mach_port_relay_lock_);
1264  return mach_port_relay_.get();
1265}
1266#endif
1267
1268void NodeController::DestroyOnIOThreadShutdown() {
1269  destroy_on_io_thread_shutdown_ = true;
1270}
1271
1272void NodeController::AttemptShutdownIfRequested() {
1273  if (!shutdown_callback_flag_)
1274    return;
1275
1276  base::Closure callback;
1277  {
1278    base::AutoLock lock(shutdown_lock_);
1279    if (shutdown_callback_.is_null())
1280      return;
1281    if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) {
1282      DVLOG(2) << "Unable to cleanly shut down node " << name_;
1283      return;
1284    }
1285
1286    callback = shutdown_callback_;
1287    shutdown_callback_.Reset();
1288    shutdown_callback_flag_.Set(false);
1289  }
1290
1291  DCHECK(!callback.is_null());
1292
1293  callback.Run();
1294}
1295
1296}  // namespace edk
1297}  // namespace mojo
1298