1// Copyright 2016 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/edk/system/ports/node.h"
6
7#include <string.h>
8
9#include <utility>
10
11#include "base/atomicops.h"
12#include "base/logging.h"
13#include "base/memory/ref_counted.h"
14#include "base/synchronization/lock.h"
15#include "mojo/edk/system/ports/node_delegate.h"
16
17namespace mojo {
18namespace edk {
19namespace ports {
20
21namespace {
22
23int DebugError(const char* message, int error_code) {
24  CHECK(false) << "Oops: " << message;
25  return error_code;
26}
27
28#define OOPS(x) DebugError(#x, x)
29
30bool CanAcceptMoreMessages(const Port* port) {
31  // Have we already doled out the last message (i.e., do we expect to NOT
32  // receive further messages)?
33  uint64_t next_sequence_num = port->message_queue.next_sequence_num();
34  if (port->state == Port::kClosed)
35    return false;
36  if (port->peer_closed || port->remove_proxy_on_last_message) {
37    if (port->last_sequence_num_to_receive == next_sequence_num - 1)
38      return false;
39  }
40  return true;
41}
42
43}  // namespace
44
45class Node::LockedPort {
46 public:
47  explicit LockedPort(Port* port) : port_(port) {
48    port_->lock.AssertAcquired();
49  }
50
51  Port* get() const { return port_; }
52  Port* operator->() const { return port_; }
53
54 private:
55  Port* const port_;
56};
57
58Node::Node(const NodeName& name, NodeDelegate* delegate)
59    : name_(name),
60      delegate_(delegate) {
61}
62
63Node::~Node() {
64  if (!ports_.empty())
65    DLOG(WARNING) << "Unclean shutdown for node " << name_;
66}
67
68bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
69  base::AutoLock ports_lock(ports_lock_);
70
71  if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
72#if DCHECK_IS_ON()
73    for (auto entry : ports_) {
74      DVLOG(2) << "Port " << entry.first << " referencing node "
75               << entry.second->peer_node_name << " is blocking shutdown of "
76               << "node " << name_ << " (state=" << entry.second->state << ")";
77    }
78#endif
79    return ports_.empty();
80  }
81
82  DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
83
84  // NOTE: This is not efficient, though it probably doesn't need to be since
85  // relatively few ports should be open during shutdown and shutdown doesn't
86  // need to be blazingly fast.
87  bool can_shutdown = true;
88  for (auto entry : ports_) {
89    base::AutoLock lock(entry.second->lock);
90    if (entry.second->peer_node_name != name_ &&
91        entry.second->state != Port::kReceiving) {
92      can_shutdown = false;
93#if DCHECK_IS_ON()
94      DVLOG(2) << "Port " << entry.first << " referencing node "
95               << entry.second->peer_node_name << " is blocking shutdown of "
96               << "node " << name_ << " (state=" << entry.second->state << ")";
97#else
98      // Exit early when not debugging.
99      break;
100#endif
101    }
102  }
103
104  return can_shutdown;
105}
106
107int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
108  scoped_refptr<Port> port = GetPort(port_name);
109  if (!port)
110    return ERROR_PORT_UNKNOWN;
111
112  *port_ref = PortRef(port_name, std::move(port));
113  return OK;
114}
115
116int Node::CreateUninitializedPort(PortRef* port_ref) {
117  PortName port_name;
118  delegate_->GenerateRandomPortName(&port_name);
119
120  scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
121  int rv = AddPortWithName(port_name, port);
122  if (rv != OK)
123    return rv;
124
125  *port_ref = PortRef(port_name, std::move(port));
126  return OK;
127}
128
129int Node::InitializePort(const PortRef& port_ref,
130                         const NodeName& peer_node_name,
131                         const PortName& peer_port_name) {
132  Port* port = port_ref.port();
133
134  {
135    base::AutoLock lock(port->lock);
136    if (port->state != Port::kUninitialized)
137      return ERROR_PORT_STATE_UNEXPECTED;
138
139    port->state = Port::kReceiving;
140    port->peer_node_name = peer_node_name;
141    port->peer_port_name = peer_port_name;
142  }
143
144  delegate_->PortStatusChanged(port_ref);
145
146  return OK;
147}
148
149int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
150  int rv;
151
152  rv = CreateUninitializedPort(port0_ref);
153  if (rv != OK)
154    return rv;
155
156  rv = CreateUninitializedPort(port1_ref);
157  if (rv != OK)
158    return rv;
159
160  rv = InitializePort(*port0_ref, name_, port1_ref->name());
161  if (rv != OK)
162    return rv;
163
164  rv = InitializePort(*port1_ref, name_, port0_ref->name());
165  if (rv != OK)
166    return rv;
167
168  return OK;
169}
170
171int Node::SetUserData(const PortRef& port_ref,
172                      scoped_refptr<UserData> user_data) {
173  Port* port = port_ref.port();
174
175  base::AutoLock lock(port->lock);
176  if (port->state == Port::kClosed)
177    return ERROR_PORT_STATE_UNEXPECTED;
178
179  port->user_data = std::move(user_data);
180
181  return OK;
182}
183
184int Node::GetUserData(const PortRef& port_ref,
185                      scoped_refptr<UserData>* user_data) {
186  Port* port = port_ref.port();
187
188  base::AutoLock lock(port->lock);
189  if (port->state == Port::kClosed)
190    return ERROR_PORT_STATE_UNEXPECTED;
191
192  *user_data = port->user_data;
193
194  return OK;
195}
196
197int Node::ClosePort(const PortRef& port_ref) {
198  std::deque<PortName> referenced_port_names;
199
200  ObserveClosureEventData data;
201
202  NodeName peer_node_name;
203  PortName peer_port_name;
204  Port* port = port_ref.port();
205  {
206    // We may need to erase the port, which requires ports_lock_ to be held,
207    // but ports_lock_ must be acquired before any individual port locks.
208    base::AutoLock ports_lock(ports_lock_);
209
210    base::AutoLock lock(port->lock);
211    if (port->state == Port::kUninitialized) {
212      // If the port was not yet initialized, there's nothing interesting to do.
213      ErasePort_Locked(port_ref.name());
214      return OK;
215    }
216
217    if (port->state != Port::kReceiving)
218      return ERROR_PORT_STATE_UNEXPECTED;
219
220    port->state = Port::kClosed;
221
222    // We pass along the sequence number of the last message sent from this
223    // port to allow the peer to have the opportunity to consume all inbound
224    // messages before notifying the embedder that this port is closed.
225    data.last_sequence_num = port->next_sequence_num_to_send - 1;
226
227    peer_node_name = port->peer_node_name;
228    peer_port_name = port->peer_port_name;
229
230    // If the port being closed still has unread messages, then we need to take
231    // care to close those ports so as to avoid leaking memory.
232    port->message_queue.GetReferencedPorts(&referenced_port_names);
233
234    ErasePort_Locked(port_ref.name());
235  }
236
237  DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
238           << " to " << peer_port_name << "@" << peer_node_name;
239
240  delegate_->ForwardMessage(
241      peer_node_name,
242      NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
243
244  for (const auto& name : referenced_port_names) {
245    PortRef ref;
246    if (GetPort(name, &ref) == OK)
247      ClosePort(ref);
248  }
249  return OK;
250}
251
252int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
253  Port* port = port_ref.port();
254
255  base::AutoLock lock(port->lock);
256
257  if (port->state != Port::kReceiving)
258    return ERROR_PORT_STATE_UNEXPECTED;
259
260  port_status->has_messages = port->message_queue.HasNextMessage();
261  port_status->receiving_messages = CanAcceptMoreMessages(port);
262  port_status->peer_closed = port->peer_closed;
263  return OK;
264}
265
266int Node::GetMessage(const PortRef& port_ref,
267                     ScopedMessage* message,
268                     MessageFilter* filter) {
269  *message = nullptr;
270
271  DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
272
273  Port* port = port_ref.port();
274  {
275    base::AutoLock lock(port->lock);
276
277    // This could also be treated like the port being unknown since the
278    // embedder should no longer be referring to a port that has been sent.
279    if (port->state != Port::kReceiving)
280      return ERROR_PORT_STATE_UNEXPECTED;
281
282    // Let the embedder get messages until there are no more before reporting
283    // that the peer closed its end.
284    if (!CanAcceptMoreMessages(port))
285      return ERROR_PORT_PEER_CLOSED;
286
287    port->message_queue.GetNextMessage(message, filter);
288  }
289
290  // Allow referenced ports to trigger PortStatusChanged calls.
291  if (*message) {
292    for (size_t i = 0; i < (*message)->num_ports(); ++i) {
293      const PortName& new_port_name = (*message)->ports()[i];
294      scoped_refptr<Port> new_port = GetPort(new_port_name);
295
296      DCHECK(new_port) << "Port " << new_port_name << "@" << name_
297                       << " does not exist!";
298
299      base::AutoLock lock(new_port->lock);
300
301      DCHECK(new_port->state == Port::kReceiving);
302      new_port->message_queue.set_signalable(true);
303    }
304  }
305
306  return OK;
307}
308
309int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) {
310  int rv = SendMessageInternal(port_ref, &message);
311  if (rv != OK) {
312    // If send failed, close all carried ports. Note that we're careful not to
313    // close the sending port itself if it happened to be one of the encoded
314    // ports (an invalid but possible condition.)
315    for (size_t i = 0; i < message->num_ports(); ++i) {
316      if (message->ports()[i] == port_ref.name())
317        continue;
318
319      PortRef port;
320      if (GetPort(message->ports()[i], &port) == OK)
321        ClosePort(port);
322    }
323  }
324  return rv;
325}
326
327int Node::AcceptMessage(ScopedMessage message) {
328  const EventHeader* header = GetEventHeader(*message);
329  switch (header->type) {
330    case EventType::kUser:
331      return OnUserMessage(std::move(message));
332    case EventType::kPortAccepted:
333      return OnPortAccepted(header->port_name);
334    case EventType::kObserveProxy:
335      return OnObserveProxy(
336          header->port_name,
337          *GetEventData<ObserveProxyEventData>(*message));
338    case EventType::kObserveProxyAck:
339      return OnObserveProxyAck(
340          header->port_name,
341          GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
342    case EventType::kObserveClosure:
343      return OnObserveClosure(
344          header->port_name,
345          GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
346    case EventType::kMergePort:
347      return OnMergePort(header->port_name,
348                         *GetEventData<MergePortEventData>(*message));
349  }
350  return OOPS(ERROR_NOT_IMPLEMENTED);
351}
352
353int Node::MergePorts(const PortRef& port_ref,
354                     const NodeName& destination_node_name,
355                     const PortName& destination_port_name) {
356  Port* port = port_ref.port();
357  MergePortEventData data;
358  {
359    base::AutoLock lock(port->lock);
360
361    DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
362             << " to " << destination_port_name << "@" << destination_node_name;
363
364    // Send the port-to-merge over to the destination node so it can be merged
365    // into the port cycle atomically there.
366    data.new_port_name = port_ref.name();
367    WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
368                 &data.new_port_descriptor);
369  }
370  delegate_->ForwardMessage(
371      destination_node_name,
372      NewInternalMessage(destination_port_name,
373                         EventType::kMergePort, data));
374  return OK;
375}
376
377int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
378  Port* port0 = port0_ref.port();
379  Port* port1 = port1_ref.port();
380  int rv;
381  {
382    // |ports_lock_| must be held when acquiring overlapping port locks.
383    base::AutoLock ports_lock(ports_lock_);
384    base::AutoLock port0_lock(port0->lock);
385    base::AutoLock port1_lock(port1->lock);
386
387    DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
388             << " and " << port1_ref.name() << "@" << name_;
389
390    if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
391      rv = ERROR_PORT_STATE_UNEXPECTED;
392    else
393      rv = MergePorts_Locked(port0_ref, port1_ref);
394  }
395
396  if (rv != OK) {
397    ClosePort(port0_ref);
398    ClosePort(port1_ref);
399  }
400
401  return rv;
402}
403
404int Node::LostConnectionToNode(const NodeName& node_name) {
405  // We can no longer send events to the given node. We also can't expect any
406  // PortAccepted events.
407
408  DVLOG(1) << "Observing lost connection from node " << name_
409           << " to node " << node_name;
410
411  DestroyAllPortsWithPeer(node_name, kInvalidPortName);
412  return OK;
413}
414
415int Node::OnUserMessage(ScopedMessage message) {
416  PortName port_name = GetEventHeader(*message)->port_name;
417  const auto* event = GetEventData<UserEventData>(*message);
418
419#if DCHECK_IS_ON()
420  std::ostringstream ports_buf;
421  for (size_t i = 0; i < message->num_ports(); ++i) {
422    if (i > 0)
423      ports_buf << ",";
424    ports_buf << message->ports()[i];
425  }
426
427  DVLOG(4) << "AcceptMessage " << event->sequence_num
428             << " [ports=" << ports_buf.str() << "] at "
429             << port_name << "@" << name_;
430#endif
431
432  scoped_refptr<Port> port = GetPort(port_name);
433
434  // Even if this port does not exist, cannot receive anymore messages or is
435  // buffering or proxying messages, we still need these ports to be bound to
436  // this node. When the message is forwarded, these ports will get transferred
437  // following the usual method. If the message cannot be accepted, then the
438  // newly bound ports will simply be closed.
439
440  for (size_t i = 0; i < message->num_ports(); ++i) {
441    int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
442    if (rv != OK)
443      return rv;
444  }
445
446  bool has_next_message = false;
447  bool message_accepted = false;
448
449  if (port) {
450    // We may want to forward messages once the port lock is held, so we must
451    // acquire |ports_lock_| first.
452    base::AutoLock ports_lock(ports_lock_);
453    base::AutoLock lock(port->lock);
454
455    // Reject spurious messages if we've already received the last expected
456    // message.
457    if (CanAcceptMoreMessages(port.get())) {
458      message_accepted = true;
459      port->message_queue.AcceptMessage(std::move(message), &has_next_message);
460
461      if (port->state == Port::kBuffering) {
462        has_next_message = false;
463      } else if (port->state == Port::kProxying) {
464        has_next_message = false;
465
466        // Forward messages. We forward messages in sequential order here so
467        // that we maintain the message queue's notion of next sequence number.
468        // That's useful for the proxy removal process as we can tell when this
469        // port has seen all of the messages it is expected to see.
470        int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
471        if (rv != OK)
472          return rv;
473
474        MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
475      }
476    }
477  }
478
479  if (!message_accepted) {
480    DVLOG(2) << "Message not accepted!\n";
481    // Close all newly accepted ports as they are effectively orphaned.
482    for (size_t i = 0; i < message->num_ports(); ++i) {
483      PortRef port_ref;
484      if (GetPort(message->ports()[i], &port_ref) == OK) {
485        ClosePort(port_ref);
486      } else {
487        DLOG(WARNING) << "Cannot close non-existent port!\n";
488      }
489    }
490  } else if (has_next_message) {
491    PortRef port_ref(port_name, port);
492    delegate_->PortStatusChanged(port_ref);
493  }
494
495  return OK;
496}
497
498int Node::OnPortAccepted(const PortName& port_name) {
499  scoped_refptr<Port> port = GetPort(port_name);
500  if (!port)
501    return ERROR_PORT_UNKNOWN;
502
503  DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
504           << " pointing to "
505           << port->peer_port_name << "@" << port->peer_node_name;
506
507  return BeginProxying(PortRef(port_name, std::move(port)));
508}
509
510int Node::OnObserveProxy(const PortName& port_name,
511                         const ObserveProxyEventData& event) {
512  if (port_name == kInvalidPortName) {
513    // An ObserveProxy with an invalid target port name is a broadcast used to
514    // inform ports when their peer (which was itself a proxy) has become
515    // defunct due to unexpected node disconnection.
516    //
517    // Receiving ports affected by this treat it as equivalent to peer closure.
518    // Proxies affected by this can be removed and will in turn broadcast their
519    // own death with a similar message.
520    CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
521    CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
522    DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
523    return OK;
524  }
525
526  // The port may have already been closed locally, in which case the
527  // ObserveClosure message will contain the last_sequence_num field.
528  // We can then silently ignore this message.
529  scoped_refptr<Port> port = GetPort(port_name);
530  if (!port) {
531    DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
532    return OK;
533  }
534
535  DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
536           << event.proxy_port_name << "@"
537           << event.proxy_node_name << " pointing to "
538           << event.proxy_to_port_name << "@"
539           << event.proxy_to_node_name;
540
541  {
542    base::AutoLock lock(port->lock);
543
544    if (port->peer_node_name == event.proxy_node_name &&
545        port->peer_port_name == event.proxy_port_name) {
546      if (port->state == Port::kReceiving) {
547        port->peer_node_name = event.proxy_to_node_name;
548        port->peer_port_name = event.proxy_to_port_name;
549
550        ObserveProxyAckEventData ack;
551        ack.last_sequence_num = port->next_sequence_num_to_send - 1;
552
553        delegate_->ForwardMessage(
554            event.proxy_node_name,
555            NewInternalMessage(event.proxy_port_name,
556                               EventType::kObserveProxyAck,
557                               ack));
558      } else {
559        // As a proxy ourselves, we don't know how to honor the ObserveProxy
560        // event or to populate the last_sequence_num field of ObserveProxyAck.
561        // Afterall, another port could be sending messages to our peer now
562        // that we've sent out our own ObserveProxy event.  Instead, we will
563        // send an ObserveProxyAck indicating that the ObserveProxy event
564        // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
565        // However, this has to be done after we are removed as a proxy.
566        // Otherwise, we might just find ourselves back here again, which
567        // would be akin to a busy loop.
568
569        DVLOG(2) << "Delaying ObserveProxyAck to "
570                 << event.proxy_port_name << "@" << event.proxy_node_name;
571
572        ObserveProxyAckEventData ack;
573        ack.last_sequence_num = kInvalidSequenceNum;
574
575        port->send_on_proxy_removal.reset(
576            new std::pair<NodeName, ScopedMessage>(
577                event.proxy_node_name,
578                NewInternalMessage(event.proxy_port_name,
579                                   EventType::kObserveProxyAck,
580                                   ack)));
581      }
582    } else {
583      // Forward this event along to our peer. Eventually, it should find the
584      // port referring to the proxy.
585      delegate_->ForwardMessage(
586          port->peer_node_name,
587          NewInternalMessage(port->peer_port_name,
588                             EventType::kObserveProxy,
589                             event));
590    }
591  }
592  return OK;
593}
594
595int Node::OnObserveProxyAck(const PortName& port_name,
596                            uint64_t last_sequence_num) {
597  DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
598           << " (last_sequence_num=" << last_sequence_num << ")";
599
600  scoped_refptr<Port> port = GetPort(port_name);
601  if (!port)
602    return ERROR_PORT_UNKNOWN;  // The port may have observed closure first, so
603                                // this is not an "Oops".
604
605  {
606    base::AutoLock lock(port->lock);
607
608    if (port->state != Port::kProxying)
609      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
610
611    if (last_sequence_num == kInvalidSequenceNum) {
612      // Send again.
613      InitiateProxyRemoval(LockedPort(port.get()), port_name);
614      return OK;
615    }
616
617    // We can now remove this port once we have received and forwarded the last
618    // message addressed to this port.
619    port->remove_proxy_on_last_message = true;
620    port->last_sequence_num_to_receive = last_sequence_num;
621  }
622  TryRemoveProxy(PortRef(port_name, std::move(port)));
623  return OK;
624}
625
626int Node::OnObserveClosure(const PortName& port_name,
627                           uint64_t last_sequence_num) {
628  // OK if the port doesn't exist, as it may have been closed already.
629  scoped_refptr<Port> port = GetPort(port_name);
630  if (!port)
631    return OK;
632
633  // This message tells the port that it should no longer expect more messages
634  // beyond last_sequence_num. This message is forwarded along until we reach
635  // the receiving end, and this message serves as an equivalent to
636  // ObserveProxyAck.
637
638  bool notify_delegate = false;
639  ObserveClosureEventData forwarded_data;
640  NodeName peer_node_name;
641  PortName peer_port_name;
642  bool try_remove_proxy = false;
643  {
644    base::AutoLock lock(port->lock);
645
646    port->peer_closed = true;
647    port->last_sequence_num_to_receive = last_sequence_num;
648
649    DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
650             << " (state=" << port->state << ") pointing to "
651             << port->peer_port_name << "@" << port->peer_node_name
652             << " (last_sequence_num=" << last_sequence_num << ")";
653
654    // We always forward ObserveClosure, even beyond the receiving port which
655    // cares about it. This ensures that any dead-end proxies beyond that port
656    // are notified to remove themselves.
657
658    if (port->state == Port::kReceiving) {
659      notify_delegate = true;
660
661      // When forwarding along the other half of the port cycle, this will only
662      // reach dead-end proxies. Tell them we've sent our last message so they
663      // can go away.
664      //
665      // TODO: Repurposing ObserveClosure for this has the desired result but
666      // may be semantically confusing since the forwarding port is not actually
667      // closed. Consider replacing this with a new event type.
668      forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
669    } else {
670      // We haven't yet reached the receiving peer of the closed port, so
671      // forward the message along as-is.
672      forwarded_data.last_sequence_num = last_sequence_num;
673
674      // See about removing the port if it is a proxy as our peer won't be able
675      // to participate in proxy removal.
676      port->remove_proxy_on_last_message = true;
677      if (port->state == Port::kProxying)
678        try_remove_proxy = true;
679    }
680
681    DVLOG(2) << "Forwarding ObserveClosure from "
682             << port_name << "@" << name_ << " to peer "
683             << port->peer_port_name << "@" << port->peer_node_name
684             << " (last_sequence_num=" << forwarded_data.last_sequence_num
685             << ")";
686
687    peer_node_name = port->peer_node_name;
688    peer_port_name = port->peer_port_name;
689  }
690  if (try_remove_proxy)
691    TryRemoveProxy(PortRef(port_name, port));
692
693  delegate_->ForwardMessage(
694      peer_node_name,
695      NewInternalMessage(peer_port_name, EventType::kObserveClosure,
696                         forwarded_data));
697
698  if (notify_delegate) {
699    PortRef port_ref(port_name, std::move(port));
700    delegate_->PortStatusChanged(port_ref);
701  }
702  return OK;
703}
704
705int Node::OnMergePort(const PortName& port_name,
706                      const MergePortEventData& event) {
707  scoped_refptr<Port> port = GetPort(port_name);
708
709  DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
710           << (port ? port->state : -1) << ") merging with proxy "
711           << event.new_port_name
712           << "@" << name_ << " pointing to "
713           << event.new_port_descriptor.peer_port_name << "@"
714           << event.new_port_descriptor.peer_node_name << " referred by "
715           << event.new_port_descriptor.referring_port_name << "@"
716           << event.new_port_descriptor.referring_node_name;
717
718  bool close_target_port = false;
719  bool close_new_port = false;
720
721  // Accept the new port. This is now the receiving end of the other port cycle
722  // to be merged with ours.
723  int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
724  if (rv != OK) {
725    close_target_port = true;
726  } else if (port) {
727    // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
728    // needs to hold |ports_lock_|. We also acquire multiple port locks within.
729    base::AutoLock ports_lock(ports_lock_);
730    base::AutoLock lock(port->lock);
731
732    if (port->state != Port::kReceiving) {
733      close_new_port = true;
734    } else {
735      scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
736      base::AutoLock new_port_lock(new_port->lock);
737      DCHECK(new_port->state == Port::kReceiving);
738
739      // Both ports are locked. Now all we have to do is swap their peer
740      // information and set them up as proxies.
741
742      PortRef port0_ref(port_name, port);
743      PortRef port1_ref(event.new_port_name, new_port);
744      int rv = MergePorts_Locked(port0_ref, port1_ref);
745      if (rv == OK)
746        return rv;
747
748      close_new_port = true;
749      close_target_port = true;
750    }
751  } else {
752    close_new_port = true;
753  }
754
755  if (close_target_port) {
756    PortRef target_port;
757    rv = GetPort(port_name, &target_port);
758    DCHECK(rv == OK);
759
760    ClosePort(target_port);
761  }
762
763  if (close_new_port) {
764    PortRef new_port;
765    rv = GetPort(event.new_port_name, &new_port);
766    DCHECK(rv == OK);
767
768    ClosePort(new_port);
769  }
770
771  return ERROR_PORT_STATE_UNEXPECTED;
772}
773
774int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
775  base::AutoLock lock(ports_lock_);
776
777  if (!ports_.insert(std::make_pair(port_name, std::move(port))).second)
778    return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
779
780  DVLOG(2) << "Created port " << port_name << "@" << name_;
781  return OK;
782}
783
784void Node::ErasePort(const PortName& port_name) {
785  base::AutoLock lock(ports_lock_);
786  ErasePort_Locked(port_name);
787}
788
789void Node::ErasePort_Locked(const PortName& port_name) {
790  ports_lock_.AssertAcquired();
791  ports_.erase(port_name);
792  DVLOG(2) << "Deleted port " << port_name << "@" << name_;
793}
794
795scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
796  base::AutoLock lock(ports_lock_);
797  return GetPort_Locked(port_name);
798}
799
800scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
801  ports_lock_.AssertAcquired();
802  auto iter = ports_.find(port_name);
803  if (iter == ports_.end())
804    return nullptr;
805
806#if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
807  // Workaround for https://crbug.com/665869.
808  base::subtle::MemoryBarrier();
809#endif
810
811  return iter->second;
812}
813
814int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) {
815  ScopedMessage& m = *message;
816  for (size_t i = 0; i < m->num_ports(); ++i) {
817    if (m->ports()[i] == port_ref.name())
818      return ERROR_PORT_CANNOT_SEND_SELF;
819  }
820
821  Port* port = port_ref.port();
822  NodeName peer_node_name;
823  {
824    // We must acquire |ports_lock_| before grabbing any port locks, because
825    // WillSendMessage_Locked may need to lock multiple ports out of order.
826    base::AutoLock ports_lock(ports_lock_);
827    base::AutoLock lock(port->lock);
828
829    if (port->state != Port::kReceiving)
830      return ERROR_PORT_STATE_UNEXPECTED;
831
832    if (port->peer_closed)
833      return ERROR_PORT_PEER_CLOSED;
834
835    int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
836    if (rv != OK)
837      return rv;
838
839    // Beyond this point there's no sense in returning anything but OK. Even if
840    // message forwarding or acceptance fails, there's nothing the embedder can
841    // do to recover. Assume that failure beyond this point must be treated as a
842    // transport failure.
843
844    peer_node_name = port->peer_node_name;
845  }
846
847  if (peer_node_name != name_) {
848    delegate_->ForwardMessage(peer_node_name, std::move(m));
849    return OK;
850  }
851
852  int rv = AcceptMessage(std::move(m));
853  if (rv != OK) {
854    // See comment above for why we don't return an error in this case.
855    DVLOG(2) << "AcceptMessage failed: " << rv;
856  }
857
858  return OK;
859}
860
861int Node::MergePorts_Locked(const PortRef& port0_ref,
862                            const PortRef& port1_ref) {
863  Port* port0 = port0_ref.port();
864  Port* port1 = port1_ref.port();
865
866  ports_lock_.AssertAcquired();
867  port0->lock.AssertAcquired();
868  port1->lock.AssertAcquired();
869
870  CHECK(port0->state == Port::kReceiving);
871  CHECK(port1->state == Port::kReceiving);
872
873  // Ports cannot be merged with their own receiving peer!
874  if (port0->peer_node_name == name_ &&
875      port0->peer_port_name == port1_ref.name())
876    return ERROR_PORT_STATE_UNEXPECTED;
877
878  if (port1->peer_node_name == name_ &&
879      port1->peer_port_name == port0_ref.name())
880    return ERROR_PORT_STATE_UNEXPECTED;
881
882  // Only merge if both ports have never sent a message.
883  if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
884      port1->next_sequence_num_to_send == kInitialSequenceNum) {
885    // Swap the ports' peer information and switch them both into buffering
886    // (eventually proxying) mode.
887
888    std::swap(port0->peer_node_name, port1->peer_node_name);
889    std::swap(port0->peer_port_name, port1->peer_port_name);
890
891    port0->state = Port::kBuffering;
892    if (port0->peer_closed)
893      port0->remove_proxy_on_last_message = true;
894
895    port1->state = Port::kBuffering;
896    if (port1->peer_closed)
897      port1->remove_proxy_on_last_message = true;
898
899    int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name());
900    int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name());
901
902    if (rv1 == OK && rv2 == OK) {
903      // If either merged port had a closed peer, its new peer needs to be
904      // informed of this.
905      if (port1->peer_closed) {
906        ObserveClosureEventData data;
907        data.last_sequence_num = port0->last_sequence_num_to_receive;
908        delegate_->ForwardMessage(
909            port0->peer_node_name,
910            NewInternalMessage(port0->peer_port_name,
911                               EventType::kObserveClosure, data));
912      }
913
914      if (port0->peer_closed) {
915        ObserveClosureEventData data;
916        data.last_sequence_num = port1->last_sequence_num_to_receive;
917        delegate_->ForwardMessage(
918            port1->peer_node_name,
919            NewInternalMessage(port1->peer_port_name,
920                               EventType::kObserveClosure, data));
921      }
922
923      return OK;
924    }
925
926    // If either proxy failed to initialize (e.g. had undeliverable messages
927    // or ended up in a bad state somehow), we keep the system in a consistent
928    // state by undoing the peer swap.
929    std::swap(port0->peer_node_name, port1->peer_node_name);
930    std::swap(port0->peer_port_name, port1->peer_port_name);
931    port0->remove_proxy_on_last_message = false;
932    port1->remove_proxy_on_last_message = false;
933    port0->state = Port::kReceiving;
934    port1->state = Port::kReceiving;
935  }
936
937  return ERROR_PORT_STATE_UNEXPECTED;
938}
939
940void Node::WillSendPort(const LockedPort& port,
941                        const NodeName& to_node_name,
942                        PortName* port_name,
943                        PortDescriptor* port_descriptor) {
944  port->lock.AssertAcquired();
945
946  PortName local_port_name = *port_name;
947
948  PortName new_port_name;
949  delegate_->GenerateRandomPortName(&new_port_name);
950
951  // Make sure we don't send messages to the new peer until after we know it
952  // exists. In the meantime, just buffer messages locally.
953  DCHECK(port->state == Port::kReceiving);
954  port->state = Port::kBuffering;
955
956  // If we already know our peer is closed, we already know this proxy can
957  // be removed once it receives and forwards its last expected message.
958  if (port->peer_closed)
959    port->remove_proxy_on_last_message = true;
960
961  *port_name = new_port_name;
962
963  port_descriptor->peer_node_name = port->peer_node_name;
964  port_descriptor->peer_port_name = port->peer_port_name;
965  port_descriptor->referring_node_name = name_;
966  port_descriptor->referring_port_name = local_port_name;
967  port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
968  port_descriptor->next_sequence_num_to_receive =
969      port->message_queue.next_sequence_num();
970  port_descriptor->last_sequence_num_to_receive =
971      port->last_sequence_num_to_receive;
972  port_descriptor->peer_closed = port->peer_closed;
973  memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
974
975  // Configure the local port to point to the new port.
976  port->peer_node_name = to_node_name;
977  port->peer_port_name = new_port_name;
978}
979
980int Node::AcceptPort(const PortName& port_name,
981                     const PortDescriptor& port_descriptor) {
982  scoped_refptr<Port> port = make_scoped_refptr(
983      new Port(port_descriptor.next_sequence_num_to_send,
984               port_descriptor.next_sequence_num_to_receive));
985  port->state = Port::kReceiving;
986  port->peer_node_name = port_descriptor.peer_node_name;
987  port->peer_port_name = port_descriptor.peer_port_name;
988  port->last_sequence_num_to_receive =
989      port_descriptor.last_sequence_num_to_receive;
990  port->peer_closed = port_descriptor.peer_closed;
991
992  DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
993           << port->peer_closed << "; last_sequence_num_to_receive="
994           << port->last_sequence_num_to_receive << "]";
995
996  // A newly accepted port is not signalable until the message referencing the
997  // new port finds its way to the consumer (see GetMessage).
998  port->message_queue.set_signalable(false);
999
1000  int rv = AddPortWithName(port_name, std::move(port));
1001  if (rv != OK)
1002    return rv;
1003
1004  // Allow referring port to forward messages.
1005  delegate_->ForwardMessage(
1006      port_descriptor.referring_node_name,
1007      NewInternalMessage(port_descriptor.referring_port_name,
1008                         EventType::kPortAccepted));
1009  return OK;
1010}
1011
1012int Node::WillSendMessage_Locked(const LockedPort& port,
1013                                 const PortName& port_name,
1014                                 Message* message) {
1015  ports_lock_.AssertAcquired();
1016  port->lock.AssertAcquired();
1017
1018  DCHECK(message);
1019
1020  // Messages may already have a sequence number if they're being forwarded
1021  // by a proxy. Otherwise, use the next outgoing sequence number.
1022  uint64_t* sequence_num =
1023      &GetMutableEventData<UserEventData>(message)->sequence_num;
1024  if (*sequence_num == 0)
1025    *sequence_num = port->next_sequence_num_to_send++;
1026
1027#if DCHECK_IS_ON()
1028  std::ostringstream ports_buf;
1029  for (size_t i = 0; i < message->num_ports(); ++i) {
1030    if (i > 0)
1031      ports_buf << ",";
1032    ports_buf << message->ports()[i];
1033  }
1034#endif
1035
1036  if (message->num_ports() > 0) {
1037    // Note: Another thread could be trying to send the same ports, so we need
1038    // to ensure that they are ours to send before we mutate their state.
1039
1040    std::vector<scoped_refptr<Port>> ports;
1041    ports.resize(message->num_ports());
1042
1043    {
1044      for (size_t i = 0; i < message->num_ports(); ++i) {
1045        ports[i] = GetPort_Locked(message->ports()[i]);
1046        DCHECK(ports[i]);
1047
1048        ports[i]->lock.Acquire();
1049        int error = OK;
1050        if (ports[i]->state != Port::kReceiving)
1051          error = ERROR_PORT_STATE_UNEXPECTED;
1052        else if (message->ports()[i] == port->peer_port_name)
1053          error = ERROR_PORT_CANNOT_SEND_PEER;
1054
1055        if (error != OK) {
1056          // Oops, we cannot send this port.
1057          for (size_t j = 0; j <= i; ++j)
1058            ports[i]->lock.Release();
1059          // Backpedal on the sequence number.
1060          port->next_sequence_num_to_send--;
1061          return error;
1062        }
1063      }
1064    }
1065
1066    PortDescriptor* port_descriptors =
1067        GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
1068
1069    for (size_t i = 0; i < message->num_ports(); ++i) {
1070      WillSendPort(LockedPort(ports[i].get()),
1071                   port->peer_node_name,
1072                   message->mutable_ports() + i,
1073                   port_descriptors + i);
1074    }
1075
1076    for (size_t i = 0; i < message->num_ports(); ++i)
1077      ports[i]->lock.Release();
1078  }
1079
1080#if DCHECK_IS_ON()
1081  DVLOG(4) << "Sending message "
1082           << GetEventData<UserEventData>(*message)->sequence_num
1083           << " [ports=" << ports_buf.str() << "]"
1084           << " from " << port_name << "@" << name_
1085           << " to " << port->peer_port_name << "@" << port->peer_node_name;
1086#endif
1087
1088  GetMutableEventHeader(message)->port_name = port->peer_port_name;
1089  return OK;
1090}
1091
1092int Node::BeginProxying_Locked(const LockedPort& port,
1093                               const PortName& port_name) {
1094  ports_lock_.AssertAcquired();
1095  port->lock.AssertAcquired();
1096
1097  if (port->state != Port::kBuffering)
1098    return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1099
1100  port->state = Port::kProxying;
1101
1102  int rv = ForwardMessages_Locked(LockedPort(port), port_name);
1103  if (rv != OK)
1104    return rv;
1105
1106  // We may have observed closure while buffering. In that case, we can advance
1107  // to removing the proxy without sending out an ObserveProxy message. We
1108  // already know the last expected message, etc.
1109
1110  if (port->remove_proxy_on_last_message) {
1111    MaybeRemoveProxy_Locked(LockedPort(port), port_name);
1112
1113    // Make sure we propagate closure to our current peer.
1114    ObserveClosureEventData data;
1115    data.last_sequence_num = port->last_sequence_num_to_receive;
1116    delegate_->ForwardMessage(
1117        port->peer_node_name,
1118        NewInternalMessage(port->peer_port_name,
1119                           EventType::kObserveClosure, data));
1120  } else {
1121    InitiateProxyRemoval(LockedPort(port), port_name);
1122  }
1123
1124  return OK;
1125}
1126
1127int Node::BeginProxying(PortRef port_ref) {
1128  Port* port = port_ref.port();
1129  {
1130    base::AutoLock ports_lock(ports_lock_);
1131    base::AutoLock lock(port->lock);
1132
1133    if (port->state != Port::kBuffering)
1134      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1135
1136    port->state = Port::kProxying;
1137
1138    int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
1139    if (rv != OK)
1140      return rv;
1141  }
1142
1143  bool should_remove;
1144  NodeName peer_node_name;
1145  ScopedMessage closure_message;
1146  {
1147    base::AutoLock lock(port->lock);
1148    if (port->state != Port::kProxying)
1149      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1150
1151    should_remove = port->remove_proxy_on_last_message;
1152    if (should_remove) {
1153      // Make sure we propagate closure to our current peer.
1154      ObserveClosureEventData data;
1155      data.last_sequence_num = port->last_sequence_num_to_receive;
1156      peer_node_name = port->peer_node_name;
1157      closure_message = NewInternalMessage(port->peer_port_name,
1158                                           EventType::kObserveClosure, data);
1159    } else {
1160      InitiateProxyRemoval(LockedPort(port), port_ref.name());
1161    }
1162  }
1163
1164  if (should_remove) {
1165    TryRemoveProxy(port_ref);
1166    delegate_->ForwardMessage(peer_node_name, std::move(closure_message));
1167  }
1168
1169  return OK;
1170}
1171
1172int Node::ForwardMessages_Locked(const LockedPort& port,
1173                                 const PortName &port_name) {
1174  ports_lock_.AssertAcquired();
1175  port->lock.AssertAcquired();
1176
1177  for (;;) {
1178    ScopedMessage message;
1179    port->message_queue.GetNextMessage(&message, nullptr);
1180    if (!message)
1181      break;
1182
1183    int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
1184    if (rv != OK)
1185      return rv;
1186
1187    delegate_->ForwardMessage(port->peer_node_name, std::move(message));
1188  }
1189  return OK;
1190}
1191
1192void Node::InitiateProxyRemoval(const LockedPort& port,
1193                                const PortName& port_name) {
1194  port->lock.AssertAcquired();
1195
1196  // To remove this node, we start by notifying the connected graph that we are
1197  // a proxy. This allows whatever port is referencing this node to skip it.
1198  // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1199  // the peer was closed in the meantime).
1200
1201  ObserveProxyEventData data;
1202  data.proxy_node_name = name_;
1203  data.proxy_port_name = port_name;
1204  data.proxy_to_node_name = port->peer_node_name;
1205  data.proxy_to_port_name = port->peer_port_name;
1206
1207  delegate_->ForwardMessage(
1208      port->peer_node_name,
1209      NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
1210}
1211
1212void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
1213                                   const PortName& port_name) {
1214  // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
1215  ports_lock_.AssertAcquired();
1216  port->lock.AssertAcquired();
1217
1218  DCHECK(port->state == Port::kProxying);
1219
1220  // Make sure we have seen ObserveProxyAck before removing the port.
1221  if (!port->remove_proxy_on_last_message)
1222    return;
1223
1224  if (!CanAcceptMoreMessages(port.get())) {
1225    // This proxy port is done. We can now remove it!
1226    ErasePort_Locked(port_name);
1227
1228    if (port->send_on_proxy_removal) {
1229      NodeName to_node = port->send_on_proxy_removal->first;
1230      ScopedMessage& message = port->send_on_proxy_removal->second;
1231
1232      delegate_->ForwardMessage(to_node, std::move(message));
1233      port->send_on_proxy_removal.reset();
1234    }
1235  } else {
1236    DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
1237             << " now; waiting for more messages";
1238  }
1239}
1240
1241void Node::TryRemoveProxy(PortRef port_ref) {
1242  Port* port = port_ref.port();
1243  bool should_erase = false;
1244  ScopedMessage msg;
1245  NodeName to_node;
1246  {
1247    base::AutoLock lock(port->lock);
1248
1249    // Port already removed. Nothing to do.
1250    if (port->state == Port::kClosed)
1251      return;
1252
1253    DCHECK(port->state == Port::kProxying);
1254
1255    // Make sure we have seen ObserveProxyAck before removing the port.
1256    if (!port->remove_proxy_on_last_message)
1257      return;
1258
1259    if (!CanAcceptMoreMessages(port)) {
1260      // This proxy port is done. We can now remove it!
1261      should_erase = true;
1262
1263      if (port->send_on_proxy_removal) {
1264        to_node = port->send_on_proxy_removal->first;
1265        msg = std::move(port->send_on_proxy_removal->second);
1266        port->send_on_proxy_removal.reset();
1267      }
1268    } else {
1269      DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1270               << " now; waiting for more messages";
1271    }
1272  }
1273
1274  if (should_erase)
1275    ErasePort(port_ref.name());
1276
1277  if (msg)
1278    delegate_->ForwardMessage(to_node, std::move(msg));
1279}
1280
1281void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1282                                   const PortName& port_name) {
1283  // Wipes out all ports whose peer node matches |node_name| and whose peer port
1284  // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1285  // node is matched.
1286
1287  std::vector<PortRef> ports_to_notify;
1288  std::vector<PortName> dead_proxies_to_broadcast;
1289  std::deque<PortName> referenced_port_names;
1290
1291  {
1292    base::AutoLock ports_lock(ports_lock_);
1293
1294    for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
1295      Port* port = iter->second.get();
1296      {
1297        base::AutoLock port_lock(port->lock);
1298
1299        if (port->peer_node_name == node_name &&
1300              (port_name == kInvalidPortName ||
1301                    port->peer_port_name == port_name)) {
1302          if (!port->peer_closed) {
1303            // Treat this as immediate peer closure. It's an exceptional
1304            // condition akin to a broken pipe, so we don't care about losing
1305            // messages.
1306
1307            port->peer_closed = true;
1308            port->last_sequence_num_to_receive =
1309                port->message_queue.next_sequence_num() - 1;
1310
1311            if (port->state == Port::kReceiving)
1312              ports_to_notify.push_back(PortRef(iter->first, port));
1313          }
1314
1315          // We don't expect to forward any further messages, and we don't
1316          // expect to receive a Port{Accepted,Rejected} event. Because we're
1317          // a proxy with no active peer, we cannot use the normal proxy removal
1318          // procedure of forward-propagating an ObserveProxy. Instead we
1319          // broadcast our own death so it can be back-propagated. This is
1320          // inefficient but rare.
1321          if (port->state != Port::kReceiving) {
1322            dead_proxies_to_broadcast.push_back(iter->first);
1323            iter->second->message_queue.GetReferencedPorts(
1324                &referenced_port_names);
1325          }
1326        }
1327      }
1328    }
1329
1330    for (const auto& proxy_name : dead_proxies_to_broadcast) {
1331      ports_.erase(proxy_name);
1332      DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1333    }
1334  }
1335
1336  // Wake up any receiving ports who have just observed simulated peer closure.
1337  for (const auto& port : ports_to_notify)
1338    delegate_->PortStatusChanged(port);
1339
1340  for (const auto& proxy_name : dead_proxies_to_broadcast) {
1341    // Broadcast an event signifying that this proxy is no longer functioning.
1342    ObserveProxyEventData event;
1343    event.proxy_node_name = name_;
1344    event.proxy_port_name = proxy_name;
1345    event.proxy_to_node_name = kInvalidNodeName;
1346    event.proxy_to_port_name = kInvalidPortName;
1347    delegate_->BroadcastMessage(NewInternalMessage(
1348        kInvalidPortName, EventType::kObserveProxy, event));
1349
1350    // Also process death locally since the port that points this closed one
1351    // could be on the current node.
1352    // Note: Although this is recursive, only a single port is involved which
1353    // limits the expected branching to 1.
1354    DestroyAllPortsWithPeer(name_, proxy_name);
1355  }
1356
1357  // Close any ports referenced by the closed proxies.
1358  for (const auto& name : referenced_port_names) {
1359    PortRef ref;
1360    if (GetPort(name, &ref) == OK)
1361      ClosePort(ref);
1362  }
1363}
1364
1365ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
1366                                              const EventType& type,
1367                                              const void* data,
1368                                              size_t num_data_bytes) {
1369  ScopedMessage message;
1370  delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
1371
1372  EventHeader* header = GetMutableEventHeader(message.get());
1373  header->port_name = port_name;
1374  header->type = type;
1375  header->padding = 0;
1376
1377  if (num_data_bytes)
1378    memcpy(header + 1, data, num_data_bytes);
1379
1380  return message;
1381}
1382
1383}  // namespace ports
1384}  // namespace edk
1385}  // namespace mojo
1386