1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/chromeos/drive/drive_file_stream_reader.h"
6
7#include <algorithm>
8#include <cstring>
9
10#include "base/callback_helpers.h"
11#include "base/logging.h"
12#include "base/sequenced_task_runner.h"
13#include "chrome/browser/chromeos/drive/drive.pb.h"
14#include "chrome/browser/chromeos/drive/file_system_interface.h"
15#include "chrome/browser/chromeos/drive/local_file_reader.h"
16#include "content/public/browser/browser_thread.h"
17#include "google_apis/drive/task_util.h"
18#include "net/base/io_buffer.h"
19#include "net/base/net_errors.h"
20#include "net/http/http_byte_range.h"
21
22using content::BrowserThread;
23
24namespace drive {
25namespace {
26
27// Converts FileError code to net::Error code.
28int FileErrorToNetError(FileError error) {
29  return net::FileErrorToNetError(FileErrorToBaseFileError(error));
30}
31
32// Computes the concrete |start| offset and the |length| of |range| in a file
33// of |total| size.
34//
35// This is a thin wrapper of HttpByteRange::ComputeBounds, extended to allow
36// an empty range at the end of the file, like "Range: bytes 0-" on a zero byte
37// file. This is for convenience in unifying implementation with the seek
38// operation of stream reader. HTTP doesn't allow such ranges but we want to
39// treat such seeking as valid.
40bool ComputeConcretePosition(net::HttpByteRange range, int64 total,
41                             int64* start, int64* length) {
42  // The special case when empty range in the end of the file is selected.
43  if (range.HasFirstBytePosition() && range.first_byte_position() == total) {
44    *start = range.first_byte_position();
45    *length = 0;
46    return true;
47  }
48
49  // Otherwise forward to HttpByteRange::ComputeBounds.
50  if (!range.ComputeBounds(total))
51    return false;
52  *start = range.first_byte_position();
53  *length = range.last_byte_position() - range.first_byte_position() + 1;
54  return true;
55}
56
57}  // namespace
58
59namespace internal {
60namespace {
61
62// Copies the content in |pending_data| into |buffer| at most
63// |buffer_length| bytes, and erases the copied data from
64// |pending_data|. Returns the number of copied bytes.
65int ReadInternal(ScopedVector<std::string>* pending_data,
66                 net::IOBuffer* buffer, int buffer_length) {
67  size_t index = 0;
68  int offset = 0;
69  for (; index < pending_data->size() && offset < buffer_length; ++index) {
70    const std::string& chunk = *(*pending_data)[index];
71    DCHECK(!chunk.empty());
72
73    size_t bytes_to_read = std::min(
74        chunk.size(), static_cast<size_t>(buffer_length - offset));
75    std::memmove(buffer->data() + offset, chunk.data(), bytes_to_read);
76    offset += bytes_to_read;
77    if (bytes_to_read < chunk.size()) {
78      // The chunk still has some remaining data.
79      // So remove leading (copied) bytes, and quit the loop so that
80      // the remaining data won't be deleted in the following erase().
81      (*pending_data)[index]->erase(0, bytes_to_read);
82      break;
83    }
84  }
85
86  // Consume the copied data.
87  pending_data->erase(pending_data->begin(), pending_data->begin() + index);
88
89  return offset;
90}
91
92}  // namespace
93
94LocalReaderProxy::LocalReaderProxy(
95    scoped_ptr<util::LocalFileReader> file_reader, int64 length)
96    : file_reader_(file_reader.Pass()),
97      remaining_length_(length),
98      weak_ptr_factory_(this) {
99  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
100  DCHECK(file_reader_);
101}
102
103LocalReaderProxy::~LocalReaderProxy() {
104}
105
106int LocalReaderProxy::Read(net::IOBuffer* buffer, int buffer_length,
107                           const net::CompletionCallback& callback) {
108  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
109  DCHECK(file_reader_);
110
111  if (buffer_length > remaining_length_) {
112    // Here, narrowing is safe.
113    buffer_length = static_cast<int>(remaining_length_);
114  }
115
116  if (!buffer_length)
117    return 0;
118
119  file_reader_->Read(buffer, buffer_length,
120                     base::Bind(&LocalReaderProxy::OnReadCompleted,
121                                weak_ptr_factory_.GetWeakPtr(), callback));
122  return net::ERR_IO_PENDING;
123}
124
125void LocalReaderProxy::OnGetContent(scoped_ptr<std::string> data) {
126  // This method should never be called, because no data should be received
127  // from the network during the reading of local-cache file.
128  NOTREACHED();
129}
130
131void LocalReaderProxy::OnCompleted(FileError error) {
132  // If this method is called, no network error should be happened.
133  DCHECK_EQ(FILE_ERROR_OK, error);
134}
135
136void LocalReaderProxy::OnReadCompleted(const net::CompletionCallback& callback,
137                                       int read_result) {
138  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
139  DCHECK(file_reader_);
140
141  if (read_result >= 0) {
142    // |read_result| bytes data is read.
143    DCHECK_LE(read_result, remaining_length_);
144    remaining_length_ -= read_result;
145  } else {
146    // An error occurs. Close the |file_reader_|.
147    file_reader_.reset();
148  }
149  callback.Run(read_result);
150}
151
152NetworkReaderProxy::NetworkReaderProxy(
153    int64 offset,
154    int64 content_length,
155    int64 full_content_length,
156    const base::Closure& job_canceller)
157    : remaining_offset_(offset),
158      remaining_content_length_(content_length),
159      is_full_download_(offset + content_length == full_content_length),
160      error_code_(net::OK),
161      buffer_length_(0),
162      job_canceller_(job_canceller) {
163  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
164}
165
166NetworkReaderProxy::~NetworkReaderProxy() {
167  if (!job_canceller_.is_null()) {
168    job_canceller_.Run();
169  }
170}
171
172int NetworkReaderProxy::Read(net::IOBuffer* buffer, int buffer_length,
173                             const net::CompletionCallback& callback) {
174  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
175  // Check if there is no pending Read operation.
176  DCHECK(!buffer_.get());
177  DCHECK_EQ(buffer_length_, 0);
178  DCHECK(callback_.is_null());
179  // Validate the arguments.
180  DCHECK(buffer);
181  DCHECK_GT(buffer_length, 0);
182  DCHECK(!callback.is_null());
183
184  if (error_code_ != net::OK) {
185    // An error is already found. Return it immediately.
186    return error_code_;
187  }
188
189  if (remaining_content_length_ == 0) {
190    // If no more data, return immediately.
191    return 0;
192  }
193
194  if (buffer_length > remaining_content_length_) {
195    // Here, narrowing cast should be safe.
196    buffer_length = static_cast<int>(remaining_content_length_);
197  }
198
199  if (pending_data_.empty()) {
200    // No data is available. Keep the arguments, and return pending status.
201    buffer_ = buffer;
202    buffer_length_ = buffer_length;
203    callback_ = callback;
204    return net::ERR_IO_PENDING;
205  }
206
207  int result = ReadInternal(&pending_data_, buffer, buffer_length);
208  remaining_content_length_ -= result;
209  DCHECK_GE(remaining_content_length_, 0);
210
211  // Although OnCompleted() should reset |job_canceller_| when download is done,
212  // due to timing issues the ReaderProxy instance may be destructed before the
213  // notification. To fix the case we reset here earlier.
214  if (is_full_download_ && remaining_content_length_ == 0)
215    job_canceller_.Reset();
216
217  return result;
218}
219
220void NetworkReaderProxy::OnGetContent(scoped_ptr<std::string> data) {
221  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
222  DCHECK(data && !data->empty());
223
224  if (remaining_offset_ >= static_cast<int64>(data->length())) {
225    // Skip unneeded leading data.
226    remaining_offset_ -= data->length();
227    return;
228  }
229
230  if (remaining_offset_ > 0) {
231    // Erase unnecessary leading bytes.
232    data->erase(0, static_cast<size_t>(remaining_offset_));
233    remaining_offset_ = 0;
234  }
235
236  pending_data_.push_back(data.release());
237  if (!buffer_.get()) {
238    // No pending Read operation.
239    return;
240  }
241
242  int result = ReadInternal(&pending_data_, buffer_.get(), buffer_length_);
243  remaining_content_length_ -= result;
244  DCHECK_GE(remaining_content_length_, 0);
245
246  if (is_full_download_ && remaining_content_length_ == 0)
247    job_canceller_.Reset();
248
249  buffer_ = NULL;
250  buffer_length_ = 0;
251  DCHECK(!callback_.is_null());
252  base::ResetAndReturn(&callback_).Run(result);
253}
254
255void NetworkReaderProxy::OnCompleted(FileError error) {
256  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
257  // The downloading is completed, so we do not need to cancel the job
258  // in the destructor.
259  job_canceller_.Reset();
260
261  if (error == FILE_ERROR_OK) {
262    return;
263  }
264
265  error_code_ = FileErrorToNetError(error);
266  pending_data_.clear();
267
268  if (callback_.is_null()) {
269    // No pending Read operation.
270    return;
271  }
272
273  buffer_ = NULL;
274  buffer_length_ = 0;
275  base::ResetAndReturn(&callback_).Run(error_code_);
276}
277
278}  // namespace internal
279
280namespace {
281
282// Calls FileSystemInterface::GetFileContent if the file system
283// is available. If not, the |completion_callback| is invoked with
284// FILE_ERROR_FAILED.
285base::Closure GetFileContentOnUIThread(
286    const DriveFileStreamReader::FileSystemGetter& file_system_getter,
287    const base::FilePath& drive_file_path,
288    const GetFileContentInitializedCallback& initialized_callback,
289    const google_apis::GetContentCallback& get_content_callback,
290    const FileOperationCallback& completion_callback) {
291  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
292
293  FileSystemInterface* file_system = file_system_getter.Run();
294  if (!file_system) {
295    completion_callback.Run(FILE_ERROR_FAILED);
296    return base::Closure();
297  }
298
299  return google_apis::CreateRelayCallback(
300      file_system->GetFileContent(drive_file_path,
301                                  initialized_callback,
302                                  get_content_callback,
303                                  completion_callback));
304}
305
306// Helper to run FileSystemInterface::GetFileContent on UI thread.
307void GetFileContent(
308    const DriveFileStreamReader::FileSystemGetter& file_system_getter,
309    const base::FilePath& drive_file_path,
310    const GetFileContentInitializedCallback& initialized_callback,
311    const google_apis::GetContentCallback& get_content_callback,
312    const FileOperationCallback& completion_callback,
313    const base::Callback<void(const base::Closure&)>& reply_callback) {
314  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
315
316  BrowserThread::PostTaskAndReplyWithResult(
317      BrowserThread::UI,
318      FROM_HERE,
319      base::Bind(&GetFileContentOnUIThread,
320                 file_system_getter,
321                 drive_file_path,
322                 google_apis::CreateRelayCallback(initialized_callback),
323                 google_apis::CreateRelayCallback(get_content_callback),
324                 google_apis::CreateRelayCallback(completion_callback)),
325      reply_callback);
326}
327
328}  // namespace
329
330DriveFileStreamReader::DriveFileStreamReader(
331    const FileSystemGetter& file_system_getter,
332    base::SequencedTaskRunner* file_task_runner)
333    : file_system_getter_(file_system_getter),
334      file_task_runner_(file_task_runner),
335      weak_ptr_factory_(this) {
336  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
337}
338
339DriveFileStreamReader::~DriveFileStreamReader() {
340}
341
342bool DriveFileStreamReader::IsInitialized() const {
343  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
344  return reader_proxy_.get() != NULL;
345}
346
347void DriveFileStreamReader::Initialize(
348    const base::FilePath& drive_file_path,
349    const net::HttpByteRange& byte_range,
350    const InitializeCompletionCallback& callback) {
351  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
352  DCHECK(!callback.is_null());
353
354  GetFileContent(
355      file_system_getter_,
356      drive_file_path,
357      base::Bind(&DriveFileStreamReader
358                     ::InitializeAfterGetFileContentInitialized,
359                 weak_ptr_factory_.GetWeakPtr(),
360                 byte_range,
361                 callback),
362      base::Bind(&DriveFileStreamReader::OnGetContent,
363                 weak_ptr_factory_.GetWeakPtr()),
364      base::Bind(&DriveFileStreamReader::OnGetFileContentCompletion,
365                 weak_ptr_factory_.GetWeakPtr(),
366                 callback),
367      base::Bind(&DriveFileStreamReader::StoreCancelDownloadClosure,
368                 weak_ptr_factory_.GetWeakPtr()));
369}
370
371int DriveFileStreamReader::Read(net::IOBuffer* buffer, int buffer_length,
372                                const net::CompletionCallback& callback) {
373  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
374  DCHECK(reader_proxy_);
375  DCHECK(buffer);
376  DCHECK(!callback.is_null());
377  return reader_proxy_->Read(buffer, buffer_length, callback);
378}
379
380void DriveFileStreamReader::StoreCancelDownloadClosure(
381    const base::Closure& cancel_download_closure) {
382  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
383  cancel_download_closure_ = cancel_download_closure;
384}
385
386void DriveFileStreamReader::InitializeAfterGetFileContentInitialized(
387    const net::HttpByteRange& byte_range,
388    const InitializeCompletionCallback& callback,
389    FileError error,
390    const base::FilePath& local_cache_file_path,
391    scoped_ptr<ResourceEntry> entry) {
392  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
393  // StoreCancelDownloadClosure() should be called before this function.
394  DCHECK(!cancel_download_closure_.is_null());
395
396  if (error != FILE_ERROR_OK) {
397    callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>());
398    return;
399  }
400  DCHECK(entry);
401
402  int64 range_start = 0, range_length = 0;
403  if (!ComputeConcretePosition(byte_range, entry->file_info().size(),
404                               &range_start, &range_length)) {
405    // If |byte_range| is invalid (e.g. out of bounds), return with an error.
406    // At the same time, we cancel the in-flight downloading operation if
407    // needed and and invalidate weak pointers so that we won't
408    // receive unwanted callbacks.
409    cancel_download_closure_.Run();
410    weak_ptr_factory_.InvalidateWeakPtrs();
411    callback.Run(
412        net::ERR_REQUEST_RANGE_NOT_SATISFIABLE, scoped_ptr<ResourceEntry>());
413    return;
414  }
415
416  if (local_cache_file_path.empty()) {
417    // The file is not cached, and being downloaded.
418    reader_proxy_.reset(
419        new internal::NetworkReaderProxy(
420            range_start, range_length,
421            entry->file_info().size(), cancel_download_closure_));
422    callback.Run(net::OK, entry.Pass());
423    return;
424  }
425
426  // Otherwise, open the stream for file.
427  scoped_ptr<util::LocalFileReader> file_reader(
428      new util::LocalFileReader(file_task_runner_.get()));
429  util::LocalFileReader* file_reader_ptr = file_reader.get();
430  file_reader_ptr->Open(
431      local_cache_file_path,
432      range_start,
433      base::Bind(
434          &DriveFileStreamReader::InitializeAfterLocalFileOpen,
435          weak_ptr_factory_.GetWeakPtr(),
436          range_length,
437          callback,
438          base::Passed(&entry),
439          base::Passed(&file_reader)));
440}
441
442void DriveFileStreamReader::InitializeAfterLocalFileOpen(
443    int64 length,
444    const InitializeCompletionCallback& callback,
445    scoped_ptr<ResourceEntry> entry,
446    scoped_ptr<util::LocalFileReader> file_reader,
447    int open_result) {
448  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
449
450  if (open_result != net::OK) {
451    callback.Run(net::ERR_FAILED, scoped_ptr<ResourceEntry>());
452    return;
453  }
454
455  reader_proxy_.reset(
456      new internal::LocalReaderProxy(file_reader.Pass(), length));
457  callback.Run(net::OK, entry.Pass());
458}
459
460void DriveFileStreamReader::OnGetContent(google_apis::GDataErrorCode error_code,
461                                         scoped_ptr<std::string> data) {
462  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
463  DCHECK(reader_proxy_);
464  reader_proxy_->OnGetContent(data.Pass());
465}
466
467void DriveFileStreamReader::OnGetFileContentCompletion(
468    const InitializeCompletionCallback& callback,
469    FileError error) {
470  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
471
472  if (reader_proxy_) {
473    // If the proxy object available, send the error to it.
474    reader_proxy_->OnCompleted(error);
475  } else {
476    // Here the proxy object is not yet available.
477    // There are two cases. 1) Some error happens during the initialization.
478    // 2) the cache file is found, but the proxy object is not *yet*
479    // initialized because the file is being opened.
480    // We are interested in 1) only. The callback for 2) will be called
481    // after opening the file is completed.
482    // Note: due to the same reason, LocalReaderProxy::OnCompleted may
483    // or may not be called. This is timing issue, and it is difficult to avoid
484    // unfortunately.
485    if (error != FILE_ERROR_OK) {
486      callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>());
487    }
488  }
489}
490
491}  // namespace drive
492