stream.cc revision a1401311d1ab56c4ed0a474bd38c108f75cb0cd9
1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/browser/streams/stream.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/message_loop/message_loop_proxy.h"
10#include "content/browser/streams/stream_handle_impl.h"
11#include "content/browser/streams/stream_read_observer.h"
12#include "content/browser/streams/stream_registry.h"
13#include "content/browser/streams/stream_write_observer.h"
14#include "net/base/io_buffer.h"
15
16namespace {
17// Start throttling the connection at about 1MB.
18const size_t kDeferSizeThreshold = 40 * 32768;
19}
20
21namespace content {
22
23Stream::Stream(StreamRegistry* registry,
24               StreamWriteObserver* write_observer,
25               const GURL& url)
26    : can_add_data_(true),
27      url_(url),
28      data_length_(0),
29      data_bytes_read_(0),
30      last_total_buffered_bytes_(0),
31      registry_(registry),
32      read_observer_(NULL),
33      write_observer_(write_observer),
34      stream_handle_(NULL),
35      weak_ptr_factory_(this) {
36  CreateByteStream(base::MessageLoopProxy::current(),
37                   base::MessageLoopProxy::current(),
38                   kDeferSizeThreshold,
39                   &writer_,
40                   &reader_);
41
42  // Setup callback for writing.
43  writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
44                                       weak_ptr_factory_.GetWeakPtr()));
45  reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
46                                       weak_ptr_factory_.GetWeakPtr()));
47
48  registry_->RegisterStream(this);
49}
50
51Stream::~Stream() {
52}
53
54bool Stream::SetReadObserver(StreamReadObserver* observer) {
55  if (read_observer_)
56    return false;
57  read_observer_ = observer;
58  return true;
59}
60
61void Stream::RemoveReadObserver(StreamReadObserver* observer) {
62  DCHECK(observer == read_observer_);
63  read_observer_ = NULL;
64}
65
66void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
67  DCHECK(observer == write_observer_);
68  write_observer_ = NULL;
69}
70
71void Stream::Abort() {
72  // Clear all buffer. It's safe to clear reader_ here since the same thread
73  // is used for both input and output operation.
74  writer_.reset();
75  reader_.reset();
76  ClearBuffer();
77  can_add_data_ = false;
78  registry_->UnregisterStream(url());
79}
80
81void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
82  if (!writer_.get())
83    return;
84
85  size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
86  if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
87    Abort();
88    return;
89  }
90
91  // Now it's guaranteed that this doesn't overflow. This must be done before
92  // Write() since GetTotalBufferedBytes() may return different value after
93  // Write() call, so if we use the new value, information in this instance and
94  // one in |registry_| become inconsistent.
95  last_total_buffered_bytes_ = current_buffered_bytes + size;
96
97  can_add_data_ = writer_->Write(buffer, size);
98}
99
100void Stream::AddData(const char* data, size_t size) {
101  if (!writer_.get())
102    return;
103
104  scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
105  memcpy(io_buffer->data(), data, size);
106  AddData(io_buffer, size);
107}
108
109void Stream::Finalize() {
110  if (!writer_.get())
111    return;
112
113  writer_->Close(0);
114  writer_.reset();
115
116  // Continue asynchronously.
117  base::MessageLoopProxy::current()->PostTask(
118      FROM_HERE,
119      base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
120}
121
122Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
123                                        int buf_size,
124                                        int* bytes_read) {
125  DCHECK(buf);
126  DCHECK(bytes_read);
127
128  *bytes_read = 0;
129  if (!data_.get()) {
130    DCHECK(!data_length_);
131    DCHECK(!data_bytes_read_);
132
133    if (!reader_.get())
134      return STREAM_ABORTED;
135
136    ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
137    switch (state) {
138      case ByteStreamReader::STREAM_HAS_DATA:
139        break;
140      case ByteStreamReader::STREAM_COMPLETE:
141        registry_->UnregisterStream(url());
142        return STREAM_COMPLETE;
143      case ByteStreamReader::STREAM_EMPTY:
144        return STREAM_EMPTY;
145    }
146  }
147
148  const size_t remaining_bytes = data_length_ - data_bytes_read_;
149  size_t to_read =
150      static_cast<size_t>(buf_size) < remaining_bytes ?
151      buf_size : remaining_bytes;
152  memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
153  data_bytes_read_ += to_read;
154  if (data_bytes_read_ >= data_length_)
155    ClearBuffer();
156
157  *bytes_read = to_read;
158  return STREAM_HAS_DATA;
159}
160
161scoped_ptr<StreamHandle> Stream::CreateHandle(
162    const GURL& original_url,
163    const std::string& mime_type,
164    const std::string& response_headers) {
165  CHECK(!stream_handle_);
166  stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
167                                        original_url,
168                                        mime_type,
169                                        response_headers);
170  return scoped_ptr<StreamHandle>(stream_handle_).Pass();
171}
172
173void Stream::CloseHandle() {
174  // Prevent deletion until this function ends.
175  scoped_refptr<Stream> ref(this);
176
177  CHECK(stream_handle_);
178  stream_handle_ = NULL;
179  registry_->UnregisterStream(url());
180  if (write_observer_)
181    write_observer_->OnClose(this);
182}
183
184void Stream::OnSpaceAvailable() {
185  can_add_data_ = true;
186  if (write_observer_)
187    write_observer_->OnSpaceAvailable(this);
188}
189
190void Stream::OnDataAvailable() {
191  if (read_observer_)
192    read_observer_->OnDataAvailable(this);
193}
194
195void Stream::ClearBuffer() {
196  data_ = NULL;
197  data_length_ = 0;
198  data_bytes_read_ = 0;
199}
200
201}  // namespace content
202