1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/message_loop/message_loop.h"
6#include "base/test/test_simple_task_runner.h"
7#include "content/browser/streams/stream.h"
8#include "content/browser/streams/stream_read_observer.h"
9#include "content/browser/streams/stream_registry.h"
10#include "content/browser/streams/stream_write_observer.h"
11#include "testing/gtest/include/gtest/gtest.h"
12
13namespace content {
14
15class StreamTest : public testing::Test {
16 public:
17  StreamTest() : producing_seed_key_(0) {}
18
19  virtual void SetUp() OVERRIDE {
20    registry_.reset(new StreamRegistry());
21  }
22
23  // Create a new IO buffer of the given |buffer_size| and fill it with random
24  // data.
25  scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size) {
26    scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size));
27    char *bufferp = buffer->data();
28    for (size_t i = 0; i < buffer_size; i++)
29      bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char));
30    ++producing_seed_key_;
31    return buffer;
32  }
33
34 protected:
35  base::MessageLoop message_loop_;
36  scoped_ptr<StreamRegistry> registry_;
37
38 private:
39  int producing_seed_key_;
40};
41
42class TestStreamReader : public StreamReadObserver {
43 public:
44  TestStreamReader() : buffer_(new net::GrowableIOBuffer()), completed_(false) {
45  }
46  virtual ~TestStreamReader() {}
47
48  void Read(Stream* stream) {
49    const size_t kBufferSize = 32768;
50    scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize));
51
52    int bytes_read = 0;
53    while (true) {
54      Stream::StreamState state =
55          stream->ReadRawData(buffer.get(), kBufferSize, &bytes_read);
56      switch (state) {
57        case Stream::STREAM_HAS_DATA:
58          // TODO(tyoshino): Move these expectations to the beginning of Read()
59          // method once Stream::Finalize() is fixed.
60          EXPECT_FALSE(completed_);
61          break;
62        case Stream::STREAM_COMPLETE:
63          completed_ = true;
64          return;
65        case Stream::STREAM_EMPTY:
66          EXPECT_FALSE(completed_);
67          return;
68        case Stream::STREAM_ABORTED:
69          EXPECT_FALSE(completed_);
70          return;
71      }
72      size_t old_capacity = buffer_->capacity();
73      buffer_->SetCapacity(old_capacity + bytes_read);
74      memcpy(buffer_->StartOfBuffer() + old_capacity,
75             buffer->data(), bytes_read);
76    }
77  }
78
79  virtual void OnDataAvailable(Stream* stream) OVERRIDE {
80    Read(stream);
81  }
82
83  scoped_refptr<net::GrowableIOBuffer> buffer() { return buffer_; }
84
85  bool completed() const {
86    return completed_;
87  }
88
89 private:
90  scoped_refptr<net::GrowableIOBuffer> buffer_;
91  bool completed_;
92};
93
94class TestStreamWriter : public StreamWriteObserver {
95 public:
96  TestStreamWriter() {}
97  virtual ~TestStreamWriter() {}
98
99  void Write(Stream* stream,
100             scoped_refptr<net::IOBuffer> buffer,
101             size_t buffer_size) {
102    stream->AddData(buffer, buffer_size);
103  }
104
105  virtual void OnSpaceAvailable(Stream* stream) OVERRIDE {
106  }
107
108  virtual void OnClose(Stream* stream) OVERRIDE {
109  }
110};
111
112TEST_F(StreamTest, SetReadObserver) {
113  TestStreamReader reader;
114  TestStreamWriter writer;
115
116  GURL url("blob://stream");
117  scoped_refptr<Stream> stream(
118      new Stream(registry_.get(), &writer, url));
119  EXPECT_TRUE(stream->SetReadObserver(&reader));
120}
121
122TEST_F(StreamTest, SetReadObserver_SecondFails) {
123  TestStreamReader reader1;
124  TestStreamReader reader2;
125  TestStreamWriter writer;
126
127  GURL url("blob://stream");
128  scoped_refptr<Stream> stream(
129      new Stream(registry_.get(), &writer, url));
130  EXPECT_TRUE(stream->SetReadObserver(&reader1));
131  EXPECT_FALSE(stream->SetReadObserver(&reader2));
132}
133
134TEST_F(StreamTest, SetReadObserver_TwoReaders) {
135  TestStreamReader reader1;
136  TestStreamReader reader2;
137  TestStreamWriter writer;
138
139  GURL url("blob://stream");
140  scoped_refptr<Stream> stream(
141      new Stream(registry_.get(), &writer, url));
142  EXPECT_TRUE(stream->SetReadObserver(&reader1));
143
144  // Once the first read observer is removed, a new one can be added.
145  stream->RemoveReadObserver(&reader1);
146  EXPECT_TRUE(stream->SetReadObserver(&reader2));
147}
148
149TEST_F(StreamTest, Stream) {
150  TestStreamReader reader;
151  TestStreamWriter writer;
152
153  GURL url("blob://stream");
154  scoped_refptr<Stream> stream(
155      new Stream(registry_.get(), &writer, url));
156  EXPECT_TRUE(stream->SetReadObserver(&reader));
157
158  const int kBufferSize = 1000000;
159  scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
160  writer.Write(stream.get(), buffer, kBufferSize);
161  stream->Finalize();
162  base::MessageLoop::current()->RunUntilIdle();
163  EXPECT_TRUE(reader.completed());
164
165  ASSERT_EQ(reader.buffer()->capacity(), kBufferSize);
166  for (int i = 0; i < kBufferSize; i++)
167    EXPECT_EQ(buffer->data()[i], reader.buffer()->data()[i]);
168}
169
170// Test that even if a reader receives an empty buffer, once TransferData()
171// method is called on it with |source_complete| = true, following Read() calls
172// on it never returns STREAM_EMPTY. Together with StreamTest.Stream above, this
173// guarantees that Reader::Read() call returns only STREAM_HAS_DATA
174// or STREAM_COMPLETE in |data_available_callback_| call corresponding to
175// Writer::Close().
176TEST_F(StreamTest, ClosedReaderDoesNotReturnStreamEmpty) {
177  TestStreamReader reader;
178  TestStreamWriter writer;
179
180  GURL url("blob://stream");
181  scoped_refptr<Stream> stream(
182      new Stream(registry_.get(), &writer, url));
183  EXPECT_TRUE(stream->SetReadObserver(&reader));
184
185  const int kBufferSize = 0;
186  scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
187  stream->AddData(buffer, kBufferSize);
188  stream->Finalize();
189  base::MessageLoop::current()->RunUntilIdle();
190  EXPECT_TRUE(reader.completed());
191  EXPECT_EQ(0, reader.buffer()->capacity());
192}
193
194TEST_F(StreamTest, GetStream) {
195  TestStreamWriter writer;
196
197  GURL url("blob://stream");
198  scoped_refptr<Stream> stream1(
199      new Stream(registry_.get(), &writer, url));
200
201  scoped_refptr<Stream> stream2 = registry_->GetStream(url);
202  ASSERT_EQ(stream1, stream2);
203}
204
205TEST_F(StreamTest, GetStream_Missing) {
206  TestStreamWriter writer;
207
208  GURL url1("blob://stream");
209  scoped_refptr<Stream> stream1(
210      new Stream(registry_.get(), &writer, url1));
211
212  GURL url2("blob://stream2");
213  scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
214  ASSERT_FALSE(stream2.get());
215}
216
217TEST_F(StreamTest, CloneStream) {
218  TestStreamWriter writer;
219
220  GURL url1("blob://stream");
221  scoped_refptr<Stream> stream1(
222      new Stream(registry_.get(), &writer, url1));
223
224  GURL url2("blob://stream2");
225  ASSERT_TRUE(registry_->CloneStream(url2, url1));
226  scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
227  ASSERT_EQ(stream1, stream2);
228}
229
230TEST_F(StreamTest, CloneStream_Missing) {
231  TestStreamWriter writer;
232
233  GURL url1("blob://stream");
234  scoped_refptr<Stream> stream1(
235      new Stream(registry_.get(), &writer, url1));
236
237  GURL url2("blob://stream2");
238  GURL url3("blob://stream3");
239  ASSERT_FALSE(registry_->CloneStream(url2, url3));
240  scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
241  ASSERT_FALSE(stream2.get());
242}
243
244TEST_F(StreamTest, UnregisterStream) {
245  TestStreamWriter writer;
246
247  GURL url("blob://stream");
248  scoped_refptr<Stream> stream1(
249      new Stream(registry_.get(), &writer, url));
250
251  registry_->UnregisterStream(url);
252  scoped_refptr<Stream> stream2 = registry_->GetStream(url);
253  ASSERT_FALSE(stream2.get());
254}
255
256TEST_F(StreamTest, MemoryExceedMemoryUsageLimit) {
257  TestStreamWriter writer1;
258  TestStreamWriter writer2;
259
260  GURL url1("blob://stream");
261  scoped_refptr<Stream> stream1(
262      new Stream(registry_.get(), &writer1, url1));
263
264  GURL url2("blob://stream2");
265  scoped_refptr<Stream> stream2(
266      new Stream(registry_.get(), &writer2, url2));
267
268  const int kMaxMemoryUsage = 1500000;
269  registry_->set_max_memory_usage_for_testing(kMaxMemoryUsage);
270
271  const int kBufferSize = 1000000;
272  scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
273  writer1.Write(stream1.get(), buffer, kBufferSize);
274  // Make transfer happen.
275  base::MessageLoop::current()->RunUntilIdle();
276
277  writer2.Write(stream2.get(), buffer, kBufferSize);
278
279  // Written data (1000000 * 2) exceeded limit (1500000). |stream2| should be
280  // unregistered with |registry_|.
281  EXPECT_EQ(NULL, registry_->GetStream(url2).get());
282
283  writer1.Write(stream1.get(), buffer, kMaxMemoryUsage - kBufferSize);
284  // Should be accepted since stream2 is unregistered and the new data is not
285  // so big to exceed the limit.
286  EXPECT_FALSE(registry_->GetStream(url1).get() == NULL);
287}
288
289TEST_F(StreamTest, UnderMemoryUsageLimit) {
290  TestStreamWriter writer;
291  TestStreamReader reader;
292
293  GURL url("blob://stream");
294  scoped_refptr<Stream> stream(new Stream(registry_.get(), &writer, url));
295  EXPECT_TRUE(stream->SetReadObserver(&reader));
296
297  registry_->set_max_memory_usage_for_testing(1500000);
298
299  const int kBufferSize = 1000000;
300  scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
301  writer.Write(stream.get(), buffer, kBufferSize);
302
303  // Run loop to make |reader| consume the data.
304  base::MessageLoop::current()->RunUntilIdle();
305
306  writer.Write(stream.get(), buffer, kBufferSize);
307
308  EXPECT_EQ(stream.get(), registry_->GetStream(url).get());
309}
310
311}  // namespace content
312