1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/media_galleries/fileapi/readahead_file_stream_reader.h"
6
7#include <algorithm>
8
9#include "base/message_loop/message_loop.h"
10#include "base/numerics/safe_conversions.h"
11#include "net/base/io_buffer.h"
12#include "net/base/net_errors.h"
13
14using storage::FileStreamReader;
15
16namespace {
17
18const size_t kDesiredNumberOfBuffers = 2;  // So we are always one buffer ahead.
19const int kBufferSize = 1024*1024;  // 1MB to minimize transaction costs.
20
21}  // namespace
22
23ReadaheadFileStreamReader::ReadaheadFileStreamReader(FileStreamReader* source)
24    : source_(source),
25      source_error_(0),
26      source_has_pending_read_(false),
27      weak_factory_(this) {
28}
29
30ReadaheadFileStreamReader::~ReadaheadFileStreamReader() {}
31
32int ReadaheadFileStreamReader::Read(
33    net::IOBuffer* buf, int buf_len, const net::CompletionCallback& callback) {
34  DCHECK(!pending_sink_buffer_.get());
35  DCHECK(pending_read_callback_.is_null());
36
37  ReadFromSourceIfNeeded();
38
39  scoped_refptr<net::DrainableIOBuffer> sink =
40      new net::DrainableIOBuffer(buf, buf_len);
41  int result = FinishReadFromCacheOrStoredError(sink.get());
42
43  // We are waiting for an source read to complete, so save the request.
44  if (result == net::ERR_IO_PENDING) {
45    DCHECK(!pending_sink_buffer_.get());
46    DCHECK(pending_read_callback_.is_null());
47    pending_sink_buffer_ = sink;
48    pending_read_callback_ = callback;
49  }
50
51  return result;
52}
53
54int64 ReadaheadFileStreamReader::GetLength(
55    const net::Int64CompletionCallback& callback) {
56  return source_->GetLength(callback);
57}
58
59int ReadaheadFileStreamReader::FinishReadFromCacheOrStoredError(
60    net::DrainableIOBuffer* sink) {
61  // If we don't have any ready cache, return the pending read code, or
62  // the stored error code.
63  if (buffers_.empty()) {
64    if (source_.get()) {
65      DCHECK(source_has_pending_read_);
66      return net::ERR_IO_PENDING;
67    } else {
68      return source_error_;
69    }
70  }
71
72  while (sink->BytesRemaining() > 0 && !buffers_.empty()) {
73    net::DrainableIOBuffer* source_buffer = buffers_.front().get();
74
75    DCHECK(source_buffer->BytesRemaining() > 0);
76
77    int copy_len = std::min(source_buffer->BytesRemaining(),
78                            sink->BytesRemaining());
79    std::copy(source_buffer->data(), source_buffer->data() + copy_len,
80              sink->data());
81
82    source_buffer->DidConsume(copy_len);
83    sink->DidConsume(copy_len);
84
85    if (source_buffer->BytesRemaining() == 0) {
86      buffers_.pop();
87
88      // Get a new buffer to replace the one we just used up.
89      ReadFromSourceIfNeeded();
90    }
91  }
92
93  return sink->BytesConsumed();
94}
95
96void ReadaheadFileStreamReader::ReadFromSourceIfNeeded() {
97  if (!source_.get() || source_has_pending_read_ ||
98      buffers_.size() >= kDesiredNumberOfBuffers) {
99    return;
100  }
101
102  source_has_pending_read_ = true;
103
104  scoped_refptr<net::IOBuffer> buf(new net::IOBuffer(kBufferSize));
105  int result = source_->Read(
106      buf.get(),
107      kBufferSize,
108      base::Bind(&ReadaheadFileStreamReader::OnFinishReadFromSource,
109                 weak_factory_.GetWeakPtr(),
110                 buf));
111
112  if (result != net::ERR_IO_PENDING)
113    OnFinishReadFromSource(buf.get(), result);
114}
115
116void ReadaheadFileStreamReader::OnFinishReadFromSource(net::IOBuffer* buf,
117                                                       int result) {
118  DCHECK(result != net::ERR_IO_PENDING);
119  DCHECK(source_has_pending_read_);
120  source_has_pending_read_ = false;
121
122  // Either store the data read from |source_|, or store the error code.
123  if (result > 0) {
124    scoped_refptr<net::DrainableIOBuffer> drainable_buffer(
125        new net::DrainableIOBuffer(buf, result));
126    buffers_.push(drainable_buffer);
127    ReadFromSourceIfNeeded();
128  } else {
129    source_.reset();
130    source_error_ = result;
131  }
132
133  // If there's a read request waiting for the source FileStreamReader to
134  // finish reading, fulfill that request now from the cache or stored error.
135  if (pending_sink_buffer_.get()) {
136    DCHECK(!pending_read_callback_.is_null());
137
138    // Free the pending callback before running it, as the callback often
139    // dispatches another read.
140    scoped_refptr<net::DrainableIOBuffer> sink = pending_sink_buffer_;
141    pending_sink_buffer_ = NULL;
142    net::CompletionCallback completion_callback = pending_read_callback_;
143    pending_read_callback_.Reset();
144
145    completion_callback.Run(FinishReadFromCacheOrStoredError(sink.get()));
146  }
147}
148