stream.cc revision a36e5920737c6adbddd3e43b760e5de8431db6e0
1// Copyright (c) 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "content/browser/streams/stream.h" 6 7#include "base/bind.h" 8#include "base/location.h" 9#include "base/message_loop/message_loop_proxy.h" 10#include "content/browser/streams/stream_handle_impl.h" 11#include "content/browser/streams/stream_read_observer.h" 12#include "content/browser/streams/stream_registry.h" 13#include "content/browser/streams/stream_write_observer.h" 14#include "net/base/io_buffer.h" 15 16namespace { 17// Start throttling the connection at about 1MB. 18const size_t kDeferSizeThreshold = 40 * 32768; 19} 20 21namespace content { 22 23Stream::Stream(StreamRegistry* registry, 24 StreamWriteObserver* write_observer, 25 const GURL& url) 26 : data_bytes_read_(0), 27 can_add_data_(true), 28 url_(url), 29 data_length_(0), 30 registry_(registry), 31 read_observer_(NULL), 32 write_observer_(write_observer), 33 stream_handle_(NULL), 34 weak_ptr_factory_(this) { 35 CreateByteStream(base::MessageLoopProxy::current(), 36 base::MessageLoopProxy::current(), 37 kDeferSizeThreshold, 38 &writer_, 39 &reader_); 40 41 // Setup callback for writing. 42 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, 43 weak_ptr_factory_.GetWeakPtr())); 44 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, 45 weak_ptr_factory_.GetWeakPtr())); 46 47 registry_->RegisterStream(this); 48} 49 50Stream::~Stream() { 51} 52 53bool Stream::SetReadObserver(StreamReadObserver* observer) { 54 if (read_observer_) 55 return false; 56 read_observer_ = observer; 57 return true; 58} 59 60void Stream::RemoveReadObserver(StreamReadObserver* observer) { 61 DCHECK(observer == read_observer_); 62 read_observer_ = NULL; 63} 64 65void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 66 DCHECK(observer == write_observer_); 67 write_observer_ = NULL; 68} 69 70void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 71 can_add_data_ = writer_->Write(buffer, size); 72} 73 74void Stream::Finalize() { 75 writer_->Close(0); 76 writer_.reset(NULL); 77 78 // Continue asynchronously. 79 base::MessageLoopProxy::current()->PostTask( 80 FROM_HERE, 81 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 82} 83 84Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 85 int buf_size, 86 int* bytes_read) { 87 *bytes_read = 0; 88 if (!data_.get()) { 89 data_length_ = 0; 90 data_bytes_read_ = 0; 91 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 92 switch (state) { 93 case ByteStreamReader::STREAM_HAS_DATA: 94 break; 95 case ByteStreamReader::STREAM_COMPLETE: 96 registry_->UnregisterStream(url()); 97 return STREAM_COMPLETE; 98 case ByteStreamReader::STREAM_EMPTY: 99 return STREAM_EMPTY; 100 } 101 } 102 103 const size_t remaining_bytes = data_length_ - data_bytes_read_; 104 size_t to_read = 105 static_cast<size_t>(buf_size) < remaining_bytes ? 106 buf_size : remaining_bytes; 107 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); 108 data_bytes_read_ += to_read; 109 if (data_bytes_read_ >= data_length_) 110 data_ = NULL; 111 112 *bytes_read = to_read; 113 return STREAM_HAS_DATA; 114} 115 116scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url, 117 const std::string& mime_type) { 118 CHECK(!stream_handle_); 119 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), 120 original_url, 121 mime_type); 122 return scoped_ptr<StreamHandle>(stream_handle_).Pass(); 123} 124 125void Stream::CloseHandle() { 126 // Prevent deletion until this function ends. 127 scoped_refptr<Stream> ref(this); 128 129 CHECK(stream_handle_); 130 stream_handle_ = NULL; 131 registry_->UnregisterStream(url()); 132 if (write_observer_) 133 write_observer_->OnClose(this); 134} 135 136void Stream::OnSpaceAvailable() { 137 can_add_data_ = true; 138 if (write_observer_) 139 write_observer_->OnSpaceAvailable(this); 140} 141 142void Stream::OnDataAvailable() { 143 if (read_observer_) 144 read_observer_->OnDataAvailable(this); 145} 146 147} // namespace content 148