byte_stream_unittest.cc revision 2a99a7e74a7f215066514fe81d2bfa6639d9eddd
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/browser/byte_stream.h"
6
7#include <deque>
8
9#include "base/bind.h"
10#include "base/callback.h"
11#include "base/memory/ref_counted.h"
12#include "base/message_loop.h"
13#include "base/test/test_simple_task_runner.h"
14#include "net/base/io_buffer.h"
15#include "testing/gtest/include/gtest/gtest.h"
16
17namespace content {
18namespace {
19
20void CountCallbacks(int* counter) {
21  ++*counter;
22}
23
24}  // namespace
25
26class ByteStreamTest : public testing::Test {
27 public:
28  ByteStreamTest();
29
30  // Create a new IO buffer of the given |buffer_size|.  Details of the
31  // contents of the created buffer will be kept, and can be validated
32  // by ValidateIOBuffer.
33  scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size) {
34    scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size));
35    char *bufferp = buffer->data();
36    for (size_t i = 0; i < buffer_size; i++)
37      bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char));
38    pointer_queue_.push_back(bufferp);
39    length_queue_.push_back(buffer_size);
40    ++producing_seed_key_;
41    return buffer;
42  }
43
44  // Create an IOBuffer of the appropriate size and add it to the
45  // ByteStream, returning the result of the ByteStream::Write.
46  // Separate function to avoid duplication of buffer_size in test
47  // calls.
48  bool Write(ByteStreamWriter* byte_stream_input, size_t buffer_size) {
49    return byte_stream_input->Write(NewIOBuffer(buffer_size), buffer_size);
50  }
51
52  // Validate that we have the IOBuffer we expect.  This routine must be
53  // called on buffers that were allocated from NewIOBuffer, and in the
54  // order that they were allocated.  Calls to NewIOBuffer &&
55  // ValidateIOBuffer may be interleaved.
56  bool ValidateIOBuffer(
57      scoped_refptr<net::IOBuffer> buffer, size_t buffer_size) {
58    char *bufferp = buffer->data();
59
60    char *expected_ptr = pointer_queue_.front();
61    size_t expected_length = length_queue_.front();
62    pointer_queue_.pop_front();
63    length_queue_.pop_front();
64    ++consuming_seed_key_;
65
66    EXPECT_EQ(expected_ptr, bufferp);
67    if (expected_ptr != bufferp)
68      return false;
69
70    EXPECT_EQ(expected_length, buffer_size);
71    if (expected_length != buffer_size)
72      return false;
73
74    for (size_t i = 0; i < buffer_size; i++) {
75      // Already incremented, so subtract one from the key.
76      EXPECT_EQ(static_cast<int>((i + consuming_seed_key_ - 1)
77                                 % (1 << sizeof(char))),
78                bufferp[i]);
79      if (static_cast<int>((i + consuming_seed_key_ - 1) %
80                           (1 << sizeof(char))) != bufferp[i]) {
81        return false;
82      }
83    }
84    return true;
85  }
86
87 protected:
88  MessageLoop message_loop_;
89
90 private:
91  int producing_seed_key_;
92  int consuming_seed_key_;
93  std::deque<char*> pointer_queue_;
94  std::deque<size_t> length_queue_;
95};
96
97ByteStreamTest::ByteStreamTest()
98    : producing_seed_key_(0),
99      consuming_seed_key_(0) { }
100
101// Confirm that filling and emptying the stream works properly, and that
102// we get full triggers when we expect.
103TEST_F(ByteStreamTest, ByteStream_PushBack) {
104  scoped_ptr<ByteStreamWriter> byte_stream_input;
105  scoped_ptr<ByteStreamReader> byte_stream_output;
106  CreateByteStream(
107      message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
108      3 * 1024, &byte_stream_input, &byte_stream_output);
109
110  // Push a series of IO buffers on; test pushback happening and
111  // that it's advisory.
112  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
113  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
114  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
115  EXPECT_FALSE(Write(byte_stream_input.get(), 1));
116  EXPECT_FALSE(Write(byte_stream_input.get(), 1024));
117  // Flush
118  byte_stream_input->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
119  message_loop_.RunUntilIdle();
120
121  // Pull the IO buffers out; do we get the same buffers and do they
122  // have the same contents?
123  scoped_refptr<net::IOBuffer> output_io_buffer;
124  size_t output_length;
125  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
126            byte_stream_output->Read(&output_io_buffer, &output_length));
127  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
128
129  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
130            byte_stream_output->Read(&output_io_buffer, &output_length));
131  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
132
133  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
134            byte_stream_output->Read(&output_io_buffer, &output_length));
135  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
136
137  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
138            byte_stream_output->Read(&output_io_buffer, &output_length));
139  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
140
141  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
142            byte_stream_output->Read(&output_io_buffer, &output_length));
143  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
144
145  EXPECT_EQ(ByteStreamReader::STREAM_COMPLETE,
146            byte_stream_output->Read(&output_io_buffer, &output_length));
147}
148
149// Same as above, only use knowledge of the internals to confirm
150// that we're getting pushback even when data's split across the two
151// objects
152TEST_F(ByteStreamTest, ByteStream_PushBackSplit) {
153  scoped_ptr<ByteStreamWriter> byte_stream_input;
154  scoped_ptr<ByteStreamReader> byte_stream_output;
155  CreateByteStream(
156      message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
157      9 * 1024, &byte_stream_input, &byte_stream_output);
158
159  // Push a series of IO buffers on; test pushback happening and
160  // that it's advisory.
161  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
162  message_loop_.RunUntilIdle();
163  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
164  message_loop_.RunUntilIdle();
165  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
166  message_loop_.RunUntilIdle();
167  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
168  message_loop_.RunUntilIdle();
169  EXPECT_FALSE(Write(byte_stream_input.get(), 6 * 1024));
170  message_loop_.RunUntilIdle();
171
172  // Pull the IO buffers out; do we get the same buffers and do they
173  // have the same contents?
174  scoped_refptr<net::IOBuffer> output_io_buffer;
175  size_t output_length;
176  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
177            byte_stream_output->Read(&output_io_buffer, &output_length));
178  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
179
180  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
181            byte_stream_output->Read(&output_io_buffer, &output_length));
182  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
183
184  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
185            byte_stream_output->Read(&output_io_buffer, &output_length));
186  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
187
188  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
189            byte_stream_output->Read(&output_io_buffer, &output_length));
190  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
191
192  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
193            byte_stream_output->Read(&output_io_buffer, &output_length));
194  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
195
196  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
197            byte_stream_output->Read(&output_io_buffer, &output_length));
198}
199
200// Confirm that a Close() notification transmits in-order
201// with data on the stream.
202TEST_F(ByteStreamTest, ByteStream_CompleteTransmits) {
203  scoped_ptr<ByteStreamWriter> byte_stream_input;
204  scoped_ptr<ByteStreamReader> byte_stream_output;
205
206  scoped_refptr<net::IOBuffer> output_io_buffer;
207  size_t output_length;
208
209  // Empty stream, non-error case.
210  CreateByteStream(
211      message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
212      3 * 1024, &byte_stream_input, &byte_stream_output);
213  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
214            byte_stream_output->Read(&output_io_buffer, &output_length));
215  byte_stream_input->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
216  message_loop_.RunUntilIdle();
217  ASSERT_EQ(ByteStreamReader::STREAM_COMPLETE,
218            byte_stream_output->Read(&output_io_buffer, &output_length));
219  EXPECT_EQ(DOWNLOAD_INTERRUPT_REASON_NONE,
220            byte_stream_output->GetStatus());
221
222  // Non-empty stream, non-error case.
223  CreateByteStream(
224      message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
225      3 * 1024, &byte_stream_input, &byte_stream_output);
226  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
227            byte_stream_output->Read(&output_io_buffer, &output_length));
228  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
229  byte_stream_input->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
230  message_loop_.RunUntilIdle();
231  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
232            byte_stream_output->Read(&output_io_buffer, &output_length));
233  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
234  ASSERT_EQ(ByteStreamReader::STREAM_COMPLETE,
235            byte_stream_output->Read(&output_io_buffer, &output_length));
236  EXPECT_EQ(DOWNLOAD_INTERRUPT_REASON_NONE,
237            byte_stream_output->GetStatus());
238
239  // Empty stream, non-error case.
240  CreateByteStream(
241      message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
242      3 * 1024, &byte_stream_input, &byte_stream_output);
243  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
244            byte_stream_output->Read(&output_io_buffer, &output_length));
245  byte_stream_input->Close(DOWNLOAD_INTERRUPT_REASON_NETWORK_DISCONNECTED);
246  message_loop_.RunUntilIdle();
247  ASSERT_EQ(ByteStreamReader::STREAM_COMPLETE,
248            byte_stream_output->Read(&output_io_buffer, &output_length));
249  EXPECT_EQ(DOWNLOAD_INTERRUPT_REASON_NETWORK_DISCONNECTED,
250            byte_stream_output->GetStatus());
251
252  // Non-empty stream, non-error case.
253  CreateByteStream(
254      message_loop_.message_loop_proxy(), message_loop_.message_loop_proxy(),
255      3 * 1024, &byte_stream_input, &byte_stream_output);
256  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
257            byte_stream_output->Read(&output_io_buffer, &output_length));
258  EXPECT_TRUE(Write(byte_stream_input.get(), 1024));
259  byte_stream_input->Close(DOWNLOAD_INTERRUPT_REASON_NETWORK_DISCONNECTED);
260  message_loop_.RunUntilIdle();
261  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
262            byte_stream_output->Read(&output_io_buffer, &output_length));
263  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
264  ASSERT_EQ(ByteStreamReader::STREAM_COMPLETE,
265            byte_stream_output->Read(&output_io_buffer, &output_length));
266  EXPECT_EQ(DOWNLOAD_INTERRUPT_REASON_NETWORK_DISCONNECTED,
267            byte_stream_output->GetStatus());
268}
269
270// Confirm that callbacks on the sink side are triggered when they should be.
271TEST_F(ByteStreamTest, ByteStream_SinkCallback) {
272  scoped_refptr<base::TestSimpleTaskRunner> task_runner(
273      new base::TestSimpleTaskRunner());
274
275  scoped_ptr<ByteStreamWriter> byte_stream_input;
276  scoped_ptr<ByteStreamReader> byte_stream_output;
277  CreateByteStream(
278      message_loop_.message_loop_proxy(), task_runner,
279      10000, &byte_stream_input, &byte_stream_output);
280
281  scoped_refptr<net::IOBuffer> output_io_buffer;
282  size_t output_length;
283
284  // Note that the specifics of when the callbacks are called with regard
285  // to how much data is pushed onto the stream is not (currently) part
286  // of the interface contract.  If it becomes part of the contract, the
287  // tests below should get much more precise.
288
289  // Confirm callback called when you add more than 33% of the buffer.
290
291  // Setup callback
292  int num_callbacks = 0;
293  byte_stream_output->RegisterCallback(
294      base::Bind(CountCallbacks, &num_callbacks));
295
296  EXPECT_TRUE(Write(byte_stream_input.get(), 4000));
297  message_loop_.RunUntilIdle();
298
299  EXPECT_EQ(0, num_callbacks);
300  task_runner->RunUntilIdle();
301  EXPECT_EQ(1, num_callbacks);
302
303  // Check data and stream state.
304  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
305            byte_stream_output->Read(&output_io_buffer, &output_length));
306  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
307  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
308            byte_stream_output->Read(&output_io_buffer, &output_length));
309
310  // Confirm callback *isn't* called at less than 33% (by lack of
311  // unexpected call on task runner).
312  EXPECT_TRUE(Write(byte_stream_input.get(), 3000));
313  message_loop_.RunUntilIdle();
314
315  // This reflects an implementation artifact that data goes with callbacks,
316  // which should not be considered part of the interface guarantee.
317  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
318            byte_stream_output->Read(&output_io_buffer, &output_length));
319}
320
321// Confirm that callbacks on the source side are triggered when they should
322// be.
323TEST_F(ByteStreamTest, ByteStream_SourceCallback) {
324  scoped_refptr<base::TestSimpleTaskRunner> task_runner(
325      new base::TestSimpleTaskRunner());
326
327  scoped_ptr<ByteStreamWriter> byte_stream_input;
328  scoped_ptr<ByteStreamReader> byte_stream_output;
329  CreateByteStream(
330      task_runner, message_loop_.message_loop_proxy(),
331      10000, &byte_stream_input, &byte_stream_output);
332
333  scoped_refptr<net::IOBuffer> output_io_buffer;
334  size_t output_length;
335
336  // Note that the specifics of when the callbacks are called with regard
337  // to how much data is pulled from the stream is not (currently) part
338  // of the interface contract.  If it becomes part of the contract, the
339  // tests below should get much more precise.
340
341  // Confirm callback called when about 33% space available, and not
342  // at other transitions.
343
344  // Add data.
345  int num_callbacks = 0;
346  byte_stream_input->RegisterCallback(
347      base::Bind(CountCallbacks, &num_callbacks));
348  EXPECT_TRUE(Write(byte_stream_input.get(), 2000));
349  EXPECT_TRUE(Write(byte_stream_input.get(), 2001));
350  EXPECT_FALSE(Write(byte_stream_input.get(), 6000));
351
352  // Allow bytes to transition (needed for message passing implementation),
353  // and get and validate the data.
354  message_loop_.RunUntilIdle();
355  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
356            byte_stream_output->Read(&output_io_buffer, &output_length));
357  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
358
359  // Grab data, triggering callback.  Recorded on dispatch, but doesn't
360  // happen because it's caught by the mock.
361  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
362            byte_stream_output->Read(&output_io_buffer, &output_length));
363  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
364
365  // Confirm that the callback passed to the mock does what we expect.
366  EXPECT_EQ(0, num_callbacks);
367  task_runner->RunUntilIdle();
368  EXPECT_EQ(1, num_callbacks);
369
370  // Same drill with final buffer.
371  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
372            byte_stream_output->Read(&output_io_buffer, &output_length));
373  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
374  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
375            byte_stream_output->Read(&output_io_buffer, &output_length));
376  EXPECT_EQ(1, num_callbacks);
377  task_runner->RunUntilIdle();
378  // Should have updated the internal structures but not called the
379  // callback.
380  EXPECT_EQ(1, num_callbacks);
381}
382
383// Confirm that racing a change to a sink callback with a post results
384// in the new callback being called.
385TEST_F(ByteStreamTest, ByteStream_SinkInterrupt) {
386  scoped_refptr<base::TestSimpleTaskRunner> task_runner(
387      new base::TestSimpleTaskRunner());
388
389  scoped_ptr<ByteStreamWriter> byte_stream_input;
390  scoped_ptr<ByteStreamReader> byte_stream_output;
391  CreateByteStream(
392      message_loop_.message_loop_proxy(), task_runner,
393      10000, &byte_stream_input, &byte_stream_output);
394
395  scoped_refptr<net::IOBuffer> output_io_buffer;
396  size_t output_length;
397  base::Closure intermediate_callback;
398
399  // Record initial state.
400  int num_callbacks = 0;
401  byte_stream_output->RegisterCallback(
402      base::Bind(CountCallbacks, &num_callbacks));
403
404  // Add data, and pass it across.
405  EXPECT_TRUE(Write(byte_stream_input.get(), 4000));
406  message_loop_.RunUntilIdle();
407
408  // The task runner should have been hit, but the callback count
409  // isn't changed until we actually run the callback.
410  EXPECT_EQ(0, num_callbacks);
411
412  // If we change the callback now, the new one should be run
413  // (simulates race with post task).
414  int num_alt_callbacks = 0;
415  byte_stream_output->RegisterCallback(
416      base::Bind(CountCallbacks, &num_alt_callbacks));
417  task_runner->RunUntilIdle();
418  EXPECT_EQ(0, num_callbacks);
419  EXPECT_EQ(1, num_alt_callbacks);
420
421  // Final cleanup.
422  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
423            byte_stream_output->Read(&output_io_buffer, &output_length));
424  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
425  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
426            byte_stream_output->Read(&output_io_buffer, &output_length));
427
428}
429
430// Confirm that racing a change to a source callback with a post results
431// in the new callback being called.
432TEST_F(ByteStreamTest, ByteStream_SourceInterrupt) {
433  scoped_refptr<base::TestSimpleTaskRunner> task_runner(
434      new base::TestSimpleTaskRunner());
435
436  scoped_ptr<ByteStreamWriter> byte_stream_input;
437  scoped_ptr<ByteStreamReader> byte_stream_output;
438  CreateByteStream(
439      task_runner, message_loop_.message_loop_proxy(),
440      10000, &byte_stream_input, &byte_stream_output);
441
442  scoped_refptr<net::IOBuffer> output_io_buffer;
443  size_t output_length;
444  base::Closure intermediate_callback;
445
446  // Setup state for test.
447  int num_callbacks = 0;
448  byte_stream_input->RegisterCallback(
449      base::Bind(CountCallbacks, &num_callbacks));
450  EXPECT_TRUE(Write(byte_stream_input.get(), 2000));
451  EXPECT_TRUE(Write(byte_stream_input.get(), 2001));
452  EXPECT_FALSE(Write(byte_stream_input.get(), 6000));
453  message_loop_.RunUntilIdle();
454
455  // Initial get should not trigger callback.
456  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
457            byte_stream_output->Read(&output_io_buffer, &output_length));
458  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
459  message_loop_.RunUntilIdle();
460
461  // Second get *should* trigger callback.
462  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
463            byte_stream_output->Read(&output_io_buffer, &output_length));
464  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
465
466  // Which should do the right thing when it's run.
467  int num_alt_callbacks = 0;
468  byte_stream_input->RegisterCallback(
469      base::Bind(CountCallbacks, &num_alt_callbacks));
470  task_runner->RunUntilIdle();
471  EXPECT_EQ(0, num_callbacks);
472  EXPECT_EQ(1, num_alt_callbacks);
473
474  // Third get should also trigger callback.
475  EXPECT_EQ(ByteStreamReader::STREAM_HAS_DATA,
476            byte_stream_output->Read(&output_io_buffer, &output_length));
477  EXPECT_TRUE(ValidateIOBuffer(output_io_buffer, output_length));
478  EXPECT_EQ(ByteStreamReader::STREAM_EMPTY,
479            byte_stream_output->Read(&output_io_buffer, &output_length));
480}
481
482// Confirm that callback is called on zero data transfer but source
483// complete.
484TEST_F(ByteStreamTest, ByteStream_ZeroCallback) {
485  scoped_refptr<base::TestSimpleTaskRunner> task_runner(
486      new base::TestSimpleTaskRunner());
487
488  scoped_ptr<ByteStreamWriter> byte_stream_input;
489  scoped_ptr<ByteStreamReader> byte_stream_output;
490  CreateByteStream(
491      message_loop_.message_loop_proxy(), task_runner,
492      10000, &byte_stream_input, &byte_stream_output);
493
494  base::Closure intermediate_callback;
495
496  // Record initial state.
497  int num_callbacks = 0;
498  byte_stream_output->RegisterCallback(
499      base::Bind(CountCallbacks, &num_callbacks));
500
501  // Immediately close the stream.
502  byte_stream_input->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
503  task_runner->RunUntilIdle();
504  EXPECT_EQ(1, num_callbacks);
505}
506
507}  // namespace content
508