1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/message_pipe_dispatcher.h" 6 7#include "base/logging.h" 8#include "mojo/system/channel.h" 9#include "mojo/system/channel_endpoint.h" 10#include "mojo/system/constants.h" 11#include "mojo/system/local_message_pipe_endpoint.h" 12#include "mojo/system/memory.h" 13#include "mojo/system/message_in_transit.h" 14#include "mojo/system/message_pipe.h" 15#include "mojo/system/options_validation.h" 16#include "mojo/system/proxy_message_pipe_endpoint.h" 17 18namespace mojo { 19namespace system { 20 21namespace { 22 23const unsigned kInvalidPort = static_cast<unsigned>(-1); 24 25struct SerializedMessagePipeDispatcher { 26 MessageInTransit::EndpointId endpoint_id; 27}; 28 29} // namespace 30 31// MessagePipeDispatcher ------------------------------------------------------- 32 33// static 34const MojoCreateMessagePipeOptions 35 MessagePipeDispatcher::kDefaultCreateOptions = { 36 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), 37 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; 38 39MessagePipeDispatcher::MessagePipeDispatcher( 40 const MojoCreateMessagePipeOptions& /*validated_options*/) 41 : port_(kInvalidPort) { 42} 43 44// static 45MojoResult MessagePipeDispatcher::ValidateCreateOptions( 46 UserPointer<const MojoCreateMessagePipeOptions> in_options, 47 MojoCreateMessagePipeOptions* out_options) { 48 const MojoCreateMessagePipeOptionsFlags kKnownFlags = 49 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; 50 51 *out_options = kDefaultCreateOptions; 52 if (in_options.IsNull()) 53 return MOJO_RESULT_OK; 54 55 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); 56 if (!reader.is_valid()) 57 return MOJO_RESULT_INVALID_ARGUMENT; 58 59 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) 60 return MOJO_RESULT_OK; 61 if ((reader.options().flags & ~kKnownFlags)) 62 return MOJO_RESULT_UNIMPLEMENTED; 63 out_options->flags = reader.options().flags; 64 65 // Checks for fields beyond |flags|: 66 67 // (Nothing here yet.) 68 69 return MOJO_RESULT_OK; 70} 71 72void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe, 73 unsigned port) { 74 DCHECK(message_pipe.get()); 75 DCHECK(port == 0 || port == 1); 76 77 message_pipe_ = message_pipe; 78 port_ = port; 79} 80 81Dispatcher::Type MessagePipeDispatcher::GetType() const { 82 return kTypeMessagePipe; 83} 84 85// static 86scoped_refptr<MessagePipeDispatcher> 87MessagePipeDispatcher::CreateRemoteMessagePipe( 88 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 89 scoped_refptr<MessagePipe> message_pipe( 90 MessagePipe::CreateLocalProxy(channel_endpoint)); 91 scoped_refptr<MessagePipeDispatcher> dispatcher( 92 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 93 dispatcher->Init(message_pipe, 0); 94 return dispatcher; 95} 96 97// static 98scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( 99 Channel* channel, 100 const void* source, 101 size_t size) { 102 if (size != sizeof(SerializedMessagePipeDispatcher)) { 103 LOG(ERROR) << "Invalid serialized message pipe dispatcher"; 104 return scoped_refptr<MessagePipeDispatcher>(); 105 } 106 107 scoped_refptr<ChannelEndpoint> channel_endpoint; 108 scoped_refptr<MessagePipeDispatcher> dispatcher = 109 CreateRemoteMessagePipe(&channel_endpoint); 110 111 MessageInTransit::EndpointId remote_id = 112 static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id; 113 if (remote_id == MessageInTransit::kInvalidEndpointId) { 114 // This means that the other end was closed, and there were no messages 115 // enqueued for us. 116 // TODO(vtl): This is wrong. We should produce a "dead" message pipe 117 // dispatcher. 118 NOTIMPLEMENTED(); 119 return scoped_refptr<MessagePipeDispatcher>(); 120 } 121 MessageInTransit::EndpointId local_id = 122 channel->AttachEndpoint(channel_endpoint); 123 if (local_id == MessageInTransit::kInvalidEndpointId) { 124 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to " 125 "attach; remote ID = " << remote_id << ")"; 126 return scoped_refptr<MessagePipeDispatcher>(); 127 } 128 DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id 129 << ", new local ID = " << local_id << ")"; 130 131 if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) { 132 // In general, this shouldn't fail, since we generated |local_id| locally. 133 NOTREACHED(); 134 return scoped_refptr<MessagePipeDispatcher>(); 135 } 136 137 // TODO(vtl): FIXME -- Need some error handling here. 138 channel->RunRemoteMessagePipeEndpoint(local_id, remote_id); 139 return dispatcher; 140} 141 142MessagePipeDispatcher::~MessagePipeDispatcher() { 143 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe. 144 DCHECK(!message_pipe_.get()); 145} 146 147MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const { 148 lock().AssertAcquired(); 149 return message_pipe_.get(); 150} 151 152unsigned MessagePipeDispatcher::GetPortNoLock() const { 153 lock().AssertAcquired(); 154 return port_; 155} 156 157void MessagePipeDispatcher::CancelAllWaitersNoLock() { 158 lock().AssertAcquired(); 159 message_pipe_->CancelAllWaiters(port_); 160} 161 162void MessagePipeDispatcher::CloseImplNoLock() { 163 lock().AssertAcquired(); 164 message_pipe_->Close(port_); 165 message_pipe_ = nullptr; 166 port_ = kInvalidPort; 167} 168 169scoped_refptr<Dispatcher> 170MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 171 lock().AssertAcquired(); 172 173 // TODO(vtl): Currently, there are no options, so we just use 174 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options 175 // too. 176 scoped_refptr<MessagePipeDispatcher> rv = 177 new MessagePipeDispatcher(kDefaultCreateOptions); 178 rv->Init(message_pipe_, port_); 179 message_pipe_ = nullptr; 180 port_ = kInvalidPort; 181 return scoped_refptr<Dispatcher>(rv.get()); 182} 183 184MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( 185 UserPointer<const void> bytes, 186 uint32_t num_bytes, 187 std::vector<DispatcherTransport>* transports, 188 MojoWriteMessageFlags flags) { 189 DCHECK(!transports || (transports->size() > 0 && 190 transports->size() <= kMaxMessageNumHandles)); 191 192 lock().AssertAcquired(); 193 194 if (num_bytes > kMaxMessageNumBytes) 195 return MOJO_RESULT_RESOURCE_EXHAUSTED; 196 197 return message_pipe_->WriteMessage( 198 port_, bytes, num_bytes, transports, flags); 199} 200 201MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( 202 UserPointer<void> bytes, 203 UserPointer<uint32_t> num_bytes, 204 DispatcherVector* dispatchers, 205 uint32_t* num_dispatchers, 206 MojoReadMessageFlags flags) { 207 lock().AssertAcquired(); 208 return message_pipe_->ReadMessage( 209 port_, bytes, num_bytes, dispatchers, num_dispatchers, flags); 210} 211 212HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() 213 const { 214 lock().AssertAcquired(); 215 return message_pipe_->GetHandleSignalsState(port_); 216} 217 218MojoResult MessagePipeDispatcher::AddWaiterImplNoLock( 219 Waiter* waiter, 220 MojoHandleSignals signals, 221 uint32_t context, 222 HandleSignalsState* signals_state) { 223 lock().AssertAcquired(); 224 return message_pipe_->AddWaiter( 225 port_, waiter, signals, context, signals_state); 226} 227 228void MessagePipeDispatcher::RemoveWaiterImplNoLock( 229 Waiter* waiter, 230 HandleSignalsState* signals_state) { 231 lock().AssertAcquired(); 232 message_pipe_->RemoveWaiter(port_, waiter, signals_state); 233} 234 235void MessagePipeDispatcher::StartSerializeImplNoLock( 236 Channel* /*channel*/, 237 size_t* max_size, 238 size_t* max_platform_handles) { 239 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. 240 *max_size = sizeof(SerializedMessagePipeDispatcher); 241 *max_platform_handles = 0; 242} 243 244bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( 245 Channel* channel, 246 void* destination, 247 size_t* actual_size, 248 embedder::PlatformHandleVector* /*platform_handles*/) { 249 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. 250 251 // Convert the local endpoint to a proxy endpoint (moving the message queue) 252 // and attach it to the channel. 253 MessageInTransit::EndpointId endpoint_id = 254 channel->AttachEndpoint(message_pipe_->ConvertLocalToProxy(port_)); 255 // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's 256 // possible that the other endpoint -- the one that we're not sending -- was 257 // closed in the intervening time.) In that case, we need to deserialize a 258 // "dead" message pipe dispatcher on the other end. (Note that this is 259 // different from just producing |MOJO_HANDLE_INVALID|.) 260 DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id 261 << ")"; 262 263 // We now have a local ID. Before we can run the proxy endpoint, we need to 264 // get an ack back from the other side with the remote ID. 265 static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id = 266 endpoint_id; 267 268 message_pipe_ = nullptr; 269 port_ = kInvalidPort; 270 271 *actual_size = sizeof(SerializedMessagePipeDispatcher); 272 return true; 273} 274 275// MessagePipeDispatcherTransport ---------------------------------------------- 276 277MessagePipeDispatcherTransport::MessagePipeDispatcherTransport( 278 DispatcherTransport transport) 279 : DispatcherTransport(transport) { 280 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe); 281} 282 283} // namespace system 284} // namespace mojo 285