1// Copyright 2016 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/edk/system/ports/node.h" 6 7#include <string.h> 8 9#include <utility> 10 11#include "base/atomicops.h" 12#include "base/logging.h" 13#include "base/memory/ref_counted.h" 14#include "base/synchronization/lock.h" 15#include "mojo/edk/system/ports/node_delegate.h" 16 17namespace mojo { 18namespace edk { 19namespace ports { 20 21namespace { 22 23int DebugError(const char* message, int error_code) { 24 CHECK(false) << "Oops: " << message; 25 return error_code; 26} 27 28#define OOPS(x) DebugError(#x, x) 29 30bool CanAcceptMoreMessages(const Port* port) { 31 // Have we already doled out the last message (i.e., do we expect to NOT 32 // receive further messages)? 33 uint64_t next_sequence_num = port->message_queue.next_sequence_num(); 34 if (port->state == Port::kClosed) 35 return false; 36 if (port->peer_closed || port->remove_proxy_on_last_message) { 37 if (port->last_sequence_num_to_receive == next_sequence_num - 1) 38 return false; 39 } 40 return true; 41} 42 43} // namespace 44 45class Node::LockedPort { 46 public: 47 explicit LockedPort(Port* port) : port_(port) { 48 port_->lock.AssertAcquired(); 49 } 50 51 Port* get() const { return port_; } 52 Port* operator->() const { return port_; } 53 54 private: 55 Port* const port_; 56}; 57 58Node::Node(const NodeName& name, NodeDelegate* delegate) 59 : name_(name), 60 delegate_(delegate) { 61} 62 63Node::~Node() { 64 if (!ports_.empty()) 65 DLOG(WARNING) << "Unclean shutdown for node " << name_; 66} 67 68bool Node::CanShutdownCleanly(ShutdownPolicy policy) { 69 base::AutoLock ports_lock(ports_lock_); 70 71 if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) { 72#if DCHECK_IS_ON() 73 for (auto entry : ports_) { 74 DVLOG(2) << "Port " << entry.first << " referencing node " 75 << entry.second->peer_node_name << " is blocking shutdown of " 76 << "node " << name_ << " (state=" << entry.second->state << ")"; 77 } 78#endif 79 return ports_.empty(); 80 } 81 82 DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS); 83 84 // NOTE: This is not efficient, though it probably doesn't need to be since 85 // relatively few ports should be open during shutdown and shutdown doesn't 86 // need to be blazingly fast. 87 bool can_shutdown = true; 88 for (auto entry : ports_) { 89 base::AutoLock lock(entry.second->lock); 90 if (entry.second->peer_node_name != name_ && 91 entry.second->state != Port::kReceiving) { 92 can_shutdown = false; 93#if DCHECK_IS_ON() 94 DVLOG(2) << "Port " << entry.first << " referencing node " 95 << entry.second->peer_node_name << " is blocking shutdown of " 96 << "node " << name_ << " (state=" << entry.second->state << ")"; 97#else 98 // Exit early when not debugging. 99 break; 100#endif 101 } 102 } 103 104 return can_shutdown; 105} 106 107int Node::GetPort(const PortName& port_name, PortRef* port_ref) { 108 scoped_refptr<Port> port = GetPort(port_name); 109 if (!port) 110 return ERROR_PORT_UNKNOWN; 111 112 *port_ref = PortRef(port_name, std::move(port)); 113 return OK; 114} 115 116int Node::CreateUninitializedPort(PortRef* port_ref) { 117 PortName port_name; 118 delegate_->GenerateRandomPortName(&port_name); 119 120 scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum)); 121 int rv = AddPortWithName(port_name, port); 122 if (rv != OK) 123 return rv; 124 125 *port_ref = PortRef(port_name, std::move(port)); 126 return OK; 127} 128 129int Node::InitializePort(const PortRef& port_ref, 130 const NodeName& peer_node_name, 131 const PortName& peer_port_name) { 132 Port* port = port_ref.port(); 133 134 { 135 base::AutoLock lock(port->lock); 136 if (port->state != Port::kUninitialized) 137 return ERROR_PORT_STATE_UNEXPECTED; 138 139 port->state = Port::kReceiving; 140 port->peer_node_name = peer_node_name; 141 port->peer_port_name = peer_port_name; 142 } 143 144 delegate_->PortStatusChanged(port_ref); 145 146 return OK; 147} 148 149int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) { 150 int rv; 151 152 rv = CreateUninitializedPort(port0_ref); 153 if (rv != OK) 154 return rv; 155 156 rv = CreateUninitializedPort(port1_ref); 157 if (rv != OK) 158 return rv; 159 160 rv = InitializePort(*port0_ref, name_, port1_ref->name()); 161 if (rv != OK) 162 return rv; 163 164 rv = InitializePort(*port1_ref, name_, port0_ref->name()); 165 if (rv != OK) 166 return rv; 167 168 return OK; 169} 170 171int Node::SetUserData(const PortRef& port_ref, 172 scoped_refptr<UserData> user_data) { 173 Port* port = port_ref.port(); 174 175 base::AutoLock lock(port->lock); 176 if (port->state == Port::kClosed) 177 return ERROR_PORT_STATE_UNEXPECTED; 178 179 port->user_data = std::move(user_data); 180 181 return OK; 182} 183 184int Node::GetUserData(const PortRef& port_ref, 185 scoped_refptr<UserData>* user_data) { 186 Port* port = port_ref.port(); 187 188 base::AutoLock lock(port->lock); 189 if (port->state == Port::kClosed) 190 return ERROR_PORT_STATE_UNEXPECTED; 191 192 *user_data = port->user_data; 193 194 return OK; 195} 196 197int Node::ClosePort(const PortRef& port_ref) { 198 std::deque<PortName> referenced_port_names; 199 200 ObserveClosureEventData data; 201 202 NodeName peer_node_name; 203 PortName peer_port_name; 204 Port* port = port_ref.port(); 205 { 206 // We may need to erase the port, which requires ports_lock_ to be held, 207 // but ports_lock_ must be acquired before any individual port locks. 208 base::AutoLock ports_lock(ports_lock_); 209 210 base::AutoLock lock(port->lock); 211 if (port->state == Port::kUninitialized) { 212 // If the port was not yet initialized, there's nothing interesting to do. 213 ErasePort_Locked(port_ref.name()); 214 return OK; 215 } 216 217 if (port->state != Port::kReceiving) 218 return ERROR_PORT_STATE_UNEXPECTED; 219 220 port->state = Port::kClosed; 221 222 // We pass along the sequence number of the last message sent from this 223 // port to allow the peer to have the opportunity to consume all inbound 224 // messages before notifying the embedder that this port is closed. 225 data.last_sequence_num = port->next_sequence_num_to_send - 1; 226 227 peer_node_name = port->peer_node_name; 228 peer_port_name = port->peer_port_name; 229 230 // If the port being closed still has unread messages, then we need to take 231 // care to close those ports so as to avoid leaking memory. 232 port->message_queue.GetReferencedPorts(&referenced_port_names); 233 234 ErasePort_Locked(port_ref.name()); 235 } 236 237 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_ 238 << " to " << peer_port_name << "@" << peer_node_name; 239 240 delegate_->ForwardMessage( 241 peer_node_name, 242 NewInternalMessage(peer_port_name, EventType::kObserveClosure, data)); 243 244 for (const auto& name : referenced_port_names) { 245 PortRef ref; 246 if (GetPort(name, &ref) == OK) 247 ClosePort(ref); 248 } 249 return OK; 250} 251 252int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) { 253 Port* port = port_ref.port(); 254 255 base::AutoLock lock(port->lock); 256 257 if (port->state != Port::kReceiving) 258 return ERROR_PORT_STATE_UNEXPECTED; 259 260 port_status->has_messages = port->message_queue.HasNextMessage(); 261 port_status->receiving_messages = CanAcceptMoreMessages(port); 262 port_status->peer_closed = port->peer_closed; 263 return OK; 264} 265 266int Node::GetMessage(const PortRef& port_ref, 267 ScopedMessage* message, 268 MessageFilter* filter) { 269 *message = nullptr; 270 271 DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_; 272 273 Port* port = port_ref.port(); 274 { 275 base::AutoLock lock(port->lock); 276 277 // This could also be treated like the port being unknown since the 278 // embedder should no longer be referring to a port that has been sent. 279 if (port->state != Port::kReceiving) 280 return ERROR_PORT_STATE_UNEXPECTED; 281 282 // Let the embedder get messages until there are no more before reporting 283 // that the peer closed its end. 284 if (!CanAcceptMoreMessages(port)) 285 return ERROR_PORT_PEER_CLOSED; 286 287 port->message_queue.GetNextMessage(message, filter); 288 } 289 290 // Allow referenced ports to trigger PortStatusChanged calls. 291 if (*message) { 292 for (size_t i = 0; i < (*message)->num_ports(); ++i) { 293 const PortName& new_port_name = (*message)->ports()[i]; 294 scoped_refptr<Port> new_port = GetPort(new_port_name); 295 296 DCHECK(new_port) << "Port " << new_port_name << "@" << name_ 297 << " does not exist!"; 298 299 base::AutoLock lock(new_port->lock); 300 301 DCHECK(new_port->state == Port::kReceiving); 302 new_port->message_queue.set_signalable(true); 303 } 304 } 305 306 return OK; 307} 308 309int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) { 310 int rv = SendMessageInternal(port_ref, &message); 311 if (rv != OK) { 312 // If send failed, close all carried ports. Note that we're careful not to 313 // close the sending port itself if it happened to be one of the encoded 314 // ports (an invalid but possible condition.) 315 for (size_t i = 0; i < message->num_ports(); ++i) { 316 if (message->ports()[i] == port_ref.name()) 317 continue; 318 319 PortRef port; 320 if (GetPort(message->ports()[i], &port) == OK) 321 ClosePort(port); 322 } 323 } 324 return rv; 325} 326 327int Node::AcceptMessage(ScopedMessage message) { 328 const EventHeader* header = GetEventHeader(*message); 329 switch (header->type) { 330 case EventType::kUser: 331 return OnUserMessage(std::move(message)); 332 case EventType::kPortAccepted: 333 return OnPortAccepted(header->port_name); 334 case EventType::kObserveProxy: 335 return OnObserveProxy( 336 header->port_name, 337 *GetEventData<ObserveProxyEventData>(*message)); 338 case EventType::kObserveProxyAck: 339 return OnObserveProxyAck( 340 header->port_name, 341 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); 342 case EventType::kObserveClosure: 343 return OnObserveClosure( 344 header->port_name, 345 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); 346 case EventType::kMergePort: 347 return OnMergePort(header->port_name, 348 *GetEventData<MergePortEventData>(*message)); 349 } 350 return OOPS(ERROR_NOT_IMPLEMENTED); 351} 352 353int Node::MergePorts(const PortRef& port_ref, 354 const NodeName& destination_node_name, 355 const PortName& destination_port_name) { 356 Port* port = port_ref.port(); 357 MergePortEventData data; 358 { 359 base::AutoLock lock(port->lock); 360 361 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ 362 << " to " << destination_port_name << "@" << destination_node_name; 363 364 // Send the port-to-merge over to the destination node so it can be merged 365 // into the port cycle atomically there. 366 data.new_port_name = port_ref.name(); 367 WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name, 368 &data.new_port_descriptor); 369 } 370 delegate_->ForwardMessage( 371 destination_node_name, 372 NewInternalMessage(destination_port_name, 373 EventType::kMergePort, data)); 374 return OK; 375} 376 377int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) { 378 Port* port0 = port0_ref.port(); 379 Port* port1 = port1_ref.port(); 380 int rv; 381 { 382 // |ports_lock_| must be held when acquiring overlapping port locks. 383 base::AutoLock ports_lock(ports_lock_); 384 base::AutoLock port0_lock(port0->lock); 385 base::AutoLock port1_lock(port1->lock); 386 387 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_ 388 << " and " << port1_ref.name() << "@" << name_; 389 390 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving) 391 rv = ERROR_PORT_STATE_UNEXPECTED; 392 else 393 rv = MergePorts_Locked(port0_ref, port1_ref); 394 } 395 396 if (rv != OK) { 397 ClosePort(port0_ref); 398 ClosePort(port1_ref); 399 } 400 401 return rv; 402} 403 404int Node::LostConnectionToNode(const NodeName& node_name) { 405 // We can no longer send events to the given node. We also can't expect any 406 // PortAccepted events. 407 408 DVLOG(1) << "Observing lost connection from node " << name_ 409 << " to node " << node_name; 410 411 DestroyAllPortsWithPeer(node_name, kInvalidPortName); 412 return OK; 413} 414 415int Node::OnUserMessage(ScopedMessage message) { 416 PortName port_name = GetEventHeader(*message)->port_name; 417 const auto* event = GetEventData<UserEventData>(*message); 418 419#if DCHECK_IS_ON() 420 std::ostringstream ports_buf; 421 for (size_t i = 0; i < message->num_ports(); ++i) { 422 if (i > 0) 423 ports_buf << ","; 424 ports_buf << message->ports()[i]; 425 } 426 427 DVLOG(4) << "AcceptMessage " << event->sequence_num 428 << " [ports=" << ports_buf.str() << "] at " 429 << port_name << "@" << name_; 430#endif 431 432 scoped_refptr<Port> port = GetPort(port_name); 433 434 // Even if this port does not exist, cannot receive anymore messages or is 435 // buffering or proxying messages, we still need these ports to be bound to 436 // this node. When the message is forwarded, these ports will get transferred 437 // following the usual method. If the message cannot be accepted, then the 438 // newly bound ports will simply be closed. 439 440 for (size_t i = 0; i < message->num_ports(); ++i) { 441 int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]); 442 if (rv != OK) 443 return rv; 444 } 445 446 bool has_next_message = false; 447 bool message_accepted = false; 448 449 if (port) { 450 // We may want to forward messages once the port lock is held, so we must 451 // acquire |ports_lock_| first. 452 base::AutoLock ports_lock(ports_lock_); 453 base::AutoLock lock(port->lock); 454 455 // Reject spurious messages if we've already received the last expected 456 // message. 457 if (CanAcceptMoreMessages(port.get())) { 458 message_accepted = true; 459 port->message_queue.AcceptMessage(std::move(message), &has_next_message); 460 461 if (port->state == Port::kBuffering) { 462 has_next_message = false; 463 } else if (port->state == Port::kProxying) { 464 has_next_message = false; 465 466 // Forward messages. We forward messages in sequential order here so 467 // that we maintain the message queue's notion of next sequence number. 468 // That's useful for the proxy removal process as we can tell when this 469 // port has seen all of the messages it is expected to see. 470 int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name); 471 if (rv != OK) 472 return rv; 473 474 MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name); 475 } 476 } 477 } 478 479 if (!message_accepted) { 480 DVLOG(2) << "Message not accepted!\n"; 481 // Close all newly accepted ports as they are effectively orphaned. 482 for (size_t i = 0; i < message->num_ports(); ++i) { 483 PortRef port_ref; 484 if (GetPort(message->ports()[i], &port_ref) == OK) { 485 ClosePort(port_ref); 486 } else { 487 DLOG(WARNING) << "Cannot close non-existent port!\n"; 488 } 489 } 490 } else if (has_next_message) { 491 PortRef port_ref(port_name, port); 492 delegate_->PortStatusChanged(port_ref); 493 } 494 495 return OK; 496} 497 498int Node::OnPortAccepted(const PortName& port_name) { 499 scoped_refptr<Port> port = GetPort(port_name); 500 if (!port) 501 return ERROR_PORT_UNKNOWN; 502 503 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ 504 << " pointing to " 505 << port->peer_port_name << "@" << port->peer_node_name; 506 507 return BeginProxying(PortRef(port_name, std::move(port))); 508} 509 510int Node::OnObserveProxy(const PortName& port_name, 511 const ObserveProxyEventData& event) { 512 if (port_name == kInvalidPortName) { 513 // An ObserveProxy with an invalid target port name is a broadcast used to 514 // inform ports when their peer (which was itself a proxy) has become 515 // defunct due to unexpected node disconnection. 516 // 517 // Receiving ports affected by this treat it as equivalent to peer closure. 518 // Proxies affected by this can be removed and will in turn broadcast their 519 // own death with a similar message. 520 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName); 521 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName); 522 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name); 523 return OK; 524 } 525 526 // The port may have already been closed locally, in which case the 527 // ObserveClosure message will contain the last_sequence_num field. 528 // We can then silently ignore this message. 529 scoped_refptr<Port> port = GetPort(port_name); 530 if (!port) { 531 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; 532 return OK; 533 } 534 535 DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at " 536 << event.proxy_port_name << "@" 537 << event.proxy_node_name << " pointing to " 538 << event.proxy_to_port_name << "@" 539 << event.proxy_to_node_name; 540 541 { 542 base::AutoLock lock(port->lock); 543 544 if (port->peer_node_name == event.proxy_node_name && 545 port->peer_port_name == event.proxy_port_name) { 546 if (port->state == Port::kReceiving) { 547 port->peer_node_name = event.proxy_to_node_name; 548 port->peer_port_name = event.proxy_to_port_name; 549 550 ObserveProxyAckEventData ack; 551 ack.last_sequence_num = port->next_sequence_num_to_send - 1; 552 553 delegate_->ForwardMessage( 554 event.proxy_node_name, 555 NewInternalMessage(event.proxy_port_name, 556 EventType::kObserveProxyAck, 557 ack)); 558 } else { 559 // As a proxy ourselves, we don't know how to honor the ObserveProxy 560 // event or to populate the last_sequence_num field of ObserveProxyAck. 561 // Afterall, another port could be sending messages to our peer now 562 // that we've sent out our own ObserveProxy event. Instead, we will 563 // send an ObserveProxyAck indicating that the ObserveProxy event 564 // should be re-sent (last_sequence_num set to kInvalidSequenceNum). 565 // However, this has to be done after we are removed as a proxy. 566 // Otherwise, we might just find ourselves back here again, which 567 // would be akin to a busy loop. 568 569 DVLOG(2) << "Delaying ObserveProxyAck to " 570 << event.proxy_port_name << "@" << event.proxy_node_name; 571 572 ObserveProxyAckEventData ack; 573 ack.last_sequence_num = kInvalidSequenceNum; 574 575 port->send_on_proxy_removal.reset( 576 new std::pair<NodeName, ScopedMessage>( 577 event.proxy_node_name, 578 NewInternalMessage(event.proxy_port_name, 579 EventType::kObserveProxyAck, 580 ack))); 581 } 582 } else { 583 // Forward this event along to our peer. Eventually, it should find the 584 // port referring to the proxy. 585 delegate_->ForwardMessage( 586 port->peer_node_name, 587 NewInternalMessage(port->peer_port_name, 588 EventType::kObserveProxy, 589 event)); 590 } 591 } 592 return OK; 593} 594 595int Node::OnObserveProxyAck(const PortName& port_name, 596 uint64_t last_sequence_num) { 597 DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_ 598 << " (last_sequence_num=" << last_sequence_num << ")"; 599 600 scoped_refptr<Port> port = GetPort(port_name); 601 if (!port) 602 return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so 603 // this is not an "Oops". 604 605 { 606 base::AutoLock lock(port->lock); 607 608 if (port->state != Port::kProxying) 609 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 610 611 if (last_sequence_num == kInvalidSequenceNum) { 612 // Send again. 613 InitiateProxyRemoval(LockedPort(port.get()), port_name); 614 return OK; 615 } 616 617 // We can now remove this port once we have received and forwarded the last 618 // message addressed to this port. 619 port->remove_proxy_on_last_message = true; 620 port->last_sequence_num_to_receive = last_sequence_num; 621 } 622 TryRemoveProxy(PortRef(port_name, std::move(port))); 623 return OK; 624} 625 626int Node::OnObserveClosure(const PortName& port_name, 627 uint64_t last_sequence_num) { 628 // OK if the port doesn't exist, as it may have been closed already. 629 scoped_refptr<Port> port = GetPort(port_name); 630 if (!port) 631 return OK; 632 633 // This message tells the port that it should no longer expect more messages 634 // beyond last_sequence_num. This message is forwarded along until we reach 635 // the receiving end, and this message serves as an equivalent to 636 // ObserveProxyAck. 637 638 bool notify_delegate = false; 639 ObserveClosureEventData forwarded_data; 640 NodeName peer_node_name; 641 PortName peer_port_name; 642 bool try_remove_proxy = false; 643 { 644 base::AutoLock lock(port->lock); 645 646 port->peer_closed = true; 647 port->last_sequence_num_to_receive = last_sequence_num; 648 649 DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_ 650 << " (state=" << port->state << ") pointing to " 651 << port->peer_port_name << "@" << port->peer_node_name 652 << " (last_sequence_num=" << last_sequence_num << ")"; 653 654 // We always forward ObserveClosure, even beyond the receiving port which 655 // cares about it. This ensures that any dead-end proxies beyond that port 656 // are notified to remove themselves. 657 658 if (port->state == Port::kReceiving) { 659 notify_delegate = true; 660 661 // When forwarding along the other half of the port cycle, this will only 662 // reach dead-end proxies. Tell them we've sent our last message so they 663 // can go away. 664 // 665 // TODO: Repurposing ObserveClosure for this has the desired result but 666 // may be semantically confusing since the forwarding port is not actually 667 // closed. Consider replacing this with a new event type. 668 forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1; 669 } else { 670 // We haven't yet reached the receiving peer of the closed port, so 671 // forward the message along as-is. 672 forwarded_data.last_sequence_num = last_sequence_num; 673 674 // See about removing the port if it is a proxy as our peer won't be able 675 // to participate in proxy removal. 676 port->remove_proxy_on_last_message = true; 677 if (port->state == Port::kProxying) 678 try_remove_proxy = true; 679 } 680 681 DVLOG(2) << "Forwarding ObserveClosure from " 682 << port_name << "@" << name_ << " to peer " 683 << port->peer_port_name << "@" << port->peer_node_name 684 << " (last_sequence_num=" << forwarded_data.last_sequence_num 685 << ")"; 686 687 peer_node_name = port->peer_node_name; 688 peer_port_name = port->peer_port_name; 689 } 690 if (try_remove_proxy) 691 TryRemoveProxy(PortRef(port_name, port)); 692 693 delegate_->ForwardMessage( 694 peer_node_name, 695 NewInternalMessage(peer_port_name, EventType::kObserveClosure, 696 forwarded_data)); 697 698 if (notify_delegate) { 699 PortRef port_ref(port_name, std::move(port)); 700 delegate_->PortStatusChanged(port_ref); 701 } 702 return OK; 703} 704 705int Node::OnMergePort(const PortName& port_name, 706 const MergePortEventData& event) { 707 scoped_refptr<Port> port = GetPort(port_name); 708 709 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" 710 << (port ? port->state : -1) << ") merging with proxy " 711 << event.new_port_name 712 << "@" << name_ << " pointing to " 713 << event.new_port_descriptor.peer_port_name << "@" 714 << event.new_port_descriptor.peer_node_name << " referred by " 715 << event.new_port_descriptor.referring_port_name << "@" 716 << event.new_port_descriptor.referring_node_name; 717 718 bool close_target_port = false; 719 bool close_new_port = false; 720 721 // Accept the new port. This is now the receiving end of the other port cycle 722 // to be merged with ours. 723 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); 724 if (rv != OK) { 725 close_target_port = true; 726 } else if (port) { 727 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn 728 // needs to hold |ports_lock_|. We also acquire multiple port locks within. 729 base::AutoLock ports_lock(ports_lock_); 730 base::AutoLock lock(port->lock); 731 732 if (port->state != Port::kReceiving) { 733 close_new_port = true; 734 } else { 735 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); 736 base::AutoLock new_port_lock(new_port->lock); 737 DCHECK(new_port->state == Port::kReceiving); 738 739 // Both ports are locked. Now all we have to do is swap their peer 740 // information and set them up as proxies. 741 742 PortRef port0_ref(port_name, port); 743 PortRef port1_ref(event.new_port_name, new_port); 744 int rv = MergePorts_Locked(port0_ref, port1_ref); 745 if (rv == OK) 746 return rv; 747 748 close_new_port = true; 749 close_target_port = true; 750 } 751 } else { 752 close_new_port = true; 753 } 754 755 if (close_target_port) { 756 PortRef target_port; 757 rv = GetPort(port_name, &target_port); 758 DCHECK(rv == OK); 759 760 ClosePort(target_port); 761 } 762 763 if (close_new_port) { 764 PortRef new_port; 765 rv = GetPort(event.new_port_name, &new_port); 766 DCHECK(rv == OK); 767 768 ClosePort(new_port); 769 } 770 771 return ERROR_PORT_STATE_UNEXPECTED; 772} 773 774int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) { 775 base::AutoLock lock(ports_lock_); 776 777 if (!ports_.insert(std::make_pair(port_name, std::move(port))).second) 778 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. 779 780 DVLOG(2) << "Created port " << port_name << "@" << name_; 781 return OK; 782} 783 784void Node::ErasePort(const PortName& port_name) { 785 base::AutoLock lock(ports_lock_); 786 ErasePort_Locked(port_name); 787} 788 789void Node::ErasePort_Locked(const PortName& port_name) { 790 ports_lock_.AssertAcquired(); 791 ports_.erase(port_name); 792 DVLOG(2) << "Deleted port " << port_name << "@" << name_; 793} 794 795scoped_refptr<Port> Node::GetPort(const PortName& port_name) { 796 base::AutoLock lock(ports_lock_); 797 return GetPort_Locked(port_name); 798} 799 800scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) { 801 ports_lock_.AssertAcquired(); 802 auto iter = ports_.find(port_name); 803 if (iter == ports_.end()) 804 return nullptr; 805 806#if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64) 807 // Workaround for https://crbug.com/665869. 808 base::subtle::MemoryBarrier(); 809#endif 810 811 return iter->second; 812} 813 814int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) { 815 ScopedMessage& m = *message; 816 for (size_t i = 0; i < m->num_ports(); ++i) { 817 if (m->ports()[i] == port_ref.name()) 818 return ERROR_PORT_CANNOT_SEND_SELF; 819 } 820 821 Port* port = port_ref.port(); 822 NodeName peer_node_name; 823 { 824 // We must acquire |ports_lock_| before grabbing any port locks, because 825 // WillSendMessage_Locked may need to lock multiple ports out of order. 826 base::AutoLock ports_lock(ports_lock_); 827 base::AutoLock lock(port->lock); 828 829 if (port->state != Port::kReceiving) 830 return ERROR_PORT_STATE_UNEXPECTED; 831 832 if (port->peer_closed) 833 return ERROR_PORT_PEER_CLOSED; 834 835 int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get()); 836 if (rv != OK) 837 return rv; 838 839 // Beyond this point there's no sense in returning anything but OK. Even if 840 // message forwarding or acceptance fails, there's nothing the embedder can 841 // do to recover. Assume that failure beyond this point must be treated as a 842 // transport failure. 843 844 peer_node_name = port->peer_node_name; 845 } 846 847 if (peer_node_name != name_) { 848 delegate_->ForwardMessage(peer_node_name, std::move(m)); 849 return OK; 850 } 851 852 int rv = AcceptMessage(std::move(m)); 853 if (rv != OK) { 854 // See comment above for why we don't return an error in this case. 855 DVLOG(2) << "AcceptMessage failed: " << rv; 856 } 857 858 return OK; 859} 860 861int Node::MergePorts_Locked(const PortRef& port0_ref, 862 const PortRef& port1_ref) { 863 Port* port0 = port0_ref.port(); 864 Port* port1 = port1_ref.port(); 865 866 ports_lock_.AssertAcquired(); 867 port0->lock.AssertAcquired(); 868 port1->lock.AssertAcquired(); 869 870 CHECK(port0->state == Port::kReceiving); 871 CHECK(port1->state == Port::kReceiving); 872 873 // Ports cannot be merged with their own receiving peer! 874 if (port0->peer_node_name == name_ && 875 port0->peer_port_name == port1_ref.name()) 876 return ERROR_PORT_STATE_UNEXPECTED; 877 878 if (port1->peer_node_name == name_ && 879 port1->peer_port_name == port0_ref.name()) 880 return ERROR_PORT_STATE_UNEXPECTED; 881 882 // Only merge if both ports have never sent a message. 883 if (port0->next_sequence_num_to_send == kInitialSequenceNum && 884 port1->next_sequence_num_to_send == kInitialSequenceNum) { 885 // Swap the ports' peer information and switch them both into buffering 886 // (eventually proxying) mode. 887 888 std::swap(port0->peer_node_name, port1->peer_node_name); 889 std::swap(port0->peer_port_name, port1->peer_port_name); 890 891 port0->state = Port::kBuffering; 892 if (port0->peer_closed) 893 port0->remove_proxy_on_last_message = true; 894 895 port1->state = Port::kBuffering; 896 if (port1->peer_closed) 897 port1->remove_proxy_on_last_message = true; 898 899 int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name()); 900 int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name()); 901 902 if (rv1 == OK && rv2 == OK) { 903 // If either merged port had a closed peer, its new peer needs to be 904 // informed of this. 905 if (port1->peer_closed) { 906 ObserveClosureEventData data; 907 data.last_sequence_num = port0->last_sequence_num_to_receive; 908 delegate_->ForwardMessage( 909 port0->peer_node_name, 910 NewInternalMessage(port0->peer_port_name, 911 EventType::kObserveClosure, data)); 912 } 913 914 if (port0->peer_closed) { 915 ObserveClosureEventData data; 916 data.last_sequence_num = port1->last_sequence_num_to_receive; 917 delegate_->ForwardMessage( 918 port1->peer_node_name, 919 NewInternalMessage(port1->peer_port_name, 920 EventType::kObserveClosure, data)); 921 } 922 923 return OK; 924 } 925 926 // If either proxy failed to initialize (e.g. had undeliverable messages 927 // or ended up in a bad state somehow), we keep the system in a consistent 928 // state by undoing the peer swap. 929 std::swap(port0->peer_node_name, port1->peer_node_name); 930 std::swap(port0->peer_port_name, port1->peer_port_name); 931 port0->remove_proxy_on_last_message = false; 932 port1->remove_proxy_on_last_message = false; 933 port0->state = Port::kReceiving; 934 port1->state = Port::kReceiving; 935 } 936 937 return ERROR_PORT_STATE_UNEXPECTED; 938} 939 940void Node::WillSendPort(const LockedPort& port, 941 const NodeName& to_node_name, 942 PortName* port_name, 943 PortDescriptor* port_descriptor) { 944 port->lock.AssertAcquired(); 945 946 PortName local_port_name = *port_name; 947 948 PortName new_port_name; 949 delegate_->GenerateRandomPortName(&new_port_name); 950 951 // Make sure we don't send messages to the new peer until after we know it 952 // exists. In the meantime, just buffer messages locally. 953 DCHECK(port->state == Port::kReceiving); 954 port->state = Port::kBuffering; 955 956 // If we already know our peer is closed, we already know this proxy can 957 // be removed once it receives and forwards its last expected message. 958 if (port->peer_closed) 959 port->remove_proxy_on_last_message = true; 960 961 *port_name = new_port_name; 962 963 port_descriptor->peer_node_name = port->peer_node_name; 964 port_descriptor->peer_port_name = port->peer_port_name; 965 port_descriptor->referring_node_name = name_; 966 port_descriptor->referring_port_name = local_port_name; 967 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send; 968 port_descriptor->next_sequence_num_to_receive = 969 port->message_queue.next_sequence_num(); 970 port_descriptor->last_sequence_num_to_receive = 971 port->last_sequence_num_to_receive; 972 port_descriptor->peer_closed = port->peer_closed; 973 memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding)); 974 975 // Configure the local port to point to the new port. 976 port->peer_node_name = to_node_name; 977 port->peer_port_name = new_port_name; 978} 979 980int Node::AcceptPort(const PortName& port_name, 981 const PortDescriptor& port_descriptor) { 982 scoped_refptr<Port> port = make_scoped_refptr( 983 new Port(port_descriptor.next_sequence_num_to_send, 984 port_descriptor.next_sequence_num_to_receive)); 985 port->state = Port::kReceiving; 986 port->peer_node_name = port_descriptor.peer_node_name; 987 port->peer_port_name = port_descriptor.peer_port_name; 988 port->last_sequence_num_to_receive = 989 port_descriptor.last_sequence_num_to_receive; 990 port->peer_closed = port_descriptor.peer_closed; 991 992 DVLOG(2) << "Accepting port " << port_name << " [peer_closed=" 993 << port->peer_closed << "; last_sequence_num_to_receive=" 994 << port->last_sequence_num_to_receive << "]"; 995 996 // A newly accepted port is not signalable until the message referencing the 997 // new port finds its way to the consumer (see GetMessage). 998 port->message_queue.set_signalable(false); 999 1000 int rv = AddPortWithName(port_name, std::move(port)); 1001 if (rv != OK) 1002 return rv; 1003 1004 // Allow referring port to forward messages. 1005 delegate_->ForwardMessage( 1006 port_descriptor.referring_node_name, 1007 NewInternalMessage(port_descriptor.referring_port_name, 1008 EventType::kPortAccepted)); 1009 return OK; 1010} 1011 1012int Node::WillSendMessage_Locked(const LockedPort& port, 1013 const PortName& port_name, 1014 Message* message) { 1015 ports_lock_.AssertAcquired(); 1016 port->lock.AssertAcquired(); 1017 1018 DCHECK(message); 1019 1020 // Messages may already have a sequence number if they're being forwarded 1021 // by a proxy. Otherwise, use the next outgoing sequence number. 1022 uint64_t* sequence_num = 1023 &GetMutableEventData<UserEventData>(message)->sequence_num; 1024 if (*sequence_num == 0) 1025 *sequence_num = port->next_sequence_num_to_send++; 1026 1027#if DCHECK_IS_ON() 1028 std::ostringstream ports_buf; 1029 for (size_t i = 0; i < message->num_ports(); ++i) { 1030 if (i > 0) 1031 ports_buf << ","; 1032 ports_buf << message->ports()[i]; 1033 } 1034#endif 1035 1036 if (message->num_ports() > 0) { 1037 // Note: Another thread could be trying to send the same ports, so we need 1038 // to ensure that they are ours to send before we mutate their state. 1039 1040 std::vector<scoped_refptr<Port>> ports; 1041 ports.resize(message->num_ports()); 1042 1043 { 1044 for (size_t i = 0; i < message->num_ports(); ++i) { 1045 ports[i] = GetPort_Locked(message->ports()[i]); 1046 DCHECK(ports[i]); 1047 1048 ports[i]->lock.Acquire(); 1049 int error = OK; 1050 if (ports[i]->state != Port::kReceiving) 1051 error = ERROR_PORT_STATE_UNEXPECTED; 1052 else if (message->ports()[i] == port->peer_port_name) 1053 error = ERROR_PORT_CANNOT_SEND_PEER; 1054 1055 if (error != OK) { 1056 // Oops, we cannot send this port. 1057 for (size_t j = 0; j <= i; ++j) 1058 ports[i]->lock.Release(); 1059 // Backpedal on the sequence number. 1060 port->next_sequence_num_to_send--; 1061 return error; 1062 } 1063 } 1064 } 1065 1066 PortDescriptor* port_descriptors = 1067 GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message)); 1068 1069 for (size_t i = 0; i < message->num_ports(); ++i) { 1070 WillSendPort(LockedPort(ports[i].get()), 1071 port->peer_node_name, 1072 message->mutable_ports() + i, 1073 port_descriptors + i); 1074 } 1075 1076 for (size_t i = 0; i < message->num_ports(); ++i) 1077 ports[i]->lock.Release(); 1078 } 1079 1080#if DCHECK_IS_ON() 1081 DVLOG(4) << "Sending message " 1082 << GetEventData<UserEventData>(*message)->sequence_num 1083 << " [ports=" << ports_buf.str() << "]" 1084 << " from " << port_name << "@" << name_ 1085 << " to " << port->peer_port_name << "@" << port->peer_node_name; 1086#endif 1087 1088 GetMutableEventHeader(message)->port_name = port->peer_port_name; 1089 return OK; 1090} 1091 1092int Node::BeginProxying_Locked(const LockedPort& port, 1093 const PortName& port_name) { 1094 ports_lock_.AssertAcquired(); 1095 port->lock.AssertAcquired(); 1096 1097 if (port->state != Port::kBuffering) 1098 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 1099 1100 port->state = Port::kProxying; 1101 1102 int rv = ForwardMessages_Locked(LockedPort(port), port_name); 1103 if (rv != OK) 1104 return rv; 1105 1106 // We may have observed closure while buffering. In that case, we can advance 1107 // to removing the proxy without sending out an ObserveProxy message. We 1108 // already know the last expected message, etc. 1109 1110 if (port->remove_proxy_on_last_message) { 1111 MaybeRemoveProxy_Locked(LockedPort(port), port_name); 1112 1113 // Make sure we propagate closure to our current peer. 1114 ObserveClosureEventData data; 1115 data.last_sequence_num = port->last_sequence_num_to_receive; 1116 delegate_->ForwardMessage( 1117 port->peer_node_name, 1118 NewInternalMessage(port->peer_port_name, 1119 EventType::kObserveClosure, data)); 1120 } else { 1121 InitiateProxyRemoval(LockedPort(port), port_name); 1122 } 1123 1124 return OK; 1125} 1126 1127int Node::BeginProxying(PortRef port_ref) { 1128 Port* port = port_ref.port(); 1129 { 1130 base::AutoLock ports_lock(ports_lock_); 1131 base::AutoLock lock(port->lock); 1132 1133 if (port->state != Port::kBuffering) 1134 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 1135 1136 port->state = Port::kProxying; 1137 1138 int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name()); 1139 if (rv != OK) 1140 return rv; 1141 } 1142 1143 bool should_remove; 1144 NodeName peer_node_name; 1145 ScopedMessage closure_message; 1146 { 1147 base::AutoLock lock(port->lock); 1148 if (port->state != Port::kProxying) 1149 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 1150 1151 should_remove = port->remove_proxy_on_last_message; 1152 if (should_remove) { 1153 // Make sure we propagate closure to our current peer. 1154 ObserveClosureEventData data; 1155 data.last_sequence_num = port->last_sequence_num_to_receive; 1156 peer_node_name = port->peer_node_name; 1157 closure_message = NewInternalMessage(port->peer_port_name, 1158 EventType::kObserveClosure, data); 1159 } else { 1160 InitiateProxyRemoval(LockedPort(port), port_ref.name()); 1161 } 1162 } 1163 1164 if (should_remove) { 1165 TryRemoveProxy(port_ref); 1166 delegate_->ForwardMessage(peer_node_name, std::move(closure_message)); 1167 } 1168 1169 return OK; 1170} 1171 1172int Node::ForwardMessages_Locked(const LockedPort& port, 1173 const PortName &port_name) { 1174 ports_lock_.AssertAcquired(); 1175 port->lock.AssertAcquired(); 1176 1177 for (;;) { 1178 ScopedMessage message; 1179 port->message_queue.GetNextMessage(&message, nullptr); 1180 if (!message) 1181 break; 1182 1183 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get()); 1184 if (rv != OK) 1185 return rv; 1186 1187 delegate_->ForwardMessage(port->peer_node_name, std::move(message)); 1188 } 1189 return OK; 1190} 1191 1192void Node::InitiateProxyRemoval(const LockedPort& port, 1193 const PortName& port_name) { 1194 port->lock.AssertAcquired(); 1195 1196 // To remove this node, we start by notifying the connected graph that we are 1197 // a proxy. This allows whatever port is referencing this node to skip it. 1198 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if 1199 // the peer was closed in the meantime). 1200 1201 ObserveProxyEventData data; 1202 data.proxy_node_name = name_; 1203 data.proxy_port_name = port_name; 1204 data.proxy_to_node_name = port->peer_node_name; 1205 data.proxy_to_port_name = port->peer_port_name; 1206 1207 delegate_->ForwardMessage( 1208 port->peer_node_name, 1209 NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data)); 1210} 1211 1212void Node::MaybeRemoveProxy_Locked(const LockedPort& port, 1213 const PortName& port_name) { 1214 // |ports_lock_| must be held so we can potentilaly ErasePort_Locked(). 1215 ports_lock_.AssertAcquired(); 1216 port->lock.AssertAcquired(); 1217 1218 DCHECK(port->state == Port::kProxying); 1219 1220 // Make sure we have seen ObserveProxyAck before removing the port. 1221 if (!port->remove_proxy_on_last_message) 1222 return; 1223 1224 if (!CanAcceptMoreMessages(port.get())) { 1225 // This proxy port is done. We can now remove it! 1226 ErasePort_Locked(port_name); 1227 1228 if (port->send_on_proxy_removal) { 1229 NodeName to_node = port->send_on_proxy_removal->first; 1230 ScopedMessage& message = port->send_on_proxy_removal->second; 1231 1232 delegate_->ForwardMessage(to_node, std::move(message)); 1233 port->send_on_proxy_removal.reset(); 1234 } 1235 } else { 1236 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_ 1237 << " now; waiting for more messages"; 1238 } 1239} 1240 1241void Node::TryRemoveProxy(PortRef port_ref) { 1242 Port* port = port_ref.port(); 1243 bool should_erase = false; 1244 ScopedMessage msg; 1245 NodeName to_node; 1246 { 1247 base::AutoLock lock(port->lock); 1248 1249 // Port already removed. Nothing to do. 1250 if (port->state == Port::kClosed) 1251 return; 1252 1253 DCHECK(port->state == Port::kProxying); 1254 1255 // Make sure we have seen ObserveProxyAck before removing the port. 1256 if (!port->remove_proxy_on_last_message) 1257 return; 1258 1259 if (!CanAcceptMoreMessages(port)) { 1260 // This proxy port is done. We can now remove it! 1261 should_erase = true; 1262 1263 if (port->send_on_proxy_removal) { 1264 to_node = port->send_on_proxy_removal->first; 1265 msg = std::move(port->send_on_proxy_removal->second); 1266 port->send_on_proxy_removal.reset(); 1267 } 1268 } else { 1269 DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_ 1270 << " now; waiting for more messages"; 1271 } 1272 } 1273 1274 if (should_erase) 1275 ErasePort(port_ref.name()); 1276 1277 if (msg) 1278 delegate_->ForwardMessage(to_node, std::move(msg)); 1279} 1280 1281void Node::DestroyAllPortsWithPeer(const NodeName& node_name, 1282 const PortName& port_name) { 1283 // Wipes out all ports whose peer node matches |node_name| and whose peer port 1284 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer 1285 // node is matched. 1286 1287 std::vector<PortRef> ports_to_notify; 1288 std::vector<PortName> dead_proxies_to_broadcast; 1289 std::deque<PortName> referenced_port_names; 1290 1291 { 1292 base::AutoLock ports_lock(ports_lock_); 1293 1294 for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) { 1295 Port* port = iter->second.get(); 1296 { 1297 base::AutoLock port_lock(port->lock); 1298 1299 if (port->peer_node_name == node_name && 1300 (port_name == kInvalidPortName || 1301 port->peer_port_name == port_name)) { 1302 if (!port->peer_closed) { 1303 // Treat this as immediate peer closure. It's an exceptional 1304 // condition akin to a broken pipe, so we don't care about losing 1305 // messages. 1306 1307 port->peer_closed = true; 1308 port->last_sequence_num_to_receive = 1309 port->message_queue.next_sequence_num() - 1; 1310 1311 if (port->state == Port::kReceiving) 1312 ports_to_notify.push_back(PortRef(iter->first, port)); 1313 } 1314 1315 // We don't expect to forward any further messages, and we don't 1316 // expect to receive a Port{Accepted,Rejected} event. Because we're 1317 // a proxy with no active peer, we cannot use the normal proxy removal 1318 // procedure of forward-propagating an ObserveProxy. Instead we 1319 // broadcast our own death so it can be back-propagated. This is 1320 // inefficient but rare. 1321 if (port->state != Port::kReceiving) { 1322 dead_proxies_to_broadcast.push_back(iter->first); 1323 iter->second->message_queue.GetReferencedPorts( 1324 &referenced_port_names); 1325 } 1326 } 1327 } 1328 } 1329 1330 for (const auto& proxy_name : dead_proxies_to_broadcast) { 1331 ports_.erase(proxy_name); 1332 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_; 1333 } 1334 } 1335 1336 // Wake up any receiving ports who have just observed simulated peer closure. 1337 for (const auto& port : ports_to_notify) 1338 delegate_->PortStatusChanged(port); 1339 1340 for (const auto& proxy_name : dead_proxies_to_broadcast) { 1341 // Broadcast an event signifying that this proxy is no longer functioning. 1342 ObserveProxyEventData event; 1343 event.proxy_node_name = name_; 1344 event.proxy_port_name = proxy_name; 1345 event.proxy_to_node_name = kInvalidNodeName; 1346 event.proxy_to_port_name = kInvalidPortName; 1347 delegate_->BroadcastMessage(NewInternalMessage( 1348 kInvalidPortName, EventType::kObserveProxy, event)); 1349 1350 // Also process death locally since the port that points this closed one 1351 // could be on the current node. 1352 // Note: Although this is recursive, only a single port is involved which 1353 // limits the expected branching to 1. 1354 DestroyAllPortsWithPeer(name_, proxy_name); 1355 } 1356 1357 // Close any ports referenced by the closed proxies. 1358 for (const auto& name : referenced_port_names) { 1359 PortRef ref; 1360 if (GetPort(name, &ref) == OK) 1361 ClosePort(ref); 1362 } 1363} 1364 1365ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, 1366 const EventType& type, 1367 const void* data, 1368 size_t num_data_bytes) { 1369 ScopedMessage message; 1370 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); 1371 1372 EventHeader* header = GetMutableEventHeader(message.get()); 1373 header->port_name = port_name; 1374 header->type = type; 1375 header->padding = 0; 1376 1377 if (num_data_bytes) 1378 memcpy(header + 1, data, num_data_bytes); 1379 1380 return message; 1381} 1382 1383} // namespace ports 1384} // namespace edk 1385} // namespace mojo 1386