1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/base/file_stream.h"
6
7#include "base/bind.h"
8#include "base/callback.h"
9#include "base/file_util.h"
10#include "base/files/file.h"
11#include "base/message_loop/message_loop.h"
12#include "base/message_loop/message_loop_proxy.h"
13#include "base/path_service.h"
14#include "base/run_loop.h"
15#include "base/synchronization/waitable_event.h"
16#include "base/test/test_timeouts.h"
17#include "base/threading/sequenced_worker_pool.h"
18#include "base/threading/thread_restrictions.h"
19#include "net/base/capturing_net_log.h"
20#include "net/base/io_buffer.h"
21#include "net/base/net_errors.h"
22#include "net/base/test_completion_callback.h"
23#include "testing/gtest/include/gtest/gtest.h"
24#include "testing/platform_test.h"
25
26#if defined(OS_ANDROID)
27#include "base/test/test_file_util.h"
28#endif
29
30namespace net {
31
32namespace {
33
34const char kTestData[] = "0123456789";
35const int kTestDataSize = arraysize(kTestData) - 1;
36
37// Creates an IOBufferWithSize that contains the kTestDataSize.
38IOBufferWithSize* CreateTestDataBuffer() {
39  IOBufferWithSize* buf = new IOBufferWithSize(kTestDataSize);
40  memcpy(buf->data(), kTestData, kTestDataSize);
41  return buf;
42}
43
44}  // namespace
45
46class FileStreamTest : public PlatformTest {
47 public:
48  virtual void SetUp() {
49    PlatformTest::SetUp();
50
51    base::CreateTemporaryFile(&temp_file_path_);
52    base::WriteFile(temp_file_path_, kTestData, kTestDataSize);
53  }
54  virtual void TearDown() {
55    // FileStreamContexts must be asynchronously closed on the file task runner
56    // before they can be deleted. Pump the RunLoop to avoid leaks.
57    base::RunLoop().RunUntilIdle();
58    EXPECT_TRUE(base::DeleteFile(temp_file_path_, false));
59
60    PlatformTest::TearDown();
61  }
62
63  const base::FilePath temp_file_path() const { return temp_file_path_; }
64
65 private:
66  base::FilePath temp_file_path_;
67};
68
69namespace {
70
71TEST_F(FileStreamTest, AsyncOpenExplicitClose) {
72  TestCompletionCallback callback;
73  FileStream stream(base::MessageLoopProxy::current());
74  int flags = base::File::FLAG_OPEN |
75              base::File::FLAG_READ |
76              base::File::FLAG_ASYNC;
77  int rv = stream.Open(temp_file_path(), flags, callback.callback());
78  EXPECT_EQ(ERR_IO_PENDING, rv);
79  EXPECT_EQ(OK, callback.WaitForResult());
80  EXPECT_TRUE(stream.IsOpen());
81  EXPECT_TRUE(stream.GetFileForTesting().IsValid());
82  EXPECT_EQ(ERR_IO_PENDING, stream.Close(callback.callback()));
83  EXPECT_EQ(OK, callback.WaitForResult());
84  EXPECT_FALSE(stream.IsOpen());
85  EXPECT_FALSE(stream.GetFileForTesting().IsValid());
86}
87
88TEST_F(FileStreamTest, AsyncOpenExplicitCloseOrphaned) {
89  TestCompletionCallback callback;
90  scoped_ptr<FileStream> stream(new FileStream(
91      base::MessageLoopProxy::current()));
92  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
93              base::File::FLAG_ASYNC;
94  int rv = stream->Open(temp_file_path(), flags, callback.callback());
95  EXPECT_EQ(ERR_IO_PENDING, rv);
96  EXPECT_EQ(OK, callback.WaitForResult());
97  EXPECT_TRUE(stream->IsOpen());
98  EXPECT_TRUE(stream->GetFileForTesting().IsValid());
99  EXPECT_EQ(ERR_IO_PENDING, stream->Close(callback.callback()));
100  stream.reset();
101  // File isn't actually closed yet.
102  base::RunLoop runloop;
103  runloop.RunUntilIdle();
104  // The file should now be closed, though the callback has not been called.
105}
106
107// Test the use of FileStream with a file handle provided at construction.
108TEST_F(FileStreamTest, UseFileHandle) {
109  int rv = 0;
110  TestCompletionCallback callback;
111  TestInt64CompletionCallback callback64;
112  // 1. Test reading with a file handle.
113  ASSERT_EQ(kTestDataSize,
114            base::WriteFile(temp_file_path(), kTestData, kTestDataSize));
115  int flags = base::File::FLAG_OPEN_ALWAYS | base::File::FLAG_READ |
116              base::File::FLAG_ASYNC;
117  base::File file(temp_file_path(), flags);
118
119  // Seek to the beginning of the file and read.
120  scoped_ptr<FileStream> read_stream(
121      new FileStream(file.Pass(), base::MessageLoopProxy::current()));
122  ASSERT_EQ(ERR_IO_PENDING,
123            read_stream->Seek(FROM_BEGIN, 0, callback64.callback()));
124  ASSERT_EQ(0, callback64.WaitForResult());
125  // Read into buffer and compare.
126  scoped_refptr<IOBufferWithSize> read_buffer =
127      new IOBufferWithSize(kTestDataSize);
128  rv = read_stream->Read(read_buffer.get(), kTestDataSize, callback.callback());
129  ASSERT_EQ(kTestDataSize, callback.GetResult(rv));
130  ASSERT_EQ(0, memcmp(kTestData, read_buffer->data(), kTestDataSize));
131  read_stream.reset();
132
133  // 2. Test writing with a file handle.
134  base::DeleteFile(temp_file_path(), false);
135  flags = base::File::FLAG_OPEN_ALWAYS | base::File::FLAG_WRITE |
136          base::File::FLAG_ASYNC;
137  file.Initialize(temp_file_path(), flags);
138
139  scoped_ptr<FileStream> write_stream(
140      new FileStream(file.Pass(), base::MessageLoopProxy::current()));
141  ASSERT_EQ(ERR_IO_PENDING,
142            write_stream->Seek(FROM_BEGIN, 0, callback64.callback()));
143  ASSERT_EQ(0, callback64.WaitForResult());
144  scoped_refptr<IOBufferWithSize> write_buffer = CreateTestDataBuffer();
145  rv = write_stream->Write(write_buffer.get(), kTestDataSize,
146                           callback.callback());
147  ASSERT_EQ(kTestDataSize, callback.GetResult(rv));
148  write_stream.reset();
149
150  // Read into buffer and compare to make sure the handle worked fine.
151  ASSERT_EQ(kTestDataSize,
152            base::ReadFile(temp_file_path(), read_buffer->data(),
153                           kTestDataSize));
154  ASSERT_EQ(0, memcmp(kTestData, read_buffer->data(), kTestDataSize));
155}
156
157TEST_F(FileStreamTest, UseClosedStream) {
158  int rv = 0;
159  TestCompletionCallback callback;
160  TestInt64CompletionCallback callback64;
161
162  FileStream stream(base::MessageLoopProxy::current());
163
164  EXPECT_FALSE(stream.IsOpen());
165
166  // Try seeking...
167  rv = stream.Seek(FROM_BEGIN, 5, callback64.callback());
168  EXPECT_EQ(ERR_UNEXPECTED, callback64.GetResult(rv));
169
170  // Try reading...
171  scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(10);
172  rv = stream.Read(buf, buf->size(), callback.callback());
173  EXPECT_EQ(ERR_UNEXPECTED, callback.GetResult(rv));
174}
175
176TEST_F(FileStreamTest, AsyncRead) {
177  int64 file_size;
178  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
179
180  FileStream stream(base::MessageLoopProxy::current());
181  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
182              base::File::FLAG_ASYNC;
183  TestCompletionCallback callback;
184  int rv = stream.Open(temp_file_path(), flags, callback.callback());
185  EXPECT_EQ(ERR_IO_PENDING, rv);
186  EXPECT_EQ(OK, callback.WaitForResult());
187
188  int total_bytes_read = 0;
189
190  std::string data_read;
191  for (;;) {
192    scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
193    rv = stream.Read(buf.get(), buf->size(), callback.callback());
194    if (rv == ERR_IO_PENDING)
195      rv = callback.WaitForResult();
196    EXPECT_LE(0, rv);
197    if (rv <= 0)
198      break;
199    total_bytes_read += rv;
200    data_read.append(buf->data(), rv);
201  }
202  EXPECT_EQ(file_size, total_bytes_read);
203  EXPECT_EQ(kTestData, data_read);
204}
205
206TEST_F(FileStreamTest, AsyncRead_EarlyDelete) {
207  int64 file_size;
208  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
209
210  scoped_ptr<FileStream> stream(
211      new FileStream(base::MessageLoopProxy::current()));
212  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
213              base::File::FLAG_ASYNC;
214  TestCompletionCallback callback;
215  int rv = stream->Open(temp_file_path(), flags, callback.callback());
216  EXPECT_EQ(ERR_IO_PENDING, rv);
217  EXPECT_EQ(OK, callback.WaitForResult());
218
219  scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
220  rv = stream->Read(buf.get(), buf->size(), callback.callback());
221  stream.reset();  // Delete instead of closing it.
222  if (rv < 0) {
223    EXPECT_EQ(ERR_IO_PENDING, rv);
224    // The callback should not be called if the request is cancelled.
225    base::RunLoop().RunUntilIdle();
226    EXPECT_FALSE(callback.have_result());
227  } else {
228    EXPECT_EQ(std::string(kTestData, rv), std::string(buf->data(), rv));
229  }
230}
231
232TEST_F(FileStreamTest, AsyncRead_FromOffset) {
233  int64 file_size;
234  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
235
236  FileStream stream(base::MessageLoopProxy::current());
237  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
238              base::File::FLAG_ASYNC;
239  TestCompletionCallback callback;
240  int rv = stream.Open(temp_file_path(), flags, callback.callback());
241  EXPECT_EQ(ERR_IO_PENDING, rv);
242  EXPECT_EQ(OK, callback.WaitForResult());
243
244  TestInt64CompletionCallback callback64;
245  const int64 kOffset = 3;
246  rv = stream.Seek(FROM_BEGIN, kOffset, callback64.callback());
247  ASSERT_EQ(ERR_IO_PENDING, rv);
248  int64 new_offset = callback64.WaitForResult();
249  EXPECT_EQ(kOffset, new_offset);
250
251  int total_bytes_read = 0;
252
253  std::string data_read;
254  for (;;) {
255    scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
256    rv = stream.Read(buf.get(), buf->size(), callback.callback());
257    if (rv == ERR_IO_PENDING)
258      rv = callback.WaitForResult();
259    EXPECT_LE(0, rv);
260    if (rv <= 0)
261      break;
262    total_bytes_read += rv;
263    data_read.append(buf->data(), rv);
264  }
265  EXPECT_EQ(file_size - kOffset, total_bytes_read);
266  EXPECT_EQ(kTestData + kOffset, data_read);
267}
268
269TEST_F(FileStreamTest, AsyncSeekAround) {
270  FileStream stream(base::MessageLoopProxy::current());
271  int flags = base::File::FLAG_OPEN | base::File::FLAG_ASYNC |
272              base::File::FLAG_READ;
273  TestCompletionCallback callback;
274  int rv = stream.Open(temp_file_path(), flags, callback.callback());
275  EXPECT_EQ(ERR_IO_PENDING, rv);
276  EXPECT_EQ(OK, callback.WaitForResult());
277
278  TestInt64CompletionCallback callback64;
279
280  const int64 kOffset = 3;
281  rv = stream.Seek(FROM_BEGIN, kOffset, callback64.callback());
282  ASSERT_EQ(ERR_IO_PENDING, rv);
283  int64 new_offset = callback64.WaitForResult();
284  EXPECT_EQ(kOffset, new_offset);
285
286  rv = stream.Seek(FROM_CURRENT, kOffset, callback64.callback());
287  ASSERT_EQ(ERR_IO_PENDING, rv);
288  new_offset = callback64.WaitForResult();
289  EXPECT_EQ(2 * kOffset, new_offset);
290
291  rv = stream.Seek(FROM_CURRENT, -kOffset, callback64.callback());
292  ASSERT_EQ(ERR_IO_PENDING, rv);
293  new_offset = callback64.WaitForResult();
294  EXPECT_EQ(kOffset, new_offset);
295
296  const int kTestDataLen = arraysize(kTestData) - 1;
297
298  rv = stream.Seek(FROM_END, -kTestDataLen, callback64.callback());
299  ASSERT_EQ(ERR_IO_PENDING, rv);
300  new_offset = callback64.WaitForResult();
301  EXPECT_EQ(0, new_offset);
302}
303
304TEST_F(FileStreamTest, AsyncWrite) {
305  FileStream stream(base::MessageLoopProxy::current());
306  int flags = base::File::FLAG_CREATE_ALWAYS | base::File::FLAG_WRITE |
307              base::File::FLAG_ASYNC;
308  TestCompletionCallback callback;
309  int rv = stream.Open(temp_file_path(), flags, callback.callback());
310  EXPECT_EQ(ERR_IO_PENDING, rv);
311  EXPECT_EQ(OK, callback.WaitForResult());
312
313  int64 file_size;
314  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
315  EXPECT_EQ(0, file_size);
316
317  int total_bytes_written = 0;
318
319  scoped_refptr<IOBufferWithSize> buf = CreateTestDataBuffer();
320  scoped_refptr<DrainableIOBuffer> drainable =
321      new DrainableIOBuffer(buf.get(), buf->size());
322  while (total_bytes_written != kTestDataSize) {
323    rv = stream.Write(drainable.get(), drainable->BytesRemaining(),
324                      callback.callback());
325    if (rv == ERR_IO_PENDING)
326      rv = callback.WaitForResult();
327    EXPECT_LT(0, rv);
328    if (rv <= 0)
329      break;
330    drainable->DidConsume(rv);
331    total_bytes_written += rv;
332  }
333  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
334  EXPECT_EQ(file_size, total_bytes_written);
335}
336
337TEST_F(FileStreamTest, AsyncWrite_EarlyDelete) {
338  scoped_ptr<FileStream> stream(
339      new FileStream(base::MessageLoopProxy::current()));
340  int flags = base::File::FLAG_CREATE_ALWAYS | base::File::FLAG_WRITE |
341              base::File::FLAG_ASYNC;
342  TestCompletionCallback callback;
343  int rv = stream->Open(temp_file_path(), flags, callback.callback());
344  EXPECT_EQ(ERR_IO_PENDING, rv);
345  EXPECT_EQ(OK, callback.WaitForResult());
346
347  int64 file_size;
348  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
349  EXPECT_EQ(0, file_size);
350
351  scoped_refptr<IOBufferWithSize> buf = CreateTestDataBuffer();
352  rv = stream->Write(buf.get(), buf->size(), callback.callback());
353  stream.reset();
354  if (rv < 0) {
355    EXPECT_EQ(ERR_IO_PENDING, rv);
356    // The callback should not be called if the request is cancelled.
357    base::RunLoop().RunUntilIdle();
358    EXPECT_FALSE(callback.have_result());
359  } else {
360    EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
361    EXPECT_EQ(file_size, rv);
362  }
363}
364
365TEST_F(FileStreamTest, AsyncWrite_FromOffset) {
366  int64 file_size;
367  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
368
369  FileStream stream(base::MessageLoopProxy::current());
370  int flags = base::File::FLAG_OPEN | base::File::FLAG_WRITE |
371              base::File::FLAG_ASYNC;
372  TestCompletionCallback callback;
373  int rv = stream.Open(temp_file_path(), flags, callback.callback());
374  EXPECT_EQ(ERR_IO_PENDING, rv);
375  EXPECT_EQ(OK, callback.WaitForResult());
376
377  TestInt64CompletionCallback callback64;
378  const int64 kOffset = 0;
379  rv = stream.Seek(FROM_END, kOffset, callback64.callback());
380  ASSERT_EQ(ERR_IO_PENDING, rv);
381  int64 new_offset = callback64.WaitForResult();
382  EXPECT_EQ(kTestDataSize, new_offset);
383
384  int total_bytes_written = 0;
385
386  scoped_refptr<IOBufferWithSize> buf = CreateTestDataBuffer();
387  scoped_refptr<DrainableIOBuffer> drainable =
388      new DrainableIOBuffer(buf.get(), buf->size());
389  while (total_bytes_written != kTestDataSize) {
390    rv = stream.Write(drainable.get(), drainable->BytesRemaining(),
391                      callback.callback());
392    if (rv == ERR_IO_PENDING)
393      rv = callback.WaitForResult();
394    EXPECT_LT(0, rv);
395    if (rv <= 0)
396      break;
397    drainable->DidConsume(rv);
398    total_bytes_written += rv;
399  }
400  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
401  EXPECT_EQ(file_size, kTestDataSize * 2);
402}
403
404TEST_F(FileStreamTest, BasicAsyncReadWrite) {
405  int64 file_size;
406  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
407
408  scoped_ptr<FileStream> stream(
409      new FileStream(base::MessageLoopProxy::current()));
410  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
411              base::File::FLAG_WRITE | base::File::FLAG_ASYNC;
412  TestCompletionCallback callback;
413  int rv = stream->Open(temp_file_path(), flags, callback.callback());
414  EXPECT_EQ(ERR_IO_PENDING, rv);
415  EXPECT_EQ(OK, callback.WaitForResult());
416
417  int64 total_bytes_read = 0;
418
419  std::string data_read;
420  for (;;) {
421    scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
422    rv = stream->Read(buf.get(), buf->size(), callback.callback());
423    if (rv == ERR_IO_PENDING)
424      rv = callback.WaitForResult();
425    EXPECT_LE(0, rv);
426    if (rv <= 0)
427      break;
428    total_bytes_read += rv;
429    data_read.append(buf->data(), rv);
430  }
431  EXPECT_EQ(file_size, total_bytes_read);
432  EXPECT_TRUE(data_read == kTestData);
433
434  int total_bytes_written = 0;
435
436  scoped_refptr<IOBufferWithSize> buf = CreateTestDataBuffer();
437  scoped_refptr<DrainableIOBuffer> drainable =
438      new DrainableIOBuffer(buf.get(), buf->size());
439  while (total_bytes_written != kTestDataSize) {
440    rv = stream->Write(drainable.get(), drainable->BytesRemaining(),
441                       callback.callback());
442    if (rv == ERR_IO_PENDING)
443      rv = callback.WaitForResult();
444    EXPECT_LT(0, rv);
445    if (rv <= 0)
446      break;
447    drainable->DidConsume(rv);
448    total_bytes_written += rv;
449  }
450
451  stream.reset();
452
453  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
454  EXPECT_EQ(kTestDataSize * 2, file_size);
455}
456
457TEST_F(FileStreamTest, BasicAsyncWriteRead) {
458  int64 file_size;
459  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
460
461  scoped_ptr<FileStream> stream(
462      new FileStream(base::MessageLoopProxy::current()));
463  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
464              base::File::FLAG_WRITE | base::File::FLAG_ASYNC;
465  TestCompletionCallback callback;
466  int rv = stream->Open(temp_file_path(), flags, callback.callback());
467  EXPECT_EQ(ERR_IO_PENDING, rv);
468  EXPECT_EQ(OK, callback.WaitForResult());
469
470  TestInt64CompletionCallback callback64;
471  rv = stream->Seek(FROM_END, 0, callback64.callback());
472  ASSERT_EQ(ERR_IO_PENDING, rv);
473  int64 offset = callback64.WaitForResult();
474  EXPECT_EQ(offset, file_size);
475
476  int total_bytes_written = 0;
477
478  scoped_refptr<IOBufferWithSize> buf = CreateTestDataBuffer();
479  scoped_refptr<DrainableIOBuffer> drainable =
480      new DrainableIOBuffer(buf.get(), buf->size());
481  while (total_bytes_written != kTestDataSize) {
482    rv = stream->Write(drainable.get(), drainable->BytesRemaining(),
483                       callback.callback());
484    if (rv == ERR_IO_PENDING)
485      rv = callback.WaitForResult();
486    EXPECT_LT(0, rv);
487    if (rv <= 0)
488      break;
489    drainable->DidConsume(rv);
490    total_bytes_written += rv;
491  }
492
493  EXPECT_EQ(kTestDataSize, total_bytes_written);
494
495  rv = stream->Seek(FROM_BEGIN, 0, callback64.callback());
496  ASSERT_EQ(ERR_IO_PENDING, rv);
497  offset = callback64.WaitForResult();
498  EXPECT_EQ(0, offset);
499
500  int total_bytes_read = 0;
501
502  std::string data_read;
503  for (;;) {
504    scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
505    rv = stream->Read(buf.get(), buf->size(), callback.callback());
506    if (rv == ERR_IO_PENDING)
507      rv = callback.WaitForResult();
508    EXPECT_LE(0, rv);
509    if (rv <= 0)
510      break;
511    total_bytes_read += rv;
512    data_read.append(buf->data(), rv);
513  }
514  stream.reset();
515
516  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
517  EXPECT_EQ(kTestDataSize * 2, file_size);
518
519  EXPECT_EQ(kTestDataSize * 2, total_bytes_read);
520  const std::string kExpectedFileData =
521      std::string(kTestData) + std::string(kTestData);
522  EXPECT_EQ(kExpectedFileData, data_read);
523}
524
525class TestWriteReadCompletionCallback {
526 public:
527  TestWriteReadCompletionCallback(FileStream* stream,
528                                  int* total_bytes_written,
529                                  int* total_bytes_read,
530                                  std::string* data_read)
531      : result_(0),
532        have_result_(false),
533        waiting_for_result_(false),
534        stream_(stream),
535        total_bytes_written_(total_bytes_written),
536        total_bytes_read_(total_bytes_read),
537        data_read_(data_read),
538        callback_(base::Bind(&TestWriteReadCompletionCallback::OnComplete,
539                             base::Unretained(this))),
540        test_data_(CreateTestDataBuffer()),
541        drainable_(new DrainableIOBuffer(test_data_.get(), kTestDataSize)) {}
542
543  int WaitForResult() {
544    DCHECK(!waiting_for_result_);
545    while (!have_result_) {
546      waiting_for_result_ = true;
547      base::RunLoop().Run();
548      waiting_for_result_ = false;
549    }
550    have_result_ = false;  // auto-reset for next callback
551    return result_;
552  }
553
554  const CompletionCallback& callback() const { return callback_; }
555
556 private:
557  void OnComplete(int result) {
558    DCHECK_LT(0, result);
559    *total_bytes_written_ += result;
560
561    int rv;
562
563    if (*total_bytes_written_ != kTestDataSize) {
564      // Recurse to finish writing all data.
565      int total_bytes_written = 0, total_bytes_read = 0;
566      std::string data_read;
567      TestWriteReadCompletionCallback callback(
568          stream_, &total_bytes_written, &total_bytes_read, &data_read);
569      rv = stream_->Write(
570          drainable_.get(), drainable_->BytesRemaining(), callback.callback());
571      DCHECK_EQ(ERR_IO_PENDING, rv);
572      rv = callback.WaitForResult();
573      drainable_->DidConsume(total_bytes_written);
574      *total_bytes_written_ += total_bytes_written;
575      *total_bytes_read_ += total_bytes_read;
576      *data_read_ += data_read;
577    } else {  // We're done writing all data.  Start reading the data.
578      TestInt64CompletionCallback callback64;
579      EXPECT_EQ(ERR_IO_PENDING,
580                stream_->Seek(FROM_BEGIN, 0, callback64.callback()));
581      {
582        base::MessageLoop::ScopedNestableTaskAllower allow(
583            base::MessageLoop::current());
584        EXPECT_LE(0, callback64.WaitForResult());
585      }
586
587      TestCompletionCallback callback;
588      for (;;) {
589        scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
590        rv = stream_->Read(buf.get(), buf->size(), callback.callback());
591        if (rv == ERR_IO_PENDING) {
592          base::MessageLoop::ScopedNestableTaskAllower allow(
593              base::MessageLoop::current());
594          rv = callback.WaitForResult();
595        }
596        EXPECT_LE(0, rv);
597        if (rv <= 0)
598          break;
599        *total_bytes_read_ += rv;
600        data_read_->append(buf->data(), rv);
601      }
602    }
603
604    result_ = *total_bytes_written_;
605    have_result_ = true;
606    if (waiting_for_result_)
607      base::MessageLoop::current()->Quit();
608  }
609
610  int result_;
611  bool have_result_;
612  bool waiting_for_result_;
613  FileStream* stream_;
614  int* total_bytes_written_;
615  int* total_bytes_read_;
616  std::string* data_read_;
617  const CompletionCallback callback_;
618  scoped_refptr<IOBufferWithSize> test_data_;
619  scoped_refptr<DrainableIOBuffer> drainable_;
620
621  DISALLOW_COPY_AND_ASSIGN(TestWriteReadCompletionCallback);
622};
623
624TEST_F(FileStreamTest, AsyncWriteRead) {
625  int64 file_size;
626  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
627
628  scoped_ptr<FileStream> stream(
629      new FileStream(base::MessageLoopProxy::current()));
630  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
631              base::File::FLAG_WRITE | base::File::FLAG_ASYNC;
632  TestCompletionCallback open_callback;
633  int rv = stream->Open(temp_file_path(), flags, open_callback.callback());
634  EXPECT_EQ(ERR_IO_PENDING, rv);
635  EXPECT_EQ(OK, open_callback.WaitForResult());
636
637  TestInt64CompletionCallback callback64;
638  EXPECT_EQ(ERR_IO_PENDING, stream->Seek(FROM_END, 0, callback64.callback()));
639  EXPECT_EQ(file_size, callback64.WaitForResult());
640
641  int total_bytes_written = 0;
642  int total_bytes_read = 0;
643  std::string data_read;
644  TestWriteReadCompletionCallback callback(stream.get(), &total_bytes_written,
645                                           &total_bytes_read, &data_read);
646
647  scoped_refptr<IOBufferWithSize> buf = CreateTestDataBuffer();
648  rv = stream->Write(buf.get(), buf->size(), callback.callback());
649  if (rv == ERR_IO_PENDING)
650    rv = callback.WaitForResult();
651  EXPECT_LT(0, rv);
652  EXPECT_EQ(kTestDataSize, total_bytes_written);
653
654  stream.reset();
655
656  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
657  EXPECT_EQ(kTestDataSize * 2, file_size);
658
659  EXPECT_EQ(kTestDataSize * 2, total_bytes_read);
660  const std::string kExpectedFileData =
661      std::string(kTestData) + std::string(kTestData);
662  EXPECT_EQ(kExpectedFileData, data_read);
663}
664
665class TestWriteCloseCompletionCallback {
666 public:
667  TestWriteCloseCompletionCallback(FileStream* stream, int* total_bytes_written)
668      : result_(0),
669        have_result_(false),
670        waiting_for_result_(false),
671        stream_(stream),
672        total_bytes_written_(total_bytes_written),
673        callback_(base::Bind(&TestWriteCloseCompletionCallback::OnComplete,
674                             base::Unretained(this))),
675        test_data_(CreateTestDataBuffer()),
676        drainable_(new DrainableIOBuffer(test_data_.get(), kTestDataSize)) {}
677
678  int WaitForResult() {
679    DCHECK(!waiting_for_result_);
680    while (!have_result_) {
681      waiting_for_result_ = true;
682      base::RunLoop().Run();
683      waiting_for_result_ = false;
684    }
685    have_result_ = false;  // auto-reset for next callback
686    return result_;
687  }
688
689  const CompletionCallback& callback() const { return callback_; }
690
691 private:
692  void OnComplete(int result) {
693    DCHECK_LT(0, result);
694    *total_bytes_written_ += result;
695
696    int rv;
697
698    if (*total_bytes_written_ != kTestDataSize) {
699      // Recurse to finish writing all data.
700      int total_bytes_written = 0;
701      TestWriteCloseCompletionCallback callback(stream_, &total_bytes_written);
702      rv = stream_->Write(
703          drainable_.get(), drainable_->BytesRemaining(), callback.callback());
704      DCHECK_EQ(ERR_IO_PENDING, rv);
705      rv = callback.WaitForResult();
706      drainable_->DidConsume(total_bytes_written);
707      *total_bytes_written_ += total_bytes_written;
708    }
709
710    result_ = *total_bytes_written_;
711    have_result_ = true;
712    if (waiting_for_result_)
713      base::MessageLoop::current()->Quit();
714  }
715
716  int result_;
717  bool have_result_;
718  bool waiting_for_result_;
719  FileStream* stream_;
720  int* total_bytes_written_;
721  const CompletionCallback callback_;
722  scoped_refptr<IOBufferWithSize> test_data_;
723  scoped_refptr<DrainableIOBuffer> drainable_;
724
725  DISALLOW_COPY_AND_ASSIGN(TestWriteCloseCompletionCallback);
726};
727
728TEST_F(FileStreamTest, AsyncWriteClose) {
729  int64 file_size;
730  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
731
732  scoped_ptr<FileStream> stream(
733      new FileStream(base::MessageLoopProxy::current()));
734  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
735              base::File::FLAG_WRITE | base::File::FLAG_ASYNC;
736  TestCompletionCallback open_callback;
737  int rv = stream->Open(temp_file_path(), flags, open_callback.callback());
738  EXPECT_EQ(ERR_IO_PENDING, rv);
739  EXPECT_EQ(OK, open_callback.WaitForResult());
740
741  TestInt64CompletionCallback callback64;
742  EXPECT_EQ(ERR_IO_PENDING, stream->Seek(FROM_END, 0, callback64.callback()));
743  EXPECT_EQ(file_size, callback64.WaitForResult());
744
745  int total_bytes_written = 0;
746  TestWriteCloseCompletionCallback callback(stream.get(), &total_bytes_written);
747
748  scoped_refptr<IOBufferWithSize> buf = CreateTestDataBuffer();
749  rv = stream->Write(buf.get(), buf->size(), callback.callback());
750  if (rv == ERR_IO_PENDING)
751    total_bytes_written = callback.WaitForResult();
752  EXPECT_LT(0, total_bytes_written);
753  EXPECT_EQ(kTestDataSize, total_bytes_written);
754
755  stream.reset();
756
757  EXPECT_TRUE(base::GetFileSize(temp_file_path(), &file_size));
758  EXPECT_EQ(kTestDataSize * 2, file_size);
759}
760
761TEST_F(FileStreamTest, AsyncOpenAndDelete) {
762  scoped_refptr<base::SequencedWorkerPool> pool(
763      new base::SequencedWorkerPool(1, "StreamTest"));
764
765  bool prev = base::ThreadRestrictions::SetIOAllowed(false);
766  scoped_ptr<FileStream> stream(new FileStream(pool.get()));
767  int flags = base::File::FLAG_OPEN | base::File::FLAG_WRITE |
768              base::File::FLAG_ASYNC;
769  TestCompletionCallback open_callback;
770  int rv = stream->Open(temp_file_path(), flags, open_callback.callback());
771  EXPECT_EQ(ERR_IO_PENDING, rv);
772
773  // Delete the stream without waiting for the open operation to be
774  // complete. Should be safe.
775  stream.reset();
776
777  // Force an operation through the pool.
778  scoped_ptr<FileStream> stream2(new FileStream(pool.get()));
779  TestCompletionCallback open_callback2;
780  rv = stream2->Open(temp_file_path(), flags, open_callback2.callback());
781  EXPECT_EQ(OK, open_callback2.GetResult(rv));
782  stream2.reset();
783
784  pool->Shutdown();
785
786  // open_callback won't be called.
787  base::RunLoop().RunUntilIdle();
788  EXPECT_FALSE(open_callback.have_result());
789  base::ThreadRestrictions::SetIOAllowed(prev);
790}
791
792// Verify that async Write() errors are mapped correctly.
793TEST_F(FileStreamTest, AsyncWriteError) {
794  // Try opening file as read-only and then writing to it using FileStream.
795  uint32 flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
796                 base::File::FLAG_ASYNC;
797
798  base::File file(temp_file_path(), flags);
799  ASSERT_TRUE(file.IsValid());
800
801  scoped_ptr<FileStream> stream(
802  new FileStream(file.Pass(), base::MessageLoopProxy::current()));
803
804  scoped_refptr<IOBuffer> buf = new IOBuffer(1);
805  buf->data()[0] = 0;
806
807  TestCompletionCallback callback;
808  int rv = stream->Write(buf.get(), 1, callback.callback());
809  if (rv == ERR_IO_PENDING)
810    rv = callback.WaitForResult();
811  EXPECT_LT(rv, 0);
812
813  stream.reset();
814  base::RunLoop().RunUntilIdle();
815}
816
817// Verify that async Read() errors are mapped correctly.
818TEST_F(FileStreamTest, AsyncReadError) {
819  // Try opening file for write and then reading from it using FileStream.
820  uint32 flags = base::File::FLAG_OPEN | base::File::FLAG_WRITE |
821                 base::File::FLAG_ASYNC;
822
823  base::File file(temp_file_path(), flags);
824  ASSERT_TRUE(file.IsValid());
825
826  scoped_ptr<FileStream> stream(
827  new FileStream(file.Pass(), base::MessageLoopProxy::current()));
828
829  scoped_refptr<IOBuffer> buf = new IOBuffer(1);
830  TestCompletionCallback callback;
831  int rv = stream->Read(buf.get(), 1, callback.callback());
832  if (rv == ERR_IO_PENDING)
833    rv = callback.WaitForResult();
834  EXPECT_LT(rv, 0);
835
836  stream.reset();
837  base::RunLoop().RunUntilIdle();
838}
839
840#if defined(OS_ANDROID)
841TEST_F(FileStreamTest, ContentUriAsyncRead) {
842  base::FilePath test_dir;
843  PathService::Get(base::DIR_SOURCE_ROOT, &test_dir);
844  test_dir = test_dir.AppendASCII("net");
845  test_dir = test_dir.AppendASCII("data");
846  test_dir = test_dir.AppendASCII("file_stream_unittest");
847  ASSERT_TRUE(base::PathExists(test_dir));
848  base::FilePath image_file = test_dir.Append(FILE_PATH_LITERAL("red.png"));
849
850  // Insert the image into MediaStore. MediaStore will do some conversions, and
851  // return the content URI.
852  base::FilePath path = file_util::InsertImageIntoMediaStore(image_file);
853  EXPECT_TRUE(path.IsContentUri());
854  EXPECT_TRUE(base::PathExists(path));
855  int64 file_size;
856  EXPECT_TRUE(base::GetFileSize(path, &file_size));
857  EXPECT_LT(0, file_size);
858
859  FileStream stream(base::MessageLoopProxy::current());
860  int flags = base::File::FLAG_OPEN | base::File::FLAG_READ |
861              base::File::FLAG_ASYNC;
862  TestCompletionCallback callback;
863  int rv = stream.Open(path, flags, callback.callback());
864  EXPECT_EQ(ERR_IO_PENDING, rv);
865  EXPECT_EQ(OK, callback.WaitForResult());
866
867  int total_bytes_read = 0;
868
869  std::string data_read;
870  for (;;) {
871    scoped_refptr<IOBufferWithSize> buf = new IOBufferWithSize(4);
872    rv = stream.Read(buf.get(), buf->size(), callback.callback());
873    if (rv == ERR_IO_PENDING)
874      rv = callback.WaitForResult();
875    EXPECT_LE(0, rv);
876    if (rv <= 0)
877      break;
878    total_bytes_read += rv;
879    data_read.append(buf->data(), rv);
880  }
881  EXPECT_EQ(file_size, total_bytes_read);
882}
883#endif
884
885}  // namespace
886
887}  // namespace net
888