stream.cc revision a1401311d1ab56c4ed0a474bd38c108f75cb0cd9
1// Copyright (c) 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "content/browser/streams/stream.h" 6 7#include "base/bind.h" 8#include "base/location.h" 9#include "base/message_loop/message_loop_proxy.h" 10#include "content/browser/streams/stream_handle_impl.h" 11#include "content/browser/streams/stream_read_observer.h" 12#include "content/browser/streams/stream_registry.h" 13#include "content/browser/streams/stream_write_observer.h" 14#include "net/base/io_buffer.h" 15 16namespace { 17// Start throttling the connection at about 1MB. 18const size_t kDeferSizeThreshold = 40 * 32768; 19} 20 21namespace content { 22 23Stream::Stream(StreamRegistry* registry, 24 StreamWriteObserver* write_observer, 25 const GURL& url) 26 : can_add_data_(true), 27 url_(url), 28 data_length_(0), 29 data_bytes_read_(0), 30 last_total_buffered_bytes_(0), 31 registry_(registry), 32 read_observer_(NULL), 33 write_observer_(write_observer), 34 stream_handle_(NULL), 35 weak_ptr_factory_(this) { 36 CreateByteStream(base::MessageLoopProxy::current(), 37 base::MessageLoopProxy::current(), 38 kDeferSizeThreshold, 39 &writer_, 40 &reader_); 41 42 // Setup callback for writing. 43 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, 44 weak_ptr_factory_.GetWeakPtr())); 45 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, 46 weak_ptr_factory_.GetWeakPtr())); 47 48 registry_->RegisterStream(this); 49} 50 51Stream::~Stream() { 52} 53 54bool Stream::SetReadObserver(StreamReadObserver* observer) { 55 if (read_observer_) 56 return false; 57 read_observer_ = observer; 58 return true; 59} 60 61void Stream::RemoveReadObserver(StreamReadObserver* observer) { 62 DCHECK(observer == read_observer_); 63 read_observer_ = NULL; 64} 65 66void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 67 DCHECK(observer == write_observer_); 68 write_observer_ = NULL; 69} 70 71void Stream::Abort() { 72 // Clear all buffer. It's safe to clear reader_ here since the same thread 73 // is used for both input and output operation. 74 writer_.reset(); 75 reader_.reset(); 76 ClearBuffer(); 77 can_add_data_ = false; 78 registry_->UnregisterStream(url()); 79} 80 81void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 82 if (!writer_.get()) 83 return; 84 85 size_t current_buffered_bytes = writer_->GetTotalBufferedBytes(); 86 if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) { 87 Abort(); 88 return; 89 } 90 91 // Now it's guaranteed that this doesn't overflow. This must be done before 92 // Write() since GetTotalBufferedBytes() may return different value after 93 // Write() call, so if we use the new value, information in this instance and 94 // one in |registry_| become inconsistent. 95 last_total_buffered_bytes_ = current_buffered_bytes + size; 96 97 can_add_data_ = writer_->Write(buffer, size); 98} 99 100void Stream::AddData(const char* data, size_t size) { 101 if (!writer_.get()) 102 return; 103 104 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); 105 memcpy(io_buffer->data(), data, size); 106 AddData(io_buffer, size); 107} 108 109void Stream::Finalize() { 110 if (!writer_.get()) 111 return; 112 113 writer_->Close(0); 114 writer_.reset(); 115 116 // Continue asynchronously. 117 base::MessageLoopProxy::current()->PostTask( 118 FROM_HERE, 119 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 120} 121 122Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 123 int buf_size, 124 int* bytes_read) { 125 DCHECK(buf); 126 DCHECK(bytes_read); 127 128 *bytes_read = 0; 129 if (!data_.get()) { 130 DCHECK(!data_length_); 131 DCHECK(!data_bytes_read_); 132 133 if (!reader_.get()) 134 return STREAM_ABORTED; 135 136 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 137 switch (state) { 138 case ByteStreamReader::STREAM_HAS_DATA: 139 break; 140 case ByteStreamReader::STREAM_COMPLETE: 141 registry_->UnregisterStream(url()); 142 return STREAM_COMPLETE; 143 case ByteStreamReader::STREAM_EMPTY: 144 return STREAM_EMPTY; 145 } 146 } 147 148 const size_t remaining_bytes = data_length_ - data_bytes_read_; 149 size_t to_read = 150 static_cast<size_t>(buf_size) < remaining_bytes ? 151 buf_size : remaining_bytes; 152 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); 153 data_bytes_read_ += to_read; 154 if (data_bytes_read_ >= data_length_) 155 ClearBuffer(); 156 157 *bytes_read = to_read; 158 return STREAM_HAS_DATA; 159} 160 161scoped_ptr<StreamHandle> Stream::CreateHandle( 162 const GURL& original_url, 163 const std::string& mime_type, 164 const std::string& response_headers) { 165 CHECK(!stream_handle_); 166 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), 167 original_url, 168 mime_type, 169 response_headers); 170 return scoped_ptr<StreamHandle>(stream_handle_).Pass(); 171} 172 173void Stream::CloseHandle() { 174 // Prevent deletion until this function ends. 175 scoped_refptr<Stream> ref(this); 176 177 CHECK(stream_handle_); 178 stream_handle_ = NULL; 179 registry_->UnregisterStream(url()); 180 if (write_observer_) 181 write_observer_->OnClose(this); 182} 183 184void Stream::OnSpaceAvailable() { 185 can_add_data_ = true; 186 if (write_observer_) 187 write_observer_->OnSpaceAvailable(this); 188} 189 190void Stream::OnDataAvailable() { 191 if (read_observer_) 192 read_observer_->OnDataAvailable(this); 193} 194 195void Stream::ClearBuffer() { 196 data_ = NULL; 197 data_length_ = 0; 198 data_bytes_read_ = 0; 199} 200 201} // namespace content 202