1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/core/platform/cloud/gcs_file_system.h"
17#include <stdio.h>
18#include <unistd.h>
19#include <algorithm>
20#include <cstdio>
21#include <cstdlib>
22#include <cstring>
23#include <fstream>
24#include <vector>
25#ifdef _WIN32
26#include <io.h>  // for _mktemp
27#endif
28#include "include/json/json.h"
29#include "tensorflow/core/lib/core/errors.h"
30#include "tensorflow/core/lib/gtl/map_util.h"
31#include "tensorflow/core/lib/gtl/stl_util.h"
32#include "tensorflow/core/lib/io/path.h"
33#include "tensorflow/core/lib/strings/numbers.h"
34#include "tensorflow/core/lib/strings/str_util.h"
35#include "tensorflow/core/lib/strings/stringprintf.h"
36#include "tensorflow/core/platform/cloud/curl_http_request.h"
37#include "tensorflow/core/platform/cloud/file_block_cache.h"
38#include "tensorflow/core/platform/cloud/google_auth_provider.h"
39#include "tensorflow/core/platform/cloud/retrying_utils.h"
40#include "tensorflow/core/platform/cloud/time_util.h"
41#include "tensorflow/core/platform/env.h"
42#include "tensorflow/core/platform/mutex.h"
43#include "tensorflow/core/platform/protobuf.h"
44#include "tensorflow/core/platform/thread_annotations.h"
45
46#ifdef _WIN32
47#ifdef DeleteFile
48#undef DeleteFile
49#endif
50#endif
51
52namespace tensorflow {
53namespace {
54
55constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
56constexpr char kGcsUploadUriBase[] =
57    "https://www.googleapis.com/upload/storage/v1/";
58constexpr char kStorageHost[] = "storage.googleapis.com";
59constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024;  // In bytes.
60constexpr int kGetChildrenDefaultPageSize = 1000;
61// The HTTP response code "308 Resume Incomplete".
62constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
63// The environment variable that overrides the size of the readahead buffer.
64// DEPRECATED. Use GCS_BLOCK_SIZE_MB instead.
65constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES";
66// The environment variable that overrides the block size for aligned reads from
67// GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes).
68constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
69constexpr size_t kDefaultBlockSize = 128 * 1024 * 1024;
70// The environment variable that overrides the max size of the LRU cache of
71// blocks read from GCS. Specified in MB.
72constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB";
73constexpr size_t kDefaultMaxCacheSize = 2 * kDefaultBlockSize;
74// The environment variable that overrides the maximum staleness of cached file
75// contents. Once any block of a file reaches this staleness, all cached blocks
76// will be evicted on the next read.
77constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
78constexpr uint64 kDefaultMaxStaleness = 0;
79// The environment variable that overrides the maximum age of entries in the
80// Stat cache. A value of 0 (the default) means nothing is cached.
81constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
82constexpr uint64 kStatCacheDefaultMaxAge = 0;
83// The environment variable that overrides the maximum number of entries in the
84// Stat cache.
85constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
86constexpr size_t kStatCacheDefaultMaxEntries = 1024;
87// The environment variable that overrides the maximum age of entries in the
88// GetMatchingPaths cache. A value of 0 (the default) means nothing is cached.
89constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE";
90constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0;
91// The environment variable that overrides the maximum number of entries in the
92// GetMatchingPaths cache.
93constexpr char kMatchingPathsCacheMaxEntries[] =
94    "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES";
95constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024;
96// The file statistics returned by Stat() for directories.
97const FileStatistics DIRECTORY_STAT(0, 0, true);
98// Some environments exhibit unreliable DNS resolution. Set this environment
99// variable to a positive integer describing the frequency used to refresh the
100// userspace DNS cache.
101constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS";
102// The environment variable to configure the http request's connection timeout.
103constexpr char kRequestConnectionTimeout[] =
104    "GCS_REQUEST_CONNECTION_TIMEOUT_SECS";
105// The environment varaible to configure the http request's idle timeout.
106constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS";
107// The environment variable to configure the overall request timeout for
108// metadata requests.
109constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS";
110// The environment variable to configure the overall request timeout for
111// block reads requests.
112constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS";
113// The environment variable to configure the overall request timeout for
114// upload requests.
115constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS";
116// The environment variable to configure an additional header to send with
117// all requests to GCS (format HEADERNAME:HEADERCONTENT)
118constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER";
119// The environment variable to configure the throttle (format: <int64>)
120constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE";
121// The environment variable to configure the token bucket size (format: <int64>)
122constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE";
123// The environment variable that controls the number of tokens per request.
124// (format: <int64>)
125constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST";
126// The environment variable to configure the initial tokens (format: <int64>)
127constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS";
128
129// TODO: DO NOT use a hardcoded path
130Status GetTmpFilename(string* filename) {
131  if (!filename) {
132    return errors::Internal("'filename' cannot be nullptr.");
133  }
134#ifndef _WIN32
135  char buffer[] = "/tmp/gcs_filesystem_XXXXXX";
136  int fd = mkstemp(buffer);
137  if (fd < 0) {
138    return errors::Internal("Failed to create a temporary file.");
139  }
140  close(fd);
141#else
142  char buffer[] = "/tmp/gcs_filesystem_XXXXXX";
143  char* ret = _mktemp(buffer);
144  if (ret == nullptr) {
145    return errors::Internal("Failed to create a temporary file.");
146  }
147#endif
148  *filename = buffer;
149  return Status::OK();
150}
151
152/// \brief Splits a GCS path to a bucket and an object.
153///
154/// For example, "gs://bucket-name/path/to/file.txt" gets split into
155/// "bucket-name" and "path/to/file.txt".
156/// If fname only contains the bucket and empty_object_ok = true, the returned
157/// object is empty.
158Status ParseGcsPath(StringPiece fname, bool empty_object_ok, string* bucket,
159                    string* object) {
160  if (!bucket || !object) {
161    return errors::Internal("bucket and object cannot be null.");
162  }
163  StringPiece scheme, bucketp, objectp;
164  io::ParseURI(fname, &scheme, &bucketp, &objectp);
165  if (scheme != "gs") {
166    return errors::InvalidArgument("GCS path doesn't start with 'gs://': ",
167                                   fname);
168  }
169  *bucket = bucketp.ToString();
170  if (bucket->empty() || *bucket == ".") {
171    return errors::InvalidArgument("GCS path doesn't contain a bucket name: ",
172                                   fname);
173  }
174  objectp.Consume("/");
175  *object = objectp.ToString();
176  if (!empty_object_ok && object->empty()) {
177    return errors::InvalidArgument("GCS path doesn't contain an object name: ",
178                                   fname);
179  }
180  return Status::OK();
181}
182
183/// Appends a trailing slash if the name doesn't already have one.
184string MaybeAppendSlash(const string& name) {
185  if (name.empty()) {
186    return "/";
187  }
188  if (name.back() != '/') {
189    return strings::StrCat(name, "/");
190  }
191  return name;
192}
193
194// io::JoinPath() doesn't work in cases when we want an empty subpath
195// to result in an appended slash in order for directory markers
196// to be processed correctly: "gs://a/b" + "" should give "gs://a/b/".
197string JoinGcsPath(const string& path, const string& subpath) {
198  return strings::StrCat(MaybeAppendSlash(path), subpath);
199}
200
201/// \brief Returns the given paths appending all their subfolders.
202///
203/// For every path X in the list, every subfolder in X is added to the
204/// resulting list.
205/// For example:
206///  - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c'
207///  - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c'
208std::set<string> AddAllSubpaths(const std::vector<string>& paths) {
209  std::set<string> result;
210  result.insert(paths.begin(), paths.end());
211  for (const string& path : paths) {
212    StringPiece subpath = io::Dirname(path);
213    while (!subpath.empty()) {
214      result.emplace(subpath.ToString());
215      subpath = io::Dirname(subpath);
216    }
217  }
218  return result;
219}
220
221Status ParseJson(StringPiece json, Json::Value* result) {
222  Json::Reader reader;
223  if (!reader.parse(json.data(), json.data() + json.size(), *result)) {
224    return errors::Internal("Couldn't parse JSON response from GCS.");
225  }
226  return Status::OK();
227}
228
229Status ParseJson(const std::vector<char>& json, Json::Value* result) {
230  return ParseJson(StringPiece{json.data(), json.size()}, result);
231}
232
233/// Reads a JSON value with the given name from a parent JSON value.
234Status GetValue(const Json::Value& parent, const char* name,
235                Json::Value* result) {
236  *result = parent.get(name, Json::Value::null);
237  if (result->isNull()) {
238    return errors::Internal("The field '", name,
239                            "' was expected in the JSON response.");
240  }
241  return Status::OK();
242}
243
244/// Reads a string JSON value with the given name from a parent JSON value.
245Status GetStringValue(const Json::Value& parent, const char* name,
246                      string* result) {
247  Json::Value result_value;
248  TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
249  if (!result_value.isString()) {
250    return errors::Internal(
251        "The field '", name,
252        "' in the JSON response was expected to be a string.");
253  }
254  *result = result_value.asString();
255  return Status::OK();
256}
257
258/// Reads a long JSON value with the given name from a parent JSON value.
259Status GetInt64Value(const Json::Value& parent, const char* name,
260                     int64* result) {
261  Json::Value result_value;
262  TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
263  if (result_value.isNumeric()) {
264    *result = result_value.asInt64();
265    return Status::OK();
266  }
267  if (result_value.isString() &&
268      strings::safe_strto64(result_value.asCString(), result)) {
269    return Status::OK();
270  }
271  return errors::Internal(
272      "The field '", name,
273      "' in the JSON response was expected to be a number.");
274}
275
276/// Reads a boolean JSON value with the given name from a parent JSON value.
277Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) {
278  Json::Value result_value;
279  TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
280  if (!result_value.isBool()) {
281    return errors::Internal(
282        "The field '", name,
283        "' in the JSON response was expected to be a boolean.");
284  }
285  *result = result_value.asBool();
286  return Status::OK();
287}
288
289/// A GCS-based implementation of a random access file with an LRU block cache.
290class GcsRandomAccessFile : public RandomAccessFile {
291 public:
292  GcsRandomAccessFile(const string& filename, FileBlockCache* file_block_cache)
293      : filename_(filename), file_block_cache_(file_block_cache) {}
294
295  /// The implementation of reads with an LRU block cache. Thread safe.
296  Status Read(uint64 offset, size_t n, StringPiece* result,
297              char* scratch) const override {
298    *result = StringPiece();
299    size_t bytes_transferred;
300    TF_RETURN_IF_ERROR(file_block_cache_->Read(filename_, offset, n, scratch,
301                                               &bytes_transferred));
302    *result = StringPiece(scratch, bytes_transferred);
303    if (bytes_transferred < n) {
304      // This is not an error per se. The RandomAccessFile interface expects
305      // that Read returns OutOfRange if fewer bytes were read than requested.
306      return errors::OutOfRange("EOF reached, ", result->size(),
307                                " bytes were read out of ", n,
308                                " bytes requested.");
309    }
310    return Status::OK();
311  }
312
313 private:
314  /// The filename of this file.
315  const string filename_;
316  /// The LRU block cache for this file.
317  mutable FileBlockCache* file_block_cache_;  // not owned
318};
319
320/// \brief GCS-based implementation of a writeable file.
321///
322/// Since GCS objects are immutable, this implementation writes to a local
323/// tmp file and copies it to GCS on flush/close.
324class GcsWritableFile : public WritableFile {
325 public:
326  GcsWritableFile(const string& bucket, const string& object,
327                  GcsFileSystem* filesystem,
328                  GcsFileSystem::TimeoutConfig* timeouts,
329                  std::function<void()> file_cache_erase,
330                  int64 initial_retry_delay_usec)
331      : bucket_(bucket),
332        object_(object),
333        filesystem_(filesystem),
334        timeouts_(timeouts),
335        file_cache_erase_(std::move(file_cache_erase)),
336        sync_needed_(true),
337        initial_retry_delay_usec_(initial_retry_delay_usec) {
338    // TODO: to make it safer, outfile_ should be constructed from an FD
339    if (GetTmpFilename(&tmp_content_filename_).ok()) {
340      outfile_.open(tmp_content_filename_,
341                    std::ofstream::binary | std::ofstream::app);
342    }
343  }
344
345  /// \brief Constructs the writable file in append mode.
346  ///
347  /// tmp_content_filename should contain a path of an existing temporary file
348  /// with the content to be appended. The class takes onwnership of the
349  /// specified tmp file and deletes it on close.
350  GcsWritableFile(const string& bucket, const string& object,
351                  GcsFileSystem* filesystem, const string& tmp_content_filename,
352                  GcsFileSystem::TimeoutConfig* timeouts,
353                  std::function<void()> file_cache_erase,
354                  int64 initial_retry_delay_usec)
355      : bucket_(bucket),
356        object_(object),
357        filesystem_(filesystem),
358        timeouts_(timeouts),
359        file_cache_erase_(std::move(file_cache_erase)),
360        sync_needed_(true),
361        initial_retry_delay_usec_(initial_retry_delay_usec) {
362    tmp_content_filename_ = tmp_content_filename;
363    outfile_.open(tmp_content_filename_,
364                  std::ofstream::binary | std::ofstream::app);
365  }
366
367  ~GcsWritableFile() override { Close().IgnoreError(); }
368
369  Status Append(const StringPiece& data) override {
370    TF_RETURN_IF_ERROR(CheckWritable());
371    sync_needed_ = true;
372    outfile_ << data;
373    if (!outfile_.good()) {
374      return errors::Internal(
375          "Could not append to the internal temporary file.");
376    }
377    return Status::OK();
378  }
379
380  Status Close() override {
381    if (outfile_.is_open()) {
382      TF_RETURN_IF_ERROR(Sync());
383      outfile_.close();
384      std::remove(tmp_content_filename_.c_str());
385    }
386    return Status::OK();
387  }
388
389  Status Flush() override { return Sync(); }
390
391  Status Sync() override {
392    TF_RETURN_IF_ERROR(CheckWritable());
393    if (!sync_needed_) {
394      return Status::OK();
395    }
396    Status status = SyncImpl();
397    if (status.ok()) {
398      sync_needed_ = false;
399    }
400    return status;
401  }
402
403 private:
404  /// Copies the current version of the file to GCS.
405  ///
406  /// This SyncImpl() uploads the object to GCS.
407  /// In case of a failure, it resumes failed uploads as recommended by the GCS
408  /// resumable API documentation. When the whole upload needs to be
409  /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
410  Status SyncImpl() {
411    outfile_.flush();
412    if (!outfile_.good()) {
413      return errors::Internal(
414          "Could not write to the internal temporary file.");
415    }
416    string session_uri;
417    TF_RETURN_IF_ERROR(CreateNewUploadSession(&session_uri));
418    uint64 already_uploaded = 0;
419    bool first_attempt = true;
420    const Status upload_status = RetryingUtils::CallWithRetries(
421        [&first_attempt, &already_uploaded, &session_uri, this]() {
422          if (!first_attempt) {
423            bool completed;
424            TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
425                session_uri, &completed, &already_uploaded));
426            if (completed) {
427              // Erase the file from the file cache on every successful write.
428              file_cache_erase_();
429              // It's unclear why UploadToSession didn't return OK in the
430              // previous attempt, but GCS reports that the file is fully
431              // uploaded, so succeed.
432              return Status::OK();
433            }
434          }
435          first_attempt = false;
436          return UploadToSession(session_uri, already_uploaded);
437        },
438        initial_retry_delay_usec_);
439    if (upload_status.code() == errors::Code::NOT_FOUND) {
440      // GCS docs recommend retrying the whole upload. We're relying on the
441      // RetryingFileSystem to retry the Sync() call.
442      return errors::Unavailable(
443          strings::StrCat("Upload to gs://", bucket_, "/", object_,
444                          " failed, caused by: ", upload_status.ToString()));
445    }
446    return upload_status;
447  }
448
449  Status CheckWritable() const {
450    if (!outfile_.is_open()) {
451      return errors::FailedPrecondition(
452          "The internal temporary file is not writable.");
453    }
454    return Status::OK();
455  }
456
457  Status GetCurrentFileSize(uint64* size) {
458    if (size == nullptr) {
459      return errors::Internal("'size' cannot be nullptr");
460    }
461    const auto tellp = outfile_.tellp();
462    if (tellp == static_cast<std::streampos>(-1)) {
463      return errors::Internal(
464          "Could not get the size of the internal temporary file.");
465    }
466    *size = tellp;
467    return Status::OK();
468  }
469
470  /// Initiates a new resumable upload session.
471  Status CreateNewUploadSession(string* session_uri) {
472    if (session_uri == nullptr) {
473      return errors::Internal("'session_uri' cannot be nullptr.");
474    }
475    uint64 file_size;
476    TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
477
478    std::vector<char> output_buffer;
479    std::unique_ptr<HttpRequest> request;
480    TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
481
482    request->SetUri(strings::StrCat(
483        kGcsUploadUriBase, "b/", bucket_,
484        "/o?uploadType=resumable&name=", request->EscapeString(object_)));
485    request->AddHeader("X-Upload-Content-Length", std::to_string(file_size));
486    request->SetPostEmptyBody();
487    request->SetResultBuffer(&output_buffer);
488    request->SetTimeouts(timeouts_->connect, timeouts_->idle,
489                         timeouts_->metadata);
490    TF_RETURN_WITH_CONTEXT_IF_ERROR(
491        request->Send(), " when initiating an upload to ", GetGcsPath());
492    *session_uri = request->GetResponseHeader("Location");
493    if (session_uri->empty()) {
494      return errors::Internal("Unexpected response from GCS when writing to ",
495                              GetGcsPath(),
496                              ": 'Location' header not returned.");
497    }
498    return Status::OK();
499  }
500
501  /// \brief Requests status of a previously initiated upload session.
502  ///
503  /// If the upload has already succeeded, sets 'completed' to true.
504  /// Otherwise sets 'completed' to false and 'uploaded' to the currently
505  /// uploaded size in bytes.
506  Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
507                                    uint64* uploaded) {
508    if (completed == nullptr || uploaded == nullptr) {
509      return errors::Internal("'completed' and 'uploaded' cannot be nullptr.");
510    }
511    uint64 file_size;
512    TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
513
514    std::unique_ptr<HttpRequest> request;
515    TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
516    request->SetUri(session_uri);
517    request->SetTimeouts(timeouts_->connect, timeouts_->idle,
518                         timeouts_->metadata);
519    request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size));
520    request->SetPutEmptyBody();
521    const Status& status = request->Send();
522    if (status.ok()) {
523      *completed = true;
524      return Status::OK();
525    }
526    *completed = false;
527    if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
528      TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ",
529                                      GetGcsPath());
530    }
531    const string& received_range = request->GetResponseHeader("Range");
532    if (received_range.empty()) {
533      // This means GCS doesn't have any bytes of the file yet.
534      *uploaded = 0;
535    } else {
536      StringPiece range_piece(received_range);
537      range_piece.Consume("bytes=");  // May or may not be present.
538      std::vector<int64> range_parts;
539      if (!str_util::SplitAndParseAsInts(range_piece, '-', &range_parts) ||
540          range_parts.size() != 2) {
541        return errors::Internal("Unexpected response from GCS when writing ",
542                                GetGcsPath(), ": Range header '",
543                                received_range, "' could not be parsed.");
544      }
545      if (range_parts[0] != 0) {
546        return errors::Internal("Unexpected response from GCS when writing to ",
547                                GetGcsPath(), ": the returned range '",
548                                received_range, "' does not start at zero.");
549      }
550      // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
551      *uploaded = range_parts[1] + 1;
552    }
553    return Status::OK();
554  }
555
556  Status UploadToSession(const string& session_uri, uint64 start_offset) {
557    uint64 file_size;
558    TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
559
560    std::unique_ptr<HttpRequest> request;
561    TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
562    request->SetUri(session_uri);
563    if (file_size > 0) {
564      request->AddHeader("Content-Range",
565                         strings::StrCat("bytes ", start_offset, "-",
566                                         file_size - 1, "/", file_size));
567    }
568    request->SetTimeouts(timeouts_->connect, timeouts_->idle, timeouts_->write);
569
570    TF_RETURN_IF_ERROR(
571        request->SetPutFromFile(tmp_content_filename_, start_offset));
572    TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
573                                    GetGcsPath());
574    // Erase the file from the file cache on every successful write.
575    file_cache_erase_();
576    return Status::OK();
577  }
578
579  string GetGcsPath() const {
580    return strings::StrCat("gs://", bucket_, "/", object_);
581  }
582
583  string bucket_;
584  string object_;
585  GcsFileSystem* const filesystem_;  // Not owned.
586  string tmp_content_filename_;
587  std::ofstream outfile_;
588  GcsFileSystem::TimeoutConfig* timeouts_;
589  std::function<void()> file_cache_erase_;
590  bool sync_needed_;  // whether there is buffered data that needs to be synced
591  int64 initial_retry_delay_usec_;
592};
593
594class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
595 public:
596  GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
597      : data_(std::move(data)), length_(length) {}
598  const void* data() override { return reinterpret_cast<void*>(data_.get()); }
599  uint64 length() override { return length_; }
600
601 private:
602  std::unique_ptr<char[]> data_;
603  uint64 length_;
604};
605
606// Helper function to extract an environment variable and convert it into a
607// value of type T.
608template <typename T>
609bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*),
610               T* value) {
611  const char* env_value = std::getenv(varname);
612  if (!env_value) {
613    return false;
614  }
615  return convert(env_value, value);
616}
617
618bool StringPieceIdentity(StringPiece str, StringPiece* value) {
619  *value = str;
620  return true;
621}
622
623}  // namespace
624
625GcsFileSystem::GcsFileSystem()
626    : auth_provider_(new GoogleAuthProvider()),
627      http_request_factory_(new CurlHttpRequest::Factory()) {
628  uint64 value;
629  size_t block_size = kDefaultBlockSize;
630  size_t max_bytes = kDefaultMaxCacheSize;
631  uint64 max_staleness = kDefaultMaxStaleness;
632  // Apply the sys env override for the readahead buffer size if it's provided.
633  if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) {
634    block_size = value;
635  }
636  // Apply the overrides for the block size (MB), max bytes (MB), and max
637  // staleness (seconds) if provided.
638  if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) {
639    block_size = value * 1024 * 1024;
640  }
641  if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) {
642    max_bytes = value * 1024 * 1024;
643  }
644  if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) {
645    max_staleness = value;
646  }
647  file_block_cache_ = MakeFileBlockCache(block_size, max_bytes, max_staleness);
648  // Apply overrides for the stat cache max age and max entries, if provided.
649  uint64 stat_cache_max_age = kStatCacheDefaultMaxAge;
650  size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
651  if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) {
652    stat_cache_max_age = value;
653  }
654  if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) {
655    stat_cache_max_entries = value;
656  }
657  stat_cache_.reset(new ExpiringLRUCache<FileStatistics>(
658      stat_cache_max_age, stat_cache_max_entries));
659  // Apply overrides for the matching paths cache max age and max entries, if
660  // provided.
661  uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge;
662  size_t matching_paths_cache_max_entries =
663      kMatchingPathsCacheDefaultMaxEntries;
664  if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) {
665    matching_paths_cache_max_age = value;
666  }
667  if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64,
668                &value)) {
669    matching_paths_cache_max_entries = value;
670  }
671  matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>(
672      matching_paths_cache_max_age, matching_paths_cache_max_entries));
673
674  int64 resolve_frequency_secs;
675  if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64,
676                &resolve_frequency_secs)) {
677    dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs));
678    VLOG(1) << "GCS DNS cache is enabled.  " << kResolveCacheSecs << " = "
679            << resolve_frequency_secs;
680  } else {
681    VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs
682            << " = 0 (or is not set)";
683  }
684
685  // Get the additional header
686  StringPiece add_header_contents;
687  if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity,
688                &add_header_contents)) {
689    size_t split = add_header_contents.find(':', 0);
690
691    if (split != StringPiece::npos) {
692      StringPiece header_name = add_header_contents.substr(0, split);
693      StringPiece header_value = add_header_contents.substr(split + 1);
694
695      if (!header_name.empty() && !header_value.empty()) {
696        additional_header_.reset(new std::pair<const string, const string>(
697            header_name.ToString(), header_value.ToString()));
698
699        VLOG(1) << "GCS additional header ENABLED. "
700                << "Name: " << additional_header_->first << ", "
701                << "Value: " << additional_header_->second;
702      } else {
703        LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
704                   << add_header_contents;
705      }
706    } else {
707      LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
708                 << add_header_contents;
709    }
710  } else {
711    VLOG(1) << "GCS additional header DISABLED. No environment variable set.";
712  }
713
714  // Apply the overrides for request timeouts
715  uint32 timeout_value;
716  if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32,
717                &timeout_value)) {
718    timeouts_.connect = timeout_value;
719  }
720  if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) {
721    timeouts_.idle = timeout_value;
722  }
723  if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32,
724                &timeout_value)) {
725    timeouts_.metadata = timeout_value;
726  }
727  if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) {
728    timeouts_.read = timeout_value;
729  }
730  if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) {
731    timeouts_.write = timeout_value;
732  }
733
734  int64 token_value;
735  if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) {
736    GcsThrottleConfig config;
737    config.enabled = true;
738    config.token_rate = token_value;
739
740    if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) {
741      config.bucket_size = token_value;
742    }
743
744    if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) {
745      config.tokens_per_request = token_value;
746    }
747
748    if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) {
749      config.initial_tokens = token_value;
750    }
751    throttle_.SetConfig(config);
752  }
753}
754
755GcsFileSystem::GcsFileSystem(
756    std::unique_ptr<AuthProvider> auth_provider,
757    std::unique_ptr<HttpRequest::Factory> http_request_factory,
758    size_t block_size, size_t max_bytes, uint64 max_staleness,
759    uint64 stat_cache_max_age, size_t stat_cache_max_entries,
760    uint64 matching_paths_cache_max_age,
761    size_t matching_paths_cache_max_entries, int64 initial_retry_delay_usec,
762    TimeoutConfig timeouts,
763    std::pair<const string, const string>* additional_header)
764    : auth_provider_(std::move(auth_provider)),
765      http_request_factory_(std::move(http_request_factory)),
766      file_block_cache_(
767          MakeFileBlockCache(block_size, max_bytes, max_staleness)),
768      stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)),
769      matching_paths_cache_(new MatchingPathsCache(
770          matching_paths_cache_max_age, matching_paths_cache_max_entries)),
771      timeouts_(timeouts),
772      initial_retry_delay_usec_(initial_retry_delay_usec),
773      additional_header_(additional_header) {}
774
775Status GcsFileSystem::NewRandomAccessFile(
776    const string& fname, std::unique_ptr<RandomAccessFile>* result) {
777  string bucket, object;
778  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
779  result->reset(new GcsRandomAccessFile(fname, file_block_cache_.get()));
780  return Status::OK();
781}
782
783// A helper function to build a FileBlockCache for GcsFileSystem.
784std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
785    size_t block_size, size_t max_bytes, uint64 max_staleness) {
786  std::unique_ptr<FileBlockCache> file_block_cache(
787      new FileBlockCache(block_size, max_bytes, max_staleness,
788                         [this](const string& filename, size_t offset, size_t n,
789                                char* buffer, size_t* bytes_transferred) {
790                           return LoadBufferFromGCS(filename, offset, n, buffer,
791                                                    bytes_transferred);
792                         }));
793  return file_block_cache;
794}
795
796// A helper function to actually read the data from GCS.
797Status GcsFileSystem::LoadBufferFromGCS(const string& filename, size_t offset,
798                                        size_t n, char* buffer,
799                                        size_t* bytes_transferred) {
800  *bytes_transferred = 0;
801
802  string bucket, object;
803  TF_RETURN_IF_ERROR(ParseGcsPath(filename, false, &bucket, &object));
804
805  std::unique_ptr<HttpRequest> request;
806  TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
807                                  "when reading gs://", bucket, "/", object);
808
809  request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/",
810                                  request->EscapeString(object)));
811  request->SetRange(offset, offset + n - 1);
812  request->SetResultBufferDirect(buffer, n);
813  request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read);
814
815  TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
816                                  bucket, "/", object);
817
818  size_t bytes_read = request->GetResultBufferDirectBytesTransferred();
819  *bytes_transferred = bytes_read;
820  VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ "
821          << offset << " of size: " << bytes_read;
822
823  throttle_.RecordResponse(bytes_read);
824
825  if (bytes_read < block_size()) {
826    // Check stat cache to see if we encountered an interrupted read.
827    FileStatistics stat;
828    if (stat_cache_->Lookup(filename, &stat)) {
829      if (offset + bytes_read < stat.length) {
830        return errors::Internal(strings::Printf(
831            "File contents are inconsistent for file: %s @ %lu.",
832            filename.c_str(), offset));
833      }
834      VLOG(2) << "Successful integrity check for: gs://" << bucket << "/"
835              << object << " @ " << offset;
836    }
837  }
838
839  return Status::OK();
840}
841
842Status GcsFileSystem::NewWritableFile(const string& fname,
843                                      std::unique_ptr<WritableFile>* result) {
844  string bucket, object;
845  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
846  result->reset(new GcsWritableFile(
847      bucket, object, this, &timeouts_,
848      [this, fname]() { file_block_cache_->RemoveFile(fname); },
849      initial_retry_delay_usec_));
850  return Status::OK();
851}
852
853// Reads the file from GCS in chunks and stores it in a tmp file,
854// which is then passed to GcsWritableFile.
855Status GcsFileSystem::NewAppendableFile(const string& fname,
856                                        std::unique_ptr<WritableFile>* result) {
857  std::unique_ptr<RandomAccessFile> reader;
858  TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader));
859  std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
860  Status status;
861  uint64 offset = 0;
862  StringPiece read_chunk;
863
864  // Read the file from GCS in chunks and save it to a tmp file.
865  string old_content_filename;
866  TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
867  std::ofstream old_content(old_content_filename, std::ofstream::binary);
868  while (true) {
869    status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk,
870                          buffer.get());
871    if (status.ok()) {
872      old_content << read_chunk;
873      offset += kReadAppendableFileBufferSize;
874    } else if (status.code() == error::OUT_OF_RANGE) {
875      // Expected, this means we reached EOF.
876      old_content << read_chunk;
877      break;
878    } else {
879      return status;
880    }
881  }
882  old_content.close();
883
884  // Create a writable file and pass the old content to it.
885  string bucket, object;
886  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
887  result->reset(new GcsWritableFile(
888      bucket, object, this, old_content_filename, &timeouts_,
889      [this, fname]() { file_block_cache_->RemoveFile(fname); },
890      initial_retry_delay_usec_));
891  return Status::OK();
892}
893
894Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
895    const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
896  uint64 size;
897  TF_RETURN_IF_ERROR(GetFileSize(fname, &size));
898  std::unique_ptr<char[]> data(new char[size]);
899
900  std::unique_ptr<RandomAccessFile> file;
901  TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file));
902
903  StringPiece piece;
904  TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
905
906  result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
907  return Status::OK();
908}
909
910Status GcsFileSystem::FileExists(const string& fname) {
911  string bucket, object;
912  TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
913  if (object.empty()) {
914    bool result;
915    TF_RETURN_IF_ERROR(BucketExists(bucket, &result));
916    if (result) {
917      return Status::OK();
918    }
919  }
920  bool result;
921  TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &result));
922  if (result) {
923    return Status::OK();
924  }
925  TF_RETURN_IF_ERROR(FolderExists(fname, &result));
926  if (result) {
927    return Status::OK();
928  }
929  return errors::NotFound("The specified path ", fname, " was not found.");
930}
931
932Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket,
933                                   const string& object, bool* result) {
934  if (!result) {
935    return errors::Internal("'result' cannot be nullptr.");
936  }
937  FileStatistics not_used_stat;
938  const Status status = StatForObject(fname, bucket, object, &not_used_stat);
939  switch (status.code()) {
940    case errors::Code::OK:
941      *result = true;
942      return Status::OK();
943    case errors::Code::NOT_FOUND:
944      *result = false;
945      return Status::OK();
946    default:
947      return status;
948  }
949}
950
951Status GcsFileSystem::StatForObject(const string& fname, const string& bucket,
952                                    const string& object,
953                                    FileStatistics* stat) {
954  if (!stat) {
955    return errors::Internal("'stat' cannot be nullptr.");
956  }
957  if (object.empty()) {
958    return errors::InvalidArgument(strings::Printf(
959        "'object' must be a non-empty string. (File: %s)", fname.c_str()));
960  }
961
962  StatCache::ComputeFunc compute_func = [this, &bucket, &object](
963                                            const string& fname,
964                                            FileStatistics* stat) {
965    std::vector<char> output_buffer;
966    std::unique_ptr<HttpRequest> request;
967    TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
968                                    " when reading metadata of gs://", bucket,
969                                    "/", object);
970
971    request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
972                                    request->EscapeString(object),
973                                    "?fields=size%2Cupdated"));
974    request->SetResultBuffer(&output_buffer);
975    request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
976
977    TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
978                                    " when reading metadata of gs://", bucket,
979                                    "/", object);
980
981    Json::Value root;
982    TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
983
984    // Parse file size.
985    TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->length));
986
987    // Parse file modification time.
988    string updated;
989    TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
990    TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->mtime_nsec)));
991
992    VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- "
993            << " length: " << stat->length
994            << "; mtime_nsec: " << stat->mtime_nsec << "; updated: " << updated;
995
996    stat->is_directory = false;
997    return Status::OK();
998  };
999
1000  TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(fname, stat, compute_func));
1001  if (stat->is_directory) {
1002    return errors::NotFound(fname, " is a directory.");
1003  } else {
1004    return Status::OK();
1005  }
1006}
1007
1008Status GcsFileSystem::BucketExists(const string& bucket, bool* result) {
1009  if (!result) {
1010    return errors::Internal("'result' cannot be nullptr.");
1011  }
1012
1013  std::unique_ptr<HttpRequest> request;
1014  TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1015  request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
1016  request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1017  const Status status = request->Send();
1018  switch (status.code()) {
1019    case errors::Code::OK:
1020      *result = true;
1021      return Status::OK();
1022    case errors::Code::NOT_FOUND:
1023      *result = false;
1024      return Status::OK();
1025    default:
1026      return status;
1027  }
1028}
1029
1030Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
1031  if (!result) {
1032    return errors::Internal("'result' cannot be nullptr.");
1033  }
1034  StatCache::ComputeFunc compute_func = [this](const string& dirname,
1035                                               FileStatistics* stat) {
1036    std::vector<string> children;
1037    TF_RETURN_IF_ERROR(
1038        GetChildrenBounded(dirname, 1, &children, true /* recursively */,
1039                           true /* include_self_directory_marker */));
1040    if (!children.empty()) {
1041      *stat = DIRECTORY_STAT;
1042      return Status::OK();
1043    } else {
1044      return errors::InvalidArgument("Not a directory!");
1045    }
1046  };
1047  FileStatistics stat;
1048  Status s = stat_cache_->LookupOrCompute(dirname, &stat, compute_func);
1049  if (s.ok()) {
1050    *result = stat.is_directory;
1051    return Status::OK();
1052  }
1053  if (errors::IsInvalidArgument(s)) {
1054    *result = false;
1055    return Status::OK();
1056  }
1057  return s;
1058}
1059
1060Status GcsFileSystem::GetChildren(const string& dirname,
1061                                  std::vector<string>* result) {
1062  return GetChildrenBounded(dirname, UINT64_MAX, result,
1063                            false /* recursively */,
1064                            false /* include_self_directory_marker */);
1065}
1066
1067Status GcsFileSystem::GetMatchingPaths(const string& pattern,
1068                                       std::vector<string>* results) {
1069  MatchingPathsCache::ComputeFunc compute_func =
1070      [this](const string& pattern, std::vector<string>* results) {
1071        results->clear();
1072        // Find the fixed prefix by looking for the first wildcard.
1073        const string& fixed_prefix =
1074            pattern.substr(0, pattern.find_first_of("*?[\\"));
1075        const string& dir = io::Dirname(fixed_prefix).ToString();
1076        if (dir.empty()) {
1077          return errors::InvalidArgument(
1078              "A GCS pattern doesn't have a bucket name: ", pattern);
1079        }
1080        std::vector<string> all_files;
1081        TF_RETURN_IF_ERROR(GetChildrenBounded(
1082            dir, UINT64_MAX, &all_files, true /* recursively */,
1083            false /* include_self_directory_marker */));
1084
1085        const auto& files_and_folders = AddAllSubpaths(all_files);
1086
1087        // Match all obtained paths to the input pattern.
1088        for (const auto& path : files_and_folders) {
1089          const string& full_path = io::JoinPath(dir, path);
1090          if (Env::Default()->MatchPath(full_path, pattern)) {
1091            results->push_back(full_path);
1092          }
1093        }
1094        return Status::OK();
1095      };
1096  TF_RETURN_IF_ERROR(
1097      matching_paths_cache_->LookupOrCompute(pattern, results, compute_func));
1098  return Status::OK();
1099}
1100
1101Status GcsFileSystem::GetChildrenBounded(const string& dirname,
1102                                         uint64 max_results,
1103                                         std::vector<string>* result,
1104                                         bool recursive,
1105                                         bool include_self_directory_marker) {
1106  if (!result) {
1107    return errors::InvalidArgument("'result' cannot be null");
1108  }
1109  string bucket, object_prefix;
1110  TF_RETURN_IF_ERROR(
1111      ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
1112
1113  string nextPageToken;
1114  uint64 retrieved_results = 0;
1115  while (true) {  // A loop over multiple result pages.
1116    std::vector<char> output_buffer;
1117    std::unique_ptr<HttpRequest> request;
1118    TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1119    auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
1120    if (recursive) {
1121      uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
1122    } else {
1123      // Set "/" as a delimiter to ask GCS to treat subfolders as children
1124      // and return them in "prefixes".
1125      uri = strings::StrCat(uri,
1126                            "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
1127      uri = strings::StrCat(uri, "&delimiter=%2F");
1128    }
1129    if (!object_prefix.empty()) {
1130      uri = strings::StrCat(uri,
1131                            "&prefix=", request->EscapeString(object_prefix));
1132    }
1133    if (!nextPageToken.empty()) {
1134      uri = strings::StrCat(
1135          uri, "&pageToken=", request->EscapeString(nextPageToken));
1136    }
1137    if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
1138      uri =
1139          strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
1140    }
1141    request->SetUri(uri);
1142    request->SetResultBuffer(&output_buffer);
1143    request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1144
1145    TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
1146    Json::Value root;
1147    TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1148    const auto items = root.get("items", Json::Value::null);
1149    if (!items.isNull()) {
1150      if (!items.isArray()) {
1151        return errors::Internal(
1152            "Expected an array 'items' in the GCS response.");
1153      }
1154      for (size_t i = 0; i < items.size(); i++) {
1155        const auto item = items.get(i, Json::Value::null);
1156        if (!item.isObject()) {
1157          return errors::Internal(
1158              "Unexpected JSON format: 'items' should be a list of objects.");
1159        }
1160        string name;
1161        TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
1162        // The names should be relative to the 'dirname'. That means the
1163        // 'object_prefix', which is part of 'dirname', should be removed from
1164        // the beginning of 'name'.
1165        StringPiece relative_path(name);
1166        if (!relative_path.Consume(object_prefix)) {
1167          return errors::Internal(strings::StrCat(
1168              "Unexpected response: the returned file name ", name,
1169              " doesn't match the prefix ", object_prefix));
1170        }
1171        if (!relative_path.empty() || include_self_directory_marker) {
1172          result->emplace_back(relative_path.ToString());
1173        }
1174        if (++retrieved_results >= max_results) {
1175          return Status::OK();
1176        }
1177      }
1178    }
1179    const auto prefixes = root.get("prefixes", Json::Value::null);
1180    if (!prefixes.isNull()) {
1181      // Subfolders are returned for the non-recursive mode.
1182      if (!prefixes.isArray()) {
1183        return errors::Internal(
1184            "'prefixes' was expected to be an array in the GCS response.");
1185      }
1186      for (size_t i = 0; i < prefixes.size(); i++) {
1187        const auto prefix = prefixes.get(i, Json::Value::null);
1188        if (prefix.isNull() || !prefix.isString()) {
1189          return errors::Internal(
1190              "'prefixes' was expected to be an array of strings in the GCS "
1191              "response.");
1192        }
1193        const string& prefix_str = prefix.asString();
1194        StringPiece relative_path(prefix_str);
1195        if (!relative_path.Consume(object_prefix)) {
1196          return errors::Internal(
1197              "Unexpected response: the returned folder name ", prefix_str,
1198              " doesn't match the prefix ", object_prefix);
1199        }
1200        result->emplace_back(relative_path.ToString());
1201        if (++retrieved_results >= max_results) {
1202          return Status::OK();
1203        }
1204      }
1205    }
1206    const auto token = root.get("nextPageToken", Json::Value::null);
1207    if (token.isNull()) {
1208      return Status::OK();
1209    }
1210    if (!token.isString()) {
1211      return errors::Internal(
1212          "Unexpected response: nextPageToken is not a string");
1213    }
1214    nextPageToken = token.asString();
1215  }
1216}
1217
1218Status GcsFileSystem::Stat(const string& fname, FileStatistics* stat) {
1219  if (!stat) {
1220    return errors::Internal("'stat' cannot be nullptr.");
1221  }
1222  string bucket, object;
1223  TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1224  if (object.empty()) {
1225    bool is_bucket;
1226    TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1227    if (is_bucket) {
1228      *stat = DIRECTORY_STAT;
1229      return Status::OK();
1230    }
1231    return errors::NotFound("The specified bucket ", fname, " was not found.");
1232  }
1233
1234  const Status status = StatForObject(fname, bucket, object, stat);
1235  if (status.ok()) {
1236    return Status::OK();
1237  }
1238  if (status.code() != errors::Code::NOT_FOUND) {
1239    return status;
1240  }
1241  bool is_folder;
1242  TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
1243  if (is_folder) {
1244    *stat = DIRECTORY_STAT;
1245    return Status::OK();
1246  }
1247  return errors::NotFound("The specified path ", fname, " was not found.");
1248}
1249
1250Status GcsFileSystem::DeleteFile(const string& fname) {
1251  string bucket, object;
1252  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1253
1254  std::unique_ptr<HttpRequest> request;
1255  TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1256  request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1257                                  request->EscapeString(object)));
1258  request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1259  request->SetDeleteRequest();
1260
1261  TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
1262  file_block_cache_->RemoveFile(fname);
1263  return Status::OK();
1264}
1265
1266Status GcsFileSystem::CreateDir(const string& dirname) {
1267  string bucket, object;
1268  TF_RETURN_IF_ERROR(ParseGcsPath(dirname, true, &bucket, &object));
1269  if (object.empty()) {
1270    bool is_bucket;
1271    TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1272    return is_bucket ? Status::OK()
1273                     : errors::NotFound("The specified bucket ", dirname,
1274                                        " was not found.");
1275  }
1276  // Create a zero-length directory marker object.
1277  std::unique_ptr<WritableFile> file;
1278  TF_RETURN_IF_ERROR(NewWritableFile(MaybeAppendSlash(dirname), &file));
1279  TF_RETURN_IF_ERROR(file->Close());
1280  return Status::OK();
1281}
1282
1283// Checks that the directory is empty (i.e no objects with this prefix exist).
1284// Deletes the GCS directory marker if it exists.
1285Status GcsFileSystem::DeleteDir(const string& dirname) {
1286  std::vector<string> children;
1287  // A directory is considered empty either if there are no matching objects
1288  // with the corresponding name prefix or if there is exactly one matching
1289  // object and it is the directory marker. Therefore we need to retrieve
1290  // at most two children for the prefix to detect if a directory is empty.
1291  TF_RETURN_IF_ERROR(
1292      GetChildrenBounded(dirname, 2, &children, true /* recursively */,
1293                         true /* include_self_directory_marker */));
1294
1295  if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
1296    return errors::FailedPrecondition("Cannot delete a non-empty directory.");
1297  }
1298  if (children.size() == 1 && children[0].empty()) {
1299    // This is the directory marker object. Delete it.
1300    return DeleteFile(MaybeAppendSlash(dirname));
1301  }
1302  return Status::OK();
1303}
1304
1305Status GcsFileSystem::GetFileSize(const string& fname, uint64* file_size) {
1306  if (!file_size) {
1307    return errors::Internal("'file_size' cannot be nullptr.");
1308  }
1309
1310  // Only validate the name.
1311  string bucket, object;
1312  TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1313
1314  FileStatistics stat;
1315  TF_RETURN_IF_ERROR(Stat(fname, &stat));
1316  *file_size = stat.length;
1317  return Status::OK();
1318}
1319
1320Status GcsFileSystem::RenameFile(const string& src, const string& target) {
1321  if (!IsDirectory(src).ok()) {
1322    return RenameObject(src, target);
1323  }
1324  // Rename all individual objects in the directory one by one.
1325  std::vector<string> children;
1326  TF_RETURN_IF_ERROR(
1327      GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */,
1328                         true /* include_self_directory_marker */));
1329  for (const string& subpath : children) {
1330    TF_RETURN_IF_ERROR(
1331        RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath)));
1332  }
1333  return Status::OK();
1334}
1335
1336// Uses a GCS API command to copy the object and then deletes the old one.
1337Status GcsFileSystem::RenameObject(const string& src, const string& target) {
1338  string src_bucket, src_object, target_bucket, target_object;
1339  TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
1340  TF_RETURN_IF_ERROR(
1341      ParseGcsPath(target, false, &target_bucket, &target_object));
1342
1343  std::unique_ptr<HttpRequest> request;
1344  TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1345  request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/",
1346                                  request->EscapeString(src_object),
1347                                  "/rewriteTo/b/", target_bucket, "/o/",
1348                                  request->EscapeString(target_object)));
1349  request->SetPostEmptyBody();
1350  request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1351  std::vector<char> output_buffer;
1352  request->SetResultBuffer(&output_buffer);
1353  TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
1354                                  " to ", target);
1355  // Flush the target from the block cache.  The source will be flushed in the
1356  // DeleteFile call below.
1357  file_block_cache_->RemoveFile(target);
1358  Json::Value root;
1359  TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1360  bool done;
1361  TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
1362  if (!done) {
1363    // If GCS didn't complete rewrite in one call, this means that a large file
1364    // is being copied to a bucket with a different storage class or location,
1365    // which requires multiple rewrite calls.
1366    // TODO(surkov): implement multi-step rewrites.
1367    return errors::Unimplemented(
1368        "Couldn't rename ", src, " to ", target,
1369        ": moving large files between buckets with different "
1370        "locations or storage classes is not supported.");
1371  }
1372
1373  // In case the delete API call failed, but the deletion actually happened
1374  // on the server side, we can't just retry the whole RenameFile operation
1375  // because the source object is already gone.
1376  return RetryingUtils::DeleteWithRetries(
1377      std::bind(&GcsFileSystem::DeleteFile, this, src),
1378      initial_retry_delay_usec_);
1379}
1380
1381Status GcsFileSystem::IsDirectory(const string& fname) {
1382  string bucket, object;
1383  TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1384  if (object.empty()) {
1385    bool is_bucket;
1386    TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1387    if (is_bucket) {
1388      return Status::OK();
1389    }
1390    return errors::NotFound("The specified bucket gs://", bucket,
1391                            " was not found.");
1392  }
1393  bool is_folder;
1394  TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
1395  if (is_folder) {
1396    return Status::OK();
1397  }
1398  bool is_object;
1399  TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object));
1400  if (is_object) {
1401    return errors::FailedPrecondition("The specified path ", fname,
1402                                      " is not a directory.");
1403  }
1404  return errors::NotFound("The specified path ", fname, " was not found.");
1405}
1406
1407Status GcsFileSystem::DeleteRecursively(const string& dirname,
1408                                        int64* undeleted_files,
1409                                        int64* undeleted_dirs) {
1410  if (!undeleted_files || !undeleted_dirs) {
1411    return errors::Internal(
1412        "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
1413  }
1414  *undeleted_files = 0;
1415  *undeleted_dirs = 0;
1416  if (!IsDirectory(dirname).ok()) {
1417    *undeleted_dirs = 1;
1418    return Status(
1419        error::NOT_FOUND,
1420        strings::StrCat(dirname, " doesn't exist or not a directory."));
1421  }
1422  std::vector<string> all_objects;
1423  // Get all children in the directory recursively.
1424  TF_RETURN_IF_ERROR(GetChildrenBounded(
1425      dirname, UINT64_MAX, &all_objects, true /* recursively */,
1426      true /* include_self_directory_marker */));
1427  for (const string& object : all_objects) {
1428    const string& full_path = JoinGcsPath(dirname, object);
1429    // Delete all objects including directory markers for subfolders.
1430    // Since DeleteRecursively returns OK if individual file deletions fail,
1431    // and therefore RetryingFileSystem won't pay attention to the failures,
1432    // we need to make sure these failures are properly retried.
1433    const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
1434        std::bind(&GcsFileSystem::DeleteFile, this, full_path),
1435        initial_retry_delay_usec_);
1436    if (!delete_file_status.ok()) {
1437      if (IsDirectory(full_path).ok()) {
1438        // The object is a directory marker.
1439        (*undeleted_dirs)++;
1440      } else {
1441        (*undeleted_files)++;
1442      }
1443    }
1444  }
1445  return Status::OK();
1446}
1447
1448// Flushes all caches for filesystem metadata and file contents. Useful for
1449// reclaiming memory once filesystem operations are done (e.g. model is loaded),
1450// or for resetting the filesystem to a consistent state.
1451void GcsFileSystem::FlushCaches() {
1452  file_block_cache_->Flush();
1453  stat_cache_->Clear();
1454  matching_paths_cache_->Clear();
1455}
1456
1457// Creates an HttpRequest and sets several parameters that are common to all
1458// requests.  All code (in GcsFileSystem) that creates an HttpRequest should
1459// go through this method, rather than directly using http_request_factory_.
1460Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) {
1461  std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()};
1462  if (dns_cache_) {
1463    dns_cache_->AnnotateRequest(new_request.get());
1464  }
1465
1466  string auth_token;
1467  TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token));
1468
1469  new_request->AddAuthBearerHeader(auth_token);
1470
1471  if (additional_header_) {
1472    new_request->AddHeader(additional_header_->first,
1473                           additional_header_->second);
1474  }
1475
1476  if (!throttle_.AdmitRequest()) {
1477    return errors::Unavailable("Request throttled");
1478  }
1479
1480  *request = std::move(new_request);
1481  return Status::OK();
1482}
1483
1484REGISTER_FILE_SYSTEM("gs", RetryingGcsFileSystem);
1485
1486}  // namespace tensorflow
1487