1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/browser/streams/stream.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/message_loop/message_loop_proxy.h"
10#include "content/browser/streams/stream_handle_impl.h"
11#include "content/browser/streams/stream_read_observer.h"
12#include "content/browser/streams/stream_registry.h"
13#include "content/browser/streams/stream_write_observer.h"
14#include "net/base/io_buffer.h"
15
16namespace {
17// Start throttling the connection at about 1MB.
18const size_t kDeferSizeThreshold = 40 * 32768;
19}
20
21namespace content {
22
23Stream::Stream(StreamRegistry* registry,
24               StreamWriteObserver* write_observer,
25               const GURL& url)
26    : data_bytes_read_(0),
27      can_add_data_(true),
28      url_(url),
29      data_length_(0),
30      registry_(registry),
31      read_observer_(NULL),
32      write_observer_(write_observer),
33      stream_handle_(NULL),
34      weak_ptr_factory_(this) {
35  CreateByteStream(base::MessageLoopProxy::current(),
36                   base::MessageLoopProxy::current(),
37                   kDeferSizeThreshold,
38                   &writer_,
39                   &reader_);
40
41  // Setup callback for writing.
42  writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
43                                       weak_ptr_factory_.GetWeakPtr()));
44  reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
45                                       weak_ptr_factory_.GetWeakPtr()));
46
47  registry_->RegisterStream(this);
48}
49
50Stream::~Stream() {
51}
52
53bool Stream::SetReadObserver(StreamReadObserver* observer) {
54  if (read_observer_)
55    return false;
56  read_observer_ = observer;
57  return true;
58}
59
60void Stream::RemoveReadObserver(StreamReadObserver* observer) {
61  DCHECK(observer == read_observer_);
62  read_observer_ = NULL;
63}
64
65void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
66  DCHECK(observer == write_observer_);
67  write_observer_ = NULL;
68}
69
70void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
71  can_add_data_ = writer_->Write(buffer, size);
72}
73
74void Stream::AddData(const char* data, size_t size) {
75  scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
76  memcpy(io_buffer->data(), data, size);
77  can_add_data_ = writer_->Write(io_buffer, size);
78}
79
80void Stream::Finalize() {
81  writer_->Close(0);
82  writer_.reset(NULL);
83
84  // Continue asynchronously.
85  base::MessageLoopProxy::current()->PostTask(
86      FROM_HERE,
87      base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
88}
89
90Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
91                                        int buf_size,
92                                        int* bytes_read) {
93  *bytes_read = 0;
94  if (!data_.get()) {
95    data_length_ = 0;
96    data_bytes_read_ = 0;
97    ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
98    switch (state) {
99      case ByteStreamReader::STREAM_HAS_DATA:
100        break;
101      case ByteStreamReader::STREAM_COMPLETE:
102        registry_->UnregisterStream(url());
103        return STREAM_COMPLETE;
104      case ByteStreamReader::STREAM_EMPTY:
105        return STREAM_EMPTY;
106    }
107  }
108
109  const size_t remaining_bytes = data_length_ - data_bytes_read_;
110  size_t to_read =
111      static_cast<size_t>(buf_size) < remaining_bytes ?
112      buf_size : remaining_bytes;
113  memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
114  data_bytes_read_ += to_read;
115  if (data_bytes_read_ >= data_length_)
116    data_ = NULL;
117
118  *bytes_read = to_read;
119  return STREAM_HAS_DATA;
120}
121
122scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url,
123                                              const std::string& mime_type) {
124  CHECK(!stream_handle_);
125  stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
126                                        original_url,
127                                        mime_type);
128  return scoped_ptr<StreamHandle>(stream_handle_).Pass();
129}
130
131void Stream::CloseHandle() {
132  // Prevent deletion until this function ends.
133  scoped_refptr<Stream> ref(this);
134
135  CHECK(stream_handle_);
136  stream_handle_ = NULL;
137  registry_->UnregisterStream(url());
138  if (write_observer_)
139    write_observer_->OnClose(this);
140}
141
142void Stream::OnSpaceAvailable() {
143  can_add_data_ = true;
144  if (write_observer_)
145    write_observer_->OnSpaceAvailable(this);
146}
147
148void Stream::OnDataAvailable() {
149  if (read_observer_)
150    read_observer_->OnDataAvailable(this);
151}
152
153}  // namespace content
154