1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/proxy_message_pipe_endpoint.h" 6 7#include <string.h> 8 9#include "base/logging.h" 10#include "mojo/system/channel.h" 11#include "mojo/system/local_message_pipe_endpoint.h" 12#include "mojo/system/message_pipe_dispatcher.h" 13 14namespace mojo { 15namespace system { 16 17ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() 18 : local_id_(MessageInTransit::kInvalidEndpointId), 19 remote_id_(MessageInTransit::kInvalidEndpointId), 20 is_peer_open_(true) { 21} 22 23ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( 24 LocalMessagePipeEndpoint* local_message_pipe_endpoint, 25 bool is_peer_open) 26 : local_id_(MessageInTransit::kInvalidEndpointId), 27 remote_id_(MessageInTransit::kInvalidEndpointId), 28 is_peer_open_(is_peer_open), 29 paused_message_queue_(MessageInTransitQueue::PassContents(), 30 local_message_pipe_endpoint->message_queue()) { 31 local_message_pipe_endpoint->Close(); 32} 33 34ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { 35 DCHECK(!is_running()); 36 DCHECK(!is_attached()); 37 AssertConsistentState(); 38 DCHECK(paused_message_queue_.IsEmpty()); 39} 40 41MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { 42 return kTypeProxy; 43} 44 45bool ProxyMessagePipeEndpoint::OnPeerClose() { 46 DCHECK(is_peer_open_); 47 48 is_peer_open_ = false; 49 50 // If our outgoing message queue isn't empty, we shouldn't be destroyed yet. 51 if (!paused_message_queue_.IsEmpty()) 52 return true; 53 54 if (is_attached()) { 55 if (!is_running()) { 56 // If we're not running yet, we can't be destroyed yet, because we're 57 // still waiting for the "run" message from the other side. 58 return true; 59 } 60 61 Detach(); 62 } 63 64 return false; 65} 66 67// Note: We may have to enqueue messages even when our (local) peer isn't open 68// -- it may have been written to and closed immediately, before we were ready. 69// This case is handled in |Run()| (which will call us). 70void ProxyMessagePipeEndpoint::EnqueueMessage( 71 scoped_ptr<MessageInTransit> message) { 72 if (is_running()) { 73 message->SerializeAndCloseDispatchers(channel_.get()); 74 75 message->set_source_id(local_id_); 76 message->set_destination_id(remote_id_); 77 if (!channel_->WriteMessage(message.Pass())) 78 LOG(WARNING) << "Failed to write message to channel"; 79 } else { 80 paused_message_queue_.AddMessage(message.Pass()); 81 } 82} 83 84void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, 85 MessageInTransit::EndpointId local_id) { 86 DCHECK(channel); 87 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 88 89 DCHECK(!is_attached()); 90 91 AssertConsistentState(); 92 channel_ = channel; 93 local_id_ = local_id; 94 AssertConsistentState(); 95} 96 97bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { 98 // Assertions about arguments: 99 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); 100 101 // Assertions about current state: 102 DCHECK(is_attached()); 103 DCHECK(!is_running()); 104 105 AssertConsistentState(); 106 remote_id_ = remote_id; 107 AssertConsistentState(); 108 109 while (!paused_message_queue_.IsEmpty()) 110 EnqueueMessage(paused_message_queue_.GetMessage()); 111 112 if (is_peer_open_) 113 return true; // Stay alive. 114 115 // We were just waiting to die. 116 Detach(); 117 return false; 118} 119 120void ProxyMessagePipeEndpoint::OnRemove() { 121 Detach(); 122} 123 124void ProxyMessagePipeEndpoint::Detach() { 125 DCHECK(is_attached()); 126 127 AssertConsistentState(); 128 channel_->DetachMessagePipeEndpoint(local_id_, remote_id_); 129 channel_ = NULL; 130 local_id_ = MessageInTransit::kInvalidEndpointId; 131 remote_id_ = MessageInTransit::kInvalidEndpointId; 132 paused_message_queue_.Clear(); 133 AssertConsistentState(); 134} 135 136#ifndef NDEBUG 137void ProxyMessagePipeEndpoint::AssertConsistentState() const { 138 if (is_attached()) { 139 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); 140 } else { // Not attached. 141 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); 142 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); 143 } 144} 145#endif 146 147} // namespace system 148} // namespace mojo 149