1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/browser/streams/stream.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/message_loop/message_loop_proxy.h"
10#include "base/values.h"
11#include "content/browser/streams/stream_handle_impl.h"
12#include "content/browser/streams/stream_read_observer.h"
13#include "content/browser/streams/stream_registry.h"
14#include "content/browser/streams/stream_write_observer.h"
15#include "net/base/io_buffer.h"
16#include "net/http/http_response_headers.h"
17
18namespace {
19// Start throttling the connection at about 1MB.
20const size_t kDeferSizeThreshold = 40 * 32768;
21}
22
23namespace content {
24
25Stream::Stream(StreamRegistry* registry,
26               StreamWriteObserver* write_observer,
27               const GURL& url)
28    : can_add_data_(true),
29      url_(url),
30      data_length_(0),
31      data_bytes_read_(0),
32      last_total_buffered_bytes_(0),
33      registry_(registry),
34      read_observer_(NULL),
35      write_observer_(write_observer),
36      stream_handle_(NULL),
37      weak_ptr_factory_(this) {
38  CreateByteStream(base::MessageLoopProxy::current(),
39                   base::MessageLoopProxy::current(),
40                   kDeferSizeThreshold,
41                   &writer_,
42                   &reader_);
43
44  // Setup callback for writing.
45  writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
46                                       weak_ptr_factory_.GetWeakPtr()));
47  reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
48                                       weak_ptr_factory_.GetWeakPtr()));
49
50  registry_->RegisterStream(this);
51}
52
53Stream::~Stream() {
54}
55
56bool Stream::SetReadObserver(StreamReadObserver* observer) {
57  if (read_observer_)
58    return false;
59  read_observer_ = observer;
60  return true;
61}
62
63void Stream::RemoveReadObserver(StreamReadObserver* observer) {
64  DCHECK(observer == read_observer_);
65  read_observer_ = NULL;
66}
67
68void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
69  DCHECK(observer == write_observer_);
70  write_observer_ = NULL;
71}
72
73void Stream::Abort() {
74  // Clear all buffer. It's safe to clear reader_ here since the same thread
75  // is used for both input and output operation.
76  writer_.reset();
77  reader_.reset();
78  ClearBuffer();
79  can_add_data_ = false;
80  registry_->UnregisterStream(url());
81}
82
83void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
84  if (!writer_.get())
85    return;
86
87  size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
88  if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
89    Abort();
90    return;
91  }
92
93  // Now it's guaranteed that this doesn't overflow. This must be done before
94  // Write() since GetTotalBufferedBytes() may return different value after
95  // Write() call, so if we use the new value, information in this instance and
96  // one in |registry_| become inconsistent.
97  last_total_buffered_bytes_ = current_buffered_bytes + size;
98
99  can_add_data_ = writer_->Write(buffer, size);
100}
101
102void Stream::AddData(const char* data, size_t size) {
103  if (!writer_.get())
104    return;
105
106  scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
107  memcpy(io_buffer->data(), data, size);
108  AddData(io_buffer, size);
109}
110
111void Stream::Finalize() {
112  if (!writer_.get())
113    return;
114
115  writer_->Close(0);
116  writer_.reset();
117
118  // Continue asynchronously.
119  base::MessageLoopProxy::current()->PostTask(
120      FROM_HERE,
121      base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
122}
123
124Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
125                                        int buf_size,
126                                        int* bytes_read) {
127  DCHECK(buf);
128  DCHECK(bytes_read);
129
130  *bytes_read = 0;
131  if (!data_.get()) {
132    DCHECK(!data_length_);
133    DCHECK(!data_bytes_read_);
134
135    if (!reader_.get())
136      return STREAM_ABORTED;
137
138    ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
139    switch (state) {
140      case ByteStreamReader::STREAM_HAS_DATA:
141        break;
142      case ByteStreamReader::STREAM_COMPLETE:
143        registry_->UnregisterStream(url());
144        return STREAM_COMPLETE;
145      case ByteStreamReader::STREAM_EMPTY:
146        return STREAM_EMPTY;
147    }
148  }
149
150  const size_t remaining_bytes = data_length_ - data_bytes_read_;
151  size_t to_read =
152      static_cast<size_t>(buf_size) < remaining_bytes ?
153      buf_size : remaining_bytes;
154  memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
155  data_bytes_read_ += to_read;
156  if (data_bytes_read_ >= data_length_)
157    ClearBuffer();
158
159  *bytes_read = to_read;
160  return STREAM_HAS_DATA;
161}
162
163scoped_ptr<StreamHandle> Stream::CreateHandle(
164    const GURL& original_url,
165    const std::string& mime_type,
166    scoped_refptr<net::HttpResponseHeaders> response_headers) {
167  CHECK(!stream_handle_);
168  stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
169                                        original_url,
170                                        mime_type,
171                                        response_headers);
172  return scoped_ptr<StreamHandle>(stream_handle_).Pass();
173}
174
175void Stream::CloseHandle() {
176  // Prevent deletion until this function ends.
177  scoped_refptr<Stream> ref(this);
178
179  CHECK(stream_handle_);
180  stream_handle_ = NULL;
181  registry_->UnregisterStream(url());
182  if (write_observer_)
183    write_observer_->OnClose(this);
184}
185
186void Stream::OnSpaceAvailable() {
187  can_add_data_ = true;
188  if (write_observer_)
189    write_observer_->OnSpaceAvailable(this);
190}
191
192void Stream::OnDataAvailable() {
193  if (read_observer_)
194    read_observer_->OnDataAvailable(this);
195}
196
197void Stream::ClearBuffer() {
198  data_ = NULL;
199  data_length_ = 0;
200  data_bytes_read_ = 0;
201}
202
203}  // namespace content
204