1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "device/serial/data_source_sender.h"
6
7#include <limits>
8
9#include "base/bind.h"
10#include "base/message_loop/message_loop.h"
11#include "device/serial/async_waiter.h"
12
13namespace device {
14
15// Represents a send that is not yet fulfilled.
16class DataSourceSender::PendingSend {
17 public:
18  PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
19
20  // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
21  // one of Done() and DoneWithError() will be called with the result.
22  void GetData(void* data, uint32_t num_bytes);
23
24 private:
25  class Buffer;
26  // Reports a successful write of |bytes_written|.
27  void Done(uint32_t bytes_written);
28
29  // Reports a partially successful or unsuccessful write of |bytes_written|
30  // with an error of |error|.
31  void DoneWithError(uint32_t bytes_written, int32_t error);
32
33  // The DataSourceSender that owns this.
34  DataSourceSender* sender_;
35
36  // The callback to call to get data.
37  ReadyCallback callback_;
38
39  // Whether the buffer specified by GetData() has been passed to |callback_|,
40  // but has not yet called Done() or DoneWithError().
41  bool buffer_in_use_;
42};
43
44// A Writable implementation that provides a view of a data pipe owned by a
45// DataSourceSender.
46class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
47 public:
48  Buffer(scoped_refptr<DataSourceSender> sender,
49         PendingSend* send,
50         char* buffer,
51         uint32_t buffer_size);
52  virtual ~Buffer();
53
54  // WritableBuffer overrides.
55  virtual char* GetData() OVERRIDE;
56  virtual uint32_t GetSize() OVERRIDE;
57  virtual void Done(uint32_t bytes_written) OVERRIDE;
58  virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE;
59
60 private:
61  // The DataSourceSender whose data pipe we are providing a view.
62  scoped_refptr<DataSourceSender> sender_;
63
64  // The PendingSend to which this buffer has been created in response.
65  PendingSend* pending_send_;
66
67  char* buffer_;
68  uint32_t buffer_size_;
69};
70
71DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
72                                   const ErrorCallback& error_callback)
73    : ready_callback_(ready_callback),
74      error_callback_(error_callback),
75      bytes_sent_(0),
76      shut_down_(false) {
77  DCHECK(!ready_callback.is_null() && !error_callback.is_null());
78}
79
80void DataSourceSender::ShutDown() {
81  shut_down_ = true;
82  waiter_.reset();
83  ready_callback_.Reset();
84  error_callback_.Reset();
85}
86
87DataSourceSender::~DataSourceSender() {
88  DCHECK(shut_down_);
89}
90
91void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
92  // This should never occur. |handle_| is only valid and |pending_send_| is
93  // only set after Init is called.
94  if (pending_send_ || handle_.is_valid() || shut_down_) {
95    DispatchFatalError();
96    return;
97  }
98  handle_ = handle.Pass();
99  pending_send_.reset(new PendingSend(this, ready_callback_));
100  StartWaiting();
101}
102
103void DataSourceSender::Resume() {
104  if (pending_send_ || !handle_.is_valid()) {
105    DispatchFatalError();
106    return;
107  }
108
109  pending_send_.reset(new PendingSend(this, ready_callback_));
110  StartWaiting();
111}
112
113void DataSourceSender::OnConnectionError() {
114  DispatchFatalError();
115}
116
117void DataSourceSender::StartWaiting() {
118  DCHECK(pending_send_ && !waiter_);
119  waiter_.reset(
120      new AsyncWaiter(handle_.get(),
121                      MOJO_HANDLE_SIGNAL_WRITABLE,
122                      base::Bind(&DataSourceSender::OnDoneWaiting, this)));
123}
124
125void DataSourceSender::OnDoneWaiting(MojoResult result) {
126  DCHECK(pending_send_ && !shut_down_ && waiter_);
127  waiter_.reset();
128  if (result != MOJO_RESULT_OK) {
129    DispatchFatalError();
130    return;
131  }
132  void* data = NULL;
133  uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
134  result = mojo::BeginWriteDataRaw(
135      handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
136  if (result != MOJO_RESULT_OK) {
137    DispatchFatalError();
138    return;
139  }
140  pending_send_->GetData(static_cast<char*>(data), num_bytes);
141}
142
143void DataSourceSender::Done(uint32_t bytes_written) {
144  DoneInternal(bytes_written);
145  if (!shut_down_)
146    StartWaiting();
147}
148
149void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
150  DoneInternal(bytes_written);
151  pending_send_.reset();
152  if (!shut_down_)
153    client()->OnError(bytes_sent_, error);
154  // We don't call StartWaiting here so we don't send any additional data until
155  // Resume() is called.
156}
157
158void DataSourceSender::DoneInternal(uint32_t bytes_written) {
159  DCHECK(pending_send_);
160  if (shut_down_)
161    return;
162
163  bytes_sent_ += bytes_written;
164  MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
165  if (result != MOJO_RESULT_OK) {
166    DispatchFatalError();
167    return;
168  }
169}
170
171void DataSourceSender::DispatchFatalError() {
172  if (shut_down_)
173    return;
174
175  error_callback_.Run();
176  ShutDown();
177}
178
179DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
180                                           const ReadyCallback& callback)
181    : sender_(sender), callback_(callback), buffer_in_use_(false) {
182}
183
184void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
185  DCHECK(!buffer_in_use_);
186  buffer_in_use_ = true;
187  callback_.Run(scoped_ptr<WritableBuffer>(
188      new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
189}
190
191void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
192  DCHECK(buffer_in_use_);
193  buffer_in_use_ = false;
194  sender_->Done(bytes_written);
195}
196
197void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
198                                                  int32_t error) {
199  DCHECK(buffer_in_use_);
200  buffer_in_use_ = false;
201  sender_->DoneWithError(bytes_written, error);
202}
203
204DataSourceSender::PendingSend::Buffer::Buffer(
205    scoped_refptr<DataSourceSender> sender,
206    PendingSend* send,
207    char* buffer,
208    uint32_t buffer_size)
209    : sender_(sender),
210      pending_send_(send),
211      buffer_(buffer),
212      buffer_size_(buffer_size) {
213}
214
215DataSourceSender::PendingSend::Buffer::~Buffer() {
216  if (sender_.get())
217    pending_send_->Done(0);
218}
219
220char* DataSourceSender::PendingSend::Buffer::GetData() {
221  return buffer_;
222}
223
224uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
225  return buffer_size_;
226}
227
228void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
229  DCHECK(sender_.get());
230  pending_send_->Done(bytes_written);
231  sender_ = NULL;
232  pending_send_ = NULL;
233  buffer_ = NULL;
234  buffer_size_ = 0;
235}
236
237void DataSourceSender::PendingSend::Buffer::DoneWithError(
238    uint32_t bytes_written,
239    int32_t error) {
240  DCHECK(sender_.get());
241  pending_send_->DoneWithError(bytes_written, error);
242  sender_ = NULL;
243  pending_send_ = NULL;
244  buffer_ = NULL;
245  buffer_size_ = 0;
246}
247
248}  // namespace device
249