1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/chromeos/drive/drive_file_stream_reader.h" 6 7#include <algorithm> 8#include <cstring> 9 10#include "base/callback_helpers.h" 11#include "base/logging.h" 12#include "base/sequenced_task_runner.h" 13#include "chrome/browser/chromeos/drive/drive.pb.h" 14#include "chrome/browser/chromeos/drive/file_system_interface.h" 15#include "chrome/browser/chromeos/drive/local_file_reader.h" 16#include "content/public/browser/browser_thread.h" 17#include "google_apis/drive/task_util.h" 18#include "net/base/io_buffer.h" 19#include "net/base/net_errors.h" 20#include "net/http/http_byte_range.h" 21 22using content::BrowserThread; 23 24namespace drive { 25namespace { 26 27// Converts FileError code to net::Error code. 28int FileErrorToNetError(FileError error) { 29 return net::FileErrorToNetError(FileErrorToBaseFileError(error)); 30} 31 32// Computes the concrete |start| offset and the |length| of |range| in a file 33// of |total| size. 34// 35// This is a thin wrapper of HttpByteRange::ComputeBounds, extended to allow 36// an empty range at the end of the file, like "Range: bytes 0-" on a zero byte 37// file. This is for convenience in unifying implementation with the seek 38// operation of stream reader. HTTP doesn't allow such ranges but we want to 39// treat such seeking as valid. 40bool ComputeConcretePosition(net::HttpByteRange range, int64 total, 41 int64* start, int64* length) { 42 // The special case when empty range in the end of the file is selected. 43 if (range.HasFirstBytePosition() && range.first_byte_position() == total) { 44 *start = range.first_byte_position(); 45 *length = 0; 46 return true; 47 } 48 49 // Otherwise forward to HttpByteRange::ComputeBounds. 50 if (!range.ComputeBounds(total)) 51 return false; 52 *start = range.first_byte_position(); 53 *length = range.last_byte_position() - range.first_byte_position() + 1; 54 return true; 55} 56 57} // namespace 58 59namespace internal { 60namespace { 61 62// Copies the content in |pending_data| into |buffer| at most 63// |buffer_length| bytes, and erases the copied data from 64// |pending_data|. Returns the number of copied bytes. 65int ReadInternal(ScopedVector<std::string>* pending_data, 66 net::IOBuffer* buffer, int buffer_length) { 67 size_t index = 0; 68 int offset = 0; 69 for (; index < pending_data->size() && offset < buffer_length; ++index) { 70 const std::string& chunk = *(*pending_data)[index]; 71 DCHECK(!chunk.empty()); 72 73 size_t bytes_to_read = std::min( 74 chunk.size(), static_cast<size_t>(buffer_length - offset)); 75 std::memmove(buffer->data() + offset, chunk.data(), bytes_to_read); 76 offset += bytes_to_read; 77 if (bytes_to_read < chunk.size()) { 78 // The chunk still has some remaining data. 79 // So remove leading (copied) bytes, and quit the loop so that 80 // the remaining data won't be deleted in the following erase(). 81 (*pending_data)[index]->erase(0, bytes_to_read); 82 break; 83 } 84 } 85 86 // Consume the copied data. 87 pending_data->erase(pending_data->begin(), pending_data->begin() + index); 88 89 return offset; 90} 91 92} // namespace 93 94LocalReaderProxy::LocalReaderProxy( 95 scoped_ptr<util::LocalFileReader> file_reader, int64 length) 96 : file_reader_(file_reader.Pass()), 97 remaining_length_(length), 98 weak_ptr_factory_(this) { 99 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 100 DCHECK(file_reader_); 101} 102 103LocalReaderProxy::~LocalReaderProxy() { 104} 105 106int LocalReaderProxy::Read(net::IOBuffer* buffer, int buffer_length, 107 const net::CompletionCallback& callback) { 108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 109 DCHECK(file_reader_); 110 111 if (buffer_length > remaining_length_) { 112 // Here, narrowing is safe. 113 buffer_length = static_cast<int>(remaining_length_); 114 } 115 116 if (!buffer_length) 117 return 0; 118 119 file_reader_->Read(buffer, buffer_length, 120 base::Bind(&LocalReaderProxy::OnReadCompleted, 121 weak_ptr_factory_.GetWeakPtr(), callback)); 122 return net::ERR_IO_PENDING; 123} 124 125void LocalReaderProxy::OnGetContent(scoped_ptr<std::string> data) { 126 // This method should never be called, because no data should be received 127 // from the network during the reading of local-cache file. 128 NOTREACHED(); 129} 130 131void LocalReaderProxy::OnCompleted(FileError error) { 132 // If this method is called, no network error should be happened. 133 DCHECK_EQ(FILE_ERROR_OK, error); 134} 135 136void LocalReaderProxy::OnReadCompleted(const net::CompletionCallback& callback, 137 int read_result) { 138 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 139 DCHECK(file_reader_); 140 141 if (read_result >= 0) { 142 // |read_result| bytes data is read. 143 DCHECK_LE(read_result, remaining_length_); 144 remaining_length_ -= read_result; 145 } else { 146 // An error occurs. Close the |file_reader_|. 147 file_reader_.reset(); 148 } 149 callback.Run(read_result); 150} 151 152NetworkReaderProxy::NetworkReaderProxy( 153 int64 offset, 154 int64 content_length, 155 int64 full_content_length, 156 const base::Closure& job_canceller) 157 : remaining_offset_(offset), 158 remaining_content_length_(content_length), 159 is_full_download_(offset + content_length == full_content_length), 160 error_code_(net::OK), 161 buffer_length_(0), 162 job_canceller_(job_canceller) { 163 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 164} 165 166NetworkReaderProxy::~NetworkReaderProxy() { 167 if (!job_canceller_.is_null()) { 168 job_canceller_.Run(); 169 } 170} 171 172int NetworkReaderProxy::Read(net::IOBuffer* buffer, int buffer_length, 173 const net::CompletionCallback& callback) { 174 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 175 // Check if there is no pending Read operation. 176 DCHECK(!buffer_.get()); 177 DCHECK_EQ(buffer_length_, 0); 178 DCHECK(callback_.is_null()); 179 // Validate the arguments. 180 DCHECK(buffer); 181 DCHECK_GT(buffer_length, 0); 182 DCHECK(!callback.is_null()); 183 184 if (error_code_ != net::OK) { 185 // An error is already found. Return it immediately. 186 return error_code_; 187 } 188 189 if (remaining_content_length_ == 0) { 190 // If no more data, return immediately. 191 return 0; 192 } 193 194 if (buffer_length > remaining_content_length_) { 195 // Here, narrowing cast should be safe. 196 buffer_length = static_cast<int>(remaining_content_length_); 197 } 198 199 if (pending_data_.empty()) { 200 // No data is available. Keep the arguments, and return pending status. 201 buffer_ = buffer; 202 buffer_length_ = buffer_length; 203 callback_ = callback; 204 return net::ERR_IO_PENDING; 205 } 206 207 int result = ReadInternal(&pending_data_, buffer, buffer_length); 208 remaining_content_length_ -= result; 209 DCHECK_GE(remaining_content_length_, 0); 210 211 // Although OnCompleted() should reset |job_canceller_| when download is done, 212 // due to timing issues the ReaderProxy instance may be destructed before the 213 // notification. To fix the case we reset here earlier. 214 if (is_full_download_ && remaining_content_length_ == 0) 215 job_canceller_.Reset(); 216 217 return result; 218} 219 220void NetworkReaderProxy::OnGetContent(scoped_ptr<std::string> data) { 221 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 222 DCHECK(data && !data->empty()); 223 224 if (remaining_offset_ >= static_cast<int64>(data->length())) { 225 // Skip unneeded leading data. 226 remaining_offset_ -= data->length(); 227 return; 228 } 229 230 if (remaining_offset_ > 0) { 231 // Erase unnecessary leading bytes. 232 data->erase(0, static_cast<size_t>(remaining_offset_)); 233 remaining_offset_ = 0; 234 } 235 236 pending_data_.push_back(data.release()); 237 if (!buffer_.get()) { 238 // No pending Read operation. 239 return; 240 } 241 242 int result = ReadInternal(&pending_data_, buffer_.get(), buffer_length_); 243 remaining_content_length_ -= result; 244 DCHECK_GE(remaining_content_length_, 0); 245 246 if (is_full_download_ && remaining_content_length_ == 0) 247 job_canceller_.Reset(); 248 249 buffer_ = NULL; 250 buffer_length_ = 0; 251 DCHECK(!callback_.is_null()); 252 base::ResetAndReturn(&callback_).Run(result); 253} 254 255void NetworkReaderProxy::OnCompleted(FileError error) { 256 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 257 // The downloading is completed, so we do not need to cancel the job 258 // in the destructor. 259 job_canceller_.Reset(); 260 261 if (error == FILE_ERROR_OK) { 262 return; 263 } 264 265 error_code_ = FileErrorToNetError(error); 266 pending_data_.clear(); 267 268 if (callback_.is_null()) { 269 // No pending Read operation. 270 return; 271 } 272 273 buffer_ = NULL; 274 buffer_length_ = 0; 275 base::ResetAndReturn(&callback_).Run(error_code_); 276} 277 278} // namespace internal 279 280namespace { 281 282// Calls FileSystemInterface::GetFileContent if the file system 283// is available. If not, the |completion_callback| is invoked with 284// FILE_ERROR_FAILED. 285base::Closure GetFileContentOnUIThread( 286 const DriveFileStreamReader::FileSystemGetter& file_system_getter, 287 const base::FilePath& drive_file_path, 288 const GetFileContentInitializedCallback& initialized_callback, 289 const google_apis::GetContentCallback& get_content_callback, 290 const FileOperationCallback& completion_callback) { 291 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 292 293 FileSystemInterface* file_system = file_system_getter.Run(); 294 if (!file_system) { 295 completion_callback.Run(FILE_ERROR_FAILED); 296 return base::Closure(); 297 } 298 299 return google_apis::CreateRelayCallback( 300 file_system->GetFileContent(drive_file_path, 301 initialized_callback, 302 get_content_callback, 303 completion_callback)); 304} 305 306// Helper to run FileSystemInterface::GetFileContent on UI thread. 307void GetFileContent( 308 const DriveFileStreamReader::FileSystemGetter& file_system_getter, 309 const base::FilePath& drive_file_path, 310 const GetFileContentInitializedCallback& initialized_callback, 311 const google_apis::GetContentCallback& get_content_callback, 312 const FileOperationCallback& completion_callback, 313 const base::Callback<void(const base::Closure&)>& reply_callback) { 314 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 315 316 BrowserThread::PostTaskAndReplyWithResult( 317 BrowserThread::UI, 318 FROM_HERE, 319 base::Bind(&GetFileContentOnUIThread, 320 file_system_getter, 321 drive_file_path, 322 google_apis::CreateRelayCallback(initialized_callback), 323 google_apis::CreateRelayCallback(get_content_callback), 324 google_apis::CreateRelayCallback(completion_callback)), 325 reply_callback); 326} 327 328} // namespace 329 330DriveFileStreamReader::DriveFileStreamReader( 331 const FileSystemGetter& file_system_getter, 332 base::SequencedTaskRunner* file_task_runner) 333 : file_system_getter_(file_system_getter), 334 file_task_runner_(file_task_runner), 335 weak_ptr_factory_(this) { 336 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 337} 338 339DriveFileStreamReader::~DriveFileStreamReader() { 340} 341 342bool DriveFileStreamReader::IsInitialized() const { 343 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 344 return reader_proxy_.get() != NULL; 345} 346 347void DriveFileStreamReader::Initialize( 348 const base::FilePath& drive_file_path, 349 const net::HttpByteRange& byte_range, 350 const InitializeCompletionCallback& callback) { 351 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 352 DCHECK(!callback.is_null()); 353 354 GetFileContent( 355 file_system_getter_, 356 drive_file_path, 357 base::Bind(&DriveFileStreamReader 358 ::InitializeAfterGetFileContentInitialized, 359 weak_ptr_factory_.GetWeakPtr(), 360 byte_range, 361 callback), 362 base::Bind(&DriveFileStreamReader::OnGetContent, 363 weak_ptr_factory_.GetWeakPtr()), 364 base::Bind(&DriveFileStreamReader::OnGetFileContentCompletion, 365 weak_ptr_factory_.GetWeakPtr(), 366 callback), 367 base::Bind(&DriveFileStreamReader::StoreCancelDownloadClosure, 368 weak_ptr_factory_.GetWeakPtr())); 369} 370 371int DriveFileStreamReader::Read(net::IOBuffer* buffer, int buffer_length, 372 const net::CompletionCallback& callback) { 373 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 374 DCHECK(reader_proxy_); 375 DCHECK(buffer); 376 DCHECK(!callback.is_null()); 377 return reader_proxy_->Read(buffer, buffer_length, callback); 378} 379 380void DriveFileStreamReader::StoreCancelDownloadClosure( 381 const base::Closure& cancel_download_closure) { 382 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 383 cancel_download_closure_ = cancel_download_closure; 384} 385 386void DriveFileStreamReader::InitializeAfterGetFileContentInitialized( 387 const net::HttpByteRange& byte_range, 388 const InitializeCompletionCallback& callback, 389 FileError error, 390 const base::FilePath& local_cache_file_path, 391 scoped_ptr<ResourceEntry> entry) { 392 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 393 // StoreCancelDownloadClosure() should be called before this function. 394 DCHECK(!cancel_download_closure_.is_null()); 395 396 if (error != FILE_ERROR_OK) { 397 callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>()); 398 return; 399 } 400 DCHECK(entry); 401 402 int64 range_start = 0, range_length = 0; 403 if (!ComputeConcretePosition(byte_range, entry->file_info().size(), 404 &range_start, &range_length)) { 405 // If |byte_range| is invalid (e.g. out of bounds), return with an error. 406 // At the same time, we cancel the in-flight downloading operation if 407 // needed and and invalidate weak pointers so that we won't 408 // receive unwanted callbacks. 409 cancel_download_closure_.Run(); 410 weak_ptr_factory_.InvalidateWeakPtrs(); 411 callback.Run( 412 net::ERR_REQUEST_RANGE_NOT_SATISFIABLE, scoped_ptr<ResourceEntry>()); 413 return; 414 } 415 416 if (local_cache_file_path.empty()) { 417 // The file is not cached, and being downloaded. 418 reader_proxy_.reset( 419 new internal::NetworkReaderProxy( 420 range_start, range_length, 421 entry->file_info().size(), cancel_download_closure_)); 422 callback.Run(net::OK, entry.Pass()); 423 return; 424 } 425 426 // Otherwise, open the stream for file. 427 scoped_ptr<util::LocalFileReader> file_reader( 428 new util::LocalFileReader(file_task_runner_.get())); 429 util::LocalFileReader* file_reader_ptr = file_reader.get(); 430 file_reader_ptr->Open( 431 local_cache_file_path, 432 range_start, 433 base::Bind( 434 &DriveFileStreamReader::InitializeAfterLocalFileOpen, 435 weak_ptr_factory_.GetWeakPtr(), 436 range_length, 437 callback, 438 base::Passed(&entry), 439 base::Passed(&file_reader))); 440} 441 442void DriveFileStreamReader::InitializeAfterLocalFileOpen( 443 int64 length, 444 const InitializeCompletionCallback& callback, 445 scoped_ptr<ResourceEntry> entry, 446 scoped_ptr<util::LocalFileReader> file_reader, 447 int open_result) { 448 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 449 450 if (open_result != net::OK) { 451 callback.Run(net::ERR_FAILED, scoped_ptr<ResourceEntry>()); 452 return; 453 } 454 455 reader_proxy_.reset( 456 new internal::LocalReaderProxy(file_reader.Pass(), length)); 457 callback.Run(net::OK, entry.Pass()); 458} 459 460void DriveFileStreamReader::OnGetContent(google_apis::GDataErrorCode error_code, 461 scoped_ptr<std::string> data) { 462 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 463 DCHECK(reader_proxy_); 464 reader_proxy_->OnGetContent(data.Pass()); 465} 466 467void DriveFileStreamReader::OnGetFileContentCompletion( 468 const InitializeCompletionCallback& callback, 469 FileError error) { 470 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); 471 472 if (reader_proxy_) { 473 // If the proxy object available, send the error to it. 474 reader_proxy_->OnCompleted(error); 475 } else { 476 // Here the proxy object is not yet available. 477 // There are two cases. 1) Some error happens during the initialization. 478 // 2) the cache file is found, but the proxy object is not *yet* 479 // initialized because the file is being opened. 480 // We are interested in 1) only. The callback for 2) will be called 481 // after opening the file is completed. 482 // Note: due to the same reason, LocalReaderProxy::OnCompleted may 483 // or may not be called. This is timing issue, and it is difficult to avoid 484 // unfortunately. 485 if (error != FILE_ERROR_OK) { 486 callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>()); 487 } 488 } 489} 490 491} // namespace drive 492