gcs_file_system.cc revision 5c1821be018d4a626efd0a9cee7844aaa8c69366
1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include "tensorflow/core/platform/cloud/gcs_file_system.h" 17#include <stdio.h> 18#include <unistd.h> 19#include <algorithm> 20#include <cstdio> 21#include <cstdlib> 22#include <cstring> 23#include <fstream> 24#include <vector> 25#include "include/json/json.h" 26#include "tensorflow/core/lib/core/errors.h" 27#include "tensorflow/core/lib/gtl/map_util.h" 28#include "tensorflow/core/lib/gtl/stl_util.h" 29#include "tensorflow/core/lib/io/path.h" 30#include "tensorflow/core/lib/strings/numbers.h" 31#include "tensorflow/core/lib/strings/str_util.h" 32#include "tensorflow/core/platform/cloud/google_auth_provider.h" 33#include "tensorflow/core/platform/cloud/time_util.h" 34#include "tensorflow/core/platform/env.h" 35#include "tensorflow/core/platform/mutex.h" 36#include "tensorflow/core/platform/protobuf.h" 37#include "tensorflow/core/platform/thread_annotations.h" 38 39namespace tensorflow { 40 41namespace { 42 43constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/"; 44constexpr char kGcsUploadUriBase[] = 45 "https://www.googleapis.com/upload/storage/v1/"; 46constexpr char kStorageHost[] = "storage.googleapis.com"; 47constexpr size_t kBufferSize = 1024 * 1024; // In bytes. 48constexpr int kGetChildrenDefaultPageSize = 1000; 49// Initial delay before retrying a GCS upload. 50// Subsequent delays can be larger due to exponential back-off. 51constexpr uint64 kUploadRetryDelayMicros = 1000000L; 52// The HTTP response code "308 Resume Incomplete". 53constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308; 54 55Status GetTmpFilename(string* filename) { 56 if (!filename) { 57 return errors::Internal("'filename' cannot be nullptr."); 58 } 59 char buffer[] = "/tmp/gcs_filesystem_XXXXXX"; 60 int fd = mkstemp(buffer); 61 if (fd < 0) { 62 return errors::Internal("Failed to create a temporary file."); 63 } 64 close(fd); 65 *filename = buffer; 66 return Status::OK(); 67} 68 69/// \brief Splits a GCS path to a bucket and an object. 70/// 71/// For example, "gs://bucket-name/path/to/file.txt" gets split into 72/// "bucket-name" and "path/to/file.txt". 73/// If fname only contains the bucket and empty_object_ok = true, the returned 74/// object is empty. 75Status ParseGcsPath(StringPiece fname, bool empty_object_ok, string* bucket, 76 string* object) { 77 if (!bucket || !object) { 78 return errors::Internal("bucket and object cannot be null."); 79 } 80 StringPiece scheme, bucketp, objectp; 81 ParseURI(fname, &scheme, &bucketp, &objectp); 82 if (scheme != "gs") { 83 return errors::InvalidArgument( 84 strings::StrCat("GCS path doesn't start with 'gs://': ", fname)); 85 } 86 *bucket = bucketp.ToString(); 87 if (bucket->empty() || *bucket == ".") { 88 return errors::InvalidArgument( 89 strings::StrCat("GCS path doesn't contain a bucket name: ", fname)); 90 } 91 objectp.Consume("/"); 92 *object = objectp.ToString(); 93 if (!empty_object_ok && object->empty()) { 94 return errors::InvalidArgument( 95 strings::StrCat("GCS path doesn't contain an object name: ", fname)); 96 } 97 return Status::OK(); 98} 99 100/// Appends a trailing slash if the name doesn't already have one. 101string MaybeAppendSlash(const string& name) { 102 if (name.empty()) { 103 return "/"; 104 } 105 if (name.back() != '/') { 106 return strings::StrCat(name, "/"); 107 } 108 return name; 109} 110 111Status ParseJson(StringPiece json, Json::Value* result) { 112 Json::Reader reader; 113 if (!reader.parse(json.ToString(), *result)) { 114 return errors::Internal("Couldn't parse JSON response from GCS."); 115 } 116 return Status::OK(); 117} 118 119/// Reads a JSON value with the given name from a parent JSON value. 120Status GetValue(const Json::Value& parent, const string& name, 121 Json::Value* result) { 122 *result = parent.get(name, Json::Value::null); 123 if (*result == Json::Value::null) { 124 return errors::Internal(strings::StrCat( 125 "The field '", name, "' was expected in the JSON response.")); 126 } 127 return Status::OK(); 128} 129 130/// Reads a string JSON value with the given name from a parent JSON value. 131Status GetStringValue(const Json::Value& parent, const string& name, 132 string* result) { 133 Json::Value result_value; 134 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value)); 135 if (!result_value.isString()) { 136 return errors::Internal( 137 strings::StrCat("The field '", name, 138 "' in the JSON response was expected to be a string.")); 139 } 140 *result = result_value.asString(); 141 return Status::OK(); 142} 143 144/// Reads a long JSON value with the given name from a parent JSON value. 145Status GetInt64Value(const Json::Value& parent, const string& name, 146 int64* result) { 147 Json::Value result_value; 148 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value)); 149 if (result_value.isNumeric()) { 150 *result = result_value.asInt64(); 151 return Status::OK(); 152 } 153 if (result_value.isString() && 154 strings::safe_strto64(result_value.asString().c_str(), result)) { 155 return Status::OK(); 156 } 157 return errors::Internal( 158 strings::StrCat("The field '", name, 159 "' in the JSON response was expected to be a number.")); 160} 161 162/// Reads a boolean JSON value with the given name from a parent JSON value. 163Status GetBoolValue(const Json::Value& parent, const string& name, 164 bool* result) { 165 Json::Value result_value; 166 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value)); 167 if (!result_value.isBool()) { 168 return errors::Internal(strings::StrCat( 169 "The field '", name, 170 "' in the JSON response was expected to be a boolean.")); 171 } 172 *result = result_value.asBool(); 173 return Status::OK(); 174} 175 176/// A GCS-based implementation of a random access file with a read-ahead buffer. 177class GcsRandomAccessFile : public RandomAccessFile { 178 public: 179 GcsRandomAccessFile(const string& bucket, const string& object, 180 AuthProvider* auth_provider, 181 HttpRequest::Factory* http_request_factory, 182 size_t read_ahead_bytes) 183 : bucket_(bucket), 184 object_(object), 185 auth_provider_(auth_provider), 186 http_request_factory_(http_request_factory), 187 read_ahead_bytes_(read_ahead_bytes) {} 188 189 /// The implementation of reads with a read-ahead buffer. Thread-safe. 190 Status Read(uint64 offset, size_t n, StringPiece* result, 191 char* scratch) const override { 192 mutex_lock lock(mu_); 193 const bool range_start_included = offset >= buffer_start_offset_; 194 const bool range_end_included = 195 offset + n <= buffer_start_offset_ + buffer_content_size_; 196 if (range_start_included && (range_end_included || buffer_reached_eof_)) { 197 // The requested range can be filled from the buffer. 198 const size_t offset_in_buffer = 199 std::min<uint64>(offset - buffer_start_offset_, buffer_content_size_); 200 const auto copy_size = 201 std::min(n, buffer_content_size_ - offset_in_buffer); 202 std::memcpy(scratch, buffer_.get() + offset_in_buffer, copy_size); 203 *result = StringPiece(scratch, copy_size); 204 } else { 205 // Update the buffer content based on the new requested range. 206 const size_t desired_buffer_size = n + read_ahead_bytes_; 207 if (n > buffer_size_ || desired_buffer_size > 2 * buffer_size_) { 208 // Re-allocate only if buffer size increased significantly. 209 buffer_.reset(new char[desired_buffer_size]); 210 buffer_size_ = desired_buffer_size; 211 } 212 213 buffer_start_offset_ = offset; 214 buffer_content_size_ = 0; 215 StringPiece buffer_content; 216 TF_RETURN_IF_ERROR( 217 ReadFromGCS(offset, buffer_size_, &buffer_content, buffer_.get())); 218 buffer_content_size_ = buffer_content.size(); 219 buffer_reached_eof_ = buffer_content_size_ < buffer_size_; 220 221 // Set the results. 222 *result = StringPiece(scratch, std::min(buffer_content_size_, n)); 223 std::memcpy(scratch, buffer_.get(), result->size()); 224 } 225 226 if (result->size() < n) { 227 // This is not an error per se. The RandomAccessFile interface expects 228 // that Read returns OutOfRange if fewer bytes were read than requested. 229 return errors::OutOfRange(strings::StrCat("EOF reached, ", result->size(), 230 " bytes were read out of ", n, 231 " bytes requested.")); 232 } 233 return Status::OK(); 234 } 235 236 private: 237 /// A helper function to actually read the data from GCS. 238 Status ReadFromGCS(uint64 offset, size_t n, StringPiece* result, 239 char* scratch) const { 240 string auth_token; 241 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token)); 242 243 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 244 TF_RETURN_IF_ERROR(request->Init()); 245 TF_RETURN_IF_ERROR( 246 request->SetUri(strings::StrCat("https://", bucket_, ".", kStorageHost, 247 "/", request->EscapeString(object_)))); 248 TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); 249 TF_RETURN_IF_ERROR(request->SetRange(offset, offset + n - 1)); 250 TF_RETURN_IF_ERROR(request->SetResultBuffer(scratch, n, result)); 251 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://", 252 bucket_, "/", object_); 253 return Status::OK(); 254 } 255 256 string bucket_; 257 string object_; 258 AuthProvider* auth_provider_; 259 HttpRequest::Factory* http_request_factory_; 260 const size_t read_ahead_bytes_; 261 262 // The buffer-related members need to be mutable, because they are modified 263 // by the const Read() method. 264 mutable mutex mu_; 265 mutable std::unique_ptr<char[]> buffer_ GUARDED_BY(mu_); 266 mutable size_t buffer_size_ GUARDED_BY(mu_) = 0; 267 // The original file offset of the first byte in the buffer. 268 mutable size_t buffer_start_offset_ GUARDED_BY(mu_) = 0; 269 mutable size_t buffer_content_size_ GUARDED_BY(mu_) = 0; 270 mutable bool buffer_reached_eof_ GUARDED_BY(mu_) = false; 271}; 272 273/// \brief GCS-based implementation of a writeable file. 274/// 275/// Since GCS objects are immutable, this implementation writes to a local 276/// tmp file and copies it to GCS on flush/close. 277class GcsWritableFile : public WritableFile { 278 public: 279 GcsWritableFile(const string& bucket, const string& object, 280 AuthProvider* auth_provider, 281 HttpRequest::Factory* http_request_factory, 282 int32 max_upload_attempts) 283 : bucket_(bucket), 284 object_(object), 285 auth_provider_(auth_provider), 286 http_request_factory_(http_request_factory), 287 max_upload_attempts_(max_upload_attempts) { 288 if (GetTmpFilename(&tmp_content_filename_).ok()) { 289 outfile_.open(tmp_content_filename_, 290 std::ofstream::binary | std::ofstream::app); 291 } 292 } 293 294 /// \brief Constructs the writable file in append mode. 295 /// 296 /// tmp_content_filename should contain a path of an existing temporary file 297 /// with the content to be appended. The class takes onwnership of the 298 /// specified tmp file and deletes it on close. 299 GcsWritableFile(const string& bucket, const string& object, 300 AuthProvider* auth_provider, 301 const string& tmp_content_filename, 302 HttpRequest::Factory* http_request_factory, 303 int32 max_upload_attempts) 304 : bucket_(bucket), 305 object_(object), 306 auth_provider_(auth_provider), 307 http_request_factory_(http_request_factory), 308 max_upload_attempts_(max_upload_attempts) { 309 tmp_content_filename_ = tmp_content_filename; 310 outfile_.open(tmp_content_filename_, 311 std::ofstream::binary | std::ofstream::app); 312 } 313 314 ~GcsWritableFile() { Close(); } 315 316 Status Append(const StringPiece& data) override { 317 TF_RETURN_IF_ERROR(CheckWritable()); 318 outfile_ << data; 319 if (!outfile_.good()) { 320 return errors::Internal( 321 "Could not append to the internal temporary file."); 322 } 323 return Status::OK(); 324 } 325 326 Status Close() override { 327 if (outfile_.is_open()) { 328 TF_RETURN_IF_ERROR(Sync()); 329 outfile_.close(); 330 std::remove(tmp_content_filename_.c_str()); 331 } 332 return Status::OK(); 333 } 334 335 Status Flush() override { return Sync(); } 336 337 /// Copies the current version of the file to GCS. 338 /// 339 /// This Sync() uploads the object to GCS. 340 /// In case of a failure, it resumes failed uploads as recommended by the GCS 341 /// resumable API documentation. When the whole upload needs to be 342 /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem. 343 Status Sync() override { 344 TF_RETURN_IF_ERROR(CheckWritable()); 345 outfile_.flush(); 346 if (!outfile_.good()) { 347 return errors::Internal( 348 "Could not write to the internal temporary file."); 349 } 350 string session_uri; 351 TF_RETURN_IF_ERROR(CreateNewUploadSession(&session_uri)); 352 uint64 already_uploaded = 0; 353 for (int attempt = 0; attempt < max_upload_attempts_; attempt++) { 354 if (attempt > 0) { 355 bool completed; 356 TF_RETURN_IF_ERROR(RequestUploadSessionStatus(session_uri, &completed, 357 &already_uploaded)); 358 if (completed) { 359 // It's unclear why UploadToSession didn't return OK in the previous 360 // attempt, but GCS reports that the file is fully uploaded, 361 // so succeed. 362 return Status::OK(); 363 } 364 } 365 const Status upload_status = 366 UploadToSession(session_uri, already_uploaded); 367 if (upload_status.ok()) { 368 return Status::OK(); 369 } 370 switch (upload_status.code()) { 371 case errors::Code::NOT_FOUND: 372 // GCS docs recommend retrying the whole upload. We're relying on the 373 // RetryingFileSystem to retry the Sync() call. 374 return errors::Unavailable( 375 strings::StrCat("Could not upload gs://", bucket_, "/", object_)); 376 case errors::Code::UNAVAILABLE: 377 // The upload can be resumed, but GCS docs recommend an exponential 378 // back-off. 379 Env::Default()->SleepForMicroseconds(kUploadRetryDelayMicros 380 << attempt); 381 break; 382 default: 383 // Something unexpected happen, fail. 384 return upload_status; 385 } 386 } 387 return errors::Aborted( 388 strings::StrCat("Upload gs://", bucket_, "/", object_, " failed.")); 389 } 390 391 private: 392 Status CheckWritable() const { 393 if (!outfile_.is_open()) { 394 return errors::FailedPrecondition( 395 "The internal temporary file is not writable."); 396 } 397 return Status::OK(); 398 } 399 400 Status GetCurrentFileSize(uint64* size) { 401 if (size == nullptr) { 402 return errors::Internal("'size' cannot be nullptr"); 403 } 404 const auto tellp = outfile_.tellp(); 405 if (tellp == -1) { 406 return errors::Internal( 407 "Could not get the size of the internal temporary file."); 408 } 409 *size = tellp; 410 return Status::OK(); 411 } 412 413 /// Initiates a new resumable upload session. 414 Status CreateNewUploadSession(string* session_uri) { 415 if (session_uri == nullptr) { 416 return errors::Internal("'session_uri' cannot be nullptr."); 417 } 418 uint64 file_size; 419 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size)); 420 421 string auth_token; 422 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token)); 423 424 std::unique_ptr<char[]> scratch(new char[kBufferSize]); 425 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 426 TF_RETURN_IF_ERROR(request->Init()); 427 TF_RETURN_IF_ERROR(request->SetUri(strings::StrCat( 428 kGcsUploadUriBase, "b/", bucket_, "/o?uploadType=resumable&name=", 429 request->EscapeString(object_)))); 430 TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); 431 TF_RETURN_IF_ERROR(request->AddHeader("X-Upload-Content-Length", 432 std::to_string(file_size))); 433 TF_RETURN_IF_ERROR(request->SetPostEmptyBody()); 434 StringPiece response_piece; 435 TF_RETURN_IF_ERROR( 436 request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece)); 437 TF_RETURN_WITH_CONTEXT_IF_ERROR( 438 request->Send(), " when initiating an upload to ", GetGcsPath()); 439 *session_uri = request->GetResponseHeader("Location"); 440 if (session_uri->empty()) { 441 return errors::Internal( 442 strings::StrCat("Unexpected response from GCS when writing to ", 443 GetGcsPath(), ": 'Location' header not returned.")); 444 } 445 return Status::OK(); 446 } 447 448 /// \brief Requests status of a previously initiated upload session. 449 /// 450 /// If the upload has already succeeded, sets 'completed' to true. 451 /// Otherwise sets 'completed' to false and 'uploaded' to the currently 452 /// uploaded size in bytes. 453 Status RequestUploadSessionStatus(const string& session_uri, bool* completed, 454 uint64* uploaded) { 455 if (completed == nullptr || uploaded == nullptr) { 456 return errors::Internal("'completed' and 'uploaded' cannot be nullptr."); 457 } 458 uint64 file_size; 459 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size)); 460 461 string auth_token; 462 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token)); 463 464 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 465 TF_RETURN_IF_ERROR(request->Init()); 466 TF_RETURN_IF_ERROR(request->SetUri(session_uri)); 467 TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); 468 TF_RETURN_IF_ERROR(request->AddHeader( 469 "Content-Range", strings::StrCat("bytes */", file_size))); 470 TF_RETURN_IF_ERROR(request->SetPutEmptyBody()); 471 const Status& status = request->Send(); 472 if (status.ok()) { 473 *completed = true; 474 return Status::OK(); 475 } 476 *completed = false; 477 if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) { 478 TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ", 479 GetGcsPath()); 480 } 481 const string& received_range = request->GetResponseHeader("Range"); 482 if (received_range.empty()) { 483 // This means GCS doesn't have any bytes of the file yet. 484 *uploaded = 0; 485 } else { 486 StringPiece range_piece(received_range); 487 range_piece.Consume("bytes="); // May or may not be present. 488 std::vector<int32> range_parts; 489 if (!str_util::SplitAndParseAsInts(range_piece, '-', &range_parts) || 490 range_parts.size() != 2) { 491 return errors::Internal(strings::StrCat( 492 "Unexpected response from GCS when writing ", GetGcsPath(), 493 ": Range header '", received_range, "' could not be parsed.")); 494 } 495 if (range_parts[0] != 0) { 496 return errors::Internal( 497 strings::StrCat("Unexpected response from GCS when writing to ", 498 GetGcsPath(), ": the returned range '", 499 received_range, "' does not start at zero.")); 500 } 501 // If GCS returned "Range: 0-10", this means 11 bytes were uploaded. 502 *uploaded = range_parts[1] + 1; 503 } 504 return Status::OK(); 505 } 506 507 Status UploadToSession(const string& session_uri, uint64 start_offset) { 508 uint64 file_size; 509 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size)); 510 511 string auth_token; 512 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token)); 513 514 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 515 TF_RETURN_IF_ERROR(request->Init()); 516 TF_RETURN_IF_ERROR(request->SetUri(session_uri)); 517 TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); 518 if (file_size > 0) { 519 TF_RETURN_IF_ERROR(request->AddHeader( 520 "Content-Range", strings::StrCat("bytes ", start_offset, "-", 521 file_size - 1, "/", file_size))); 522 } 523 TF_RETURN_IF_ERROR( 524 request->SetPutFromFile(tmp_content_filename_, start_offset)); 525 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ", 526 GetGcsPath()); 527 return Status::OK(); 528 } 529 530 string GetGcsPath() const { 531 return strings::StrCat("gs://", bucket_, "/", object_); 532 } 533 534 string bucket_; 535 string object_; 536 AuthProvider* auth_provider_; 537 string tmp_content_filename_; 538 std::ofstream outfile_; 539 HttpRequest::Factory* http_request_factory_; 540 int32 max_upload_attempts_; 541}; 542 543class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { 544 public: 545 GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length) 546 : data_(std::move(data)), length_(length) {} 547 const void* data() override { return reinterpret_cast<void*>(data_.get()); } 548 uint64 length() override { return length_; } 549 550 private: 551 std::unique_ptr<char[]> data_; 552 uint64 length_; 553}; 554} // namespace 555 556GcsFileSystem::GcsFileSystem() 557 : auth_provider_(new GoogleAuthProvider()), 558 http_request_factory_(new HttpRequest::Factory()) {} 559 560GcsFileSystem::GcsFileSystem( 561 std::unique_ptr<AuthProvider> auth_provider, 562 std::unique_ptr<HttpRequest::Factory> http_request_factory, 563 size_t read_ahead_bytes, int32 max_upload_attempts) 564 : auth_provider_(std::move(auth_provider)), 565 http_request_factory_(std::move(http_request_factory)), 566 read_ahead_bytes_(read_ahead_bytes), 567 max_upload_attempts_(max_upload_attempts) {} 568 569Status GcsFileSystem::NewRandomAccessFile( 570 const string& fname, std::unique_ptr<RandomAccessFile>* result) { 571 string bucket, object; 572 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 573 result->reset(new GcsRandomAccessFile(bucket, object, auth_provider_.get(), 574 http_request_factory_.get(), 575 read_ahead_bytes_)); 576 return Status::OK(); 577} 578 579Status GcsFileSystem::NewWritableFile(const string& fname, 580 std::unique_ptr<WritableFile>* result) { 581 string bucket, object; 582 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 583 result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(), 584 http_request_factory_.get(), 585 max_upload_attempts_)); 586 return Status::OK(); 587} 588 589// Reads the file from GCS in chunks and stores it in a tmp file, 590// which is then passed to GcsWritableFile. 591Status GcsFileSystem::NewAppendableFile(const string& fname, 592 std::unique_ptr<WritableFile>* result) { 593 std::unique_ptr<RandomAccessFile> reader; 594 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader)); 595 std::unique_ptr<char[]> buffer(new char[kBufferSize]); 596 Status status; 597 uint64 offset = 0; 598 StringPiece read_chunk; 599 600 // Read the file from GCS in chunks and save it to a tmp file. 601 string old_content_filename; 602 TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename)); 603 std::ofstream old_content(old_content_filename, std::ofstream::binary); 604 while (true) { 605 status = reader->Read(offset, kBufferSize, &read_chunk, buffer.get()); 606 if (status.ok()) { 607 old_content << read_chunk; 608 offset += kBufferSize; 609 } else if (status.code() == error::OUT_OF_RANGE) { 610 // Expected, this means we reached EOF. 611 old_content << read_chunk; 612 break; 613 } else { 614 return status; 615 } 616 } 617 old_content.close(); 618 619 // Create a writable file and pass the old content to it. 620 string bucket, object; 621 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 622 result->reset(new GcsWritableFile( 623 bucket, object, auth_provider_.get(), old_content_filename, 624 http_request_factory_.get(), max_upload_attempts_)); 625 return Status::OK(); 626} 627 628Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile( 629 const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) { 630 uint64 size; 631 TF_RETURN_IF_ERROR(GetFileSize(fname, &size)); 632 std::unique_ptr<char[]> data(new char[size]); 633 634 std::unique_ptr<RandomAccessFile> file; 635 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file)); 636 637 StringPiece piece; 638 TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get())); 639 640 result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size)); 641 return Status::OK(); 642} 643 644bool GcsFileSystem::FileExists(const string& fname) { 645 string bucket, object; 646 if (!ParseGcsPath(fname, true, &bucket, &object).ok()) { 647 LOG(ERROR) << "Could not parse GCS file name " << fname; 648 return false; 649 } 650 if (object.empty()) { 651 return BucketExists(bucket).ok(); 652 } 653 return ObjectExists(bucket, object).ok() || FolderExists(fname).ok(); 654} 655 656Status GcsFileSystem::ObjectExists(const string& bucket, const string& object) { 657 FileStatistics stat; 658 return StatForObject(bucket, object, &stat); 659} 660 661Status GcsFileSystem::StatForObject(const string& bucket, const string& object, 662 FileStatistics* stat) { 663 if (!stat) { 664 return errors::Internal("'stat' cannot be nullptr."); 665 } 666 if (object.empty()) { 667 return errors::InvalidArgument("'object' must be a non-empty string."); 668 } 669 670 string auth_token; 671 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token)); 672 673 std::unique_ptr<char[]> scratch(new char[kBufferSize]); 674 StringPiece response_piece; 675 676 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 677 TF_RETURN_IF_ERROR(request->Init()); 678 TF_RETURN_IF_ERROR(request->SetUri(strings::StrCat( 679 kGcsUriBase, "b/", bucket, "/o/", request->EscapeString(object), 680 "?fields=size%2Cupdated"))); 681 TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); 682 TF_RETURN_IF_ERROR( 683 request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece)); 684 TF_RETURN_WITH_CONTEXT_IF_ERROR( 685 request->Send(), " when reading metadata of gs://", bucket, "/", object); 686 687 Json::Value root; 688 TF_RETURN_IF_ERROR(ParseJson(response_piece, &root)); 689 690 // Parse file size. 691 TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &(stat->length))); 692 693 // Parse file modification time. 694 string updated; 695 TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated)); 696 TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->mtime_nsec))); 697 698 stat->is_directory = false; 699 700 return Status::OK(); 701} 702 703Status GcsFileSystem::BucketExists(const string& bucket) { 704 string auth_token; 705 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token)); 706 707 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 708 TF_RETURN_IF_ERROR(request->Init()); 709 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket)); 710 request->AddAuthBearerHeader(auth_token); 711 return request->Send(); 712} 713 714Status GcsFileSystem::FolderExists(const string& dirname) { 715 std::vector<string> children; 716 TF_RETURN_IF_ERROR(GetChildrenBounded(dirname, 1, &children, true)); 717 if (children.empty()) { 718 return errors::NotFound("Folder does not exist."); 719 } 720 return Status::OK(); 721} 722 723Status GcsFileSystem::GetChildren(const string& dirname, 724 std::vector<string>* result) { 725 return GetChildrenBounded(dirname, UINT64_MAX, result, false); 726} 727 728Status GcsFileSystem::GetMatchingPaths(const string& pattern, 729 std::vector<string>* results) { 730 results->clear(); 731 // Find the fixed prefix by looking for the first wildcard. 732 const string& fixed_prefix = 733 pattern.substr(0, pattern.find_first_of("*?[\\")); 734 const string& dir = io::Dirname(fixed_prefix).ToString(); 735 if (dir.empty()) { 736 return errors::InvalidArgument( 737 strings::StrCat("A GCS pattern doesn't have a bucket name: ", pattern)); 738 } 739 std::vector<string> all_files; 740 TF_RETURN_IF_ERROR(GetChildrenBounded(dir, UINT64_MAX, &all_files, true)); 741 742 // Match all obtained files to the input pattern. 743 for (const auto& f : all_files) { 744 const string& full_path = io::JoinPath(dir, f); 745 if (Env::Default()->MatchPath(full_path, pattern)) { 746 results->push_back(full_path); 747 } 748 } 749 return Status::OK(); 750} 751 752Status GcsFileSystem::GetChildrenBounded(const string& dirname, 753 uint64 max_results, 754 std::vector<string>* result, 755 bool recursive) { 756 if (!result) { 757 return errors::InvalidArgument("'result' cannot be null"); 758 } 759 string bucket, object_prefix; 760 TF_RETURN_IF_ERROR( 761 ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix)); 762 763 string nextPageToken; 764 uint64 retrieved_results = 0; 765 while (true) { // A loop over multiple result pages. 766 string auth_token; 767 TF_RETURN_IF_ERROR( 768 AuthProvider::GetToken(auth_provider_.get(), &auth_token)); 769 770 std::unique_ptr<char[]> scratch(new char[kBufferSize]); 771 StringPiece response_piece; 772 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 773 TF_RETURN_IF_ERROR(request->Init()); 774 auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o"); 775 if (recursive) { 776 uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken"); 777 } else { 778 // Set "/" as a delimiter to ask GCS to treat subfolders as children 779 // and return them in "prefixes". 780 uri = strings::StrCat(uri, 781 "?fields=items%2Fname%2Cprefixes%2CnextPageToken"); 782 uri = strings::StrCat(uri, "&delimiter=%2F"); 783 } 784 if (!object_prefix.empty()) { 785 uri = strings::StrCat(uri, "&prefix=", 786 request->EscapeString(object_prefix)); 787 } 788 if (!nextPageToken.empty()) { 789 uri = strings::StrCat(uri, "&pageToken=", 790 request->EscapeString(nextPageToken)); 791 } 792 if (max_results - retrieved_results < kGetChildrenDefaultPageSize) { 793 uri = 794 strings::StrCat(uri, "&maxResults=", max_results - retrieved_results); 795 } 796 TF_RETURN_IF_ERROR(request->SetUri(uri)); 797 TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); 798 TF_RETURN_IF_ERROR( 799 request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece)); 800 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname); 801 Json::Value root; 802 TF_RETURN_IF_ERROR(ParseJson(response_piece, &root)); 803 const auto items = root.get("items", Json::Value::null); 804 if (items == Json::Value::null) { 805 // Empty results. 806 return Status::OK(); 807 } 808 if (!items.isArray()) { 809 return errors::Internal("Expected an array 'items' in the GCS response."); 810 } 811 for (size_t i = 0; i < items.size(); i++) { 812 const auto item = items.get(i, Json::Value::null); 813 if (!item.isObject()) { 814 return errors::Internal( 815 "Unexpected JSON format: 'items' should be a list of objects."); 816 } 817 string name; 818 TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name)); 819 // The names should be relative to the 'dirname'. That means the 820 // 'object_prefix', which is part of 'dirname', should be removed from the 821 // beginning of 'name'. 822 StringPiece relative_path(name); 823 if (!relative_path.Consume(object_prefix)) { 824 return errors::Internal( 825 strings::StrCat("Unexpected response: the returned file name ", 826 name, " doesn't match the prefix ", object_prefix)); 827 } 828 result->emplace_back(relative_path.ToString()); 829 if (++retrieved_results >= max_results) { 830 return Status::OK(); 831 } 832 } 833 const auto prefixes = root.get("prefixes", Json::Value::null); 834 if (prefixes != Json::Value::null) { 835 // Subfolders are returned for the non-recursive mode. 836 if (!prefixes.isArray()) { 837 return errors::Internal( 838 "'prefixes' was expected to be an array in the GCS response."); 839 } 840 for (size_t i = 0; i < prefixes.size(); i++) { 841 const auto prefix = prefixes.get(i, Json::Value::null); 842 if (prefix == Json::Value::null || !prefix.isString()) { 843 return errors::Internal( 844 "'prefixes' was expected to be an array of strings in the GCS " 845 "response."); 846 } 847 const string& prefix_str = prefix.asString(); 848 StringPiece relative_path(prefix_str); 849 if (!relative_path.Consume(object_prefix)) { 850 return errors::Internal(strings::StrCat( 851 "Unexpected response: the returned folder name ", prefix_str, 852 " doesn't match the prefix ", object_prefix)); 853 } 854 result->emplace_back(relative_path.ToString()); 855 if (++retrieved_results >= max_results) { 856 return Status::OK(); 857 } 858 } 859 } 860 const auto token = root.get("nextPageToken", Json::Value::null); 861 if (token == Json::Value::null) { 862 return Status::OK(); 863 } 864 if (!token.isString()) { 865 return errors::Internal( 866 "Unexpected response: nextPageToken is not a string"); 867 } 868 nextPageToken = token.asString(); 869 } 870} 871 872Status GcsFileSystem::Stat(const string& fname, FileStatistics* stat) { 873 if (!stat) { 874 return errors::Internal("'stat' cannot be nullptr."); 875 } 876 string bucket, object; 877 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object)); 878 if (StatForObject(bucket, object, stat).ok()) { 879 return Status::OK(); 880 } 881 if ((object.empty() && BucketExists(bucket).ok()) || 882 (!object.empty() && FolderExists(fname).ok())) { 883 stat->length = 0; 884 stat->mtime_nsec = 0; 885 stat->is_directory = true; 886 return Status::OK(); 887 } 888 return errors::NotFound( 889 strings::StrCat("The specified path ", fname, " was not found.")); 890} 891 892Status GcsFileSystem::DeleteFile(const string& fname) { 893 string bucket, object; 894 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 895 896 string auth_token; 897 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token)); 898 899 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 900 TF_RETURN_IF_ERROR(request->Init()); 901 TF_RETURN_IF_ERROR(request->SetUri(strings::StrCat( 902 kGcsUriBase, "b/", bucket, "/o/", request->EscapeString(object)))); 903 TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); 904 TF_RETURN_IF_ERROR(request->SetDeleteRequest()); 905 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname); 906 return Status::OK(); 907} 908 909Status GcsFileSystem::CreateDir(const string& dirname) { 910 string bucket, object; 911 TF_RETURN_IF_ERROR(ParseGcsPath(dirname, true, &bucket, &object)); 912 if (object.empty()) { 913 if (BucketExists(bucket).ok()) { 914 return Status::OK(); 915 } 916 return errors::NotFound( 917 strings::StrCat("The specified bucket ", dirname, " was not found.")); 918 } 919 // Create a zero-length directory marker object. 920 std::unique_ptr<WritableFile> file; 921 TF_RETURN_IF_ERROR(NewWritableFile(MaybeAppendSlash(dirname), &file)); 922 TF_RETURN_IF_ERROR(file->Close()); 923 return Status::OK(); 924} 925 926// Checks that the directory is empty (i.e no objects with this prefix exist). 927// If it is, does nothing, because directories are not entities in GCS. 928Status GcsFileSystem::DeleteDir(const string& dirname) { 929 std::vector<string> children; 930 // A directory is considered empty either if there are no matching objects 931 // with the corresponding name prefix or if there is exactly one matching 932 // object and it is the directory marker. Therefore we need to retrieve 933 // at most two children for the prefix to detect if a directory is empty. 934 TF_RETURN_IF_ERROR(GetChildrenBounded(dirname, 2, &children, true)); 935 936 if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) { 937 return errors::FailedPrecondition("Cannot delete a non-empty directory."); 938 } 939 if (children.size() == 1 && children[0].empty()) { 940 // This is the directory marker object. Delete it. 941 return DeleteFile(MaybeAppendSlash(dirname)); 942 } 943 return Status::OK(); 944} 945 946Status GcsFileSystem::GetFileSize(const string& fname, uint64* file_size) { 947 if (!file_size) { 948 return errors::Internal("'file_size' cannot be nullptr."); 949 } 950 951 // Only validate the name. 952 string bucket, object; 953 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); 954 955 FileStatistics stat; 956 TF_RETURN_IF_ERROR(Stat(fname, &stat)); 957 *file_size = stat.length; 958 return Status::OK(); 959} 960 961Status GcsFileSystem::RenameFile(const string& src, const string& target) { 962 if (!IsDirectory(src).ok()) { 963 return RenameObject(src, target); 964 } 965 // Rename all individual objects in the directory one by one. 966 std::vector<string> children; 967 TF_RETURN_IF_ERROR(GetChildrenBounded(src, UINT64_MAX, &children, true)); 968 for (const string& subpath : children) { 969 // io::JoinPath() wouldn't work here, because we want an empty subpath 970 // to result in an appended slash in order for directory markers 971 // to be processed correctly: "gs://a/b" + "" should give "gs:/a/b/". 972 TF_RETURN_IF_ERROR( 973 RenameObject(strings::StrCat(MaybeAppendSlash(src), subpath), 974 strings::StrCat(MaybeAppendSlash(target), subpath))); 975 } 976 return Status::OK(); 977} 978 979// Uses a GCS API command to copy the object and then deletes the old one. 980Status GcsFileSystem::RenameObject(const string& src, const string& target) { 981 string src_bucket, src_object, target_bucket, target_object; 982 TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object)); 983 TF_RETURN_IF_ERROR( 984 ParseGcsPath(target, false, &target_bucket, &target_object)); 985 986 string auth_token; 987 TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token)); 988 989 std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); 990 TF_RETURN_IF_ERROR(request->Init()); 991 TF_RETURN_IF_ERROR(request->SetUri(strings::StrCat( 992 kGcsUriBase, "b/", src_bucket, "/o/", request->EscapeString(src_object), 993 "/rewriteTo/b/", target_bucket, "/o/", 994 request->EscapeString(target_object)))); 995 TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); 996 TF_RETURN_IF_ERROR(request->SetPostEmptyBody()); 997 std::unique_ptr<char[]> scratch(new char[kBufferSize]); 998 StringPiece response_piece; 999 TF_RETURN_IF_ERROR( 1000 request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece)); 1001 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src, 1002 " to ", target); 1003 1004 Json::Value root; 1005 TF_RETURN_IF_ERROR(ParseJson(response_piece, &root)); 1006 bool done; 1007 TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done)); 1008 if (!done) { 1009 // If GCS didn't complete rewrite in one call, this means that a large file 1010 // is being copied to a bucket with a different storage class or location, 1011 // which requires multiple rewrite calls. 1012 // TODO(surkov): implement multi-step rewrites. 1013 return errors::Unimplemented( 1014 strings::StrCat("Couldn't rename ", src, " to ", target, 1015 ": moving large files between buckets with different " 1016 "locations or storage classes is not supported.")); 1017 } 1018 1019 TF_RETURN_IF_ERROR(DeleteFile(src)); 1020 return Status::OK(); 1021} 1022 1023Status GcsFileSystem::IsDirectory(const string& fname) { 1024 string bucket, object; 1025 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object)); 1026 if (object.empty()) { 1027 if (BucketExists(bucket).ok()) { 1028 return Status::OK(); 1029 } 1030 return errors::NotFound(strings::StrCat("The specified bucket gs://", 1031 bucket, " was not found.")); 1032 } 1033 if (FolderExists(fname).ok()) { 1034 return Status::OK(); 1035 } 1036 if (ObjectExists(bucket, object).ok()) { 1037 return errors::FailedPrecondition( 1038 strings::StrCat("The specified path ", fname, " is not a directory.")); 1039 } 1040 return errors::NotFound( 1041 strings::StrCat("The specified path ", fname, " was not found.")); 1042} 1043 1044REGISTER_FILE_SYSTEM("gs", RetryingGcsFileSystem); 1045 1046} // namespace tensorflow 1047