1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef CONTENT_BROWSER_BYTE_STREAM_H_
6#define CONTENT_BROWSER_BYTE_STREAM_H_
7
8#include "base/callback.h"
9#include "base/memory/ref_counted.h"
10#include "base/memory/scoped_ptr.h"
11#include "content/common/content_export.h"
12#include "net/base/io_buffer.h"
13
14namespace base {
15class SequencedTaskRunner;
16}
17
18namespace content {
19
20// A byte stream is a pipe to transfer bytes between a source and a
21// sink, which may be on different threads.  It is intended to be the
22// only connection between source and sink; they need have no
23// direct awareness of each other aside from the byte stream.  The source and
24// the sink have different interfaces to a byte stream, |ByteStreamWriter|
25// and |ByteStreamReader|.  A pair of connected interfaces is generated by
26// calling |CreateByteStream|.
27//
28// The source adds bytes to the bytestream via |ByteStreamWriter::Write|
29// and the sink retrieves bytes already written via |ByteStreamReader::Read|.
30//
31// When the source has no more data to add, it will call
32// |ByteStreamWriter::Close| to indicate that.  Operation status at the source
33// is indicated to the sink via an int passed to the Close() method and returned
34// from the GetStatus() method. Source and sink must agree on the interpretation
35// of this int.
36//
37// Normally the source is not managed after the relationship is setup;
38// it is expected to provide data and then close itself.  If an error
39// occurs on the sink, it is not signalled to the source via this
40// mechanism; instead, the source will write data until it exausts the
41// available space.  If the source needs to be aware of errors occuring
42// on the sink, this must be signalled in some other fashion (usually
43// through whatever controller setup the relationship).
44//
45// Callback lifetime management: No lifetime management is done in this
46// class to prevent registered callbacks from being called after any
47// objects to which they may refer have been destroyed.  It is the
48// responsibility of the callers to avoid use-after-free references.
49// This may be done by any of several mechanisms, including weak
50// pointers, scoped_refptr references, or calling the registration
51// function with a null callback from a destructor.  To enable the null
52// callback strategy, callbacks will not be stored between retrieval and
53// evaluation, so setting a null callback will guarantee that the
54// previous callback will not be executed after setting.
55//
56// Class methods are virtual to allow mocking for tests; these classes
57// aren't intended to be base classes for other classes.
58//
59// Sample usage (note that this does not show callback usage):
60//
61//    void OriginatingClass::Initialize() {
62//      // Create a stream for sending bytes from IO->FILE threads.
63//      scoped_ptr<ByteStreamWriter> writer;
64//      scoped_ptr<ByteStreamReader> reader;
65//      CreateByteStream(
66//          BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO),
67//          BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE),
68//          kStreamBufferSize /* e.g. 10240.  */,
69//          &writer,
70//          &reader);         // Presumed passed to FILE thread for reading.
71//
72//      // Setup callback for writing.
73//      writer->RegisterCallback(base::Bind(&SpaceAvailable, this));
74//
75//      // Do initial round of writing.
76//      SpaceAvailable();
77//    }
78//
79//    // May only be run on first argument task runner, in this case the IO
80//    // thread.
81//    void OriginatingClass::SpaceAvailable() {
82//      while (<data available>) {
83//        scoped_ptr<net::IOBuffer> buffer;
84//        size_t buffer_length;
85//        // Create IOBuffer, fill in with data, and set buffer_length.
86//        if (!writer->Write(buffer, buffer_length)) {
87//          // No more space; return and we'll be called again
88//          // when there is space.
89//          return;
90//        }
91//      }
92//      writer->Close(<operation status>);
93//      writer.reset(NULL);
94//    }
95//
96//    // On File thread; containing class setup not shown.
97//
98//    void ReceivingClass::Initialize() {
99//      // Initialization
100//      reader->RegisterCallback(base::Bind(&DataAvailable, obj));
101//    }
102//
103//    // Called whenever there's something to read.
104//    void ReceivingClass::DataAvailable() {
105//      scoped_refptr<net::IOBuffer> data;
106//      size_t length = 0;
107//
108//      while (ByteStreamReader::STREAM_HAS_DATA ==
109//             (state = reader->Read(&data, &length))) {
110//        // Process |data|.
111//      }
112//
113//      if (ByteStreamReader::STREAM_COMPLETE == state) {
114//        int status = reader->GetStatus();
115//        // Process error or successful completion in |status|.
116//      }
117//
118//      // if |state| is STREAM_EMPTY, we're done for now; we'll be called
119//      // again when there's more data.
120//    }
121class CONTENT_EXPORT ByteStreamWriter {
122 public:
123  // Inverse of the fraction of the stream buffer that must be full before
124  // a notification is sent to paired Reader that there's more data.
125  static const int kFractionBufferBeforeSending;
126
127  virtual ~ByteStreamWriter() = 0;
128
129  // Always adds the data passed into the ByteStream.  Returns true
130  // if more data may be added without exceeding the class limit
131  // on data.  Takes ownership of |buffer|.
132  virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
133                     size_t byte_count) = 0;
134
135  // Flushes contents buffered in this writer to the corresponding reader
136  // regardless if buffer filling rate is greater than
137  // kFractionBufferBeforeSending or not. Does nothing if there's no contents
138  // buffered.
139  virtual void Flush() = 0;
140
141  // Signal that all data that is going to be sent, has been sent,
142  // and provide a status.
143  virtual void Close(int status) = 0;
144
145  // Register a callback to be called when the stream transitions from
146  // full to having space available.  The callback will always be
147  // called on the task runner associated with the ByteStreamWriter.
148  // This callback will only be called if a call to Write has previously
149  // returned false (i.e. the ByteStream has been filled).
150  // Multiple calls to this function are supported, though note that it
151  // is the callers responsibility to handle races with space becoming
152  // available (i.e. in the case of that race either of the before
153  // or after callbacks may be called).
154  // The callback will not be called after ByteStreamWriter destruction.
155  virtual void RegisterCallback(const base::Closure& source_callback) = 0;
156
157  // Returns the number of bytes sent to the reader but not yet reported by
158  // the reader as read.
159  virtual size_t GetTotalBufferedBytes() const = 0;
160};
161
162class CONTENT_EXPORT ByteStreamReader {
163 public:
164  // Inverse of the fraction of the stream buffer that must be empty before
165  // a notification is send to paired Writer that there's more room.
166  static const int kFractionReadBeforeWindowUpdate;
167
168  enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE };
169
170  virtual ~ByteStreamReader() = 0;
171
172  // Returns STREAM_EMPTY if there is no data on the ByteStream and
173  // Close() has not been called, and STREAM_COMPLETE if there
174  // is no data on the ByteStream and Close() has been called.
175  // If there is data on the ByteStream, returns STREAM_HAS_DATA
176  // and fills in |*data| with a pointer to the data, and |*length|
177  // with its length.
178  virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
179                           size_t* length) = 0;
180
181  // Only valid to call if Read() has returned STREAM_COMPLETE.
182  virtual int GetStatus() const = 0;
183
184  // Register a callback to be called when data is added or the source
185  // completes.  The callback will be always be called on the owning
186  // task runner.  Multiple calls to this function are supported,
187  // though note that it is the callers responsibility to handle races
188  // with data becoming available (i.e. in the case of that race
189  // either of the before or after callbacks may be called).
190  // The callback will not be called after ByteStreamReader destruction.
191  virtual void RegisterCallback(const base::Closure& sink_callback) = 0;
192};
193
194CONTENT_EXPORT void CreateByteStream(
195    scoped_refptr<base::SequencedTaskRunner> input_task_runner,
196    scoped_refptr<base::SequencedTaskRunner> output_task_runner,
197    size_t buffer_size,
198    scoped_ptr<ByteStreamWriter>* input,
199    scoped_ptr<ByteStreamReader>* output);
200
201}  // namespace content
202
203#endif  // CONTENT_BROWSER_BYTE_STREAM_H_
204