stream.cc revision 2a99a7e74a7f215066514fe81d2bfa6639d9eddd
1// Copyright (c) 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "content/browser/streams/stream.h" 6 7#include "base/bind.h" 8#include "base/location.h" 9#include "base/message_loop_proxy.h" 10#include "content/browser/streams/stream_handle_impl.h" 11#include "content/browser/streams/stream_read_observer.h" 12#include "content/browser/streams/stream_registry.h" 13#include "content/browser/streams/stream_write_observer.h" 14#include "net/base/io_buffer.h" 15 16namespace { 17// Start throttling the connection at about 1MB. 18const size_t kDeferSizeThreshold = 40 * 32768; 19} 20 21namespace content { 22 23Stream::Stream(StreamRegistry* registry, 24 StreamWriteObserver* write_observer, 25 const GURL& security_origin, 26 const GURL& url) 27 : data_bytes_read_(0), 28 can_add_data_(true), 29 security_origin_(security_origin), 30 url_(url), 31 data_length_(0), 32 registry_(registry), 33 read_observer_(NULL), 34 write_observer_(write_observer), 35 stream_handle_(NULL), 36 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { 37 CreateByteStream(base::MessageLoopProxy::current(), 38 base::MessageLoopProxy::current(), 39 kDeferSizeThreshold, 40 &writer_, 41 &reader_); 42 43 // Setup callback for writing. 44 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, 45 weak_ptr_factory_.GetWeakPtr())); 46 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, 47 weak_ptr_factory_.GetWeakPtr())); 48 49 registry_->RegisterStream(this); 50} 51 52Stream::~Stream() { 53} 54 55bool Stream::SetReadObserver(StreamReadObserver* observer) { 56 if (read_observer_) 57 return false; 58 read_observer_ = observer; 59 return true; 60} 61 62void Stream::RemoveReadObserver(StreamReadObserver* observer) { 63 DCHECK(observer == read_observer_); 64 read_observer_ = NULL; 65} 66 67void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 68 DCHECK(observer == write_observer_); 69 write_observer_ = NULL; 70} 71 72void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 73 can_add_data_ = writer_->Write(buffer, size); 74} 75 76void Stream::Finalize() { 77 writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE); 78 writer_.reset(NULL); 79 80 // Continue asynchronously. 81 base::MessageLoopProxy::current()->PostTask( 82 FROM_HERE, 83 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 84} 85 86Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 87 int buf_size, 88 int* bytes_read) { 89 *bytes_read = 0; 90 if (!data_) { 91 data_length_ = 0; 92 data_bytes_read_ = 0; 93 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 94 switch (state) { 95 case ByteStreamReader::STREAM_HAS_DATA: 96 break; 97 case ByteStreamReader::STREAM_COMPLETE: 98 registry_->UnregisterStream(url()); 99 return STREAM_COMPLETE; 100 case ByteStreamReader::STREAM_EMPTY: 101 return STREAM_EMPTY; 102 } 103 } 104 105 const size_t remaining_bytes = data_length_ - data_bytes_read_; 106 size_t to_read = 107 static_cast<size_t>(buf_size) < remaining_bytes ? 108 buf_size : remaining_bytes; 109 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); 110 data_bytes_read_ += to_read; 111 if (data_bytes_read_ >= data_length_) 112 data_ = NULL; 113 114 *bytes_read = to_read; 115 return STREAM_HAS_DATA; 116} 117 118scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url, 119 const std::string& mime_type) { 120 CHECK(!stream_handle_); 121 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), 122 original_url, 123 mime_type); 124 return scoped_ptr<StreamHandle>(stream_handle_).Pass(); 125} 126 127void Stream::CloseHandle() { 128 CHECK(stream_handle_); 129 stream_handle_ = NULL; 130 registry_->UnregisterStream(url()); 131 if (write_observer_) 132 write_observer_->OnClose(this); 133} 134 135void Stream::OnSpaceAvailable() { 136 can_add_data_ = true; 137 if (write_observer_) 138 write_observer_->OnSpaceAvailable(this); 139} 140 141void Stream::OnDataAvailable() { 142 if (read_observer_) 143 read_observer_->OnDataAvailable(this); 144} 145 146} // namespace content 147 148