1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "device/serial/data_sender.h"
6
7#include "base/bind.h"
8#include "base/message_loop/message_loop.h"
9#include "device/serial/async_waiter.h"
10
11namespace device {
12
13// Represents a send that is not yet fulfilled.
14class DataSender::PendingSend {
15 public:
16  PendingSend(const base::StringPiece& data,
17              const DataSentCallback& callback,
18              const SendErrorCallback& error_callback,
19              int32_t fatal_error_value);
20
21  // Invoked to report that |num_bytes| of data have been sent. Subtracts the
22  // number of bytes that were part of this send from |num_bytes|. Returns
23  // whether this send has been completed. If this send has been completed, this
24  // calls |callback_|.
25  bool ReportBytesSent(uint32_t* num_bytes);
26
27  // Invoked to report that |num_bytes| of data have been sent and then an
28  // error, |error| was encountered. Subtracts the number of bytes that were
29  // part of this send from |num_bytes|. If this send was not completed before
30  // the error, this calls |error_callback_| to report the error. Otherwise,
31  // this calls |callback_|. Returns the number of bytes sent but not acked.
32  uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
33
34  // Reports |fatal_error_value_| to |receive_error_callback_|.
35  void DispatchFatalError();
36
37  // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
38  // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
39  // or the error if one is encountered writing to |handle|.
40  MojoResult SendData(mojo::DataPipeProducerHandle handle);
41
42 private:
43  // Invoked to update |bytes_acked_| and |num_bytes|.
44  void ReportBytesSentInternal(uint32_t* num_bytes);
45
46  // The data to send.
47  const base::StringPiece data_;
48
49  // The callback to report success.
50  const DataSentCallback callback_;
51
52  // The callback to report errors.
53  const SendErrorCallback error_callback_;
54
55  // The error value to report when DispatchFatalError() is called.
56  const int32_t fatal_error_value_;
57
58  // The number of bytes sent to the data pipe.
59  uint32_t bytes_sent_;
60
61  // The number of bytes acked.
62  uint32_t bytes_acked_;
63};
64
65DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
66                       uint32_t buffer_size,
67                       int32_t fatal_error_value)
68    : sink_(sink.Pass()),
69      fatal_error_value_(fatal_error_value),
70      shut_down_(false) {
71  sink_.set_error_handler(this);
72  MojoCreateDataPipeOptions options = {
73      sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
74  };
75  options.struct_size = sizeof(options);
76  mojo::ScopedDataPipeConsumerHandle remote_handle;
77  MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
78  DCHECK_EQ(MOJO_RESULT_OK, result);
79  sink_->Init(remote_handle.Pass());
80  sink_.set_client(this);
81}
82
83DataSender::~DataSender() {
84  ShutDown();
85}
86
87bool DataSender::Send(const base::StringPiece& data,
88                      const DataSentCallback& callback,
89                      const SendErrorCallback& error_callback) {
90  DCHECK(!callback.is_null() && !error_callback.is_null());
91  if (!pending_cancel_.is_null() || shut_down_)
92    return false;
93
94  pending_sends_.push(linked_ptr<PendingSend>(
95      new PendingSend(data, callback, error_callback, fatal_error_value_)));
96  SendInternal();
97  return true;
98}
99
100bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
101  DCHECK(!callback.is_null());
102  if (!pending_cancel_.is_null() || shut_down_)
103    return false;
104  if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
105    base::MessageLoop::current()->PostTask(FROM_HERE, callback);
106    return true;
107  }
108
109  pending_cancel_ = callback;
110  sink_->Cancel(error);
111  return true;
112}
113
114void DataSender::ReportBytesSent(uint32_t bytes_sent) {
115  if (shut_down_)
116    return;
117
118  while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
119         sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
120    sends_awaiting_ack_.pop();
121  }
122  if (bytes_sent > 0 && !pending_sends_.empty()) {
123    bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
124    DCHECK(!finished);
125    if (finished) {
126      ShutDown();
127      return;
128    }
129  }
130  if (bytes_sent != 0) {
131    ShutDown();
132    return;
133  }
134  if (pending_sends_.empty() && sends_awaiting_ack_.empty())
135    RunCancelCallback();
136}
137
138void DataSender::ReportBytesSentAndError(
139    uint32_t bytes_sent,
140    int32_t error,
141    const mojo::Callback<void(uint32_t)>& callback) {
142  if (shut_down_)
143    return;
144
145  uint32_t bytes_to_flush = 0;
146  while (!sends_awaiting_ack_.empty()) {
147    bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
148        &bytes_sent, error);
149    sends_awaiting_ack_.pop();
150  }
151  while (!pending_sends_.empty()) {
152    bytes_to_flush +=
153        pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
154    pending_sends_.pop();
155  }
156  callback.Run(bytes_to_flush);
157  RunCancelCallback();
158}
159
160void DataSender::OnConnectionError() {
161  ShutDown();
162}
163
164void DataSender::SendInternal() {
165  while (!pending_sends_.empty()) {
166    MojoResult result = pending_sends_.front()->SendData(handle_.get());
167    if (result == MOJO_RESULT_OK) {
168      sends_awaiting_ack_.push(pending_sends_.front());
169      pending_sends_.pop();
170    } else if (result == MOJO_RESULT_SHOULD_WAIT) {
171      waiter_.reset(new AsyncWaiter(
172          handle_.get(),
173          MOJO_HANDLE_SIGNAL_WRITABLE,
174          base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
175      return;
176    } else {
177      ShutDown();
178      return;
179    }
180  }
181}
182
183void DataSender::OnDoneWaiting(MojoResult result) {
184  waiter_.reset();
185  if (result != MOJO_RESULT_OK) {
186    ShutDown();
187    return;
188  }
189  SendInternal();
190}
191
192void DataSender::RunCancelCallback() {
193  DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
194  if (pending_cancel_.is_null())
195    return;
196
197  base::MessageLoop::current()->PostTask(FROM_HERE,
198                                         base::Bind(pending_cancel_));
199  pending_cancel_.Reset();
200}
201
202void DataSender::ShutDown() {
203  waiter_.reset();
204  shut_down_ = true;
205  while (!pending_sends_.empty()) {
206    pending_sends_.front()->DispatchFatalError();
207    pending_sends_.pop();
208  }
209  while (!sends_awaiting_ack_.empty()) {
210    sends_awaiting_ack_.front()->DispatchFatalError();
211    sends_awaiting_ack_.pop();
212  }
213  RunCancelCallback();
214}
215
216DataSender::PendingSend::PendingSend(const base::StringPiece& data,
217                                     const DataSentCallback& callback,
218                                     const SendErrorCallback& error_callback,
219                                     int32_t fatal_error_value)
220    : data_(data),
221      callback_(callback),
222      error_callback_(error_callback),
223      fatal_error_value_(fatal_error_value),
224      bytes_sent_(0),
225      bytes_acked_(0) {
226}
227
228bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
229  ReportBytesSentInternal(num_bytes);
230  if (bytes_acked_ < data_.size())
231    return false;
232
233  base::MessageLoop::current()->PostTask(FROM_HERE,
234                                         base::Bind(callback_, bytes_acked_));
235  return true;
236}
237
238uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
239                                                          int32_t error) {
240  ReportBytesSentInternal(num_bytes);
241  if (*num_bytes > 0) {
242    base::MessageLoop::current()->PostTask(FROM_HERE,
243                                           base::Bind(callback_, bytes_acked_));
244    return 0;
245  }
246  base::MessageLoop::current()->PostTask(
247      FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
248  return bytes_sent_ - bytes_acked_;
249}
250
251void DataSender::PendingSend::DispatchFatalError() {
252  base::MessageLoop::current()->PostTask(
253      FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
254}
255
256MojoResult DataSender::PendingSend::SendData(
257    mojo::DataPipeProducerHandle handle) {
258  uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
259  MojoResult result = mojo::WriteDataRaw(handle,
260                                         data_.data() + bytes_sent_,
261                                         &bytes_to_send,
262                                         MOJO_WRITE_DATA_FLAG_NONE);
263  if (result != MOJO_RESULT_OK)
264    return result;
265
266  bytes_sent_ += bytes_to_send;
267  return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
268}
269
270void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
271  bytes_acked_ += *num_bytes;
272  if (bytes_acked_ > bytes_sent_) {
273    *num_bytes = bytes_acked_ - bytes_sent_;
274    bytes_acked_ = bytes_sent_;
275  } else {
276    *num_bytes = 0;
277  }
278}
279
280}  // namespace device
281