gcs_file_system.cc revision 5c1821be018d4a626efd0a9cee7844aaa8c69366
1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/core/platform/cloud/gcs_file_system.h"
17#include <stdio.h>
18#include <unistd.h>
19#include <algorithm>
20#include <cstdio>
21#include <cstdlib>
22#include <cstring>
23#include <fstream>
24#include <vector>
25#include "include/json/json.h"
26#include "tensorflow/core/lib/core/errors.h"
27#include "tensorflow/core/lib/gtl/map_util.h"
28#include "tensorflow/core/lib/gtl/stl_util.h"
29#include "tensorflow/core/lib/io/path.h"
30#include "tensorflow/core/lib/strings/numbers.h"
31#include "tensorflow/core/lib/strings/str_util.h"
32#include "tensorflow/core/platform/cloud/google_auth_provider.h"
33#include "tensorflow/core/platform/cloud/time_util.h"
34#include "tensorflow/core/platform/env.h"
35#include "tensorflow/core/platform/mutex.h"
36#include "tensorflow/core/platform/protobuf.h"
37#include "tensorflow/core/platform/thread_annotations.h"
38
39namespace tensorflow {
40
41namespace {
42
43constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
44constexpr char kGcsUploadUriBase[] =
45    "https://www.googleapis.com/upload/storage/v1/";
46constexpr char kStorageHost[] = "storage.googleapis.com";
47constexpr size_t kBufferSize = 1024 * 1024;  // In bytes.
48constexpr int kGetChildrenDefaultPageSize = 1000;
49// Initial delay before retrying a GCS upload.
50// Subsequent delays can be larger due to exponential back-off.
51constexpr uint64 kUploadRetryDelayMicros = 1000000L;
52// The HTTP response code "308 Resume Incomplete".
53constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
54
55Status GetTmpFilename(string* filename) {
56  if (!filename) {
57    return errors::Internal("'filename' cannot be nullptr.");
58  }
59  char buffer[] = "/tmp/gcs_filesystem_XXXXXX";
60  int fd = mkstemp(buffer);
61  if (fd < 0) {
62    return errors::Internal("Failed to create a temporary file.");
63  }
64  close(fd);
65  *filename = buffer;
66  return Status::OK();
67}
68
69/// \brief Splits a GCS path to a bucket and an object.
70///
71/// For example, "gs://bucket-name/path/to/file.txt" gets split into
72/// "bucket-name" and "path/to/file.txt".
73/// If fname only contains the bucket and empty_object_ok = true, the returned
74/// object is empty.
75Status ParseGcsPath(StringPiece fname, bool empty_object_ok, string* bucket,
76                    string* object) {
77  if (!bucket || !object) {
78    return errors::Internal("bucket and object cannot be null.");
79  }
80  StringPiece scheme, bucketp, objectp;
81  ParseURI(fname, &scheme, &bucketp, &objectp);
82  if (scheme != "gs") {
83    return errors::InvalidArgument(
84        strings::StrCat("GCS path doesn't start with 'gs://': ", fname));
85  }
86  *bucket = bucketp.ToString();
87  if (bucket->empty() || *bucket == ".") {
88    return errors::InvalidArgument(
89        strings::StrCat("GCS path doesn't contain a bucket name: ", fname));
90  }
91  objectp.Consume("/");
92  *object = objectp.ToString();
93  if (!empty_object_ok && object->empty()) {
94    return errors::InvalidArgument(
95        strings::StrCat("GCS path doesn't contain an object name: ", fname));
96  }
97  return Status::OK();
98}
99
100/// Appends a trailing slash if the name doesn't already have one.
101string MaybeAppendSlash(const string& name) {
102  if (name.empty()) {
103    return "/";
104  }
105  if (name.back() != '/') {
106    return strings::StrCat(name, "/");
107  }
108  return name;
109}
110
111Status ParseJson(StringPiece json, Json::Value* result) {
112  Json::Reader reader;
113  if (!reader.parse(json.ToString(), *result)) {
114    return errors::Internal("Couldn't parse JSON response from GCS.");
115  }
116  return Status::OK();
117}
118
119/// Reads a JSON value with the given name from a parent JSON value.
120Status GetValue(const Json::Value& parent, const string& name,
121                Json::Value* result) {
122  *result = parent.get(name, Json::Value::null);
123  if (*result == Json::Value::null) {
124    return errors::Internal(strings::StrCat(
125        "The field '", name, "' was expected in the JSON response."));
126  }
127  return Status::OK();
128}
129
130/// Reads a string JSON value with the given name from a parent JSON value.
131Status GetStringValue(const Json::Value& parent, const string& name,
132                      string* result) {
133  Json::Value result_value;
134  TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
135  if (!result_value.isString()) {
136    return errors::Internal(
137        strings::StrCat("The field '", name,
138                        "' in the JSON response was expected to be a string."));
139  }
140  *result = result_value.asString();
141  return Status::OK();
142}
143
144/// Reads a long JSON value with the given name from a parent JSON value.
145Status GetInt64Value(const Json::Value& parent, const string& name,
146                     int64* result) {
147  Json::Value result_value;
148  TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
149  if (result_value.isNumeric()) {
150    *result = result_value.asInt64();
151    return Status::OK();
152  }
153  if (result_value.isString() &&
154      strings::safe_strto64(result_value.asString().c_str(), result)) {
155    return Status::OK();
156  }
157  return errors::Internal(
158      strings::StrCat("The field '", name,
159                      "' in the JSON response was expected to be a number."));
160}
161
162/// Reads a boolean JSON value with the given name from a parent JSON value.
163Status GetBoolValue(const Json::Value& parent, const string& name,
164                    bool* result) {
165  Json::Value result_value;
166  TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
167  if (!result_value.isBool()) {
168    return errors::Internal(strings::StrCat(
169        "The field '", name,
170        "' in the JSON response was expected to be a boolean."));
171  }
172  *result = result_value.asBool();
173  return Status::OK();
174}
175
176/// A GCS-based implementation of a random access file with a read-ahead buffer.
177class GcsRandomAccessFile : public RandomAccessFile {
178 public:
179  GcsRandomAccessFile(const string& bucket, const string& object,
180                      AuthProvider* auth_provider,
181                      HttpRequest::Factory* http_request_factory,
182                      size_t read_ahead_bytes)
183      : bucket_(bucket),
184        object_(object),
185        auth_provider_(auth_provider),
186        http_request_factory_(http_request_factory),
187        read_ahead_bytes_(read_ahead_bytes) {}
188
189  /// The implementation of reads with a read-ahead buffer. Thread-safe.
190  Status Read(uint64 offset, size_t n, StringPiece* result,
191              char* scratch) const override {
192    mutex_lock lock(mu_);
193    const bool range_start_included = offset >= buffer_start_offset_;
194    const bool range_end_included =
195        offset + n <= buffer_start_offset_ + buffer_content_size_;
196    if (range_start_included && (range_end_included || buffer_reached_eof_)) {
197      // The requested range can be filled from the buffer.
198      const size_t offset_in_buffer =
199          std::min<uint64>(offset - buffer_start_offset_, buffer_content_size_);
200      const auto copy_size =
201          std::min(n, buffer_content_size_ - offset_in_buffer);
202      std::memcpy(scratch, buffer_.get() + offset_in_buffer, copy_size);
203      *result = StringPiece(scratch, copy_size);
204    } else {
205      // Update the buffer content based on the new requested range.
206      const size_t desired_buffer_size = n + read_ahead_bytes_;
207      if (n > buffer_size_ || desired_buffer_size > 2 * buffer_size_) {
208        // Re-allocate only if buffer size increased significantly.
209        buffer_.reset(new char[desired_buffer_size]);
210        buffer_size_ = desired_buffer_size;
211      }
212
213      buffer_start_offset_ = offset;
214      buffer_content_size_ = 0;
215      StringPiece buffer_content;
216      TF_RETURN_IF_ERROR(
217          ReadFromGCS(offset, buffer_size_, &buffer_content, buffer_.get()));
218      buffer_content_size_ = buffer_content.size();
219      buffer_reached_eof_ = buffer_content_size_ < buffer_size_;
220
221      // Set the results.
222      *result = StringPiece(scratch, std::min(buffer_content_size_, n));
223      std::memcpy(scratch, buffer_.get(), result->size());
224    }
225
226    if (result->size() < n) {
227      // This is not an error per se. The RandomAccessFile interface expects
228      // that Read returns OutOfRange if fewer bytes were read than requested.
229      return errors::OutOfRange(strings::StrCat("EOF reached, ", result->size(),
230                                                " bytes were read out of ", n,
231                                                " bytes requested."));
232    }
233    return Status::OK();
234  }
235
236 private:
237  /// A helper function to actually read the data from GCS.
238  Status ReadFromGCS(uint64 offset, size_t n, StringPiece* result,
239                     char* scratch) const {
240    string auth_token;
241    TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token));
242
243    std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
244    TF_RETURN_IF_ERROR(request->Init());
245    TF_RETURN_IF_ERROR(
246        request->SetUri(strings::StrCat("https://", bucket_, ".", kStorageHost,
247                                        "/", request->EscapeString(object_))));
248    TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
249    TF_RETURN_IF_ERROR(request->SetRange(offset, offset + n - 1));
250    TF_RETURN_IF_ERROR(request->SetResultBuffer(scratch, n, result));
251    TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
252                                    bucket_, "/", object_);
253    return Status::OK();
254  }
255
256  string bucket_;
257  string object_;
258  AuthProvider* auth_provider_;
259  HttpRequest::Factory* http_request_factory_;
260  const size_t read_ahead_bytes_;
261
262  // The buffer-related members need to be mutable, because they are modified
263  // by the const Read() method.
264  mutable mutex mu_;
265  mutable std::unique_ptr<char[]> buffer_ GUARDED_BY(mu_);
266  mutable size_t buffer_size_ GUARDED_BY(mu_) = 0;
267  // The original file offset of the first byte in the buffer.
268  mutable size_t buffer_start_offset_ GUARDED_BY(mu_) = 0;
269  mutable size_t buffer_content_size_ GUARDED_BY(mu_) = 0;
270  mutable bool buffer_reached_eof_ GUARDED_BY(mu_) = false;
271};
272
273/// \brief GCS-based implementation of a writeable file.
274///
275/// Since GCS objects are immutable, this implementation writes to a local
276/// tmp file and copies it to GCS on flush/close.
277class GcsWritableFile : public WritableFile {
278 public:
279  GcsWritableFile(const string& bucket, const string& object,
280                  AuthProvider* auth_provider,
281                  HttpRequest::Factory* http_request_factory,
282                  int32 max_upload_attempts)
283      : bucket_(bucket),
284        object_(object),
285        auth_provider_(auth_provider),
286        http_request_factory_(http_request_factory),
287        max_upload_attempts_(max_upload_attempts) {
288    if (GetTmpFilename(&tmp_content_filename_).ok()) {
289      outfile_.open(tmp_content_filename_,
290                    std::ofstream::binary | std::ofstream::app);
291    }
292  }
293
294  /// \brief Constructs the writable file in append mode.
295  ///
296  /// tmp_content_filename should contain a path of an existing temporary file
297  /// with the content to be appended. The class takes onwnership of the
298  /// specified tmp file and deletes it on close.
299  GcsWritableFile(const string& bucket, const string& object,
300                  AuthProvider* auth_provider,
301                  const string& tmp_content_filename,
302                  HttpRequest::Factory* http_request_factory,
303                  int32 max_upload_attempts)
304      : bucket_(bucket),
305        object_(object),
306        auth_provider_(auth_provider),
307        http_request_factory_(http_request_factory),
308        max_upload_attempts_(max_upload_attempts) {
309    tmp_content_filename_ = tmp_content_filename;
310    outfile_.open(tmp_content_filename_,
311                  std::ofstream::binary | std::ofstream::app);
312  }
313
314  ~GcsWritableFile() { Close(); }
315
316  Status Append(const StringPiece& data) override {
317    TF_RETURN_IF_ERROR(CheckWritable());
318    outfile_ << data;
319    if (!outfile_.good()) {
320      return errors::Internal(
321          "Could not append to the internal temporary file.");
322    }
323    return Status::OK();
324  }
325
326  Status Close() override {
327    if (outfile_.is_open()) {
328      TF_RETURN_IF_ERROR(Sync());
329      outfile_.close();
330      std::remove(tmp_content_filename_.c_str());
331    }
332    return Status::OK();
333  }
334
335  Status Flush() override { return Sync(); }
336
337  /// Copies the current version of the file to GCS.
338  ///
339  /// This Sync() uploads the object to GCS.
340  /// In case of a failure, it resumes failed uploads as recommended by the GCS
341  /// resumable API documentation. When the whole upload needs to be
342  /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
343  Status Sync() override {
344    TF_RETURN_IF_ERROR(CheckWritable());
345    outfile_.flush();
346    if (!outfile_.good()) {
347      return errors::Internal(
348          "Could not write to the internal temporary file.");
349    }
350    string session_uri;
351    TF_RETURN_IF_ERROR(CreateNewUploadSession(&session_uri));
352    uint64 already_uploaded = 0;
353    for (int attempt = 0; attempt < max_upload_attempts_; attempt++) {
354      if (attempt > 0) {
355        bool completed;
356        TF_RETURN_IF_ERROR(RequestUploadSessionStatus(session_uri, &completed,
357                                                      &already_uploaded));
358        if (completed) {
359          // It's unclear why UploadToSession didn't return OK in the previous
360          // attempt, but GCS reports that the file is fully uploaded,
361          // so succeed.
362          return Status::OK();
363        }
364      }
365      const Status upload_status =
366          UploadToSession(session_uri, already_uploaded);
367      if (upload_status.ok()) {
368        return Status::OK();
369      }
370      switch (upload_status.code()) {
371        case errors::Code::NOT_FOUND:
372          // GCS docs recommend retrying the whole upload. We're relying on the
373          // RetryingFileSystem to retry the Sync() call.
374          return errors::Unavailable(
375              strings::StrCat("Could not upload gs://", bucket_, "/", object_));
376        case errors::Code::UNAVAILABLE:
377          // The upload can be resumed, but GCS docs recommend an exponential
378          // back-off.
379          Env::Default()->SleepForMicroseconds(kUploadRetryDelayMicros
380                                               << attempt);
381          break;
382        default:
383          // Something unexpected happen, fail.
384          return upload_status;
385      }
386    }
387    return errors::Aborted(
388        strings::StrCat("Upload gs://", bucket_, "/", object_, " failed."));
389  }
390
391 private:
392  Status CheckWritable() const {
393    if (!outfile_.is_open()) {
394      return errors::FailedPrecondition(
395          "The internal temporary file is not writable.");
396    }
397    return Status::OK();
398  }
399
400  Status GetCurrentFileSize(uint64* size) {
401    if (size == nullptr) {
402      return errors::Internal("'size' cannot be nullptr");
403    }
404    const auto tellp = outfile_.tellp();
405    if (tellp == -1) {
406      return errors::Internal(
407          "Could not get the size of the internal temporary file.");
408    }
409    *size = tellp;
410    return Status::OK();
411  }
412
413  /// Initiates a new resumable upload session.
414  Status CreateNewUploadSession(string* session_uri) {
415    if (session_uri == nullptr) {
416      return errors::Internal("'session_uri' cannot be nullptr.");
417    }
418    uint64 file_size;
419    TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
420
421    string auth_token;
422    TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token));
423
424    std::unique_ptr<char[]> scratch(new char[kBufferSize]);
425    std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
426    TF_RETURN_IF_ERROR(request->Init());
427    TF_RETURN_IF_ERROR(request->SetUri(strings::StrCat(
428        kGcsUploadUriBase, "b/", bucket_, "/o?uploadType=resumable&name=",
429        request->EscapeString(object_))));
430    TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
431    TF_RETURN_IF_ERROR(request->AddHeader("X-Upload-Content-Length",
432                                          std::to_string(file_size)));
433    TF_RETURN_IF_ERROR(request->SetPostEmptyBody());
434    StringPiece response_piece;
435    TF_RETURN_IF_ERROR(
436        request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece));
437    TF_RETURN_WITH_CONTEXT_IF_ERROR(
438        request->Send(), " when initiating an upload to ", GetGcsPath());
439    *session_uri = request->GetResponseHeader("Location");
440    if (session_uri->empty()) {
441      return errors::Internal(
442          strings::StrCat("Unexpected response from GCS when writing to ",
443                          GetGcsPath(), ": 'Location' header not returned."));
444    }
445    return Status::OK();
446  }
447
448  /// \brief Requests status of a previously initiated upload session.
449  ///
450  /// If the upload has already succeeded, sets 'completed' to true.
451  /// Otherwise sets 'completed' to false and 'uploaded' to the currently
452  /// uploaded size in bytes.
453  Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
454                                    uint64* uploaded) {
455    if (completed == nullptr || uploaded == nullptr) {
456      return errors::Internal("'completed' and 'uploaded' cannot be nullptr.");
457    }
458    uint64 file_size;
459    TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
460
461    string auth_token;
462    TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token));
463
464    std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
465    TF_RETURN_IF_ERROR(request->Init());
466    TF_RETURN_IF_ERROR(request->SetUri(session_uri));
467    TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
468    TF_RETURN_IF_ERROR(request->AddHeader(
469        "Content-Range", strings::StrCat("bytes */", file_size)));
470    TF_RETURN_IF_ERROR(request->SetPutEmptyBody());
471    const Status& status = request->Send();
472    if (status.ok()) {
473      *completed = true;
474      return Status::OK();
475    }
476    *completed = false;
477    if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
478      TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ",
479                                      GetGcsPath());
480    }
481    const string& received_range = request->GetResponseHeader("Range");
482    if (received_range.empty()) {
483      // This means GCS doesn't have any bytes of the file yet.
484      *uploaded = 0;
485    } else {
486      StringPiece range_piece(received_range);
487      range_piece.Consume("bytes=");  // May or may not be present.
488      std::vector<int32> range_parts;
489      if (!str_util::SplitAndParseAsInts(range_piece, '-', &range_parts) ||
490          range_parts.size() != 2) {
491        return errors::Internal(strings::StrCat(
492            "Unexpected response from GCS when writing ", GetGcsPath(),
493            ": Range header '", received_range, "' could not be parsed."));
494      }
495      if (range_parts[0] != 0) {
496        return errors::Internal(
497            strings::StrCat("Unexpected response from GCS when writing to ",
498                            GetGcsPath(), ": the returned range '",
499                            received_range, "' does not start at zero."));
500      }
501      // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
502      *uploaded = range_parts[1] + 1;
503    }
504    return Status::OK();
505  }
506
507  Status UploadToSession(const string& session_uri, uint64 start_offset) {
508    uint64 file_size;
509    TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
510
511    string auth_token;
512    TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token));
513
514    std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
515    TF_RETURN_IF_ERROR(request->Init());
516    TF_RETURN_IF_ERROR(request->SetUri(session_uri));
517    TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
518    if (file_size > 0) {
519      TF_RETURN_IF_ERROR(request->AddHeader(
520          "Content-Range", strings::StrCat("bytes ", start_offset, "-",
521                                           file_size - 1, "/", file_size)));
522    }
523    TF_RETURN_IF_ERROR(
524        request->SetPutFromFile(tmp_content_filename_, start_offset));
525    TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
526                                    GetGcsPath());
527    return Status::OK();
528  }
529
530  string GetGcsPath() const {
531    return strings::StrCat("gs://", bucket_, "/", object_);
532  }
533
534  string bucket_;
535  string object_;
536  AuthProvider* auth_provider_;
537  string tmp_content_filename_;
538  std::ofstream outfile_;
539  HttpRequest::Factory* http_request_factory_;
540  int32 max_upload_attempts_;
541};
542
543class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
544 public:
545  GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
546      : data_(std::move(data)), length_(length) {}
547  const void* data() override { return reinterpret_cast<void*>(data_.get()); }
548  uint64 length() override { return length_; }
549
550 private:
551  std::unique_ptr<char[]> data_;
552  uint64 length_;
553};
554}  // namespace
555
556GcsFileSystem::GcsFileSystem()
557    : auth_provider_(new GoogleAuthProvider()),
558      http_request_factory_(new HttpRequest::Factory()) {}
559
560GcsFileSystem::GcsFileSystem(
561    std::unique_ptr<AuthProvider> auth_provider,
562    std::unique_ptr<HttpRequest::Factory> http_request_factory,
563    size_t read_ahead_bytes, int32 max_upload_attempts)
564    : auth_provider_(std::move(auth_provider)),
565      http_request_factory_(std::move(http_request_factory)),
566      read_ahead_bytes_(read_ahead_bytes),
567      max_upload_attempts_(max_upload_attempts) {}
568
569Status GcsFileSystem::NewRandomAccessFile(
570    const string& fname, std::unique_ptr<RandomAccessFile>* result) {
571  string bucket, object;
572  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
573  result->reset(new GcsRandomAccessFile(bucket, object, auth_provider_.get(),
574                                        http_request_factory_.get(),
575                                        read_ahead_bytes_));
576  return Status::OK();
577}
578
579Status GcsFileSystem::NewWritableFile(const string& fname,
580                                      std::unique_ptr<WritableFile>* result) {
581  string bucket, object;
582  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
583  result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(),
584                                    http_request_factory_.get(),
585                                    max_upload_attempts_));
586  return Status::OK();
587}
588
589// Reads the file from GCS in chunks and stores it in a tmp file,
590// which is then passed to GcsWritableFile.
591Status GcsFileSystem::NewAppendableFile(const string& fname,
592                                        std::unique_ptr<WritableFile>* result) {
593  std::unique_ptr<RandomAccessFile> reader;
594  TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader));
595  std::unique_ptr<char[]> buffer(new char[kBufferSize]);
596  Status status;
597  uint64 offset = 0;
598  StringPiece read_chunk;
599
600  // Read the file from GCS in chunks and save it to a tmp file.
601  string old_content_filename;
602  TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
603  std::ofstream old_content(old_content_filename, std::ofstream::binary);
604  while (true) {
605    status = reader->Read(offset, kBufferSize, &read_chunk, buffer.get());
606    if (status.ok()) {
607      old_content << read_chunk;
608      offset += kBufferSize;
609    } else if (status.code() == error::OUT_OF_RANGE) {
610      // Expected, this means we reached EOF.
611      old_content << read_chunk;
612      break;
613    } else {
614      return status;
615    }
616  }
617  old_content.close();
618
619  // Create a writable file and pass the old content to it.
620  string bucket, object;
621  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
622  result->reset(new GcsWritableFile(
623      bucket, object, auth_provider_.get(), old_content_filename,
624      http_request_factory_.get(), max_upload_attempts_));
625  return Status::OK();
626}
627
628Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
629    const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
630  uint64 size;
631  TF_RETURN_IF_ERROR(GetFileSize(fname, &size));
632  std::unique_ptr<char[]> data(new char[size]);
633
634  std::unique_ptr<RandomAccessFile> file;
635  TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file));
636
637  StringPiece piece;
638  TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
639
640  result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
641  return Status::OK();
642}
643
644bool GcsFileSystem::FileExists(const string& fname) {
645  string bucket, object;
646  if (!ParseGcsPath(fname, true, &bucket, &object).ok()) {
647    LOG(ERROR) << "Could not parse GCS file name " << fname;
648    return false;
649  }
650  if (object.empty()) {
651    return BucketExists(bucket).ok();
652  }
653  return ObjectExists(bucket, object).ok() || FolderExists(fname).ok();
654}
655
656Status GcsFileSystem::ObjectExists(const string& bucket, const string& object) {
657  FileStatistics stat;
658  return StatForObject(bucket, object, &stat);
659}
660
661Status GcsFileSystem::StatForObject(const string& bucket, const string& object,
662                                    FileStatistics* stat) {
663  if (!stat) {
664    return errors::Internal("'stat' cannot be nullptr.");
665  }
666  if (object.empty()) {
667    return errors::InvalidArgument("'object' must be a non-empty string.");
668  }
669
670  string auth_token;
671  TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token));
672
673  std::unique_ptr<char[]> scratch(new char[kBufferSize]);
674  StringPiece response_piece;
675
676  std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
677  TF_RETURN_IF_ERROR(request->Init());
678  TF_RETURN_IF_ERROR(request->SetUri(strings::StrCat(
679      kGcsUriBase, "b/", bucket, "/o/", request->EscapeString(object),
680      "?fields=size%2Cupdated")));
681  TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
682  TF_RETURN_IF_ERROR(
683      request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece));
684  TF_RETURN_WITH_CONTEXT_IF_ERROR(
685      request->Send(), " when reading metadata of gs://", bucket, "/", object);
686
687  Json::Value root;
688  TF_RETURN_IF_ERROR(ParseJson(response_piece, &root));
689
690  // Parse file size.
691  TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &(stat->length)));
692
693  // Parse file modification time.
694  string updated;
695  TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
696  TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->mtime_nsec)));
697
698  stat->is_directory = false;
699
700  return Status::OK();
701}
702
703Status GcsFileSystem::BucketExists(const string& bucket) {
704  string auth_token;
705  TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token));
706
707  std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
708  TF_RETURN_IF_ERROR(request->Init());
709  request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
710  request->AddAuthBearerHeader(auth_token);
711  return request->Send();
712}
713
714Status GcsFileSystem::FolderExists(const string& dirname) {
715  std::vector<string> children;
716  TF_RETURN_IF_ERROR(GetChildrenBounded(dirname, 1, &children, true));
717  if (children.empty()) {
718    return errors::NotFound("Folder does not exist.");
719  }
720  return Status::OK();
721}
722
723Status GcsFileSystem::GetChildren(const string& dirname,
724                                  std::vector<string>* result) {
725  return GetChildrenBounded(dirname, UINT64_MAX, result, false);
726}
727
728Status GcsFileSystem::GetMatchingPaths(const string& pattern,
729                                       std::vector<string>* results) {
730  results->clear();
731  // Find the fixed prefix by looking for the first wildcard.
732  const string& fixed_prefix =
733      pattern.substr(0, pattern.find_first_of("*?[\\"));
734  const string& dir = io::Dirname(fixed_prefix).ToString();
735  if (dir.empty()) {
736    return errors::InvalidArgument(
737        strings::StrCat("A GCS pattern doesn't have a bucket name: ", pattern));
738  }
739  std::vector<string> all_files;
740  TF_RETURN_IF_ERROR(GetChildrenBounded(dir, UINT64_MAX, &all_files, true));
741
742  // Match all obtained files to the input pattern.
743  for (const auto& f : all_files) {
744    const string& full_path = io::JoinPath(dir, f);
745    if (Env::Default()->MatchPath(full_path, pattern)) {
746      results->push_back(full_path);
747    }
748  }
749  return Status::OK();
750}
751
752Status GcsFileSystem::GetChildrenBounded(const string& dirname,
753                                         uint64 max_results,
754                                         std::vector<string>* result,
755                                         bool recursive) {
756  if (!result) {
757    return errors::InvalidArgument("'result' cannot be null");
758  }
759  string bucket, object_prefix;
760  TF_RETURN_IF_ERROR(
761      ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
762
763  string nextPageToken;
764  uint64 retrieved_results = 0;
765  while (true) {  // A loop over multiple result pages.
766    string auth_token;
767    TF_RETURN_IF_ERROR(
768        AuthProvider::GetToken(auth_provider_.get(), &auth_token));
769
770    std::unique_ptr<char[]> scratch(new char[kBufferSize]);
771    StringPiece response_piece;
772    std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
773    TF_RETURN_IF_ERROR(request->Init());
774    auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
775    if (recursive) {
776      uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
777    } else {
778      // Set "/" as a delimiter to ask GCS to treat subfolders as children
779      // and return them in "prefixes".
780      uri = strings::StrCat(uri,
781                            "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
782      uri = strings::StrCat(uri, "&delimiter=%2F");
783    }
784    if (!object_prefix.empty()) {
785      uri = strings::StrCat(uri, "&prefix=",
786                            request->EscapeString(object_prefix));
787    }
788    if (!nextPageToken.empty()) {
789      uri = strings::StrCat(uri, "&pageToken=",
790                            request->EscapeString(nextPageToken));
791    }
792    if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
793      uri =
794          strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
795    }
796    TF_RETURN_IF_ERROR(request->SetUri(uri));
797    TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
798    TF_RETURN_IF_ERROR(
799        request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece));
800    TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
801    Json::Value root;
802    TF_RETURN_IF_ERROR(ParseJson(response_piece, &root));
803    const auto items = root.get("items", Json::Value::null);
804    if (items == Json::Value::null) {
805      // Empty results.
806      return Status::OK();
807    }
808    if (!items.isArray()) {
809      return errors::Internal("Expected an array 'items' in the GCS response.");
810    }
811    for (size_t i = 0; i < items.size(); i++) {
812      const auto item = items.get(i, Json::Value::null);
813      if (!item.isObject()) {
814        return errors::Internal(
815            "Unexpected JSON format: 'items' should be a list of objects.");
816      }
817      string name;
818      TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
819      // The names should be relative to the 'dirname'. That means the
820      // 'object_prefix', which is part of 'dirname', should be removed from the
821      // beginning of 'name'.
822      StringPiece relative_path(name);
823      if (!relative_path.Consume(object_prefix)) {
824        return errors::Internal(
825            strings::StrCat("Unexpected response: the returned file name ",
826                            name, " doesn't match the prefix ", object_prefix));
827      }
828      result->emplace_back(relative_path.ToString());
829      if (++retrieved_results >= max_results) {
830        return Status::OK();
831      }
832    }
833    const auto prefixes = root.get("prefixes", Json::Value::null);
834    if (prefixes != Json::Value::null) {
835      // Subfolders are returned for the non-recursive mode.
836      if (!prefixes.isArray()) {
837        return errors::Internal(
838            "'prefixes' was expected to be an array in the GCS response.");
839      }
840      for (size_t i = 0; i < prefixes.size(); i++) {
841        const auto prefix = prefixes.get(i, Json::Value::null);
842        if (prefix == Json::Value::null || !prefix.isString()) {
843          return errors::Internal(
844              "'prefixes' was expected to be an array of strings in the GCS "
845              "response.");
846        }
847        const string& prefix_str = prefix.asString();
848        StringPiece relative_path(prefix_str);
849        if (!relative_path.Consume(object_prefix)) {
850          return errors::Internal(strings::StrCat(
851              "Unexpected response: the returned folder name ", prefix_str,
852              " doesn't match the prefix ", object_prefix));
853        }
854        result->emplace_back(relative_path.ToString());
855        if (++retrieved_results >= max_results) {
856          return Status::OK();
857        }
858      }
859    }
860    const auto token = root.get("nextPageToken", Json::Value::null);
861    if (token == Json::Value::null) {
862      return Status::OK();
863    }
864    if (!token.isString()) {
865      return errors::Internal(
866          "Unexpected response: nextPageToken is not a string");
867    }
868    nextPageToken = token.asString();
869  }
870}
871
872Status GcsFileSystem::Stat(const string& fname, FileStatistics* stat) {
873  if (!stat) {
874    return errors::Internal("'stat' cannot be nullptr.");
875  }
876  string bucket, object;
877  TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
878  if (StatForObject(bucket, object, stat).ok()) {
879    return Status::OK();
880  }
881  if ((object.empty() && BucketExists(bucket).ok()) ||
882      (!object.empty() && FolderExists(fname).ok())) {
883    stat->length = 0;
884    stat->mtime_nsec = 0;
885    stat->is_directory = true;
886    return Status::OK();
887  }
888  return errors::NotFound(
889      strings::StrCat("The specified path ", fname, " was not found."));
890}
891
892Status GcsFileSystem::DeleteFile(const string& fname) {
893  string bucket, object;
894  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
895
896  string auth_token;
897  TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token));
898
899  std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
900  TF_RETURN_IF_ERROR(request->Init());
901  TF_RETURN_IF_ERROR(request->SetUri(strings::StrCat(
902      kGcsUriBase, "b/", bucket, "/o/", request->EscapeString(object))));
903  TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
904  TF_RETURN_IF_ERROR(request->SetDeleteRequest());
905  TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
906  return Status::OK();
907}
908
909Status GcsFileSystem::CreateDir(const string& dirname) {
910  string bucket, object;
911  TF_RETURN_IF_ERROR(ParseGcsPath(dirname, true, &bucket, &object));
912  if (object.empty()) {
913    if (BucketExists(bucket).ok()) {
914      return Status::OK();
915    }
916    return errors::NotFound(
917        strings::StrCat("The specified bucket ", dirname, " was not found."));
918  }
919  // Create a zero-length directory marker object.
920  std::unique_ptr<WritableFile> file;
921  TF_RETURN_IF_ERROR(NewWritableFile(MaybeAppendSlash(dirname), &file));
922  TF_RETURN_IF_ERROR(file->Close());
923  return Status::OK();
924}
925
926// Checks that the directory is empty (i.e no objects with this prefix exist).
927// If it is, does nothing, because directories are not entities in GCS.
928Status GcsFileSystem::DeleteDir(const string& dirname) {
929  std::vector<string> children;
930  // A directory is considered empty either if there are no matching objects
931  // with the corresponding name prefix or if there is exactly one matching
932  // object and it is the directory marker. Therefore we need to retrieve
933  // at most two children for the prefix to detect if a directory is empty.
934  TF_RETURN_IF_ERROR(GetChildrenBounded(dirname, 2, &children, true));
935
936  if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
937    return errors::FailedPrecondition("Cannot delete a non-empty directory.");
938  }
939  if (children.size() == 1 && children[0].empty()) {
940    // This is the directory marker object. Delete it.
941    return DeleteFile(MaybeAppendSlash(dirname));
942  }
943  return Status::OK();
944}
945
946Status GcsFileSystem::GetFileSize(const string& fname, uint64* file_size) {
947  if (!file_size) {
948    return errors::Internal("'file_size' cannot be nullptr.");
949  }
950
951  // Only validate the name.
952  string bucket, object;
953  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
954
955  FileStatistics stat;
956  TF_RETURN_IF_ERROR(Stat(fname, &stat));
957  *file_size = stat.length;
958  return Status::OK();
959}
960
961Status GcsFileSystem::RenameFile(const string& src, const string& target) {
962  if (!IsDirectory(src).ok()) {
963    return RenameObject(src, target);
964  }
965  // Rename all individual objects in the directory one by one.
966  std::vector<string> children;
967  TF_RETURN_IF_ERROR(GetChildrenBounded(src, UINT64_MAX, &children, true));
968  for (const string& subpath : children) {
969    // io::JoinPath() wouldn't work here, because we want an empty subpath
970    // to result in an appended slash in order for directory markers
971    // to be processed correctly: "gs://a/b" + "" should give "gs:/a/b/".
972    TF_RETURN_IF_ERROR(
973        RenameObject(strings::StrCat(MaybeAppendSlash(src), subpath),
974                     strings::StrCat(MaybeAppendSlash(target), subpath)));
975  }
976  return Status::OK();
977}
978
979// Uses a GCS API command to copy the object and then deletes the old one.
980Status GcsFileSystem::RenameObject(const string& src, const string& target) {
981  string src_bucket, src_object, target_bucket, target_object;
982  TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
983  TF_RETURN_IF_ERROR(
984      ParseGcsPath(target, false, &target_bucket, &target_object));
985
986  string auth_token;
987  TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token));
988
989  std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
990  TF_RETURN_IF_ERROR(request->Init());
991  TF_RETURN_IF_ERROR(request->SetUri(strings::StrCat(
992      kGcsUriBase, "b/", src_bucket, "/o/", request->EscapeString(src_object),
993      "/rewriteTo/b/", target_bucket, "/o/",
994      request->EscapeString(target_object))));
995  TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
996  TF_RETURN_IF_ERROR(request->SetPostEmptyBody());
997  std::unique_ptr<char[]> scratch(new char[kBufferSize]);
998  StringPiece response_piece;
999  TF_RETURN_IF_ERROR(
1000      request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece));
1001  TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
1002                                  " to ", target);
1003
1004  Json::Value root;
1005  TF_RETURN_IF_ERROR(ParseJson(response_piece, &root));
1006  bool done;
1007  TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
1008  if (!done) {
1009    // If GCS didn't complete rewrite in one call, this means that a large file
1010    // is being copied to a bucket with a different storage class or location,
1011    // which requires multiple rewrite calls.
1012    // TODO(surkov): implement multi-step rewrites.
1013    return errors::Unimplemented(
1014        strings::StrCat("Couldn't rename ", src, " to ", target,
1015                        ": moving large files between buckets with different "
1016                        "locations or storage classes is not supported."));
1017  }
1018
1019  TF_RETURN_IF_ERROR(DeleteFile(src));
1020  return Status::OK();
1021}
1022
1023Status GcsFileSystem::IsDirectory(const string& fname) {
1024  string bucket, object;
1025  TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1026  if (object.empty()) {
1027    if (BucketExists(bucket).ok()) {
1028      return Status::OK();
1029    }
1030    return errors::NotFound(strings::StrCat("The specified bucket gs://",
1031                                            bucket, " was not found."));
1032  }
1033  if (FolderExists(fname).ok()) {
1034    return Status::OK();
1035  }
1036  if (ObjectExists(bucket, object).ok()) {
1037    return errors::FailedPrecondition(
1038        strings::StrCat("The specified path ", fname, " is not a directory."));
1039  }
1040  return errors::NotFound(
1041      strings::StrCat("The specified path ", fname, " was not found."));
1042}
1043
1044REGISTER_FILE_SYSTEM("gs", RetryingGcsFileSystem);
1045
1046}  // namespace tensorflow
1047