1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/message_pipe_dispatcher.h"
6
7#include "base/logging.h"
8#include "mojo/system/channel.h"
9#include "mojo/system/channel_endpoint.h"
10#include "mojo/system/constants.h"
11#include "mojo/system/local_message_pipe_endpoint.h"
12#include "mojo/system/memory.h"
13#include "mojo/system/message_in_transit.h"
14#include "mojo/system/message_pipe.h"
15#include "mojo/system/options_validation.h"
16#include "mojo/system/proxy_message_pipe_endpoint.h"
17
18namespace mojo {
19namespace system {
20
21namespace {
22
23const unsigned kInvalidPort = static_cast<unsigned>(-1);
24
25struct SerializedMessagePipeDispatcher {
26  MessageInTransit::EndpointId endpoint_id;
27};
28
29}  // namespace
30
31// MessagePipeDispatcher -------------------------------------------------------
32
33// static
34const MojoCreateMessagePipeOptions
35    MessagePipeDispatcher::kDefaultCreateOptions = {
36        static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
37        MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
38
39MessagePipeDispatcher::MessagePipeDispatcher(
40    const MojoCreateMessagePipeOptions& /*validated_options*/)
41    : port_(kInvalidPort) {
42}
43
44// static
45MojoResult MessagePipeDispatcher::ValidateCreateOptions(
46    UserPointer<const MojoCreateMessagePipeOptions> in_options,
47    MojoCreateMessagePipeOptions* out_options) {
48  const MojoCreateMessagePipeOptionsFlags kKnownFlags =
49      MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
50
51  *out_options = kDefaultCreateOptions;
52  if (in_options.IsNull())
53    return MOJO_RESULT_OK;
54
55  UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
56  if (!reader.is_valid())
57    return MOJO_RESULT_INVALID_ARGUMENT;
58
59  if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
60    return MOJO_RESULT_OK;
61  if ((reader.options().flags & ~kKnownFlags))
62    return MOJO_RESULT_UNIMPLEMENTED;
63  out_options->flags = reader.options().flags;
64
65  // Checks for fields beyond |flags|:
66
67  // (Nothing here yet.)
68
69  return MOJO_RESULT_OK;
70}
71
72void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
73                                 unsigned port) {
74  DCHECK(message_pipe.get());
75  DCHECK(port == 0 || port == 1);
76
77  message_pipe_ = message_pipe;
78  port_ = port;
79}
80
81Dispatcher::Type MessagePipeDispatcher::GetType() const {
82  return kTypeMessagePipe;
83}
84
85// static
86scoped_refptr<MessagePipeDispatcher>
87MessagePipeDispatcher::CreateRemoteMessagePipe(
88    scoped_refptr<ChannelEndpoint>* channel_endpoint) {
89  scoped_refptr<MessagePipe> message_pipe(
90      MessagePipe::CreateLocalProxy(channel_endpoint));
91  scoped_refptr<MessagePipeDispatcher> dispatcher(
92      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
93  dispatcher->Init(message_pipe, 0);
94  return dispatcher;
95}
96
97// static
98scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
99    Channel* channel,
100    const void* source,
101    size_t size) {
102  if (size != sizeof(SerializedMessagePipeDispatcher)) {
103    LOG(ERROR) << "Invalid serialized message pipe dispatcher";
104    return scoped_refptr<MessagePipeDispatcher>();
105  }
106
107  scoped_refptr<ChannelEndpoint> channel_endpoint;
108  scoped_refptr<MessagePipeDispatcher> dispatcher =
109      CreateRemoteMessagePipe(&channel_endpoint);
110
111  MessageInTransit::EndpointId remote_id =
112      static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
113  if (remote_id == MessageInTransit::kInvalidEndpointId) {
114    // This means that the other end was closed, and there were no messages
115    // enqueued for us.
116    // TODO(vtl): This is wrong. We should produce a "dead" message pipe
117    // dispatcher.
118    NOTIMPLEMENTED();
119    return scoped_refptr<MessagePipeDispatcher>();
120  }
121  MessageInTransit::EndpointId local_id =
122      channel->AttachEndpoint(channel_endpoint);
123  if (local_id == MessageInTransit::kInvalidEndpointId) {
124    LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
125                  "attach; remote ID = " << remote_id << ")";
126    return scoped_refptr<MessagePipeDispatcher>();
127  }
128  DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id
129           << ", new local ID = " << local_id << ")";
130
131  if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) {
132    // In general, this shouldn't fail, since we generated |local_id| locally.
133    NOTREACHED();
134    return scoped_refptr<MessagePipeDispatcher>();
135  }
136
137  // TODO(vtl): FIXME -- Need some error handling here.
138  channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
139  return dispatcher;
140}
141
142MessagePipeDispatcher::~MessagePipeDispatcher() {
143  // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
144  DCHECK(!message_pipe_.get());
145}
146
147MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
148  lock().AssertAcquired();
149  return message_pipe_.get();
150}
151
152unsigned MessagePipeDispatcher::GetPortNoLock() const {
153  lock().AssertAcquired();
154  return port_;
155}
156
157void MessagePipeDispatcher::CancelAllWaitersNoLock() {
158  lock().AssertAcquired();
159  message_pipe_->CancelAllWaiters(port_);
160}
161
162void MessagePipeDispatcher::CloseImplNoLock() {
163  lock().AssertAcquired();
164  message_pipe_->Close(port_);
165  message_pipe_ = nullptr;
166  port_ = kInvalidPort;
167}
168
169scoped_refptr<Dispatcher>
170MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
171  lock().AssertAcquired();
172
173  // TODO(vtl): Currently, there are no options, so we just use
174  // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
175  // too.
176  scoped_refptr<MessagePipeDispatcher> rv =
177      new MessagePipeDispatcher(kDefaultCreateOptions);
178  rv->Init(message_pipe_, port_);
179  message_pipe_ = nullptr;
180  port_ = kInvalidPort;
181  return scoped_refptr<Dispatcher>(rv.get());
182}
183
184MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
185    UserPointer<const void> bytes,
186    uint32_t num_bytes,
187    std::vector<DispatcherTransport>* transports,
188    MojoWriteMessageFlags flags) {
189  DCHECK(!transports || (transports->size() > 0 &&
190                         transports->size() <= kMaxMessageNumHandles));
191
192  lock().AssertAcquired();
193
194  if (num_bytes > kMaxMessageNumBytes)
195    return MOJO_RESULT_RESOURCE_EXHAUSTED;
196
197  return message_pipe_->WriteMessage(
198      port_, bytes, num_bytes, transports, flags);
199}
200
201MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
202    UserPointer<void> bytes,
203    UserPointer<uint32_t> num_bytes,
204    DispatcherVector* dispatchers,
205    uint32_t* num_dispatchers,
206    MojoReadMessageFlags flags) {
207  lock().AssertAcquired();
208  return message_pipe_->ReadMessage(
209      port_, bytes, num_bytes, dispatchers, num_dispatchers, flags);
210}
211
212HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
213    const {
214  lock().AssertAcquired();
215  return message_pipe_->GetHandleSignalsState(port_);
216}
217
218MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(
219    Waiter* waiter,
220    MojoHandleSignals signals,
221    uint32_t context,
222    HandleSignalsState* signals_state) {
223  lock().AssertAcquired();
224  return message_pipe_->AddWaiter(
225      port_, waiter, signals, context, signals_state);
226}
227
228void MessagePipeDispatcher::RemoveWaiterImplNoLock(
229    Waiter* waiter,
230    HandleSignalsState* signals_state) {
231  lock().AssertAcquired();
232  message_pipe_->RemoveWaiter(port_, waiter, signals_state);
233}
234
235void MessagePipeDispatcher::StartSerializeImplNoLock(
236    Channel* /*channel*/,
237    size_t* max_size,
238    size_t* max_platform_handles) {
239  DCHECK(HasOneRef());  // Only one ref => no need to take the lock.
240  *max_size = sizeof(SerializedMessagePipeDispatcher);
241  *max_platform_handles = 0;
242}
243
244bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
245    Channel* channel,
246    void* destination,
247    size_t* actual_size,
248    embedder::PlatformHandleVector* /*platform_handles*/) {
249  DCHECK(HasOneRef());  // Only one ref => no need to take the lock.
250
251  // Convert the local endpoint to a proxy endpoint (moving the message queue)
252  // and attach it to the channel.
253  MessageInTransit::EndpointId endpoint_id =
254      channel->AttachEndpoint(message_pipe_->ConvertLocalToProxy(port_));
255  // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
256  // possible that the other endpoint -- the one that we're not sending -- was
257  // closed in the intervening time.) In that case, we need to deserialize a
258  // "dead" message pipe dispatcher on the other end. (Note that this is
259  // different from just producing |MOJO_HANDLE_INVALID|.)
260  DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
261           << ")";
262
263  // We now have a local ID. Before we can run the proxy endpoint, we need to
264  // get an ack back from the other side with the remote ID.
265  static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
266      endpoint_id;
267
268  message_pipe_ = nullptr;
269  port_ = kInvalidPort;
270
271  *actual_size = sizeof(SerializedMessagePipeDispatcher);
272  return true;
273}
274
275// MessagePipeDispatcherTransport ----------------------------------------------
276
277MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
278    DispatcherTransport transport)
279    : DispatcherTransport(transport) {
280  DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
281}
282
283}  // namespace system
284}  // namespace mojo
285