1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/message_pipe.h"
6
7#include "base/logging.h"
8#include "mojo/system/channel_endpoint.h"
9#include "mojo/system/local_message_pipe_endpoint.h"
10#include "mojo/system/message_in_transit.h"
11#include "mojo/system/message_pipe_dispatcher.h"
12#include "mojo/system/message_pipe_endpoint.h"
13#include "mojo/system/proxy_message_pipe_endpoint.h"
14
15namespace mojo {
16namespace system {
17
18// static
19MessagePipe* MessagePipe::CreateLocalLocal() {
20  MessagePipe* message_pipe = new MessagePipe();
21  message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
22  message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
23  return message_pipe;
24}
25
26// static
27MessagePipe* MessagePipe::CreateLocalProxy(
28    scoped_refptr<ChannelEndpoint>* channel_endpoint) {
29  DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
30  MessagePipe* message_pipe = new MessagePipe();
31  message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
32  *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
33  message_pipe->endpoints_[1].reset(
34      new ProxyMessagePipeEndpoint(channel_endpoint->get()));
35  return message_pipe;
36}
37
38// static
39MessagePipe* MessagePipe::CreateProxyLocal(
40    scoped_refptr<ChannelEndpoint>* channel_endpoint) {
41  DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
42  MessagePipe* message_pipe = new MessagePipe();
43  *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
44  message_pipe->endpoints_[0].reset(
45      new ProxyMessagePipeEndpoint(channel_endpoint->get()));
46  message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
47  return message_pipe;
48}
49
50// static
51unsigned MessagePipe::GetPeerPort(unsigned port) {
52  DCHECK(port == 0 || port == 1);
53  return port ^ 1;
54}
55
56MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
57  DCHECK(port == 0 || port == 1);
58  base::AutoLock locker(lock_);
59  DCHECK(endpoints_[port]);
60
61  return endpoints_[port]->GetType();
62}
63
64void MessagePipe::CancelAllWaiters(unsigned port) {
65  DCHECK(port == 0 || port == 1);
66
67  base::AutoLock locker(lock_);
68  DCHECK(endpoints_[port]);
69  endpoints_[port]->CancelAllWaiters();
70}
71
72void MessagePipe::Close(unsigned port) {
73  DCHECK(port == 0 || port == 1);
74
75  unsigned destination_port = GetPeerPort(port);
76
77  base::AutoLock locker(lock_);
78  DCHECK(endpoints_[port]);
79
80  endpoints_[port]->Close();
81  if (endpoints_[destination_port]) {
82    if (!endpoints_[destination_port]->OnPeerClose())
83      endpoints_[destination_port].reset();
84  }
85  endpoints_[port].reset();
86}
87
88// TODO(vtl): Handle flags.
89MojoResult MessagePipe::WriteMessage(
90    unsigned port,
91    UserPointer<const void> bytes,
92    uint32_t num_bytes,
93    std::vector<DispatcherTransport>* transports,
94    MojoWriteMessageFlags flags) {
95  DCHECK(port == 0 || port == 1);
96  return EnqueueMessageInternal(
97      GetPeerPort(port),
98      make_scoped_ptr(new MessageInTransit(
99          MessageInTransit::kTypeMessagePipeEndpoint,
100          MessageInTransit::kSubtypeMessagePipeEndpointData,
101          num_bytes,
102          bytes)),
103      transports);
104}
105
106MojoResult MessagePipe::ReadMessage(unsigned port,
107                                    UserPointer<void> bytes,
108                                    UserPointer<uint32_t> num_bytes,
109                                    DispatcherVector* dispatchers,
110                                    uint32_t* num_dispatchers,
111                                    MojoReadMessageFlags flags) {
112  DCHECK(port == 0 || port == 1);
113
114  base::AutoLock locker(lock_);
115  DCHECK(endpoints_[port]);
116
117  return endpoints_[port]->ReadMessage(
118      bytes, num_bytes, dispatchers, num_dispatchers, flags);
119}
120
121HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
122  DCHECK(port == 0 || port == 1);
123
124  base::AutoLock locker(const_cast<base::Lock&>(lock_));
125  DCHECK(endpoints_[port]);
126
127  return endpoints_[port]->GetHandleSignalsState();
128}
129
130MojoResult MessagePipe::AddWaiter(unsigned port,
131                                  Waiter* waiter,
132                                  MojoHandleSignals signals,
133                                  uint32_t context,
134                                  HandleSignalsState* signals_state) {
135  DCHECK(port == 0 || port == 1);
136
137  base::AutoLock locker(lock_);
138  DCHECK(endpoints_[port]);
139
140  return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
141}
142
143void MessagePipe::RemoveWaiter(unsigned port,
144                               Waiter* waiter,
145                               HandleSignalsState* signals_state) {
146  DCHECK(port == 0 || port == 1);
147
148  base::AutoLock locker(lock_);
149  DCHECK(endpoints_[port]);
150
151  endpoints_[port]->RemoveWaiter(waiter, signals_state);
152}
153
154scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
155  DCHECK(port == 0 || port == 1);
156
157  base::AutoLock locker(lock_);
158  DCHECK(endpoints_[port]);
159  DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
160
161  // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
162  // |MessagePipe| with two proxy endpoints, which will then act as a proxy
163  // (rather than trying to connect the two ends directly).
164  DLOG_IF(WARNING,
165          !!endpoints_[GetPeerPort(port)] &&
166              endpoints_[GetPeerPort(port)]->GetType() !=
167                  MessagePipeEndpoint::kTypeLocal)
168      << "Direct message pipe passing across multiple channels not yet "
169         "implemented; will proxy";
170
171  scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
172  scoped_refptr<ChannelEndpoint> channel_endpoint(
173      new ChannelEndpoint(this, port));
174  endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
175  channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>(
176                                     old_endpoint.get())->message_queue());
177  old_endpoint->Close();
178
179  return channel_endpoint;
180}
181
182MojoResult MessagePipe::EnqueueMessage(unsigned port,
183                                       scoped_ptr<MessageInTransit> message) {
184  return EnqueueMessageInternal(port, message.Pass(), nullptr);
185}
186
187void MessagePipe::OnRemove(unsigned port) {
188  unsigned destination_port = GetPeerPort(port);
189
190  base::AutoLock locker(lock_);
191  // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
192  if (!endpoints_[port])
193    return;
194
195  if (endpoints_[destination_port]) {
196    if (!endpoints_[destination_port]->OnPeerClose())
197      endpoints_[destination_port].reset();
198  }
199  endpoints_[port].reset();
200}
201
202MessagePipe::MessagePipe() {
203}
204
205MessagePipe::~MessagePipe() {
206  // Owned by the dispatchers. The owning dispatchers should only release us via
207  // their |Close()| method, which should inform us of being closed via our
208  // |Close()|. Thus these should already be null.
209  DCHECK(!endpoints_[0]);
210  DCHECK(!endpoints_[1]);
211}
212
213MojoResult MessagePipe::EnqueueMessageInternal(
214    unsigned port,
215    scoped_ptr<MessageInTransit> message,
216    std::vector<DispatcherTransport>* transports) {
217  DCHECK(port == 0 || port == 1);
218  DCHECK(message);
219
220  if (message->type() == MessageInTransit::kTypeMessagePipe) {
221    DCHECK(!transports);
222    return HandleControlMessage(port, message.Pass());
223  }
224
225  DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
226
227  base::AutoLock locker(lock_);
228  DCHECK(endpoints_[GetPeerPort(port)]);
229
230  // The destination port need not be open, unlike the source port.
231  if (!endpoints_[port])
232    return MOJO_RESULT_FAILED_PRECONDITION;
233
234  if (transports) {
235    MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
236    if (result != MOJO_RESULT_OK)
237      return result;
238  }
239
240  // The endpoint's |EnqueueMessage()| may not report failure.
241  endpoints_[port]->EnqueueMessage(message.Pass());
242  return MOJO_RESULT_OK;
243}
244
245MojoResult MessagePipe::AttachTransportsNoLock(
246    unsigned port,
247    MessageInTransit* message,
248    std::vector<DispatcherTransport>* transports) {
249  DCHECK(!message->has_dispatchers());
250
251  // You're not allowed to send either handle to a message pipe over the message
252  // pipe, so check for this. (The case of trying to write a handle to itself is
253  // taken care of by |Core|. That case kind of makes sense, but leads to
254  // complications if, e.g., both sides try to do the same thing with their
255  // respective handles simultaneously. The other case, of trying to write the
256  // peer handle to a handle, doesn't make sense -- since no handle will be
257  // available to read the message from.)
258  for (size_t i = 0; i < transports->size(); i++) {
259    if (!(*transports)[i].is_valid())
260      continue;
261    if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
262      MessagePipeDispatcherTransport mp_transport((*transports)[i]);
263      if (mp_transport.GetMessagePipe() == this) {
264        // The other case should have been disallowed by |Core|. (Note: |port|
265        // is the peer port of the handle given to |WriteMessage()|.)
266        DCHECK_EQ(mp_transport.GetPort(), port);
267        return MOJO_RESULT_INVALID_ARGUMENT;
268      }
269    }
270  }
271
272  // Clone the dispatchers and attach them to the message. (This must be done as
273  // a separate loop, since we want to leave the dispatchers alone on failure.)
274  scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
275  dispatchers->reserve(transports->size());
276  for (size_t i = 0; i < transports->size(); i++) {
277    if ((*transports)[i].is_valid()) {
278      dispatchers->push_back(
279          (*transports)[i].CreateEquivalentDispatcherAndClose());
280    } else {
281      LOG(WARNING) << "Enqueueing null dispatcher";
282      dispatchers->push_back(scoped_refptr<Dispatcher>());
283    }
284  }
285  message->SetDispatchers(dispatchers.Pass());
286  return MOJO_RESULT_OK;
287}
288
289MojoResult MessagePipe::HandleControlMessage(
290    unsigned /*port*/,
291    scoped_ptr<MessageInTransit> message) {
292  LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
293               << message->subtype();
294  return MOJO_RESULT_UNKNOWN;
295}
296
297}  // namespace system
298}  // namespace mojo
299