1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "content/browser/byte_stream.h" 6 7#include <deque> 8#include <set> 9#include <utility> 10 11#include "base/bind.h" 12#include "base/location.h" 13#include "base/memory/ref_counted.h" 14#include "base/sequenced_task_runner.h" 15 16namespace content { 17namespace { 18 19typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > 20ContentVector; 21 22class ByteStreamReaderImpl; 23 24// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be 25// cleared in an object destructor and accessed to check for object 26// existence. We can't use weak pointers because they're tightly tied to 27// threads rather than task runners. 28// TODO(rdsmith): A better solution would be extending weak pointers 29// to support SequencedTaskRunners. 30struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { 31 public: 32 LifetimeFlag() : is_alive(true) { } 33 bool is_alive; 34 35 protected: 36 friend class base::RefCountedThreadSafe<LifetimeFlag>; 37 virtual ~LifetimeFlag() { } 38 39 private: 40 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag); 41}; 42 43// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and 44// SetPeer may happen anywhere; all other operations on each class must 45// happen in the context of their SequencedTaskRunner. 46class ByteStreamWriterImpl : public ByteStreamWriter { 47 public: 48 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, 49 scoped_refptr<LifetimeFlag> lifetime_flag, 50 size_t buffer_size); 51 virtual ~ByteStreamWriterImpl(); 52 53 // Must be called before any operations are performed. 54 void SetPeer(ByteStreamReaderImpl* peer, 55 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 56 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 57 58 // Overridden from ByteStreamWriter. 59 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, 60 size_t byte_count) OVERRIDE; 61 virtual void Flush() OVERRIDE; 62 virtual void Close(int status) OVERRIDE; 63 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; 64 65 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. 66 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, 67 ByteStreamWriterImpl* target, 68 size_t bytes_consumed); 69 70 private: 71 // Called from UpdateWindow when object existence has been validated. 72 void UpdateWindowInternal(size_t bytes_consumed); 73 74 void PostToPeer(bool complete, int status); 75 76 const size_t total_buffer_size_; 77 78 // All data objects in this class are only valid to access on 79 // this task runner except as otherwise noted. 80 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 81 82 // True while this object is alive. 83 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 84 85 base::Closure space_available_callback_; 86 ContentVector input_contents_; 87 size_t input_contents_size_; 88 89 // ** Peer information. 90 91 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; 92 93 // How much we've sent to the output that for flow control purposes we 94 // must assume hasn't been read yet. 95 size_t output_size_used_; 96 97 // Only valid to access on peer_task_runner_. 98 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; 99 100 // Only valid to access on peer_task_runner_ if 101 // |*peer_lifetime_flag_ == true| 102 ByteStreamReaderImpl* peer_; 103}; 104 105class ByteStreamReaderImpl : public ByteStreamReader { 106 public: 107 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, 108 scoped_refptr<LifetimeFlag> lifetime_flag, 109 size_t buffer_size); 110 virtual ~ByteStreamReaderImpl(); 111 112 // Must be called before any operations are performed. 113 void SetPeer(ByteStreamWriterImpl* peer, 114 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 115 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 116 117 // Overridden from ByteStreamReader. 118 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, 119 size_t* length) OVERRIDE; 120 virtual int GetStatus() const OVERRIDE; 121 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; 122 123 // PostTask target from |ByteStreamWriterImpl::Write| and 124 // |ByteStreamWriterImpl::Close|. 125 // Receive data from our peer. 126 // static because it may be called after the object it is targeting 127 // has been destroyed. It may not access |*target| 128 // if |*object_lifetime_flag| is false. 129 static void TransferData( 130 scoped_refptr<LifetimeFlag> object_lifetime_flag, 131 ByteStreamReaderImpl* target, 132 scoped_ptr<ContentVector> transfer_buffer, 133 size_t transfer_buffer_bytes, 134 bool source_complete, 135 int status); 136 137 private: 138 // Called from TransferData once object existence has been validated. 139 void TransferDataInternal( 140 scoped_ptr<ContentVector> transfer_buffer, 141 size_t transfer_buffer_bytes, 142 bool source_complete, 143 int status); 144 145 void MaybeUpdateInput(); 146 147 const size_t total_buffer_size_; 148 149 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 150 151 // True while this object is alive. 152 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 153 154 ContentVector available_contents_; 155 156 bool received_status_; 157 int status_; 158 159 base::Closure data_available_callback_; 160 161 // Time of last point at which data in stream transitioned from full 162 // to non-full. Nulled when a callback is sent. 163 base::Time last_non_full_time_; 164 165 // ** Peer information 166 167 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; 168 169 // How much has been removed from this class that we haven't told 170 // the input about yet. 171 size_t unreported_consumed_bytes_; 172 173 // Only valid to access on peer_task_runner_. 174 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; 175 176 // Only valid to access on peer_task_runner_ if 177 // |*peer_lifetime_flag_ == true| 178 ByteStreamWriterImpl* peer_; 179}; 180 181ByteStreamWriterImpl::ByteStreamWriterImpl( 182 scoped_refptr<base::SequencedTaskRunner> task_runner, 183 scoped_refptr<LifetimeFlag> lifetime_flag, 184 size_t buffer_size) 185 : total_buffer_size_(buffer_size), 186 my_task_runner_(task_runner), 187 my_lifetime_flag_(lifetime_flag), 188 input_contents_size_(0), 189 output_size_used_(0), 190 peer_(NULL) { 191 DCHECK(my_lifetime_flag_.get()); 192 my_lifetime_flag_->is_alive = true; 193} 194 195ByteStreamWriterImpl::~ByteStreamWriterImpl() { 196 my_lifetime_flag_->is_alive = false; 197} 198 199void ByteStreamWriterImpl::SetPeer( 200 ByteStreamReaderImpl* peer, 201 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 202 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 203 peer_ = peer; 204 peer_task_runner_ = peer_task_runner; 205 peer_lifetime_flag_ = peer_lifetime_flag; 206} 207 208bool ByteStreamWriterImpl::Write( 209 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { 210 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 211 212 input_contents_.push_back(std::make_pair(buffer, byte_count)); 213 input_contents_size_ += byte_count; 214 215 // Arbitrarily, we buffer to a third of the total size before sending. 216 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) 217 PostToPeer(false, 0); 218 219 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); 220} 221 222void ByteStreamWriterImpl::Flush() { 223 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 224 if (input_contents_size_ > 0) 225 PostToPeer(false, 0); 226} 227 228void ByteStreamWriterImpl::Close(int status) { 229 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 230 PostToPeer(true, status); 231} 232 233void ByteStreamWriterImpl::RegisterCallback( 234 const base::Closure& source_callback) { 235 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 236 space_available_callback_ = source_callback; 237} 238 239// static 240void ByteStreamWriterImpl::UpdateWindow( 241 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, 242 size_t bytes_consumed) { 243 // If the target object isn't alive anymore, we do nothing. 244 if (!lifetime_flag->is_alive) return; 245 246 target->UpdateWindowInternal(bytes_consumed); 247} 248 249void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { 250 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 251 DCHECK_GE(output_size_used_, bytes_consumed); 252 output_size_used_ -= bytes_consumed; 253 254 // Callback if we were above the limit and we're now <= to it. 255 size_t total_known_size_used = 256 input_contents_size_ + output_size_used_; 257 258 if (total_known_size_used <= total_buffer_size_ && 259 (total_known_size_used + bytes_consumed > total_buffer_size_) && 260 !space_available_callback_.is_null()) 261 space_available_callback_.Run(); 262} 263 264void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { 265 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 266 // Valid contexts in which to call. 267 DCHECK(complete || 0 != input_contents_size_); 268 269 scoped_ptr<ContentVector> transfer_buffer; 270 size_t buffer_size = 0; 271 if (0 != input_contents_size_) { 272 transfer_buffer.reset(new ContentVector); 273 transfer_buffer->swap(input_contents_); 274 buffer_size = input_contents_size_; 275 output_size_used_ += input_contents_size_; 276 input_contents_size_ = 0; 277 } 278 peer_task_runner_->PostTask( 279 FROM_HERE, base::Bind( 280 &ByteStreamReaderImpl::TransferData, 281 peer_lifetime_flag_, 282 peer_, 283 base::Passed(&transfer_buffer), 284 buffer_size, 285 complete, 286 status)); 287} 288 289ByteStreamReaderImpl::ByteStreamReaderImpl( 290 scoped_refptr<base::SequencedTaskRunner> task_runner, 291 scoped_refptr<LifetimeFlag> lifetime_flag, 292 size_t buffer_size) 293 : total_buffer_size_(buffer_size), 294 my_task_runner_(task_runner), 295 my_lifetime_flag_(lifetime_flag), 296 received_status_(false), 297 status_(0), 298 unreported_consumed_bytes_(0), 299 peer_(NULL) { 300 DCHECK(my_lifetime_flag_.get()); 301 my_lifetime_flag_->is_alive = true; 302} 303 304ByteStreamReaderImpl::~ByteStreamReaderImpl() { 305 my_lifetime_flag_->is_alive = false; 306} 307 308void ByteStreamReaderImpl::SetPeer( 309 ByteStreamWriterImpl* peer, 310 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 311 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 312 peer_ = peer; 313 peer_task_runner_ = peer_task_runner; 314 peer_lifetime_flag_ = peer_lifetime_flag; 315} 316 317ByteStreamReaderImpl::StreamState 318ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data, 319 size_t* length) { 320 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 321 322 if (available_contents_.size()) { 323 *data = available_contents_.front().first; 324 *length = available_contents_.front().second; 325 available_contents_.pop_front(); 326 unreported_consumed_bytes_ += *length; 327 328 MaybeUpdateInput(); 329 return STREAM_HAS_DATA; 330 } 331 if (received_status_) { 332 return STREAM_COMPLETE; 333 } 334 return STREAM_EMPTY; 335} 336 337int ByteStreamReaderImpl::GetStatus() const { 338 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 339 DCHECK(received_status_); 340 return status_; 341} 342 343void ByteStreamReaderImpl::RegisterCallback( 344 const base::Closure& sink_callback) { 345 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 346 347 data_available_callback_ = sink_callback; 348} 349 350// static 351void ByteStreamReaderImpl::TransferData( 352 scoped_refptr<LifetimeFlag> object_lifetime_flag, 353 ByteStreamReaderImpl* target, 354 scoped_ptr<ContentVector> transfer_buffer, 355 size_t buffer_size, 356 bool source_complete, 357 int status) { 358 // If our target is no longer alive, do nothing. 359 if (!object_lifetime_flag->is_alive) return; 360 361 target->TransferDataInternal( 362 transfer_buffer.Pass(), buffer_size, source_complete, status); 363} 364 365void ByteStreamReaderImpl::TransferDataInternal( 366 scoped_ptr<ContentVector> transfer_buffer, 367 size_t buffer_size, 368 bool source_complete, 369 int status) { 370 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 371 372 bool was_empty = available_contents_.empty(); 373 374 if (transfer_buffer) { 375 available_contents_.insert(available_contents_.end(), 376 transfer_buffer->begin(), 377 transfer_buffer->end()); 378 } 379 380 if (source_complete) { 381 received_status_ = true; 382 status_ = status; 383 } 384 385 // Callback on transition from empty to non-empty, or 386 // source complete. 387 if (((was_empty && !available_contents_.empty()) || 388 source_complete) && 389 !data_available_callback_.is_null()) 390 data_available_callback_.Run(); 391} 392 393// Decide whether or not to send the input a window update. 394// Currently we do that whenever we've got unreported consumption 395// greater than 1/3 of total size. 396void ByteStreamReaderImpl::MaybeUpdateInput() { 397 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 398 399 if (unreported_consumed_bytes_ <= 400 total_buffer_size_ / kFractionReadBeforeWindowUpdate) 401 return; 402 403 peer_task_runner_->PostTask( 404 FROM_HERE, base::Bind( 405 &ByteStreamWriterImpl::UpdateWindow, 406 peer_lifetime_flag_, 407 peer_, 408 unreported_consumed_bytes_)); 409 unreported_consumed_bytes_ = 0; 410} 411 412} // namespace 413 414const int ByteStreamWriter::kFractionBufferBeforeSending = 3; 415const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3; 416 417ByteStreamReader::~ByteStreamReader() { } 418 419ByteStreamWriter::~ByteStreamWriter() { } 420 421void CreateByteStream( 422 scoped_refptr<base::SequencedTaskRunner> input_task_runner, 423 scoped_refptr<base::SequencedTaskRunner> output_task_runner, 424 size_t buffer_size, 425 scoped_ptr<ByteStreamWriter>* input, 426 scoped_ptr<ByteStreamReader>* output) { 427 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); 428 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); 429 430 ByteStreamWriterImpl* in = new ByteStreamWriterImpl( 431 input_task_runner, input_flag, buffer_size); 432 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( 433 output_task_runner, output_flag, buffer_size); 434 435 in->SetPeer(out, output_task_runner, output_flag); 436 out->SetPeer(in, input_task_runner, input_flag); 437 input->reset(in); 438 output->reset(out); 439} 440 441} // namespace content 442