connector.cc revision 03b57e008b61dfcb1fbad3aea950ae0e001748b0
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/public/cpp/bindings/lib/connector.h"
6
7#include <stddef.h>
8
9#include "mojo/public/cpp/bindings/error_handler.h"
10#include "mojo/public/cpp/environment/logging.h"
11
12namespace mojo {
13namespace internal {
14
15// ----------------------------------------------------------------------------
16
17Connector::Connector(ScopedMessagePipeHandle message_pipe,
18                     const MojoAsyncWaiter* waiter)
19    : error_handler_(NULL),
20      waiter_(waiter),
21      message_pipe_(message_pipe.Pass()),
22      incoming_receiver_(NULL),
23      async_wait_id_(0),
24      error_(false),
25      drop_writes_(false),
26      enforce_errors_from_incoming_receiver_(true),
27      destroyed_flag_(NULL) {
28  // Even though we don't have an incoming receiver, we still want to monitor
29  // the message pipe to know if is closed or encounters an error.
30  WaitToReadMore();
31}
32
33Connector::~Connector() {
34  if (destroyed_flag_)
35    *destroyed_flag_ = true;
36
37  CancelWait();
38}
39
40void Connector::CloseMessagePipe() {
41  CancelWait();
42  Close(message_pipe_.Pass());
43}
44
45ScopedMessagePipeHandle Connector::PassMessagePipe() {
46  CancelWait();
47  return message_pipe_.Pass();
48}
49
50bool Connector::WaitForIncomingMessage() {
51  if (error_)
52    return false;
53
54  MojoResult rv = Wait(message_pipe_.get(),
55                       MOJO_HANDLE_SIGNAL_READABLE,
56                       MOJO_DEADLINE_INDEFINITE);
57  if (rv != MOJO_RESULT_OK) {
58    NotifyError();
59    return false;
60  }
61  mojo_ignore_result(ReadSingleMessage(&rv));
62  return (rv == MOJO_RESULT_OK);
63}
64
65bool Connector::Accept(Message* message) {
66  MOJO_CHECK(message_pipe_.is_valid());
67
68  if (error_)
69    return false;
70
71  if (drop_writes_)
72    return true;
73
74  MojoResult rv = WriteMessageRaw(
75      message_pipe_.get(),
76      message->data(),
77      message->data_num_bytes(),
78      message->mutable_handles()->empty() ? NULL :
79          reinterpret_cast<const MojoHandle*>(
80              &message->mutable_handles()->front()),
81      static_cast<uint32_t>(message->mutable_handles()->size()),
82      MOJO_WRITE_MESSAGE_FLAG_NONE);
83
84  switch (rv) {
85    case MOJO_RESULT_OK:
86      // The handles were successfully transferred, so we don't need the message
87      // to track their lifetime any longer.
88      message->mutable_handles()->clear();
89      break;
90    case MOJO_RESULT_FAILED_PRECONDITION:
91      // There's no point in continuing to write to this pipe since the other
92      // end is gone. Avoid writing any future messages. Hide write failures
93      // from the caller since we'd like them to continue consuming any backlog
94      // of incoming messages before regarding the message pipe as closed.
95      drop_writes_ = true;
96      break;
97    case MOJO_RESULT_BUSY:
98      // We'd get a "busy" result if one of the message's handles is:
99      //   - |message_pipe_|'s own handle;
100      //   - simultaneously being used on another thread; or
101      //   - in a "busy" state that prohibits it from being transferred (e.g.,
102      //     a data pipe handle in the middle of a two-phase read/write,
103      //     regardless of which thread that two-phase read/write is happening
104      //     on).
105      // TODO(vtl): I wonder if this should be a |MOJO_DCHECK()|. (But, until
106      // crbug.com/389666, etc. are resolved, this will make tests fail quickly
107      // rather than hanging.)
108      MOJO_CHECK(false) << "Race condition or other bug detected";
109      return false;
110    default:
111      // This particular write was rejected, presumably because of bad input.
112      // The pipe is not necessarily in a bad state.
113      return false;
114  }
115  return true;
116}
117
118// static
119void Connector::CallOnHandleReady(void* closure, MojoResult result) {
120  Connector* self = static_cast<Connector*>(closure);
121  self->OnHandleReady(result);
122}
123
124void Connector::OnHandleReady(MojoResult result) {
125  MOJO_CHECK(async_wait_id_ != 0);
126  async_wait_id_ = 0;
127  if (result != MOJO_RESULT_OK) {
128    NotifyError();
129    return;
130  }
131  ReadAllAvailableMessages();
132  // At this point, this object might have been deleted. Return.
133}
134
135void Connector::WaitToReadMore() {
136  MOJO_CHECK(!async_wait_id_);
137  async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
138                                      MOJO_HANDLE_SIGNAL_READABLE,
139                                      MOJO_DEADLINE_INDEFINITE,
140                                      &Connector::CallOnHandleReady,
141                                      this);
142}
143
144bool Connector::ReadSingleMessage(MojoResult* read_result) {
145  bool receiver_result = false;
146
147  // Detect if |this| was destroyed during message dispatch. Allow for the
148  // possibility of re-entering ReadMore() through message dispatch.
149  bool was_destroyed_during_dispatch = false;
150  bool* previous_destroyed_flag = destroyed_flag_;
151  destroyed_flag_ = &was_destroyed_during_dispatch;
152
153  MojoResult rv = ReadAndDispatchMessage(
154      message_pipe_.get(), incoming_receiver_, &receiver_result);
155  if (read_result)
156    *read_result = rv;
157
158  if (was_destroyed_during_dispatch) {
159    if (previous_destroyed_flag)
160      *previous_destroyed_flag = true;  // Propagate flag.
161    return false;
162  }
163  destroyed_flag_ = previous_destroyed_flag;
164
165  if (rv == MOJO_RESULT_SHOULD_WAIT)
166    return true;
167
168  if (rv != MOJO_RESULT_OK ||
169      (enforce_errors_from_incoming_receiver_ && !receiver_result)) {
170    NotifyError();
171    return false;
172  }
173  return true;
174}
175
176void Connector::ReadAllAvailableMessages() {
177  while (!error_) {
178    MojoResult rv;
179
180    // Return immediately if |this| was destroyed. Do not touch any members!
181    if (!ReadSingleMessage(&rv))
182      return;
183
184    if (rv == MOJO_RESULT_SHOULD_WAIT) {
185      WaitToReadMore();
186      break;
187    }
188  }
189}
190
191void Connector::CancelWait() {
192  if (!async_wait_id_)
193    return;
194
195  waiter_->CancelWait(async_wait_id_);
196  async_wait_id_ = 0;
197}
198
199void Connector::NotifyError() {
200  error_ = true;
201  CancelWait();
202  if (error_handler_)
203    error_handler_->OnConnectionError();
204}
205
206}  // namespace internal
207}  // namespace mojo
208