1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/channel.h"
6
7#include <algorithm>
8
9#include "base/bind.h"
10#include "base/compiler_specific.h"
11#include "base/logging.h"
12#include "base/macros.h"
13#include "base/strings/stringprintf.h"
14#include "mojo/embedder/platform_handle_vector.h"
15#include "mojo/system/message_pipe_endpoint.h"
16#include "mojo/system/transport_data.h"
17
18namespace mojo {
19namespace system {
20
21static_assert(Channel::kBootstrapEndpointId !=
22                  MessageInTransit::kInvalidEndpointId,
23              "kBootstrapEndpointId is invalid");
24
25STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26    Channel::kBootstrapEndpointId;
27
28Channel::Channel(embedder::PlatformSupport* platform_support)
29    : platform_support_(platform_support),
30      is_running_(false),
31      is_shutting_down_(false),
32      next_local_id_(kBootstrapEndpointId) {
33}
34
35bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
36  DCHECK(creation_thread_checker_.CalledOnValidThread());
37  DCHECK(raw_channel);
38
39  // No need to take |lock_|, since this must be called before this object
40  // becomes thread-safe.
41  DCHECK(!is_running_);
42  raw_channel_ = raw_channel.Pass();
43
44  if (!raw_channel_->Init(this)) {
45    raw_channel_.reset();
46    return false;
47  }
48
49  is_running_ = true;
50  return true;
51}
52
53void Channel::Shutdown() {
54  DCHECK(creation_thread_checker_.CalledOnValidThread());
55
56  IdToEndpointMap to_destroy;
57  {
58    base::AutoLock locker(lock_);
59    if (!is_running_)
60      return;
61
62    // Note: Don't reset |raw_channel_|, in case we're being called from within
63    // |OnReadMessage()| or |OnError()|.
64    raw_channel_->Shutdown();
65    is_running_ = false;
66
67    // We need to deal with it outside the lock.
68    std::swap(to_destroy, local_id_to_endpoint_map_);
69  }
70
71  size_t num_live = 0;
72  size_t num_zombies = 0;
73  for (IdToEndpointMap::iterator it = to_destroy.begin();
74       it != to_destroy.end();
75       ++it) {
76    if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) {
77      it->second->message_pipe_->OnRemove(it->second->port_);
78      num_live++;
79    } else {
80      DCHECK(!it->second->message_pipe_.get());
81      num_zombies++;
82    }
83    it->second->DetachFromChannel();
84  }
85  DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
86                                       << " live endpoints and " << num_zombies
87                                       << " zombies";
88}
89
90void Channel::WillShutdownSoon() {
91  base::AutoLock locker(lock_);
92  is_shutting_down_ = true;
93}
94
95// Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
96// keeps the endpoint alive even after the lock is released. Otherwise, there's
97// the temptation to simply pass the result of |new ChannelEndpoint(...)|
98// directly to this function, which wouldn't be sufficient for safety.
99MessageInTransit::EndpointId Channel::AttachEndpoint(
100    scoped_refptr<ChannelEndpoint> endpoint) {
101  DCHECK(endpoint.get());
102
103  MessageInTransit::EndpointId local_id;
104  {
105    base::AutoLock locker(lock_);
106
107    DLOG_IF(WARNING, is_shutting_down_)
108        << "AttachEndpoint() while shutting down";
109
110    while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
111           local_id_to_endpoint_map_.find(next_local_id_) !=
112               local_id_to_endpoint_map_.end())
113      next_local_id_++;
114
115    local_id = next_local_id_;
116    next_local_id_++;
117    local_id_to_endpoint_map_[local_id] = endpoint;
118  }
119
120  endpoint->AttachToChannel(this, local_id);
121  return local_id;
122}
123
124bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
125                                     MessageInTransit::EndpointId remote_id) {
126  scoped_refptr<ChannelEndpoint> endpoint;
127  ChannelEndpoint::State state;
128  {
129    base::AutoLock locker(lock_);
130
131    DLOG_IF(WARNING, is_shutting_down_)
132        << "RunMessagePipeEndpoint() while shutting down";
133
134    IdToEndpointMap::const_iterator it =
135        local_id_to_endpoint_map_.find(local_id);
136    if (it == local_id_to_endpoint_map_.end())
137      return false;
138    endpoint = it->second;
139    state = it->second->state_;
140  }
141
142  // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
143  // and ignore it.
144  if (state != ChannelEndpoint::STATE_NORMAL) {
145    DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
146                "(local ID " << local_id << ", remote ID " << remote_id << ")";
147    return true;
148  }
149
150  // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
151  // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
152  endpoint->Run(remote_id);
153  return true;
154}
155
156void Channel::RunRemoteMessagePipeEndpoint(
157    MessageInTransit::EndpointId local_id,
158    MessageInTransit::EndpointId remote_id) {
159#if DCHECK_IS_ON
160  {
161    base::AutoLock locker(lock_);
162    DCHECK(local_id_to_endpoint_map_.find(local_id) !=
163           local_id_to_endpoint_map_.end());
164  }
165#endif
166
167  if (!SendControlMessage(
168          MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
169          local_id,
170          remote_id)) {
171    HandleLocalError(base::StringPrintf(
172        "Failed to send message to run remote message pipe endpoint (local ID "
173        "%u, remote ID %u)",
174        static_cast<unsigned>(local_id),
175        static_cast<unsigned>(remote_id)));
176  }
177}
178
179bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
180  base::AutoLock locker(lock_);
181  if (!is_running_) {
182    // TODO(vtl): I think this is probably not an error condition, but I should
183    // think about it (and the shutdown sequence) more carefully.
184    LOG(WARNING) << "WriteMessage() after shutdown";
185    return false;
186  }
187
188  DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
189  return raw_channel_->WriteMessage(message.Pass());
190}
191
192bool Channel::IsWriteBufferEmpty() {
193  base::AutoLock locker(lock_);
194  if (!is_running_)
195    return true;
196  return raw_channel_->IsWriteBufferEmpty();
197}
198
199void Channel::DetachMessagePipeEndpoint(
200    MessageInTransit::EndpointId local_id,
201    MessageInTransit::EndpointId remote_id) {
202  DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
203
204  // If this is non-null after the locked block, the endpoint should be detached
205  // (and no remove message sent).
206  scoped_refptr<ChannelEndpoint> endpoint_to_detach;
207  {
208    base::AutoLock locker_(lock_);
209    if (!is_running_)
210      return;
211
212    IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
213    DCHECK(it != local_id_to_endpoint_map_.end());
214
215    switch (it->second->state_) {
216      case ChannelEndpoint::STATE_NORMAL:
217        it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK;
218        it->second->message_pipe_ = nullptr;
219        if (remote_id == MessageInTransit::kInvalidEndpointId)
220          return;
221        // We have to send a remove message (outside the lock).
222        break;
223      case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
224        endpoint_to_detach = it->second;
225        local_id_to_endpoint_map_.erase(it);
226        // We have to detach (outside the lock).
227        break;
228      case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
229        NOTREACHED();
230        return;
231    }
232  }
233  if (endpoint_to_detach.get()) {
234    endpoint_to_detach->DetachFromChannel();
235    return;
236  }
237
238  if (!SendControlMessage(
239          MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
240          local_id,
241          remote_id)) {
242    HandleLocalError(base::StringPrintf(
243        "Failed to send message to remove remote message pipe endpoint (local "
244        "ID %u, remote ID %u)",
245        static_cast<unsigned>(local_id),
246        static_cast<unsigned>(remote_id)));
247  }
248}
249
250size_t Channel::GetSerializedPlatformHandleSize() const {
251  return raw_channel_->GetSerializedPlatformHandleSize();
252}
253
254Channel::~Channel() {
255  // The channel should have been shut down first.
256  DCHECK(!is_running_);
257}
258
259void Channel::OnReadMessage(
260    const MessageInTransit::View& message_view,
261    embedder::ScopedPlatformHandleVectorPtr platform_handles) {
262  DCHECK(creation_thread_checker_.CalledOnValidThread());
263
264  switch (message_view.type()) {
265    case MessageInTransit::kTypeMessagePipeEndpoint:
266    case MessageInTransit::kTypeMessagePipe:
267      OnReadMessageForDownstream(message_view, platform_handles.Pass());
268      break;
269    case MessageInTransit::kTypeChannel:
270      OnReadMessageForChannel(message_view, platform_handles.Pass());
271      break;
272    default:
273      HandleRemoteError(
274          base::StringPrintf("Received message of invalid type %u",
275                             static_cast<unsigned>(message_view.type())));
276      break;
277  }
278}
279
280void Channel::OnError(Error error) {
281  DCHECK(creation_thread_checker_.CalledOnValidThread());
282
283  switch (error) {
284    case ERROR_READ_SHUTDOWN:
285      // The other side was cleanly closed, so this isn't actually an error.
286      DVLOG(1) << "RawChannel read error (shutdown)";
287      break;
288    case ERROR_READ_BROKEN: {
289      base::AutoLock locker(lock_);
290      LOG_IF(ERROR, !is_shutting_down_)
291          << "RawChannel read error (connection broken)";
292      break;
293    }
294    case ERROR_READ_BAD_MESSAGE:
295      // Receiving a bad message means either a bug, data corruption, or
296      // malicious attack (probably due to some other bug).
297      LOG(ERROR) << "RawChannel read error (received bad message)";
298      break;
299    case ERROR_READ_UNKNOWN:
300      LOG(ERROR) << "RawChannel read error (unknown)";
301      break;
302    case ERROR_WRITE:
303      // Write errors are slightly notable: they probably shouldn't happen under
304      // normal operation (but maybe the other side crashed).
305      LOG(WARNING) << "RawChannel write error";
306      break;
307  }
308  Shutdown();
309}
310
311void Channel::OnReadMessageForDownstream(
312    const MessageInTransit::View& message_view,
313    embedder::ScopedPlatformHandleVectorPtr platform_handles) {
314  DCHECK(creation_thread_checker_.CalledOnValidThread());
315  DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
316         message_view.type() == MessageInTransit::kTypeMessagePipe);
317
318  MessageInTransit::EndpointId local_id = message_view.destination_id();
319  if (local_id == MessageInTransit::kInvalidEndpointId) {
320    HandleRemoteError("Received message with no destination ID");
321    return;
322  }
323
324  ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL;
325  scoped_refptr<MessagePipe> message_pipe;
326  unsigned port = ~0u;
327  bool nonexistent_local_id_error = false;
328  {
329    base::AutoLock locker(lock_);
330
331    // Since we own |raw_channel_|, and this method and |Shutdown()| should only
332    // be called from the creation thread, |raw_channel_| should never be null
333    // here.
334    DCHECK(is_running_);
335
336    IdToEndpointMap::const_iterator it =
337        local_id_to_endpoint_map_.find(local_id);
338    if (it == local_id_to_endpoint_map_.end()) {
339      nonexistent_local_id_error = true;
340    } else {
341      state = it->second->state_;
342      message_pipe = it->second->message_pipe_;
343      port = it->second->port_;
344    }
345  }
346  if (nonexistent_local_id_error) {
347    HandleRemoteError(base::StringPrintf(
348        "Received a message for nonexistent local destination ID %u",
349        static_cast<unsigned>(local_id)));
350    // This is strongly indicative of some problem. However, it's not a fatal
351    // error, since it may indicate a buggy (or hostile) remote process. Don't
352    // die even for Debug builds, since handling this properly needs to be
353    // tested (TODO(vtl)).
354    DLOG(ERROR) << "This should not happen under normal operation.";
355    return;
356  }
357
358  // Ignore messages for zombie endpoints (not an error).
359  if (state != ChannelEndpoint::STATE_NORMAL) {
360    DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
361             << local_id << ", remote ID = " << message_view.source_id() << ")";
362    return;
363  }
364
365  // We need to duplicate the message (data), because |EnqueueMessage()| will
366  // take ownership of it.
367  scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
368  if (message_view.transport_data_buffer_size() > 0) {
369    DCHECK(message_view.transport_data_buffer());
370    message->SetDispatchers(TransportData::DeserializeDispatchers(
371        message_view.transport_data_buffer(),
372        message_view.transport_data_buffer_size(),
373        platform_handles.Pass(),
374        this));
375  }
376  MojoResult result = message_pipe->EnqueueMessage(
377      MessagePipe::GetPeerPort(port), message.Pass());
378  if (result != MOJO_RESULT_OK) {
379    // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
380    // has been closed (in an unavoidable race). This might also be a "remote"
381    // error, e.g., if the remote side is sending invalid control messages (to
382    // the message pipe).
383    HandleLocalError(base::StringPrintf(
384        "Failed to enqueue message to local ID %u (result %d)",
385        static_cast<unsigned>(local_id),
386        static_cast<int>(result)));
387    return;
388  }
389}
390
391void Channel::OnReadMessageForChannel(
392    const MessageInTransit::View& message_view,
393    embedder::ScopedPlatformHandleVectorPtr platform_handles) {
394  DCHECK(creation_thread_checker_.CalledOnValidThread());
395  DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
396
397  // Currently, no channel messages take platform handles.
398  if (platform_handles) {
399    HandleRemoteError(
400        "Received invalid channel message (has platform handles)");
401    NOTREACHED();
402    return;
403  }
404
405  switch (message_view.subtype()) {
406    case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
407      DVLOG(2) << "Handling channel message to run message pipe (local ID "
408               << message_view.destination_id() << ", remote ID "
409               << message_view.source_id() << ")";
410      if (!RunMessagePipeEndpoint(message_view.destination_id(),
411                                  message_view.source_id())) {
412        HandleRemoteError(
413            "Received invalid channel message to run message pipe");
414      }
415      break;
416    case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
417      DVLOG(2) << "Handling channel message to remove message pipe (local ID "
418               << message_view.destination_id() << ", remote ID "
419               << message_view.source_id() << ")";
420      if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
421                                     message_view.source_id())) {
422        HandleRemoteError(
423            "Received invalid channel message to remove message pipe");
424      }
425      break;
426    case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
427      DVLOG(2) << "Handling channel message to ack remove message pipe (local "
428                  "ID " << message_view.destination_id() << ", remote ID "
429               << message_view.source_id() << ")";
430      if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
431                                     message_view.source_id())) {
432        HandleRemoteError(
433            "Received invalid channel message to ack remove message pipe");
434      }
435      break;
436    default:
437      HandleRemoteError("Received invalid channel message");
438      NOTREACHED();
439      break;
440  }
441}
442
443bool Channel::RemoveMessagePipeEndpoint(
444    MessageInTransit::EndpointId local_id,
445    MessageInTransit::EndpointId remote_id) {
446  DCHECK(creation_thread_checker_.CalledOnValidThread());
447
448  // If this is non-null after the locked block, the endpoint should be detached
449  // (and no remove ack message sent).
450  scoped_refptr<ChannelEndpoint> endpoint_to_detach;
451  scoped_refptr<MessagePipe> message_pipe;
452  unsigned port = ~0u;
453  {
454    base::AutoLock locker(lock_);
455
456    IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
457    if (it == local_id_to_endpoint_map_.end()) {
458      DVLOG(2) << "Remove message pipe error: not found";
459      return false;
460    }
461
462    switch (it->second->state_) {
463      case ChannelEndpoint::STATE_NORMAL:
464        it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH;
465        message_pipe = it->second->message_pipe_;
466        port = it->second->port_;
467        it->second->message_pipe_ = nullptr;
468        // We have to send a remove ack message (outside the lock).
469        break;
470      case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
471        DVLOG(2) << "Remove message pipe error: wrong state";
472        return false;
473      case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
474        endpoint_to_detach = it->second;
475        local_id_to_endpoint_map_.erase(it);
476        // We have to detach (outside the lock).
477        break;
478    }
479  }
480  if (endpoint_to_detach.get()) {
481    endpoint_to_detach->DetachFromChannel();
482    return true;
483  }
484
485  if (!SendControlMessage(
486          MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
487          local_id,
488          remote_id)) {
489    HandleLocalError(base::StringPrintf(
490        "Failed to send message to remove remote message pipe endpoint ack "
491        "(local ID %u, remote ID %u)",
492        static_cast<unsigned>(local_id),
493        static_cast<unsigned>(remote_id)));
494  }
495
496  message_pipe->OnRemove(port);
497
498  return true;
499}
500
501bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
502                                 MessageInTransit::EndpointId local_id,
503                                 MessageInTransit::EndpointId remote_id) {
504  DVLOG(2) << "Sending channel control message: subtype " << subtype
505           << ", local ID " << local_id << ", remote ID " << remote_id;
506  scoped_ptr<MessageInTransit> message(new MessageInTransit(
507      MessageInTransit::kTypeChannel, subtype, 0, nullptr));
508  message->set_source_id(local_id);
509  message->set_destination_id(remote_id);
510  return WriteMessage(message.Pass());
511}
512
513void Channel::HandleRemoteError(const base::StringPiece& error_message) {
514  // TODO(vtl): Is this how we really want to handle this? Probably we want to
515  // terminate the connection, since it's spewing invalid stuff.
516  LOG(WARNING) << error_message;
517}
518
519void Channel::HandleLocalError(const base::StringPiece& error_message) {
520  // TODO(vtl): Is this how we really want to handle this?
521  // Sometimes we'll want to propagate the error back to the message pipe
522  // (endpoint), and notify it that the remote is (effectively) closed.
523  // Sometimes we'll want to kill the channel (and notify all the endpoints that
524  // their remotes are dead.
525  LOG(WARNING) << error_message;
526}
527
528}  // namespace system
529}  // namespace mojo
530