1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/media_galleries/fileapi/readahead_file_stream_reader.h" 6 7#include <algorithm> 8 9#include "base/message_loop/message_loop.h" 10#include "base/numerics/safe_conversions.h" 11#include "net/base/io_buffer.h" 12#include "net/base/net_errors.h" 13 14using storage::FileStreamReader; 15 16namespace { 17 18const size_t kDesiredNumberOfBuffers = 2; // So we are always one buffer ahead. 19const int kBufferSize = 1024*1024; // 1MB to minimize transaction costs. 20 21} // namespace 22 23ReadaheadFileStreamReader::ReadaheadFileStreamReader(FileStreamReader* source) 24 : source_(source), 25 source_error_(0), 26 source_has_pending_read_(false), 27 weak_factory_(this) { 28} 29 30ReadaheadFileStreamReader::~ReadaheadFileStreamReader() {} 31 32int ReadaheadFileStreamReader::Read( 33 net::IOBuffer* buf, int buf_len, const net::CompletionCallback& callback) { 34 DCHECK(!pending_sink_buffer_.get()); 35 DCHECK(pending_read_callback_.is_null()); 36 37 ReadFromSourceIfNeeded(); 38 39 scoped_refptr<net::DrainableIOBuffer> sink = 40 new net::DrainableIOBuffer(buf, buf_len); 41 int result = FinishReadFromCacheOrStoredError(sink.get()); 42 43 // We are waiting for an source read to complete, so save the request. 44 if (result == net::ERR_IO_PENDING) { 45 DCHECK(!pending_sink_buffer_.get()); 46 DCHECK(pending_read_callback_.is_null()); 47 pending_sink_buffer_ = sink; 48 pending_read_callback_ = callback; 49 } 50 51 return result; 52} 53 54int64 ReadaheadFileStreamReader::GetLength( 55 const net::Int64CompletionCallback& callback) { 56 return source_->GetLength(callback); 57} 58 59int ReadaheadFileStreamReader::FinishReadFromCacheOrStoredError( 60 net::DrainableIOBuffer* sink) { 61 // If we don't have any ready cache, return the pending read code, or 62 // the stored error code. 63 if (buffers_.empty()) { 64 if (source_.get()) { 65 DCHECK(source_has_pending_read_); 66 return net::ERR_IO_PENDING; 67 } else { 68 return source_error_; 69 } 70 } 71 72 while (sink->BytesRemaining() > 0 && !buffers_.empty()) { 73 net::DrainableIOBuffer* source_buffer = buffers_.front().get(); 74 75 DCHECK(source_buffer->BytesRemaining() > 0); 76 77 int copy_len = std::min(source_buffer->BytesRemaining(), 78 sink->BytesRemaining()); 79 std::copy(source_buffer->data(), source_buffer->data() + copy_len, 80 sink->data()); 81 82 source_buffer->DidConsume(copy_len); 83 sink->DidConsume(copy_len); 84 85 if (source_buffer->BytesRemaining() == 0) { 86 buffers_.pop(); 87 88 // Get a new buffer to replace the one we just used up. 89 ReadFromSourceIfNeeded(); 90 } 91 } 92 93 return sink->BytesConsumed(); 94} 95 96void ReadaheadFileStreamReader::ReadFromSourceIfNeeded() { 97 if (!source_.get() || source_has_pending_read_ || 98 buffers_.size() >= kDesiredNumberOfBuffers) { 99 return; 100 } 101 102 source_has_pending_read_ = true; 103 104 scoped_refptr<net::IOBuffer> buf(new net::IOBuffer(kBufferSize)); 105 int result = source_->Read( 106 buf.get(), 107 kBufferSize, 108 base::Bind(&ReadaheadFileStreamReader::OnFinishReadFromSource, 109 weak_factory_.GetWeakPtr(), 110 buf)); 111 112 if (result != net::ERR_IO_PENDING) 113 OnFinishReadFromSource(buf.get(), result); 114} 115 116void ReadaheadFileStreamReader::OnFinishReadFromSource(net::IOBuffer* buf, 117 int result) { 118 DCHECK(result != net::ERR_IO_PENDING); 119 DCHECK(source_has_pending_read_); 120 source_has_pending_read_ = false; 121 122 // Either store the data read from |source_|, or store the error code. 123 if (result > 0) { 124 scoped_refptr<net::DrainableIOBuffer> drainable_buffer( 125 new net::DrainableIOBuffer(buf, result)); 126 buffers_.push(drainable_buffer); 127 ReadFromSourceIfNeeded(); 128 } else { 129 source_.reset(); 130 source_error_ = result; 131 } 132 133 // If there's a read request waiting for the source FileStreamReader to 134 // finish reading, fulfill that request now from the cache or stored error. 135 if (pending_sink_buffer_.get()) { 136 DCHECK(!pending_read_callback_.is_null()); 137 138 // Free the pending callback before running it, as the callback often 139 // dispatches another read. 140 scoped_refptr<net::DrainableIOBuffer> sink = pending_sink_buffer_; 141 pending_sink_buffer_ = NULL; 142 net::CompletionCallback completion_callback = pending_read_callback_; 143 pending_read_callback_.Reset(); 144 145 completion_callback.Run(FinishReadFromCacheOrStoredError(sink.get())); 146 } 147} 148