node_controller.cc revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2016 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/edk/system/node_controller.h" 6 7#include <algorithm> 8#include <limits> 9 10#include "base/bind.h" 11#include "base/location.h" 12#include "base/logging.h" 13#include "base/macros.h" 14#include "base/message_loop/message_loop.h" 15#include "base/metrics/histogram_macros.h" 16#include "base/process/process_handle.h" 17#include "base/rand_util.h" 18#include "base/time/time.h" 19#include "base/timer/elapsed_timer.h" 20#include "mojo/edk/embedder/embedder_internal.h" 21#include "mojo/edk/embedder/platform_channel_pair.h" 22#include "mojo/edk/system/broker.h" 23#include "mojo/edk/system/broker_host.h" 24#include "mojo/edk/system/core.h" 25#include "mojo/edk/system/ports_message.h" 26#include "mojo/edk/system/request_context.h" 27 28#if defined(OS_MACOSX) && !defined(OS_IOS) 29#include "mojo/edk/system/mach_port_relay.h" 30#endif 31 32#if !defined(OS_NACL) 33#include "crypto/random.h" 34#endif 35 36namespace mojo { 37namespace edk { 38 39namespace { 40 41#if defined(OS_NACL) 42template <typename T> 43void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); } 44#else 45template <typename T> 46void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); } 47#endif 48 49ports::NodeName GetRandomNodeName() { 50 ports::NodeName name; 51 GenerateRandomName(&name); 52 return name; 53} 54 55void RecordPeerCount(size_t count) { 56 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); 57 58 // 8k is the maximum number of file descriptors allowed in Chrome. 59 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers", 60 static_cast<int32_t>(count), 61 0 /* min */, 62 8000 /* max */, 63 50 /* bucket count */); 64} 65 66void RecordPendingChildCount(size_t count) { 67 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); 68 69 // 8k is the maximum number of file descriptors allowed in Chrome. 70 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", 71 static_cast<int32_t>(count), 72 0 /* min */, 73 8000 /* max */, 74 50 /* bucket count */); 75} 76 77bool ParsePortsMessage(Channel::Message* message, 78 void** data, 79 size_t* num_data_bytes, 80 size_t* num_header_bytes, 81 size_t* num_payload_bytes, 82 size_t* num_ports_bytes) { 83 DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes && 84 num_ports_bytes); 85 86 NodeChannel::GetPortsMessageData(message, data, num_data_bytes); 87 if (!*num_data_bytes) 88 return false; 89 90 if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes, 91 num_payload_bytes, num_ports_bytes)) { 92 return false; 93 } 94 95 return true; 96} 97 98// Used by NodeController to watch for shutdown. Since no IO can happen once 99// the IO thread is killed, the NodeController can cleanly drop all its peers 100// at that time. 101class ThreadDestructionObserver : 102 public base::MessageLoop::DestructionObserver { 103 public: 104 static void Create(scoped_refptr<base::TaskRunner> task_runner, 105 const base::Closure& callback) { 106 if (task_runner->RunsTasksOnCurrentThread()) { 107 // Owns itself. 108 new ThreadDestructionObserver(callback); 109 } else { 110 task_runner->PostTask(FROM_HERE, 111 base::Bind(&Create, task_runner, callback)); 112 } 113 } 114 115 private: 116 explicit ThreadDestructionObserver(const base::Closure& callback) 117 : callback_(callback) { 118 base::MessageLoop::current()->AddDestructionObserver(this); 119 } 120 121 ~ThreadDestructionObserver() override { 122 base::MessageLoop::current()->RemoveDestructionObserver(this); 123 } 124 125 // base::MessageLoop::DestructionObserver: 126 void WillDestroyCurrentMessageLoop() override { 127 callback_.Run(); 128 delete this; 129 } 130 131 const base::Closure callback_; 132 133 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); 134}; 135 136} // namespace 137 138NodeController::~NodeController() {} 139 140NodeController::NodeController(Core* core) 141 : core_(core), 142 name_(GetRandomNodeName()), 143 node_(new ports::Node(name_, this)) { 144 DVLOG(1) << "Initializing node " << name_; 145} 146 147#if defined(OS_MACOSX) && !defined(OS_IOS) 148void NodeController::CreateMachPortRelay( 149 base::PortProvider* port_provider) { 150 base::AutoLock lock(mach_port_relay_lock_); 151 DCHECK(!mach_port_relay_); 152 mach_port_relay_.reset(new MachPortRelay(port_provider)); 153} 154#endif 155 156void NodeController::SetIOTaskRunner( 157 scoped_refptr<base::TaskRunner> task_runner) { 158 io_task_runner_ = task_runner; 159 ThreadDestructionObserver::Create( 160 io_task_runner_, 161 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); 162} 163 164void NodeController::ConnectToChild( 165 base::ProcessHandle process_handle, 166 ScopedPlatformHandle platform_handle, 167 const std::string& child_token, 168 const ProcessErrorCallback& process_error_callback) { 169 // Generate the temporary remote node name here so that it can be associated 170 // with the embedder's child_token. If an error occurs in the child process 171 // after it is launched, but before any reserved ports are connected, this can 172 // be used to clean up any dangling ports. 173 ports::NodeName node_name; 174 GenerateRandomName(&node_name); 175 176 { 177 base::AutoLock lock(reserved_ports_lock_); 178 bool inserted = pending_child_tokens_.insert( 179 std::make_pair(node_name, child_token)).second; 180 DCHECK(inserted); 181 } 182 183#if defined(OS_WIN) 184 // On Windows, we need to duplicate the process handle because we have no 185 // control over its lifetime and it may become invalid by the time the posted 186 // task runs. 187 HANDLE dup_handle = INVALID_HANDLE_VALUE; 188 BOOL ok = ::DuplicateHandle( 189 base::GetCurrentProcessHandle(), process_handle, 190 base::GetCurrentProcessHandle(), &dup_handle, 191 0, FALSE, DUPLICATE_SAME_ACCESS); 192 DPCHECK(ok); 193 process_handle = dup_handle; 194#endif 195 196 io_task_runner_->PostTask( 197 FROM_HERE, 198 base::Bind(&NodeController::ConnectToChildOnIOThread, 199 base::Unretained(this), 200 process_handle, 201 base::Passed(&platform_handle), 202 node_name, 203 process_error_callback)); 204} 205 206void NodeController::CloseChildPorts(const std::string& child_token) { 207 std::vector<ports::PortRef> ports_to_close; 208 { 209 std::vector<std::string> port_tokens; 210 base::AutoLock lock(reserved_ports_lock_); 211 for (const auto& port : reserved_ports_) { 212 if (port.second.child_token == child_token) { 213 DVLOG(1) << "Closing reserved port " << port.second.port.name(); 214 ports_to_close.push_back(port.second.port); 215 port_tokens.push_back(port.first); 216 } 217 } 218 219 for (const auto& token : port_tokens) 220 reserved_ports_.erase(token); 221 } 222 223 for (const auto& port : ports_to_close) 224 node_->ClosePort(port); 225 226 // Ensure local port closure messages are processed. 227 AcceptIncomingMessages(); 228} 229 230void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { 231// TODO(amistry): Consider the need for a broker on Windows. 232#if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 233 // On posix, use the bootstrap channel for the broker and receive the node's 234 // channel synchronously as the first message from the broker. 235 base::ElapsedTimer timer; 236 broker_.reset(new Broker(std::move(platform_handle))); 237 platform_handle = broker_->GetParentPlatformHandle(); 238 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime", 239 timer.Elapsed()); 240 241 if (!platform_handle.is_valid()) { 242 // Most likely the browser side of the channel has already been closed and 243 // the broker was unable to negotiate a NodeChannel pipe. In this case we 244 // can cancel parent connection. 245 DVLOG(1) << "Cannot connect to invalid parent channel."; 246 return; 247 } 248#endif 249 250 io_task_runner_->PostTask( 251 FROM_HERE, 252 base::Bind(&NodeController::ConnectToParentOnIOThread, 253 base::Unretained(this), 254 base::Passed(&platform_handle))); 255} 256 257void NodeController::SetPortObserver( 258 const ports::PortRef& port, 259 const scoped_refptr<PortObserver>& observer) { 260 node_->SetUserData(port, observer); 261} 262 263void NodeController::ClosePort(const ports::PortRef& port) { 264 SetPortObserver(port, nullptr); 265 int rv = node_->ClosePort(port); 266 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); 267 268 AcceptIncomingMessages(); 269} 270 271int NodeController::SendMessage(const ports::PortRef& port, 272 std::unique_ptr<PortsMessage> message) { 273 ports::ScopedMessage ports_message(message.release()); 274 int rv = node_->SendMessage(port, std::move(ports_message)); 275 276 AcceptIncomingMessages(); 277 return rv; 278} 279 280void NodeController::ReservePort(const std::string& token, 281 const ports::PortRef& port, 282 const std::string& child_token) { 283 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " 284 << token; 285 286 base::AutoLock lock(reserved_ports_lock_); 287 auto result = reserved_ports_.insert( 288 std::make_pair(token, ReservedPort{port, child_token})); 289 DCHECK(result.second); 290} 291 292void NodeController::MergePortIntoParent(const std::string& token, 293 const ports::PortRef& port) { 294 bool was_merged = false; 295 { 296 // This request may be coming from within the process that reserved the 297 // "parent" side (e.g. for Chrome single-process mode), so if this token is 298 // reserved locally, merge locally instead. 299 base::AutoLock lock(reserved_ports_lock_); 300 auto it = reserved_ports_.find(token); 301 if (it != reserved_ports_.end()) { 302 node_->MergePorts(port, name_, it->second.port.name()); 303 reserved_ports_.erase(it); 304 was_merged = true; 305 } 306 } 307 if (was_merged) { 308 AcceptIncomingMessages(); 309 return; 310 } 311 312 scoped_refptr<NodeChannel> parent; 313 bool reject_merge = false; 314 { 315 // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise, 316 // there is a race where the parent can be set, and |pending_port_merges_| 317 // be processed between retrieving |parent| and adding the merge to 318 // |pending_port_merges_|. 319 base::AutoLock lock(pending_port_merges_lock_); 320 parent = GetParentChannel(); 321 if (reject_pending_merges_) { 322 reject_merge = true; 323 } else if (!parent) { 324 pending_port_merges_.push_back(std::make_pair(token, port)); 325 return; 326 } 327 } 328 if (reject_merge) { 329 node_->ClosePort(port); 330 DVLOG(2) << "Rejecting port merge for token " << token 331 << " due to closed parent channel."; 332 AcceptIncomingMessages(); 333 return; 334 } 335 336 parent->RequestPortMerge(port.name(), token); 337} 338 339int NodeController::MergeLocalPorts(const ports::PortRef& port0, 340 const ports::PortRef& port1) { 341 int rv = node_->MergeLocalPorts(port0, port1); 342 AcceptIncomingMessages(); 343 return rv; 344} 345 346scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( 347 size_t num_bytes) { 348#if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 349 // Shared buffer creation failure is fatal, so always use the broker when we 350 // have one. This does mean that a non-root process that has children will use 351 // the broker for shared buffer creation even though that process is 352 // privileged. 353 if (broker_) { 354 return broker_->GetSharedBuffer(num_bytes); 355 } 356#endif 357 return PlatformSharedBuffer::Create(num_bytes); 358} 359 360void NodeController::RequestShutdown(const base::Closure& callback) { 361 { 362 base::AutoLock lock(shutdown_lock_); 363 shutdown_callback_ = callback; 364 shutdown_callback_flag_.Set(true); 365 } 366 367 AttemptShutdownIfRequested(); 368} 369 370void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node, 371 const std::string& error) { 372 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); 373 if (peer) 374 peer->NotifyBadMessage(error); 375} 376 377void NodeController::ConnectToChildOnIOThread( 378 base::ProcessHandle process_handle, 379 ScopedPlatformHandle platform_handle, 380 ports::NodeName token, 381 const ProcessErrorCallback& process_error_callback) { 382 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 383 384#if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL) 385 PlatformChannelPair node_channel; 386 // BrokerHost owns itself. 387 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); 388 broker_host->SendChannel(node_channel.PassClientHandle()); 389 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 390 this, node_channel.PassServerHandle(), io_task_runner_, 391 process_error_callback); 392#else 393 scoped_refptr<NodeChannel> channel = 394 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_, 395 process_error_callback); 396#endif 397 398 // We set up the child channel with a temporary name so it can be identified 399 // as a pending child if it writes any messages to the channel. We may start 400 // receiving messages from it (though we shouldn't) as soon as Start() is 401 // called below. 402 403 pending_children_.insert(std::make_pair(token, channel)); 404 RecordPendingChildCount(pending_children_.size()); 405 406 channel->SetRemoteNodeName(token); 407 channel->SetRemoteProcessHandle(process_handle); 408 channel->Start(); 409 410 channel->AcceptChild(name_, token); 411} 412 413void NodeController::ConnectToParentOnIOThread( 414 ScopedPlatformHandle platform_handle) { 415 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 416 417 { 418 base::AutoLock lock(parent_lock_); 419 DCHECK(parent_name_ == ports::kInvalidNodeName); 420 421 // At this point we don't know the parent's name, so we can't yet insert it 422 // into our |peers_| map. That will happen as soon as we receive an 423 // AcceptChild message from them. 424 bootstrap_parent_channel_ = 425 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_, 426 ProcessErrorCallback()); 427 // Prevent the parent pipe handle from being closed on shutdown. Pipe 428 // closure is used by the parent to detect the child process has exited. 429 // Relying on message pipes to be closed is not enough because the parent 430 // may see the message pipe closure before the child is dead, causing the 431 // child process to be unexpectedly SIGKILL'd. 432 bootstrap_parent_channel_->LeakHandleOnShutdown(); 433 } 434 bootstrap_parent_channel_->Start(); 435} 436 437scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 438 const ports::NodeName& name) { 439 base::AutoLock lock(peers_lock_); 440 auto it = peers_.find(name); 441 if (it == peers_.end()) 442 return nullptr; 443 return it->second; 444} 445 446scoped_refptr<NodeChannel> NodeController::GetParentChannel() { 447 ports::NodeName parent_name; 448 { 449 base::AutoLock lock(parent_lock_); 450 parent_name = parent_name_; 451 } 452 return GetPeerChannel(parent_name); 453} 454 455scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() { 456 ports::NodeName broker_name; 457 { 458 base::AutoLock lock(broker_lock_); 459 broker_name = broker_name_; 460 } 461 return GetPeerChannel(broker_name); 462} 463 464void NodeController::AddPeer(const ports::NodeName& name, 465 scoped_refptr<NodeChannel> channel, 466 bool start_channel) { 467 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 468 469 DCHECK(name != ports::kInvalidNodeName); 470 DCHECK(channel); 471 472 channel->SetRemoteNodeName(name); 473 474 OutgoingMessageQueue pending_messages; 475 { 476 base::AutoLock lock(peers_lock_); 477 if (peers_.find(name) != peers_.end()) { 478 // This can happen normally if two nodes race to be introduced to each 479 // other. The losing pipe will be silently closed and introduction should 480 // not be affected. 481 DVLOG(1) << "Ignoring duplicate peer name " << name; 482 return; 483 } 484 485 auto result = peers_.insert(std::make_pair(name, channel)); 486 DCHECK(result.second); 487 488 DVLOG(2) << "Accepting new peer " << name << " on node " << name_; 489 490 RecordPeerCount(peers_.size()); 491 492 auto it = pending_peer_messages_.find(name); 493 if (it != pending_peer_messages_.end()) { 494 std::swap(pending_messages, it->second); 495 pending_peer_messages_.erase(it); 496 } 497 } 498 499 if (start_channel) 500 channel->Start(); 501 502 // Flush any queued message we need to deliver to this node. 503 while (!pending_messages.empty()) { 504 channel->PortsMessage(std::move(pending_messages.front())); 505 pending_messages.pop(); 506 } 507} 508 509void NodeController::DropPeer(const ports::NodeName& name, 510 NodeChannel* channel) { 511 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 512 513 { 514 base::AutoLock lock(peers_lock_); 515 auto it = peers_.find(name); 516 517 if (it != peers_.end()) { 518 ports::NodeName peer = it->first; 519 peers_.erase(it); 520 DVLOG(1) << "Dropped peer " << peer; 521 } 522 523 pending_peer_messages_.erase(name); 524 pending_children_.erase(name); 525 526 RecordPeerCount(peers_.size()); 527 RecordPendingChildCount(pending_children_.size()); 528 } 529 530 std::vector<ports::PortRef> ports_to_close; 531 { 532 // Clean up any reserved ports. 533 base::AutoLock lock(reserved_ports_lock_); 534 auto it = pending_child_tokens_.find(name); 535 if (it != pending_child_tokens_.end()) { 536 const std::string& child_token = it->second; 537 538 std::vector<std::string> port_tokens; 539 for (const auto& port : reserved_ports_) { 540 if (port.second.child_token == child_token) { 541 DVLOG(1) << "Closing reserved port: " << port.second.port.name(); 542 ports_to_close.push_back(port.second.port); 543 port_tokens.push_back(port.first); 544 } 545 } 546 547 // We have to erase reserved ports in a two-step manner because the usual 548 // manner of using the returned iterator from map::erase isn't technically 549 // valid in C++11 (although it is in C++14). 550 for (const auto& token : port_tokens) 551 reserved_ports_.erase(token); 552 553 pending_child_tokens_.erase(it); 554 } 555 } 556 557 bool is_parent; 558 { 559 base::AutoLock lock(parent_lock_); 560 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_); 561 } 562 // If the error comes from the parent channel, we also need to cancel any 563 // port merge requests, so that errors can be propagated to the message 564 // pipes. 565 if (is_parent) { 566 base::AutoLock lock(pending_port_merges_lock_); 567 reject_pending_merges_ = true; 568 569 for (const auto& port : pending_port_merges_) 570 ports_to_close.push_back(port.second); 571 pending_port_merges_.clear(); 572 } 573 574 for (const auto& port : ports_to_close) 575 node_->ClosePort(port); 576 577 node_->LostConnectionToNode(name); 578 579 AcceptIncomingMessages(); 580} 581 582void NodeController::SendPeerMessage(const ports::NodeName& name, 583 ports::ScopedMessage message) { 584 Channel::MessagePtr channel_message = 585 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 586 587 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); 588#if defined(OS_WIN) 589 if (channel_message->has_handles()) { 590 // If we're sending a message with handles we aren't the destination 591 // node's parent or broker (i.e. we don't know its process handle), ask 592 // the broker to relay for us. 593 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 594 if (!peer || !peer->HasRemoteProcessHandle()) { 595 if (broker) { 596 broker->RelayPortsMessage(name, std::move(channel_message)); 597 } else { 598 base::AutoLock lock(broker_lock_); 599 pending_relay_messages_[name].emplace(std::move(channel_message)); 600 } 601 return; 602 } 603 } 604#elif defined(OS_MACOSX) && !defined(OS_IOS) 605 if (channel_message->has_mach_ports()) { 606 // Messages containing Mach ports are always routed through the broker, even 607 // if the broker process is the intended recipient. 608 bool use_broker = false; 609 { 610 base::AutoLock lock(parent_lock_); 611 use_broker = (bootstrap_parent_channel_ || 612 parent_name_ != ports::kInvalidNodeName); 613 } 614 if (use_broker) { 615 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 616 if (broker) { 617 broker->RelayPortsMessage(name, std::move(channel_message)); 618 } else { 619 base::AutoLock lock(broker_lock_); 620 pending_relay_messages_[name].emplace(std::move(channel_message)); 621 } 622 return; 623 } 624 } 625#endif // defined(OS_WIN) 626 627 if (peer) { 628 peer->PortsMessage(std::move(channel_message)); 629 return; 630 } 631 632 // If we don't know who the peer is, queue the message for delivery. If this 633 // is the first message queued for the peer, we also ask the broker to 634 // introduce us to them. 635 636 bool needs_introduction = false; 637 { 638 base::AutoLock lock(peers_lock_); 639 auto& queue = pending_peer_messages_[name]; 640 needs_introduction = queue.empty(); 641 queue.emplace(std::move(channel_message)); 642 } 643 644 if (needs_introduction) { 645 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 646 if (!broker) { 647 DVLOG(1) << "Dropping message for unknown peer: " << name; 648 return; 649 } 650 651 // HACK: On ARC++ we never really need this codepath since it is always hit 652 // when RemoteMessagePipeBootstrap races against us in delivering the 653 // three-way broker handshake. If we do send the RequestIntroduction message, 654 // the broker (Chrome) won't yet know our peer, and it will cause us to drop 655 // that peer's connection, causing it to go in loops requesting for a 656 // re-introduction. Instead, let's assume that the node will eventually be 657 // introduced to us and just stick it in the queue. Note that this is only 658 // safe since ARC++ processes strictly follow a star topology, and we never 659 // pass handles between children. 660 // 661 // We need to revert this eventually when a long-term fix is ready. See 662 // b/33453258 for more details. 663 LOG(ERROR) << "Averted b/33453258 by dropping the introduction request for " 664 << name; 665 return; 666 667 broker->RequestIntroduction(name); 668 } 669} 670 671void NodeController::AcceptIncomingMessages() { 672 { 673 base::AutoLock lock(messages_lock_); 674 if (!incoming_messages_.empty()) { 675 // libstdc++'s deque creates an internal buffer on construction, even when 676 // the size is 0. So avoid creating it until it is necessary. 677 std::queue<ports::ScopedMessage> messages; 678 std::swap(messages, incoming_messages_); 679 base::AutoUnlock unlock(messages_lock_); 680 681 while (!messages.empty()) { 682 node_->AcceptMessage(std::move(messages.front())); 683 messages.pop(); 684 } 685 } 686 } 687 688 AttemptShutdownIfRequested(); 689} 690 691void NodeController::ProcessIncomingMessages() { 692 RequestContext request_context(RequestContext::Source::SYSTEM); 693 694 { 695 base::AutoLock lock(messages_lock_); 696 // Allow a new incoming messages processing task to be posted. This can't be 697 // done after AcceptIncomingMessages() otherwise a message might be missed. 698 // Doing it here may result in at most two tasks existing at the same time; 699 // this running one, and one pending in the task runner. 700 incoming_messages_task_posted_ = false; 701 } 702 703 AcceptIncomingMessages(); 704} 705 706void NodeController::DropAllPeers() { 707 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 708 709 std::vector<scoped_refptr<NodeChannel>> all_peers; 710 { 711 base::AutoLock lock(parent_lock_); 712 if (bootstrap_parent_channel_) { 713 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its 714 // existence to determine whether or not this is the root node. Once 715 // bootstrap_parent_channel_->ShutDown() has been called, 716 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't 717 // matter if it's deleted now or when |this| is deleted. 718 // Note: |bootstrap_parent_channel_| is only modified on the IO thread. 719 all_peers.push_back(bootstrap_parent_channel_); 720 } 721 } 722 723 { 724 base::AutoLock lock(peers_lock_); 725 for (const auto& peer : peers_) 726 all_peers.push_back(peer.second); 727 for (const auto& peer : pending_children_) 728 all_peers.push_back(peer.second); 729 peers_.clear(); 730 pending_children_.clear(); 731 pending_peer_messages_.clear(); 732 } 733 734 for (const auto& peer : all_peers) 735 peer->ShutDown(); 736 737 if (destroy_on_io_thread_shutdown_) 738 delete this; 739} 740 741void NodeController::GenerateRandomPortName(ports::PortName* port_name) { 742 GenerateRandomName(port_name); 743} 744 745void NodeController::AllocMessage(size_t num_header_bytes, 746 ports::ScopedMessage* message) { 747 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr)); 748} 749 750void NodeController::ForwardMessage(const ports::NodeName& node, 751 ports::ScopedMessage message) { 752 DCHECK(message); 753 bool schedule_pump_task = false; 754 if (node == name_) { 755 // NOTE: We need to avoid re-entering the Node instance within 756 // ForwardMessage. Because ForwardMessage is only ever called 757 // (synchronously) in response to Node's ClosePort, SendMessage, or 758 // AcceptMessage, we flush the queue after calling any of those methods. 759 base::AutoLock lock(messages_lock_); 760 // |io_task_runner_| may be null in tests or processes that don't require 761 // multi-process Mojo. 762 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ && 763 !incoming_messages_task_posted_; 764 incoming_messages_task_posted_ |= schedule_pump_task; 765 incoming_messages_.emplace(std::move(message)); 766 } else { 767 SendPeerMessage(node, std::move(message)); 768 } 769 770 if (schedule_pump_task) { 771 // Normally, the queue is processed after the action that added the local 772 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also 773 // possible for a local message to be added as a result of a remote message, 774 // and OnChannelMessage() doesn't process this queue (although 775 // OnPortsMessage() does). There may also be other code paths, now or added 776 // in the future, which cause local messages to be added but don't process 777 // this message queue. 778 // 779 // Instead of adding a call to AcceptIncomingMessages() on every possible 780 // code path, post a task to the IO thread to process the queue. If the 781 // current call stack processes the queue, this may end up doing nothing. 782 io_task_runner_->PostTask( 783 FROM_HERE, 784 base::Bind(&NodeController::ProcessIncomingMessages, 785 base::Unretained(this))); 786 } 787} 788 789void NodeController::BroadcastMessage(ports::ScopedMessage message) { 790 CHECK_EQ(message->num_ports(), 0u); 791 Channel::MessagePtr channel_message = 792 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 793 CHECK(!channel_message->has_handles()); 794 795 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 796 if (broker) 797 broker->Broadcast(std::move(channel_message)); 798 else 799 OnBroadcast(name_, std::move(channel_message)); 800} 801 802void NodeController::PortStatusChanged(const ports::PortRef& port) { 803 scoped_refptr<ports::UserData> user_data; 804 node_->GetUserData(port, &user_data); 805 806 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); 807 if (observer) { 808 observer->OnPortStatusChanged(); 809 } else { 810 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " 811 << "doesn't have an observer."; 812 } 813} 814 815void NodeController::OnAcceptChild(const ports::NodeName& from_node, 816 const ports::NodeName& parent_name, 817 const ports::NodeName& token) { 818 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 819 820 scoped_refptr<NodeChannel> parent; 821 { 822 base::AutoLock lock(parent_lock_); 823 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { 824 parent_name_ = parent_name; 825 parent = bootstrap_parent_channel_; 826 } 827 } 828 829 if (!parent) { 830 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; 831 DropPeer(from_node, nullptr); 832 return; 833 } 834 835 parent->SetRemoteNodeName(parent_name); 836 parent->AcceptParent(token, name_); 837 838 // NOTE: The child does not actually add its parent as a peer until 839 // receiving an AcceptBrokerClient message from the broker. The parent 840 // will request that said message be sent upon receiving AcceptParent. 841 842 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; 843} 844 845void NodeController::OnAcceptParent(const ports::NodeName& from_node, 846 const ports::NodeName& token, 847 const ports::NodeName& child_name) { 848 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 849 850 auto it = pending_children_.find(from_node); 851 if (it == pending_children_.end() || token != from_node) { 852 DLOG(ERROR) << "Received unexpected AcceptParent message from " 853 << from_node; 854 DropPeer(from_node, nullptr); 855 return; 856 } 857 858 scoped_refptr<NodeChannel> channel = it->second; 859 pending_children_.erase(it); 860 861 DCHECK(channel); 862 863 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name; 864 865 AddPeer(child_name, channel, false /* start_channel */); 866 867 // TODO(rockot/amistry): We could simplify child initialization if we could 868 // synchronously get a new async broker channel from the broker. For now we do 869 // it asynchronously since it's only used to facilitate handle passing, not 870 // handle creation. 871 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 872 if (broker) { 873 // Inform the broker of this new child. 874 broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle()); 875 } else { 876 // If we have no broker, either we need to wait for one, or we *are* the 877 // broker. 878 scoped_refptr<NodeChannel> parent = GetParentChannel(); 879 if (!parent) { 880 base::AutoLock lock(parent_lock_); 881 parent = bootstrap_parent_channel_; 882 } 883 884 if (!parent) { 885 // Yes, we're the broker. We can initialize the child directly. 886 channel->AcceptBrokerClient(name_, ScopedPlatformHandle()); 887 } else { 888 // We aren't the broker, so wait for a broker connection. 889 base::AutoLock lock(broker_lock_); 890 pending_broker_clients_.push(child_name); 891 } 892 } 893} 894 895void NodeController::OnAddBrokerClient(const ports::NodeName& from_node, 896 const ports::NodeName& client_name, 897 base::ProcessHandle process_handle) { 898#if defined(OS_WIN) 899 // Scoped handle to avoid leaks on error. 900 ScopedPlatformHandle scoped_process_handle = 901 ScopedPlatformHandle(PlatformHandle(process_handle)); 902#endif 903 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node); 904 if (!sender) { 905 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender."; 906 return; 907 } 908 909 if (GetPeerChannel(client_name)) { 910 DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; 911 DropPeer(from_node, nullptr); 912 return; 913 } 914 915 PlatformChannelPair broker_channel; 916 scoped_refptr<NodeChannel> client = NodeChannel::Create( 917 this, broker_channel.PassServerHandle(), io_task_runner_, 918 ProcessErrorCallback()); 919 920#if defined(OS_WIN) 921 // The broker must have a working handle to the client process in order to 922 // properly copy other handles to and from the client. 923 if (!scoped_process_handle.is_valid()) { 924 DLOG(ERROR) << "Broker rejecting client with invalid process handle."; 925 return; 926 } 927 client->SetRemoteProcessHandle(scoped_process_handle.release().handle); 928#else 929 client->SetRemoteProcessHandle(process_handle); 930#endif 931 932 AddPeer(client_name, client, true /* start_channel */); 933 934 DVLOG(1) << "Broker " << name_ << " accepting client " << client_name 935 << " from peer " << from_node; 936 937 sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle()); 938} 939 940void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node, 941 const ports::NodeName& client_name, 942 ScopedPlatformHandle broker_channel) { 943 scoped_refptr<NodeChannel> client = GetPeerChannel(client_name); 944 if (!client) { 945 DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name; 946 return; 947 } 948 949 // This should have come from our own broker. 950 if (GetBrokerChannel() != GetPeerChannel(from_node)) { 951 DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node; 952 return; 953 } 954 955 DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node; 956 957 client->AcceptBrokerClient(from_node, std::move(broker_channel)); 958} 959 960void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, 961 const ports::NodeName& broker_name, 962 ScopedPlatformHandle broker_channel) { 963 // This node should already have a parent in bootstrap mode. 964 ports::NodeName parent_name; 965 scoped_refptr<NodeChannel> parent; 966 { 967 base::AutoLock lock(parent_lock_); 968 parent_name = parent_name_; 969 parent = bootstrap_parent_channel_; 970 bootstrap_parent_channel_ = nullptr; 971 } 972 DCHECK(parent_name == from_node); 973 DCHECK(parent); 974 975 std::queue<ports::NodeName> pending_broker_clients; 976 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 977 pending_relay_messages; 978 { 979 base::AutoLock lock(broker_lock_); 980 broker_name_ = broker_name; 981 std::swap(pending_broker_clients, pending_broker_clients_); 982 std::swap(pending_relay_messages, pending_relay_messages_); 983 } 984 DCHECK(broker_name != ports::kInvalidNodeName); 985 986 // It's now possible to add both the broker and the parent as peers. 987 // Note that the broker and parent may be the same node. 988 scoped_refptr<NodeChannel> broker; 989 if (broker_name == parent_name) { 990 DCHECK(!broker_channel.is_valid()); 991 broker = parent; 992 } else { 993 DCHECK(broker_channel.is_valid()); 994 broker = NodeChannel::Create(this, std::move(broker_channel), 995 io_task_runner_, ProcessErrorCallback()); 996 AddPeer(broker_name, broker, true /* start_channel */); 997 } 998 999 AddPeer(parent_name, parent, false /* start_channel */); 1000 1001 { 1002 // Complete any port merge requests we have waiting for the parent. 1003 base::AutoLock lock(pending_port_merges_lock_); 1004 for (const auto& request : pending_port_merges_) 1005 parent->RequestPortMerge(request.second.name(), request.first); 1006 pending_port_merges_.clear(); 1007 } 1008 1009 // Feed the broker any pending children of our own. 1010 while (!pending_broker_clients.empty()) { 1011 const ports::NodeName& child_name = pending_broker_clients.front(); 1012 auto it = pending_children_.find(child_name); 1013 DCHECK(it != pending_children_.end()); 1014 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle()); 1015 pending_broker_clients.pop(); 1016 } 1017 1018#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1019 // Have the broker relay any messages we have waiting. 1020 for (auto& entry : pending_relay_messages) { 1021 const ports::NodeName& destination = entry.first; 1022 auto& message_queue = entry.second; 1023 while (!message_queue.empty()) { 1024 broker->RelayPortsMessage(destination, std::move(message_queue.front())); 1025 message_queue.pop(); 1026 } 1027 } 1028#endif 1029 1030 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; 1031} 1032 1033void NodeController::OnPortsMessage(const ports::NodeName& from_node, 1034 Channel::MessagePtr channel_message) { 1035 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1036 1037 void* data; 1038 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1039 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, 1040 &num_header_bytes, &num_payload_bytes, 1041 &num_ports_bytes)) { 1042 DropPeer(from_node, nullptr); 1043 return; 1044 } 1045 1046 CHECK(channel_message); 1047 std::unique_ptr<PortsMessage> ports_message( 1048 new PortsMessage(num_header_bytes, 1049 num_payload_bytes, 1050 num_ports_bytes, 1051 std::move(channel_message))); 1052 ports_message->set_source_node(from_node); 1053 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); 1054 AcceptIncomingMessages(); 1055} 1056 1057void NodeController::OnRequestPortMerge( 1058 const ports::NodeName& from_node, 1059 const ports::PortName& connector_port_name, 1060 const std::string& token) { 1061 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1062 1063 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " 1064 << token << " and port " << connector_port_name << "@" << from_node; 1065 1066 ports::PortRef local_port; 1067 { 1068 base::AutoLock lock(reserved_ports_lock_); 1069 auto it = reserved_ports_.find(token); 1070 if (it == reserved_ports_.end()) { 1071 DVLOG(1) << "Ignoring request to connect to port for unknown token " 1072 << token; 1073 return; 1074 } 1075 local_port = it->second.port; 1076 } 1077 1078 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 1079 if (rv != ports::OK) 1080 DLOG(ERROR) << "MergePorts failed: " << rv; 1081 1082 AcceptIncomingMessages(); 1083} 1084 1085void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 1086 const ports::NodeName& name) { 1087 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1088 1089 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 1090 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 1091 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 1092 << from_node; 1093 DropPeer(from_node, nullptr); 1094 return; 1095 } 1096 1097 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); 1098 if (!new_friend) { 1099 // We don't know who they're talking about! 1100 requestor->Introduce(name, ScopedPlatformHandle()); 1101 } else { 1102 PlatformChannelPair new_channel; 1103 requestor->Introduce(name, new_channel.PassServerHandle()); 1104 new_friend->Introduce(from_node, new_channel.PassClientHandle()); 1105 } 1106} 1107 1108void NodeController::OnIntroduce(const ports::NodeName& from_node, 1109 const ports::NodeName& name, 1110 ScopedPlatformHandle channel_handle) { 1111 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1112 1113 if (!channel_handle.is_valid()) { 1114 node_->LostConnectionToNode(name); 1115 1116 DLOG(ERROR) << "Could not be introduced to peer " << name; 1117 base::AutoLock lock(peers_lock_); 1118 pending_peer_messages_.erase(name); 1119 return; 1120 } 1121 1122 scoped_refptr<NodeChannel> channel = 1123 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_, 1124 ProcessErrorCallback()); 1125 1126 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; 1127 AddPeer(name, channel, true /* start_channel */); 1128} 1129 1130void NodeController::OnBroadcast(const ports::NodeName& from_node, 1131 Channel::MessagePtr message) { 1132 DCHECK(!message->has_handles()); 1133 1134 void* data; 1135 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1136 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes, 1137 &num_header_bytes, &num_payload_bytes, 1138 &num_ports_bytes)) { 1139 DropPeer(from_node, nullptr); 1140 return; 1141 } 1142 1143 // Broadcast messages must not contain ports. 1144 if (num_ports_bytes > 0) { 1145 DropPeer(from_node, nullptr); 1146 return; 1147 } 1148 1149 base::AutoLock lock(peers_lock_); 1150 for (auto& iter : peers_) { 1151 // Copy and send the message to each known peer. 1152 Channel::MessagePtr peer_message( 1153 new Channel::Message(message->payload_size(), 0)); 1154 memcpy(peer_message->mutable_payload(), message->payload(), 1155 message->payload_size()); 1156 iter.second->PortsMessage(std::move(peer_message)); 1157 } 1158} 1159 1160#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1161void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 1162 base::ProcessHandle from_process, 1163 const ports::NodeName& destination, 1164 Channel::MessagePtr message) { 1165 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1166 1167 if (GetBrokerChannel()) { 1168 // Only the broker should be asked to relay a message. 1169 LOG(ERROR) << "Non-broker refusing to relay message."; 1170 DropPeer(from_node, nullptr); 1171 return; 1172 } 1173 1174 // The parent should always know which process this came from. 1175 DCHECK(from_process != base::kNullProcessHandle); 1176 1177#if defined(OS_WIN) 1178 // Rewrite the handles to this (the parent) process. If the message is 1179 // destined for another child process, the handles will be rewritten to that 1180 // process before going out (see NodeChannel::WriteChannelMessage). 1181 // 1182 // TODO: We could avoid double-duplication. 1183 // 1184 // Note that we explicitly mark the handles as being owned by the sending 1185 // process before rewriting them, in order to accommodate RewriteHandles' 1186 // internal sanity checks. 1187 ScopedPlatformHandleVectorPtr handles = message->TakeHandles(); 1188 for (size_t i = 0; i < handles->size(); ++i) 1189 (*handles)[i].owning_process = from_process; 1190 if (!Channel::Message::RewriteHandles(from_process, 1191 base::GetCurrentProcessHandle(), 1192 handles.get())) { 1193 DLOG(ERROR) << "Failed to relay one or more handles."; 1194 } 1195 message->SetHandles(std::move(handles)); 1196#else 1197 MachPortRelay* relay = GetMachPortRelay(); 1198 if (!relay) { 1199 LOG(ERROR) << "Receiving Mach ports without a port relay from " 1200 << from_node << ". Dropping message."; 1201 return; 1202 } 1203 if (!relay->ExtractPortRights(message.get(), from_process)) { 1204 // NodeChannel should ensure that MachPortRelay is ready for the remote 1205 // process. At this point, if the port extraction failed, either something 1206 // went wrong in the mach stuff, or the remote process died. 1207 LOG(ERROR) << "Error on receiving Mach ports " << from_node 1208 << ". Dropping message."; 1209 return; 1210 } 1211#endif // defined(OS_WIN) 1212 1213 if (destination == name_) { 1214 // Great, we can deliver this message locally. 1215 OnPortsMessage(from_node, std::move(message)); 1216 return; 1217 } 1218 1219 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); 1220 if (peer) 1221 peer->PortsMessageFromRelay(from_node, std::move(message)); 1222 else 1223 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; 1224} 1225 1226void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node, 1227 const ports::NodeName& source_node, 1228 Channel::MessagePtr message) { 1229 if (GetPeerChannel(from_node) != GetBrokerChannel()) { 1230 LOG(ERROR) << "Refusing relayed message from non-broker node."; 1231 DropPeer(from_node, nullptr); 1232 return; 1233 } 1234 1235 OnPortsMessage(source_node, std::move(message)); 1236} 1237#endif 1238 1239void NodeController::OnChannelError(const ports::NodeName& from_node, 1240 NodeChannel* channel) { 1241 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1242 DropPeer(from_node, channel); 1243 // DropPeer may have caused local port closures, so be sure to process any 1244 // pending local messages. 1245 AcceptIncomingMessages(); 1246 } else { 1247 io_task_runner_->PostTask( 1248 FROM_HERE, 1249 base::Bind(&NodeController::OnChannelError, base::Unretained(this), 1250 from_node, channel)); 1251 } 1252} 1253 1254#if defined(OS_MACOSX) && !defined(OS_IOS) 1255MachPortRelay* NodeController::GetMachPortRelay() { 1256 { 1257 base::AutoLock lock(parent_lock_); 1258 // Return null if we're not the root. 1259 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) 1260 return nullptr; 1261 } 1262 1263 base::AutoLock lock(mach_port_relay_lock_); 1264 return mach_port_relay_.get(); 1265} 1266#endif 1267 1268void NodeController::DestroyOnIOThreadShutdown() { 1269 destroy_on_io_thread_shutdown_ = true; 1270} 1271 1272void NodeController::AttemptShutdownIfRequested() { 1273 if (!shutdown_callback_flag_) 1274 return; 1275 1276 base::Closure callback; 1277 { 1278 base::AutoLock lock(shutdown_lock_); 1279 if (shutdown_callback_.is_null()) 1280 return; 1281 if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) { 1282 DVLOG(2) << "Unable to cleanly shut down node " << name_; 1283 return; 1284 } 1285 1286 callback = shutdown_callback_; 1287 shutdown_callback_.Reset(); 1288 shutdown_callback_flag_.Set(false); 1289 } 1290 1291 DCHECK(!callback.is_null()); 1292 1293 callback.Run(); 1294} 1295 1296} // namespace edk 1297} // namespace mojo 1298