1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/message_pipe.h" 6 7#include "base/logging.h" 8#include "mojo/system/channel_endpoint.h" 9#include "mojo/system/local_message_pipe_endpoint.h" 10#include "mojo/system/message_in_transit.h" 11#include "mojo/system/message_pipe_dispatcher.h" 12#include "mojo/system/message_pipe_endpoint.h" 13#include "mojo/system/proxy_message_pipe_endpoint.h" 14 15namespace mojo { 16namespace system { 17 18// static 19MessagePipe* MessagePipe::CreateLocalLocal() { 20 MessagePipe* message_pipe = new MessagePipe(); 21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 23 return message_pipe; 24} 25 26// static 27MessagePipe* MessagePipe::CreateLocalProxy( 28 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 29 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. 30 MessagePipe* message_pipe = new MessagePipe(); 31 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 32 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); 33 message_pipe->endpoints_[1].reset( 34 new ProxyMessagePipeEndpoint(channel_endpoint->get())); 35 return message_pipe; 36} 37 38// static 39MessagePipe* MessagePipe::CreateProxyLocal( 40 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 41 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. 42 MessagePipe* message_pipe = new MessagePipe(); 43 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); 44 message_pipe->endpoints_[0].reset( 45 new ProxyMessagePipeEndpoint(channel_endpoint->get())); 46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 47 return message_pipe; 48} 49 50// static 51unsigned MessagePipe::GetPeerPort(unsigned port) { 52 DCHECK(port == 0 || port == 1); 53 return port ^ 1; 54} 55 56MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { 57 DCHECK(port == 0 || port == 1); 58 base::AutoLock locker(lock_); 59 DCHECK(endpoints_[port]); 60 61 return endpoints_[port]->GetType(); 62} 63 64void MessagePipe::CancelAllWaiters(unsigned port) { 65 DCHECK(port == 0 || port == 1); 66 67 base::AutoLock locker(lock_); 68 DCHECK(endpoints_[port]); 69 endpoints_[port]->CancelAllWaiters(); 70} 71 72void MessagePipe::Close(unsigned port) { 73 DCHECK(port == 0 || port == 1); 74 75 unsigned destination_port = GetPeerPort(port); 76 77 base::AutoLock locker(lock_); 78 DCHECK(endpoints_[port]); 79 80 endpoints_[port]->Close(); 81 if (endpoints_[destination_port]) { 82 if (!endpoints_[destination_port]->OnPeerClose()) 83 endpoints_[destination_port].reset(); 84 } 85 endpoints_[port].reset(); 86} 87 88// TODO(vtl): Handle flags. 89MojoResult MessagePipe::WriteMessage( 90 unsigned port, 91 UserPointer<const void> bytes, 92 uint32_t num_bytes, 93 std::vector<DispatcherTransport>* transports, 94 MojoWriteMessageFlags flags) { 95 DCHECK(port == 0 || port == 1); 96 return EnqueueMessageInternal( 97 GetPeerPort(port), 98 make_scoped_ptr(new MessageInTransit( 99 MessageInTransit::kTypeMessagePipeEndpoint, 100 MessageInTransit::kSubtypeMessagePipeEndpointData, 101 num_bytes, 102 bytes)), 103 transports); 104} 105 106MojoResult MessagePipe::ReadMessage(unsigned port, 107 UserPointer<void> bytes, 108 UserPointer<uint32_t> num_bytes, 109 DispatcherVector* dispatchers, 110 uint32_t* num_dispatchers, 111 MojoReadMessageFlags flags) { 112 DCHECK(port == 0 || port == 1); 113 114 base::AutoLock locker(lock_); 115 DCHECK(endpoints_[port]); 116 117 return endpoints_[port]->ReadMessage( 118 bytes, num_bytes, dispatchers, num_dispatchers, flags); 119} 120 121HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { 122 DCHECK(port == 0 || port == 1); 123 124 base::AutoLock locker(const_cast<base::Lock&>(lock_)); 125 DCHECK(endpoints_[port]); 126 127 return endpoints_[port]->GetHandleSignalsState(); 128} 129 130MojoResult MessagePipe::AddWaiter(unsigned port, 131 Waiter* waiter, 132 MojoHandleSignals signals, 133 uint32_t context, 134 HandleSignalsState* signals_state) { 135 DCHECK(port == 0 || port == 1); 136 137 base::AutoLock locker(lock_); 138 DCHECK(endpoints_[port]); 139 140 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state); 141} 142 143void MessagePipe::RemoveWaiter(unsigned port, 144 Waiter* waiter, 145 HandleSignalsState* signals_state) { 146 DCHECK(port == 0 || port == 1); 147 148 base::AutoLock locker(lock_); 149 DCHECK(endpoints_[port]); 150 151 endpoints_[port]->RemoveWaiter(waiter, signals_state); 152} 153 154scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { 155 DCHECK(port == 0 || port == 1); 156 157 base::AutoLock locker(lock_); 158 DCHECK(endpoints_[port]); 159 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); 160 161 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a 162 // |MessagePipe| with two proxy endpoints, which will then act as a proxy 163 // (rather than trying to connect the two ends directly). 164 DLOG_IF(WARNING, 165 !!endpoints_[GetPeerPort(port)] && 166 endpoints_[GetPeerPort(port)]->GetType() != 167 MessagePipeEndpoint::kTypeLocal) 168 << "Direct message pipe passing across multiple channels not yet " 169 "implemented; will proxy"; 170 171 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); 172 scoped_refptr<ChannelEndpoint> channel_endpoint( 173 new ChannelEndpoint(this, port)); 174 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); 175 channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>( 176 old_endpoint.get())->message_queue()); 177 old_endpoint->Close(); 178 179 return channel_endpoint; 180} 181 182MojoResult MessagePipe::EnqueueMessage(unsigned port, 183 scoped_ptr<MessageInTransit> message) { 184 return EnqueueMessageInternal(port, message.Pass(), nullptr); 185} 186 187void MessagePipe::OnRemove(unsigned port) { 188 unsigned destination_port = GetPeerPort(port); 189 190 base::AutoLock locker(lock_); 191 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called. 192 if (!endpoints_[port]) 193 return; 194 195 if (endpoints_[destination_port]) { 196 if (!endpoints_[destination_port]->OnPeerClose()) 197 endpoints_[destination_port].reset(); 198 } 199 endpoints_[port].reset(); 200} 201 202MessagePipe::MessagePipe() { 203} 204 205MessagePipe::~MessagePipe() { 206 // Owned by the dispatchers. The owning dispatchers should only release us via 207 // their |Close()| method, which should inform us of being closed via our 208 // |Close()|. Thus these should already be null. 209 DCHECK(!endpoints_[0]); 210 DCHECK(!endpoints_[1]); 211} 212 213MojoResult MessagePipe::EnqueueMessageInternal( 214 unsigned port, 215 scoped_ptr<MessageInTransit> message, 216 std::vector<DispatcherTransport>* transports) { 217 DCHECK(port == 0 || port == 1); 218 DCHECK(message); 219 220 if (message->type() == MessageInTransit::kTypeMessagePipe) { 221 DCHECK(!transports); 222 return HandleControlMessage(port, message.Pass()); 223 } 224 225 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint); 226 227 base::AutoLock locker(lock_); 228 DCHECK(endpoints_[GetPeerPort(port)]); 229 230 // The destination port need not be open, unlike the source port. 231 if (!endpoints_[port]) 232 return MOJO_RESULT_FAILED_PRECONDITION; 233 234 if (transports) { 235 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); 236 if (result != MOJO_RESULT_OK) 237 return result; 238 } 239 240 // The endpoint's |EnqueueMessage()| may not report failure. 241 endpoints_[port]->EnqueueMessage(message.Pass()); 242 return MOJO_RESULT_OK; 243} 244 245MojoResult MessagePipe::AttachTransportsNoLock( 246 unsigned port, 247 MessageInTransit* message, 248 std::vector<DispatcherTransport>* transports) { 249 DCHECK(!message->has_dispatchers()); 250 251 // You're not allowed to send either handle to a message pipe over the message 252 // pipe, so check for this. (The case of trying to write a handle to itself is 253 // taken care of by |Core|. That case kind of makes sense, but leads to 254 // complications if, e.g., both sides try to do the same thing with their 255 // respective handles simultaneously. The other case, of trying to write the 256 // peer handle to a handle, doesn't make sense -- since no handle will be 257 // available to read the message from.) 258 for (size_t i = 0; i < transports->size(); i++) { 259 if (!(*transports)[i].is_valid()) 260 continue; 261 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) { 262 MessagePipeDispatcherTransport mp_transport((*transports)[i]); 263 if (mp_transport.GetMessagePipe() == this) { 264 // The other case should have been disallowed by |Core|. (Note: |port| 265 // is the peer port of the handle given to |WriteMessage()|.) 266 DCHECK_EQ(mp_transport.GetPort(), port); 267 return MOJO_RESULT_INVALID_ARGUMENT; 268 } 269 } 270 } 271 272 // Clone the dispatchers and attach them to the message. (This must be done as 273 // a separate loop, since we want to leave the dispatchers alone on failure.) 274 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); 275 dispatchers->reserve(transports->size()); 276 for (size_t i = 0; i < transports->size(); i++) { 277 if ((*transports)[i].is_valid()) { 278 dispatchers->push_back( 279 (*transports)[i].CreateEquivalentDispatcherAndClose()); 280 } else { 281 LOG(WARNING) << "Enqueueing null dispatcher"; 282 dispatchers->push_back(scoped_refptr<Dispatcher>()); 283 } 284 } 285 message->SetDispatchers(dispatchers.Pass()); 286 return MOJO_RESULT_OK; 287} 288 289MojoResult MessagePipe::HandleControlMessage( 290 unsigned /*port*/, 291 scoped_ptr<MessageInTransit> message) { 292 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " 293 << message->subtype(); 294 return MOJO_RESULT_UNKNOWN; 295} 296 297} // namespace system 298} // namespace mojo 299