1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "device/serial/data_source_sender.h" 6 7#include <limits> 8 9#include "base/bind.h" 10#include "base/message_loop/message_loop.h" 11#include "device/serial/async_waiter.h" 12 13namespace device { 14 15// Represents a send that is not yet fulfilled. 16class DataSourceSender::PendingSend { 17 public: 18 PendingSend(DataSourceSender* sender, const ReadyCallback& callback); 19 20 // Asynchronously fills |data| with up to |num_bytes| of data. Following this, 21 // one of Done() and DoneWithError() will be called with the result. 22 void GetData(void* data, uint32_t num_bytes); 23 24 private: 25 class Buffer; 26 // Reports a successful write of |bytes_written|. 27 void Done(uint32_t bytes_written); 28 29 // Reports a partially successful or unsuccessful write of |bytes_written| 30 // with an error of |error|. 31 void DoneWithError(uint32_t bytes_written, int32_t error); 32 33 // The DataSourceSender that owns this. 34 DataSourceSender* sender_; 35 36 // The callback to call to get data. 37 ReadyCallback callback_; 38 39 // Whether the buffer specified by GetData() has been passed to |callback_|, 40 // but has not yet called Done() or DoneWithError(). 41 bool buffer_in_use_; 42}; 43 44// A Writable implementation that provides a view of a data pipe owned by a 45// DataSourceSender. 46class DataSourceSender::PendingSend::Buffer : public WritableBuffer { 47 public: 48 Buffer(scoped_refptr<DataSourceSender> sender, 49 PendingSend* send, 50 char* buffer, 51 uint32_t buffer_size); 52 virtual ~Buffer(); 53 54 // WritableBuffer overrides. 55 virtual char* GetData() OVERRIDE; 56 virtual uint32_t GetSize() OVERRIDE; 57 virtual void Done(uint32_t bytes_written) OVERRIDE; 58 virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE; 59 60 private: 61 // The DataSourceSender whose data pipe we are providing a view. 62 scoped_refptr<DataSourceSender> sender_; 63 64 // The PendingSend to which this buffer has been created in response. 65 PendingSend* pending_send_; 66 67 char* buffer_; 68 uint32_t buffer_size_; 69}; 70 71DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, 72 const ErrorCallback& error_callback) 73 : ready_callback_(ready_callback), 74 error_callback_(error_callback), 75 bytes_sent_(0), 76 shut_down_(false) { 77 DCHECK(!ready_callback.is_null() && !error_callback.is_null()); 78} 79 80void DataSourceSender::ShutDown() { 81 shut_down_ = true; 82 waiter_.reset(); 83 ready_callback_.Reset(); 84 error_callback_.Reset(); 85} 86 87DataSourceSender::~DataSourceSender() { 88 DCHECK(shut_down_); 89} 90 91void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { 92 // This should never occur. |handle_| is only valid and |pending_send_| is 93 // only set after Init is called. 94 if (pending_send_ || handle_.is_valid() || shut_down_) { 95 DispatchFatalError(); 96 return; 97 } 98 handle_ = handle.Pass(); 99 pending_send_.reset(new PendingSend(this, ready_callback_)); 100 StartWaiting(); 101} 102 103void DataSourceSender::Resume() { 104 if (pending_send_ || !handle_.is_valid()) { 105 DispatchFatalError(); 106 return; 107 } 108 109 pending_send_.reset(new PendingSend(this, ready_callback_)); 110 StartWaiting(); 111} 112 113void DataSourceSender::OnConnectionError() { 114 DispatchFatalError(); 115} 116 117void DataSourceSender::StartWaiting() { 118 DCHECK(pending_send_ && !waiter_); 119 waiter_.reset( 120 new AsyncWaiter(handle_.get(), 121 MOJO_HANDLE_SIGNAL_WRITABLE, 122 base::Bind(&DataSourceSender::OnDoneWaiting, this))); 123} 124 125void DataSourceSender::OnDoneWaiting(MojoResult result) { 126 DCHECK(pending_send_ && !shut_down_ && waiter_); 127 waiter_.reset(); 128 if (result != MOJO_RESULT_OK) { 129 DispatchFatalError(); 130 return; 131 } 132 void* data = NULL; 133 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); 134 result = mojo::BeginWriteDataRaw( 135 handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); 136 if (result != MOJO_RESULT_OK) { 137 DispatchFatalError(); 138 return; 139 } 140 pending_send_->GetData(static_cast<char*>(data), num_bytes); 141} 142 143void DataSourceSender::Done(uint32_t bytes_written) { 144 DoneInternal(bytes_written); 145 if (!shut_down_) 146 StartWaiting(); 147} 148 149void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) { 150 DoneInternal(bytes_written); 151 pending_send_.reset(); 152 if (!shut_down_) 153 client()->OnError(bytes_sent_, error); 154 // We don't call StartWaiting here so we don't send any additional data until 155 // Resume() is called. 156} 157 158void DataSourceSender::DoneInternal(uint32_t bytes_written) { 159 DCHECK(pending_send_); 160 if (shut_down_) 161 return; 162 163 bytes_sent_ += bytes_written; 164 MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); 165 if (result != MOJO_RESULT_OK) { 166 DispatchFatalError(); 167 return; 168 } 169} 170 171void DataSourceSender::DispatchFatalError() { 172 if (shut_down_) 173 return; 174 175 error_callback_.Run(); 176 ShutDown(); 177} 178 179DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, 180 const ReadyCallback& callback) 181 : sender_(sender), callback_(callback), buffer_in_use_(false) { 182} 183 184void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { 185 DCHECK(!buffer_in_use_); 186 buffer_in_use_ = true; 187 callback_.Run(scoped_ptr<WritableBuffer>( 188 new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); 189} 190 191void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { 192 DCHECK(buffer_in_use_); 193 buffer_in_use_ = false; 194 sender_->Done(bytes_written); 195} 196 197void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, 198 int32_t error) { 199 DCHECK(buffer_in_use_); 200 buffer_in_use_ = false; 201 sender_->DoneWithError(bytes_written, error); 202} 203 204DataSourceSender::PendingSend::Buffer::Buffer( 205 scoped_refptr<DataSourceSender> sender, 206 PendingSend* send, 207 char* buffer, 208 uint32_t buffer_size) 209 : sender_(sender), 210 pending_send_(send), 211 buffer_(buffer), 212 buffer_size_(buffer_size) { 213} 214 215DataSourceSender::PendingSend::Buffer::~Buffer() { 216 if (sender_.get()) 217 pending_send_->Done(0); 218} 219 220char* DataSourceSender::PendingSend::Buffer::GetData() { 221 return buffer_; 222} 223 224uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { 225 return buffer_size_; 226} 227 228void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { 229 DCHECK(sender_.get()); 230 pending_send_->Done(bytes_written); 231 sender_ = NULL; 232 pending_send_ = NULL; 233 buffer_ = NULL; 234 buffer_size_ = 0; 235} 236 237void DataSourceSender::PendingSend::Buffer::DoneWithError( 238 uint32_t bytes_written, 239 int32_t error) { 240 DCHECK(sender_.get()); 241 pending_send_->DoneWithError(bytes_written, error); 242 sender_ = NULL; 243 pending_send_ = NULL; 244 buffer_ = NULL; 245 buffer_size_ = 0; 246} 247 248} // namespace device 249