1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/browser/byte_stream.h"
6
7#include <deque>
8#include <set>
9#include <utility>
10
11#include "base/bind.h"
12#include "base/location.h"
13#include "base/memory/ref_counted.h"
14#include "base/sequenced_task_runner.h"
15
16namespace content {
17namespace {
18
19typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
20ContentVector;
21
22class ByteStreamReaderImpl;
23
24// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25// cleared in an object destructor and accessed to check for object
26// existence.  We can't use weak pointers because they're tightly tied to
27// threads rather than task runners.
28// TODO(rdsmith): A better solution would be extending weak pointers
29// to support SequencedTaskRunners.
30struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
31 public:
32  LifetimeFlag() : is_alive(true) { }
33  bool is_alive;
34
35 protected:
36  friend class base::RefCountedThreadSafe<LifetimeFlag>;
37  virtual ~LifetimeFlag() { }
38
39 private:
40  DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
41};
42
43// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44// SetPeer may happen anywhere; all other operations on each class must
45// happen in the context of their SequencedTaskRunner.
46class ByteStreamWriterImpl : public ByteStreamWriter {
47 public:
48  ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
49                       scoped_refptr<LifetimeFlag> lifetime_flag,
50                       size_t buffer_size);
51  virtual ~ByteStreamWriterImpl();
52
53  // Must be called before any operations are performed.
54  void SetPeer(ByteStreamReaderImpl* peer,
55               scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
56               scoped_refptr<LifetimeFlag> peer_lifetime_flag);
57
58  // Overridden from ByteStreamWriter.
59  virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
60                     size_t byte_count) OVERRIDE;
61  virtual void Flush() OVERRIDE;
62  virtual void Close(int status) OVERRIDE;
63  virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
64
65  // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
66  static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
67                           ByteStreamWriterImpl* target,
68                           size_t bytes_consumed);
69
70 private:
71  // Called from UpdateWindow when object existence has been validated.
72  void UpdateWindowInternal(size_t bytes_consumed);
73
74  void PostToPeer(bool complete, int status);
75
76  const size_t total_buffer_size_;
77
78  // All data objects in this class are only valid to access on
79  // this task runner except as otherwise noted.
80  scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
81
82  // True while this object is alive.
83  scoped_refptr<LifetimeFlag> my_lifetime_flag_;
84
85  base::Closure space_available_callback_;
86  ContentVector input_contents_;
87  size_t input_contents_size_;
88
89  // ** Peer information.
90
91  scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
92
93  // How much we've sent to the output that for flow control purposes we
94  // must assume hasn't been read yet.
95  size_t output_size_used_;
96
97  // Only valid to access on peer_task_runner_.
98  scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
99
100  // Only valid to access on peer_task_runner_ if
101  // |*peer_lifetime_flag_ == true|
102  ByteStreamReaderImpl* peer_;
103};
104
105class ByteStreamReaderImpl : public ByteStreamReader {
106 public:
107  ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
108                       scoped_refptr<LifetimeFlag> lifetime_flag,
109                       size_t buffer_size);
110  virtual ~ByteStreamReaderImpl();
111
112  // Must be called before any operations are performed.
113  void SetPeer(ByteStreamWriterImpl* peer,
114               scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
115               scoped_refptr<LifetimeFlag> peer_lifetime_flag);
116
117  // Overridden from ByteStreamReader.
118  virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
119                           size_t* length) OVERRIDE;
120  virtual int GetStatus() const OVERRIDE;
121  virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
122
123  // PostTask target from |ByteStreamWriterImpl::Write| and
124  // |ByteStreamWriterImpl::Close|.
125  // Receive data from our peer.
126  // static because it may be called after the object it is targeting
127  // has been destroyed.  It may not access |*target|
128  // if |*object_lifetime_flag| is false.
129  static void TransferData(
130      scoped_refptr<LifetimeFlag> object_lifetime_flag,
131      ByteStreamReaderImpl* target,
132      scoped_ptr<ContentVector> transfer_buffer,
133      size_t transfer_buffer_bytes,
134      bool source_complete,
135      int status);
136
137 private:
138  // Called from TransferData once object existence has been validated.
139  void TransferDataInternal(
140      scoped_ptr<ContentVector> transfer_buffer,
141      size_t transfer_buffer_bytes,
142      bool source_complete,
143      int status);
144
145  void MaybeUpdateInput();
146
147  const size_t total_buffer_size_;
148
149  scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
150
151  // True while this object is alive.
152  scoped_refptr<LifetimeFlag> my_lifetime_flag_;
153
154  ContentVector available_contents_;
155
156  bool received_status_;
157  int status_;
158
159  base::Closure data_available_callback_;
160
161  // Time of last point at which data in stream transitioned from full
162  // to non-full.  Nulled when a callback is sent.
163  base::Time last_non_full_time_;
164
165  // ** Peer information
166
167  scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
168
169  // How much has been removed from this class that we haven't told
170  // the input about yet.
171  size_t unreported_consumed_bytes_;
172
173  // Only valid to access on peer_task_runner_.
174  scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
175
176  // Only valid to access on peer_task_runner_ if
177  // |*peer_lifetime_flag_ == true|
178  ByteStreamWriterImpl* peer_;
179};
180
181ByteStreamWriterImpl::ByteStreamWriterImpl(
182    scoped_refptr<base::SequencedTaskRunner> task_runner,
183    scoped_refptr<LifetimeFlag> lifetime_flag,
184    size_t buffer_size)
185    : total_buffer_size_(buffer_size),
186      my_task_runner_(task_runner),
187      my_lifetime_flag_(lifetime_flag),
188      input_contents_size_(0),
189      output_size_used_(0),
190      peer_(NULL) {
191  DCHECK(my_lifetime_flag_.get());
192  my_lifetime_flag_->is_alive = true;
193}
194
195ByteStreamWriterImpl::~ByteStreamWriterImpl() {
196  my_lifetime_flag_->is_alive = false;
197}
198
199void ByteStreamWriterImpl::SetPeer(
200    ByteStreamReaderImpl* peer,
201    scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
202    scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
203  peer_ = peer;
204  peer_task_runner_ = peer_task_runner;
205  peer_lifetime_flag_ = peer_lifetime_flag;
206}
207
208bool ByteStreamWriterImpl::Write(
209    scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
210  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
211
212  input_contents_.push_back(std::make_pair(buffer, byte_count));
213  input_contents_size_ += byte_count;
214
215  // Arbitrarily, we buffer to a third of the total size before sending.
216  if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
217    PostToPeer(false, 0);
218
219  return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
220}
221
222void ByteStreamWriterImpl::Flush() {
223  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
224  if (input_contents_size_ > 0)
225    PostToPeer(false, 0);
226}
227
228void ByteStreamWriterImpl::Close(int status) {
229  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
230  PostToPeer(true, status);
231}
232
233void ByteStreamWriterImpl::RegisterCallback(
234    const base::Closure& source_callback) {
235  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
236  space_available_callback_ = source_callback;
237}
238
239// static
240void ByteStreamWriterImpl::UpdateWindow(
241    scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
242    size_t bytes_consumed) {
243  // If the target object isn't alive anymore, we do nothing.
244  if (!lifetime_flag->is_alive) return;
245
246  target->UpdateWindowInternal(bytes_consumed);
247}
248
249void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
250  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
251  DCHECK_GE(output_size_used_, bytes_consumed);
252  output_size_used_ -= bytes_consumed;
253
254  // Callback if we were above the limit and we're now <= to it.
255  size_t total_known_size_used =
256      input_contents_size_ + output_size_used_;
257
258  if (total_known_size_used <= total_buffer_size_ &&
259      (total_known_size_used + bytes_consumed > total_buffer_size_) &&
260      !space_available_callback_.is_null())
261    space_available_callback_.Run();
262}
263
264void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
265  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
266  // Valid contexts in which to call.
267  DCHECK(complete || 0 != input_contents_size_);
268
269  scoped_ptr<ContentVector> transfer_buffer;
270  size_t buffer_size = 0;
271  if (0 != input_contents_size_) {
272    transfer_buffer.reset(new ContentVector);
273    transfer_buffer->swap(input_contents_);
274    buffer_size = input_contents_size_;
275    output_size_used_ += input_contents_size_;
276    input_contents_size_ = 0;
277  }
278  peer_task_runner_->PostTask(
279      FROM_HERE, base::Bind(
280          &ByteStreamReaderImpl::TransferData,
281          peer_lifetime_flag_,
282          peer_,
283          base::Passed(&transfer_buffer),
284          buffer_size,
285          complete,
286          status));
287}
288
289ByteStreamReaderImpl::ByteStreamReaderImpl(
290    scoped_refptr<base::SequencedTaskRunner> task_runner,
291    scoped_refptr<LifetimeFlag> lifetime_flag,
292    size_t buffer_size)
293    : total_buffer_size_(buffer_size),
294      my_task_runner_(task_runner),
295      my_lifetime_flag_(lifetime_flag),
296      received_status_(false),
297      status_(0),
298      unreported_consumed_bytes_(0),
299      peer_(NULL) {
300  DCHECK(my_lifetime_flag_.get());
301  my_lifetime_flag_->is_alive = true;
302}
303
304ByteStreamReaderImpl::~ByteStreamReaderImpl() {
305  my_lifetime_flag_->is_alive = false;
306}
307
308void ByteStreamReaderImpl::SetPeer(
309    ByteStreamWriterImpl* peer,
310    scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
311    scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
312  peer_ = peer;
313  peer_task_runner_ = peer_task_runner;
314  peer_lifetime_flag_ = peer_lifetime_flag;
315}
316
317ByteStreamReaderImpl::StreamState
318ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
319                           size_t* length) {
320  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
321
322  if (available_contents_.size()) {
323    *data = available_contents_.front().first;
324    *length = available_contents_.front().second;
325    available_contents_.pop_front();
326    unreported_consumed_bytes_ += *length;
327
328    MaybeUpdateInput();
329    return STREAM_HAS_DATA;
330  }
331  if (received_status_) {
332    return STREAM_COMPLETE;
333  }
334  return STREAM_EMPTY;
335}
336
337int ByteStreamReaderImpl::GetStatus() const {
338  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
339  DCHECK(received_status_);
340  return status_;
341}
342
343void ByteStreamReaderImpl::RegisterCallback(
344    const base::Closure& sink_callback) {
345  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
346
347  data_available_callback_ = sink_callback;
348}
349
350// static
351void ByteStreamReaderImpl::TransferData(
352    scoped_refptr<LifetimeFlag> object_lifetime_flag,
353    ByteStreamReaderImpl* target,
354    scoped_ptr<ContentVector> transfer_buffer,
355    size_t buffer_size,
356    bool source_complete,
357    int status) {
358  // If our target is no longer alive, do nothing.
359  if (!object_lifetime_flag->is_alive) return;
360
361  target->TransferDataInternal(
362      transfer_buffer.Pass(), buffer_size, source_complete, status);
363}
364
365void ByteStreamReaderImpl::TransferDataInternal(
366    scoped_ptr<ContentVector> transfer_buffer,
367    size_t buffer_size,
368    bool source_complete,
369    int status) {
370  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
371
372  bool was_empty = available_contents_.empty();
373
374  if (transfer_buffer) {
375    available_contents_.insert(available_contents_.end(),
376                               transfer_buffer->begin(),
377                               transfer_buffer->end());
378  }
379
380  if (source_complete) {
381    received_status_ = true;
382    status_ = status;
383  }
384
385  // Callback on transition from empty to non-empty, or
386  // source complete.
387  if (((was_empty && !available_contents_.empty()) ||
388       source_complete) &&
389      !data_available_callback_.is_null())
390    data_available_callback_.Run();
391}
392
393// Decide whether or not to send the input a window update.
394// Currently we do that whenever we've got unreported consumption
395// greater than 1/3 of total size.
396void ByteStreamReaderImpl::MaybeUpdateInput() {
397  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
398
399  if (unreported_consumed_bytes_ <=
400      total_buffer_size_ / kFractionReadBeforeWindowUpdate)
401    return;
402
403  peer_task_runner_->PostTask(
404      FROM_HERE, base::Bind(
405          &ByteStreamWriterImpl::UpdateWindow,
406          peer_lifetime_flag_,
407          peer_,
408          unreported_consumed_bytes_));
409  unreported_consumed_bytes_ = 0;
410}
411
412}  // namespace
413
414const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
415const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
416
417ByteStreamReader::~ByteStreamReader() { }
418
419ByteStreamWriter::~ByteStreamWriter() { }
420
421void CreateByteStream(
422    scoped_refptr<base::SequencedTaskRunner> input_task_runner,
423    scoped_refptr<base::SequencedTaskRunner> output_task_runner,
424    size_t buffer_size,
425    scoped_ptr<ByteStreamWriter>* input,
426    scoped_ptr<ByteStreamReader>* output) {
427  scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
428  scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
429
430  ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
431      input_task_runner, input_flag, buffer_size);
432  ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
433      output_task_runner, output_flag, buffer_size);
434
435  in->SetPeer(out, output_task_runner, output_flag);
436  out->SetPeer(in, input_task_runner, input_flag);
437  input->reset(in);
438  output->reset(out);
439}
440
441}  // namespace content
442