1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/browser/byte_stream.h"
6
7#include <deque>
8#include <set>
9#include <utility>
10
11#include "base/bind.h"
12#include "base/location.h"
13#include "base/memory/ref_counted.h"
14#include "base/sequenced_task_runner.h"
15
16namespace content {
17namespace {
18
19typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
20ContentVector;
21
22class ByteStreamReaderImpl;
23
24// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25// cleared in an object destructor and accessed to check for object
26// existence.  We can't use weak pointers because they're tightly tied to
27// threads rather than task runners.
28// TODO(rdsmith): A better solution would be extending weak pointers
29// to support SequencedTaskRunners.
30struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
31 public:
32  LifetimeFlag() : is_alive(true) { }
33  bool is_alive;
34
35 protected:
36  friend class base::RefCountedThreadSafe<LifetimeFlag>;
37  virtual ~LifetimeFlag() { }
38
39 private:
40  DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
41};
42
43// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44// SetPeer may happen anywhere; all other operations on each class must
45// happen in the context of their SequencedTaskRunner.
46class ByteStreamWriterImpl : public ByteStreamWriter {
47 public:
48  ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
49                       scoped_refptr<LifetimeFlag> lifetime_flag,
50                       size_t buffer_size);
51  virtual ~ByteStreamWriterImpl();
52
53  // Must be called before any operations are performed.
54  void SetPeer(ByteStreamReaderImpl* peer,
55               scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
56               scoped_refptr<LifetimeFlag> peer_lifetime_flag);
57
58  // Overridden from ByteStreamWriter.
59  virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
60                     size_t byte_count) OVERRIDE;
61  virtual void Flush() OVERRIDE;
62  virtual void Close(int status) OVERRIDE;
63  virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
64  virtual size_t GetTotalBufferedBytes() const OVERRIDE;
65
66  // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
67  static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
68                           ByteStreamWriterImpl* target,
69                           size_t bytes_consumed);
70
71 private:
72  // Called from UpdateWindow when object existence has been validated.
73  void UpdateWindowInternal(size_t bytes_consumed);
74
75  void PostToPeer(bool complete, int status);
76
77  const size_t total_buffer_size_;
78
79  // All data objects in this class are only valid to access on
80  // this task runner except as otherwise noted.
81  scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
82
83  // True while this object is alive.
84  scoped_refptr<LifetimeFlag> my_lifetime_flag_;
85
86  base::Closure space_available_callback_;
87  ContentVector input_contents_;
88  size_t input_contents_size_;
89
90  // ** Peer information.
91
92  scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
93
94  // How much we've sent to the output that for flow control purposes we
95  // must assume hasn't been read yet.
96  size_t output_size_used_;
97
98  // Only valid to access on peer_task_runner_.
99  scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
100
101  // Only valid to access on peer_task_runner_ if
102  // |*peer_lifetime_flag_ == true|
103  ByteStreamReaderImpl* peer_;
104};
105
106class ByteStreamReaderImpl : public ByteStreamReader {
107 public:
108  ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
109                       scoped_refptr<LifetimeFlag> lifetime_flag,
110                       size_t buffer_size);
111  virtual ~ByteStreamReaderImpl();
112
113  // Must be called before any operations are performed.
114  void SetPeer(ByteStreamWriterImpl* peer,
115               scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
116               scoped_refptr<LifetimeFlag> peer_lifetime_flag);
117
118  // Overridden from ByteStreamReader.
119  virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
120                           size_t* length) OVERRIDE;
121  virtual int GetStatus() const OVERRIDE;
122  virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
123
124  // PostTask target from |ByteStreamWriterImpl::Write| and
125  // |ByteStreamWriterImpl::Close|.
126  // Receive data from our peer.
127  // static because it may be called after the object it is targeting
128  // has been destroyed.  It may not access |*target|
129  // if |*object_lifetime_flag| is false.
130  static void TransferData(
131      scoped_refptr<LifetimeFlag> object_lifetime_flag,
132      ByteStreamReaderImpl* target,
133      scoped_ptr<ContentVector> transfer_buffer,
134      size_t transfer_buffer_bytes,
135      bool source_complete,
136      int status);
137
138 private:
139  // Called from TransferData once object existence has been validated.
140  void TransferDataInternal(
141      scoped_ptr<ContentVector> transfer_buffer,
142      size_t transfer_buffer_bytes,
143      bool source_complete,
144      int status);
145
146  void MaybeUpdateInput();
147
148  const size_t total_buffer_size_;
149
150  scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
151
152  // True while this object is alive.
153  scoped_refptr<LifetimeFlag> my_lifetime_flag_;
154
155  ContentVector available_contents_;
156
157  bool received_status_;
158  int status_;
159
160  base::Closure data_available_callback_;
161
162  // Time of last point at which data in stream transitioned from full
163  // to non-full.  Nulled when a callback is sent.
164  base::Time last_non_full_time_;
165
166  // ** Peer information
167
168  scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
169
170  // How much has been removed from this class that we haven't told
171  // the input about yet.
172  size_t unreported_consumed_bytes_;
173
174  // Only valid to access on peer_task_runner_.
175  scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
176
177  // Only valid to access on peer_task_runner_ if
178  // |*peer_lifetime_flag_ == true|
179  ByteStreamWriterImpl* peer_;
180};
181
182ByteStreamWriterImpl::ByteStreamWriterImpl(
183    scoped_refptr<base::SequencedTaskRunner> task_runner,
184    scoped_refptr<LifetimeFlag> lifetime_flag,
185    size_t buffer_size)
186    : total_buffer_size_(buffer_size),
187      my_task_runner_(task_runner),
188      my_lifetime_flag_(lifetime_flag),
189      input_contents_size_(0),
190      output_size_used_(0),
191      peer_(NULL) {
192  DCHECK(my_lifetime_flag_.get());
193  my_lifetime_flag_->is_alive = true;
194}
195
196ByteStreamWriterImpl::~ByteStreamWriterImpl() {
197  // No RunsTasksOnCurrentThread() check to allow deleting a created writer
198  // before we start using it. Once started, should be deleted on the specified
199  // task runner.
200  my_lifetime_flag_->is_alive = false;
201}
202
203void ByteStreamWriterImpl::SetPeer(
204    ByteStreamReaderImpl* peer,
205    scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
206    scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
207  peer_ = peer;
208  peer_task_runner_ = peer_task_runner;
209  peer_lifetime_flag_ = peer_lifetime_flag;
210}
211
212bool ByteStreamWriterImpl::Write(
213    scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
214  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
215
216  // Check overflow.
217  //
218  // TODO(tyoshino): Discuss with content/browser/download developer and if
219  // they're fine with, set smaller limit and make it configurable.
220  size_t space_limit = std::numeric_limits<size_t>::max() -
221      GetTotalBufferedBytes();
222  if (byte_count > space_limit) {
223    // TODO(tyoshino): Tell the user that Write() failed.
224    // Ignore input.
225    return false;
226  }
227
228  input_contents_.push_back(std::make_pair(buffer, byte_count));
229  input_contents_size_ += byte_count;
230
231  // Arbitrarily, we buffer to a third of the total size before sending.
232  if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
233    PostToPeer(false, 0);
234
235  return GetTotalBufferedBytes() <= total_buffer_size_;
236}
237
238void ByteStreamWriterImpl::Flush() {
239  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
240  if (input_contents_size_ > 0)
241    PostToPeer(false, 0);
242}
243
244void ByteStreamWriterImpl::Close(int status) {
245  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
246  PostToPeer(true, status);
247}
248
249void ByteStreamWriterImpl::RegisterCallback(
250    const base::Closure& source_callback) {
251  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
252  space_available_callback_ = source_callback;
253}
254
255size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
256  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
257  // This sum doesn't overflow since Write() fails if this sum is going to
258  // overflow.
259  return input_contents_size_ + output_size_used_;
260}
261
262// static
263void ByteStreamWriterImpl::UpdateWindow(
264    scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
265    size_t bytes_consumed) {
266  // If the target object isn't alive anymore, we do nothing.
267  if (!lifetime_flag->is_alive) return;
268
269  target->UpdateWindowInternal(bytes_consumed);
270}
271
272void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
273  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
274
275  bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
276
277  DCHECK_GE(output_size_used_, bytes_consumed);
278  output_size_used_ -= bytes_consumed;
279
280  // Callback if we were above the limit and we're now <= to it.
281  bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
282
283  if (no_longer_above_limit && was_above_limit &&
284      !space_available_callback_.is_null())
285    space_available_callback_.Run();
286}
287
288void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
289  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
290  // Valid contexts in which to call.
291  DCHECK(complete || 0 != input_contents_size_);
292
293  scoped_ptr<ContentVector> transfer_buffer;
294  size_t buffer_size = 0;
295  if (0 != input_contents_size_) {
296    transfer_buffer.reset(new ContentVector);
297    transfer_buffer->swap(input_contents_);
298    buffer_size = input_contents_size_;
299    output_size_used_ += input_contents_size_;
300    input_contents_size_ = 0;
301  }
302  peer_task_runner_->PostTask(
303      FROM_HERE, base::Bind(
304          &ByteStreamReaderImpl::TransferData,
305          peer_lifetime_flag_,
306          peer_,
307          base::Passed(&transfer_buffer),
308          buffer_size,
309          complete,
310          status));
311}
312
313ByteStreamReaderImpl::ByteStreamReaderImpl(
314    scoped_refptr<base::SequencedTaskRunner> task_runner,
315    scoped_refptr<LifetimeFlag> lifetime_flag,
316    size_t buffer_size)
317    : total_buffer_size_(buffer_size),
318      my_task_runner_(task_runner),
319      my_lifetime_flag_(lifetime_flag),
320      received_status_(false),
321      status_(0),
322      unreported_consumed_bytes_(0),
323      peer_(NULL) {
324  DCHECK(my_lifetime_flag_.get());
325  my_lifetime_flag_->is_alive = true;
326}
327
328ByteStreamReaderImpl::~ByteStreamReaderImpl() {
329  // No RunsTasksOnCurrentThread() check to allow deleting a created writer
330  // before we start using it. Once started, should be deleted on the specified
331  // task runner.
332  my_lifetime_flag_->is_alive = false;
333}
334
335void ByteStreamReaderImpl::SetPeer(
336    ByteStreamWriterImpl* peer,
337    scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
338    scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
339  peer_ = peer;
340  peer_task_runner_ = peer_task_runner;
341  peer_lifetime_flag_ = peer_lifetime_flag;
342}
343
344ByteStreamReaderImpl::StreamState
345ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
346                           size_t* length) {
347  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
348
349  if (available_contents_.size()) {
350    *data = available_contents_.front().first;
351    *length = available_contents_.front().second;
352    available_contents_.pop_front();
353    unreported_consumed_bytes_ += *length;
354
355    MaybeUpdateInput();
356    return STREAM_HAS_DATA;
357  }
358  if (received_status_) {
359    return STREAM_COMPLETE;
360  }
361  return STREAM_EMPTY;
362}
363
364int ByteStreamReaderImpl::GetStatus() const {
365  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
366  DCHECK(received_status_);
367  return status_;
368}
369
370void ByteStreamReaderImpl::RegisterCallback(
371    const base::Closure& sink_callback) {
372  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
373
374  data_available_callback_ = sink_callback;
375}
376
377// static
378void ByteStreamReaderImpl::TransferData(
379    scoped_refptr<LifetimeFlag> object_lifetime_flag,
380    ByteStreamReaderImpl* target,
381    scoped_ptr<ContentVector> transfer_buffer,
382    size_t buffer_size,
383    bool source_complete,
384    int status) {
385  // If our target is no longer alive, do nothing.
386  if (!object_lifetime_flag->is_alive) return;
387
388  target->TransferDataInternal(
389      transfer_buffer.Pass(), buffer_size, source_complete, status);
390}
391
392void ByteStreamReaderImpl::TransferDataInternal(
393    scoped_ptr<ContentVector> transfer_buffer,
394    size_t buffer_size,
395    bool source_complete,
396    int status) {
397  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
398
399  bool was_empty = available_contents_.empty();
400
401  if (transfer_buffer) {
402    available_contents_.insert(available_contents_.end(),
403                               transfer_buffer->begin(),
404                               transfer_buffer->end());
405  }
406
407  if (source_complete) {
408    received_status_ = true;
409    status_ = status;
410  }
411
412  // Callback on transition from empty to non-empty, or
413  // source complete.
414  if (((was_empty && !available_contents_.empty()) ||
415       source_complete) &&
416      !data_available_callback_.is_null())
417    data_available_callback_.Run();
418}
419
420// Decide whether or not to send the input a window update.
421// Currently we do that whenever we've got unreported consumption
422// greater than 1/3 of total size.
423void ByteStreamReaderImpl::MaybeUpdateInput() {
424  DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
425
426  if (unreported_consumed_bytes_ <=
427      total_buffer_size_ / kFractionReadBeforeWindowUpdate)
428    return;
429
430  peer_task_runner_->PostTask(
431      FROM_HERE, base::Bind(
432          &ByteStreamWriterImpl::UpdateWindow,
433          peer_lifetime_flag_,
434          peer_,
435          unreported_consumed_bytes_));
436  unreported_consumed_bytes_ = 0;
437}
438
439}  // namespace
440
441const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
442const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
443
444ByteStreamReader::~ByteStreamReader() { }
445
446ByteStreamWriter::~ByteStreamWriter() { }
447
448void CreateByteStream(
449    scoped_refptr<base::SequencedTaskRunner> input_task_runner,
450    scoped_refptr<base::SequencedTaskRunner> output_task_runner,
451    size_t buffer_size,
452    scoped_ptr<ByteStreamWriter>* input,
453    scoped_ptr<ByteStreamReader>* output) {
454  scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
455  scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
456
457  ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
458      input_task_runner, input_flag, buffer_size);
459  ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
460      output_task_runner, output_flag, buffer_size);
461
462  in->SetPeer(out, output_task_runner, output_flag);
463  out->SetPeer(in, input_task_runner, input_flag);
464  input->reset(in);
465  output->reset(out);
466}
467
468}  // namespace content
469