1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "device/serial/data_sender.h" 6 7#include "base/bind.h" 8#include "base/message_loop/message_loop.h" 9#include "device/serial/async_waiter.h" 10 11namespace device { 12 13// Represents a send that is not yet fulfilled. 14class DataSender::PendingSend { 15 public: 16 PendingSend(const base::StringPiece& data, 17 const DataSentCallback& callback, 18 const SendErrorCallback& error_callback, 19 int32_t fatal_error_value); 20 21 // Invoked to report that |num_bytes| of data have been sent. Subtracts the 22 // number of bytes that were part of this send from |num_bytes|. Returns 23 // whether this send has been completed. If this send has been completed, this 24 // calls |callback_|. 25 bool ReportBytesSent(uint32_t* num_bytes); 26 27 // Invoked to report that |num_bytes| of data have been sent and then an 28 // error, |error| was encountered. Subtracts the number of bytes that were 29 // part of this send from |num_bytes|. If this send was not completed before 30 // the error, this calls |error_callback_| to report the error. Otherwise, 31 // this calls |callback_|. Returns the number of bytes sent but not acked. 32 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); 33 34 // Reports |fatal_error_value_| to |receive_error_callback_|. 35 void DispatchFatalError(); 36 37 // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK 38 // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent 39 // or the error if one is encountered writing to |handle|. 40 MojoResult SendData(mojo::DataPipeProducerHandle handle); 41 42 private: 43 // Invoked to update |bytes_acked_| and |num_bytes|. 44 void ReportBytesSentInternal(uint32_t* num_bytes); 45 46 // The data to send. 47 const base::StringPiece data_; 48 49 // The callback to report success. 50 const DataSentCallback callback_; 51 52 // The callback to report errors. 53 const SendErrorCallback error_callback_; 54 55 // The error value to report when DispatchFatalError() is called. 56 const int32_t fatal_error_value_; 57 58 // The number of bytes sent to the data pipe. 59 uint32_t bytes_sent_; 60 61 // The number of bytes acked. 62 uint32_t bytes_acked_; 63}; 64 65DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, 66 uint32_t buffer_size, 67 int32_t fatal_error_value) 68 : sink_(sink.Pass()), 69 fatal_error_value_(fatal_error_value), 70 shut_down_(false) { 71 sink_.set_error_handler(this); 72 MojoCreateDataPipeOptions options = { 73 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, 74 }; 75 options.struct_size = sizeof(options); 76 mojo::ScopedDataPipeConsumerHandle remote_handle; 77 MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle); 78 DCHECK_EQ(MOJO_RESULT_OK, result); 79 sink_->Init(remote_handle.Pass()); 80 sink_.set_client(this); 81} 82 83DataSender::~DataSender() { 84 ShutDown(); 85} 86 87bool DataSender::Send(const base::StringPiece& data, 88 const DataSentCallback& callback, 89 const SendErrorCallback& error_callback) { 90 DCHECK(!callback.is_null() && !error_callback.is_null()); 91 if (!pending_cancel_.is_null() || shut_down_) 92 return false; 93 94 pending_sends_.push(linked_ptr<PendingSend>( 95 new PendingSend(data, callback, error_callback, fatal_error_value_))); 96 SendInternal(); 97 return true; 98} 99 100bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { 101 DCHECK(!callback.is_null()); 102 if (!pending_cancel_.is_null() || shut_down_) 103 return false; 104 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) { 105 base::MessageLoop::current()->PostTask(FROM_HERE, callback); 106 return true; 107 } 108 109 pending_cancel_ = callback; 110 sink_->Cancel(error); 111 return true; 112} 113 114void DataSender::ReportBytesSent(uint32_t bytes_sent) { 115 if (shut_down_) 116 return; 117 118 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && 119 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { 120 sends_awaiting_ack_.pop(); 121 } 122 if (bytes_sent > 0 && !pending_sends_.empty()) { 123 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); 124 DCHECK(!finished); 125 if (finished) { 126 ShutDown(); 127 return; 128 } 129 } 130 if (bytes_sent != 0) { 131 ShutDown(); 132 return; 133 } 134 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) 135 RunCancelCallback(); 136} 137 138void DataSender::ReportBytesSentAndError( 139 uint32_t bytes_sent, 140 int32_t error, 141 const mojo::Callback<void(uint32_t)>& callback) { 142 if (shut_down_) 143 return; 144 145 uint32_t bytes_to_flush = 0; 146 while (!sends_awaiting_ack_.empty()) { 147 bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError( 148 &bytes_sent, error); 149 sends_awaiting_ack_.pop(); 150 } 151 while (!pending_sends_.empty()) { 152 bytes_to_flush += 153 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); 154 pending_sends_.pop(); 155 } 156 callback.Run(bytes_to_flush); 157 RunCancelCallback(); 158} 159 160void DataSender::OnConnectionError() { 161 ShutDown(); 162} 163 164void DataSender::SendInternal() { 165 while (!pending_sends_.empty()) { 166 MojoResult result = pending_sends_.front()->SendData(handle_.get()); 167 if (result == MOJO_RESULT_OK) { 168 sends_awaiting_ack_.push(pending_sends_.front()); 169 pending_sends_.pop(); 170 } else if (result == MOJO_RESULT_SHOULD_WAIT) { 171 waiter_.reset(new AsyncWaiter( 172 handle_.get(), 173 MOJO_HANDLE_SIGNAL_WRITABLE, 174 base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this)))); 175 return; 176 } else { 177 ShutDown(); 178 return; 179 } 180 } 181} 182 183void DataSender::OnDoneWaiting(MojoResult result) { 184 waiter_.reset(); 185 if (result != MOJO_RESULT_OK) { 186 ShutDown(); 187 return; 188 } 189 SendInternal(); 190} 191 192void DataSender::RunCancelCallback() { 193 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); 194 if (pending_cancel_.is_null()) 195 return; 196 197 base::MessageLoop::current()->PostTask(FROM_HERE, 198 base::Bind(pending_cancel_)); 199 pending_cancel_.Reset(); 200} 201 202void DataSender::ShutDown() { 203 waiter_.reset(); 204 shut_down_ = true; 205 while (!pending_sends_.empty()) { 206 pending_sends_.front()->DispatchFatalError(); 207 pending_sends_.pop(); 208 } 209 while (!sends_awaiting_ack_.empty()) { 210 sends_awaiting_ack_.front()->DispatchFatalError(); 211 sends_awaiting_ack_.pop(); 212 } 213 RunCancelCallback(); 214} 215 216DataSender::PendingSend::PendingSend(const base::StringPiece& data, 217 const DataSentCallback& callback, 218 const SendErrorCallback& error_callback, 219 int32_t fatal_error_value) 220 : data_(data), 221 callback_(callback), 222 error_callback_(error_callback), 223 fatal_error_value_(fatal_error_value), 224 bytes_sent_(0), 225 bytes_acked_(0) { 226} 227 228bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) { 229 ReportBytesSentInternal(num_bytes); 230 if (bytes_acked_ < data_.size()) 231 return false; 232 233 base::MessageLoop::current()->PostTask(FROM_HERE, 234 base::Bind(callback_, bytes_acked_)); 235 return true; 236} 237 238uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes, 239 int32_t error) { 240 ReportBytesSentInternal(num_bytes); 241 if (*num_bytes > 0) { 242 base::MessageLoop::current()->PostTask(FROM_HERE, 243 base::Bind(callback_, bytes_acked_)); 244 return 0; 245 } 246 base::MessageLoop::current()->PostTask( 247 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); 248 return bytes_sent_ - bytes_acked_; 249} 250 251void DataSender::PendingSend::DispatchFatalError() { 252 base::MessageLoop::current()->PostTask( 253 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); 254} 255 256MojoResult DataSender::PendingSend::SendData( 257 mojo::DataPipeProducerHandle handle) { 258 uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_; 259 MojoResult result = mojo::WriteDataRaw(handle, 260 data_.data() + bytes_sent_, 261 &bytes_to_send, 262 MOJO_WRITE_DATA_FLAG_NONE); 263 if (result != MOJO_RESULT_OK) 264 return result; 265 266 bytes_sent_ += bytes_to_send; 267 return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; 268} 269 270void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { 271 bytes_acked_ += *num_bytes; 272 if (bytes_acked_ > bytes_sent_) { 273 *num_bytes = bytes_acked_ - bytes_sent_; 274 bytes_acked_ = bytes_sent_; 275 } else { 276 *num_bytes = 0; 277 } 278} 279 280} // namespace device 281