1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/proxy_message_pipe_endpoint.h"
6
7#include <string.h>
8
9#include "base/logging.h"
10#include "mojo/system/channel.h"
11#include "mojo/system/local_message_pipe_endpoint.h"
12#include "mojo/system/message_pipe_dispatcher.h"
13
14namespace mojo {
15namespace system {
16
17ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
18    : local_id_(MessageInTransit::kInvalidEndpointId),
19      remote_id_(MessageInTransit::kInvalidEndpointId),
20      is_peer_open_(true) {
21}
22
23ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
24    LocalMessagePipeEndpoint* local_message_pipe_endpoint,
25    bool is_peer_open)
26    : local_id_(MessageInTransit::kInvalidEndpointId),
27      remote_id_(MessageInTransit::kInvalidEndpointId),
28      is_peer_open_(is_peer_open),
29      paused_message_queue_(MessageInTransitQueue::PassContents(),
30                            local_message_pipe_endpoint->message_queue()) {
31  local_message_pipe_endpoint->Close();
32}
33
34ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
35  DCHECK(!is_running());
36  DCHECK(!is_attached());
37  AssertConsistentState();
38  DCHECK(paused_message_queue_.IsEmpty());
39}
40
41MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const {
42  return kTypeProxy;
43}
44
45bool ProxyMessagePipeEndpoint::OnPeerClose() {
46  DCHECK(is_peer_open_);
47
48  is_peer_open_ = false;
49
50  // If our outgoing message queue isn't empty, we shouldn't be destroyed yet.
51  if (!paused_message_queue_.IsEmpty())
52    return true;
53
54  if (is_attached()) {
55    if (!is_running()) {
56      // If we're not running yet, we can't be destroyed yet, because we're
57      // still waiting for the "run" message from the other side.
58      return true;
59    }
60
61    Detach();
62  }
63
64  return false;
65}
66
67// Note: We may have to enqueue messages even when our (local) peer isn't open
68// -- it may have been written to and closed immediately, before we were ready.
69// This case is handled in |Run()| (which will call us).
70void ProxyMessagePipeEndpoint::EnqueueMessage(
71    scoped_ptr<MessageInTransit> message) {
72  if (is_running()) {
73    message->SerializeAndCloseDispatchers(channel_.get());
74
75    message->set_source_id(local_id_);
76    message->set_destination_id(remote_id_);
77    if (!channel_->WriteMessage(message.Pass()))
78      LOG(WARNING) << "Failed to write message to channel";
79  } else {
80    paused_message_queue_.AddMessage(message.Pass());
81  }
82}
83
84void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
85                                      MessageInTransit::EndpointId local_id) {
86  DCHECK(channel);
87  DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
88
89  DCHECK(!is_attached());
90
91  AssertConsistentState();
92  channel_ = channel;
93  local_id_ = local_id;
94  AssertConsistentState();
95}
96
97bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
98  // Assertions about arguments:
99  DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
100
101  // Assertions about current state:
102  DCHECK(is_attached());
103  DCHECK(!is_running());
104
105  AssertConsistentState();
106  remote_id_ = remote_id;
107  AssertConsistentState();
108
109  while (!paused_message_queue_.IsEmpty())
110    EnqueueMessage(paused_message_queue_.GetMessage());
111
112  if (is_peer_open_)
113    return true;  // Stay alive.
114
115  // We were just waiting to die.
116  Detach();
117  return false;
118}
119
120void ProxyMessagePipeEndpoint::OnRemove() {
121  Detach();
122}
123
124void ProxyMessagePipeEndpoint::Detach() {
125  DCHECK(is_attached());
126
127  AssertConsistentState();
128  channel_->DetachMessagePipeEndpoint(local_id_, remote_id_);
129  channel_ = NULL;
130  local_id_ = MessageInTransit::kInvalidEndpointId;
131  remote_id_ = MessageInTransit::kInvalidEndpointId;
132  paused_message_queue_.Clear();
133  AssertConsistentState();
134}
135
136#ifndef NDEBUG
137void ProxyMessagePipeEndpoint::AssertConsistentState() const {
138  if (is_attached()) {
139    DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
140  } else {  // Not attached.
141    DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
142    DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
143  }
144}
145#endif
146
147}  // namespace system
148}  // namespace mojo
149