stream.cc revision 2a99a7e74a7f215066514fe81d2bfa6639d9eddd
1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/browser/streams/stream.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/message_loop_proxy.h"
10#include "content/browser/streams/stream_handle_impl.h"
11#include "content/browser/streams/stream_read_observer.h"
12#include "content/browser/streams/stream_registry.h"
13#include "content/browser/streams/stream_write_observer.h"
14#include "net/base/io_buffer.h"
15
16namespace {
17// Start throttling the connection at about 1MB.
18const size_t kDeferSizeThreshold = 40 * 32768;
19}
20
21namespace content {
22
23Stream::Stream(StreamRegistry* registry,
24               StreamWriteObserver* write_observer,
25               const GURL& security_origin,
26               const GURL& url)
27    : data_bytes_read_(0),
28      can_add_data_(true),
29      security_origin_(security_origin),
30      url_(url),
31      data_length_(0),
32      registry_(registry),
33      read_observer_(NULL),
34      write_observer_(write_observer),
35      stream_handle_(NULL),
36      weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) {
37  CreateByteStream(base::MessageLoopProxy::current(),
38                   base::MessageLoopProxy::current(),
39                   kDeferSizeThreshold,
40                   &writer_,
41                   &reader_);
42
43  // Setup callback for writing.
44  writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
45                                       weak_ptr_factory_.GetWeakPtr()));
46  reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
47                                       weak_ptr_factory_.GetWeakPtr()));
48
49  registry_->RegisterStream(this);
50}
51
52Stream::~Stream() {
53}
54
55bool Stream::SetReadObserver(StreamReadObserver* observer) {
56  if (read_observer_)
57    return false;
58  read_observer_ = observer;
59  return true;
60}
61
62void Stream::RemoveReadObserver(StreamReadObserver* observer) {
63  DCHECK(observer == read_observer_);
64  read_observer_ = NULL;
65}
66
67void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
68  DCHECK(observer == write_observer_);
69  write_observer_ = NULL;
70}
71
72void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
73  can_add_data_ = writer_->Write(buffer, size);
74}
75
76void Stream::Finalize() {
77  writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
78  writer_.reset(NULL);
79
80  // Continue asynchronously.
81  base::MessageLoopProxy::current()->PostTask(
82      FROM_HERE,
83      base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
84}
85
86Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
87                                        int buf_size,
88                                        int* bytes_read) {
89  *bytes_read = 0;
90  if (!data_) {
91    data_length_ = 0;
92    data_bytes_read_ = 0;
93    ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
94    switch (state) {
95      case ByteStreamReader::STREAM_HAS_DATA:
96        break;
97      case ByteStreamReader::STREAM_COMPLETE:
98        registry_->UnregisterStream(url());
99        return STREAM_COMPLETE;
100      case ByteStreamReader::STREAM_EMPTY:
101        return STREAM_EMPTY;
102    }
103  }
104
105  const size_t remaining_bytes = data_length_ - data_bytes_read_;
106  size_t to_read =
107      static_cast<size_t>(buf_size) < remaining_bytes ?
108      buf_size : remaining_bytes;
109  memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
110  data_bytes_read_ += to_read;
111  if (data_bytes_read_ >= data_length_)
112    data_ = NULL;
113
114  *bytes_read = to_read;
115  return STREAM_HAS_DATA;
116}
117
118scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url,
119                                              const std::string& mime_type) {
120  CHECK(!stream_handle_);
121  stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
122                                        original_url,
123                                        mime_type);
124  return scoped_ptr<StreamHandle>(stream_handle_).Pass();
125}
126
127void Stream::CloseHandle() {
128  CHECK(stream_handle_);
129  stream_handle_ = NULL;
130  registry_->UnregisterStream(url());
131  if (write_observer_)
132    write_observer_->OnClose(this);
133}
134
135void Stream::OnSpaceAvailable() {
136  can_add_data_ = true;
137  if (write_observer_)
138    write_observer_->OnSpaceAvailable(this);
139}
140
141void Stream::OnDataAvailable() {
142  if (read_observer_)
143    read_observer_->OnDataAvailable(this);
144}
145
146}  // namespace content
147
148