1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15#include "tensorflow/core/platform/s3/s3_file_system.h"
16#include "tensorflow/core/lib/io/path.h"
17#include "tensorflow/core/lib/strings/str_util.h"
18#include "tensorflow/core/platform/mutex.h"
19#include "tensorflow/core/platform/s3/aws_logging.h"
20#include "tensorflow/core/platform/s3/s3_crypto.h"
21
22#include <aws/core/Aws.h>
23#include <aws/core/config/AWSProfileConfigLoader.h>
24#include <aws/core/utils/FileSystemUtils.h>
25#include <aws/core/utils/StringUtils.h>
26#include <aws/core/utils/logging/AWSLogging.h>
27#include <aws/core/utils/logging/LogSystemInterface.h>
28#include <aws/core/utils/StringUtils.h>
29#include <aws/s3/S3Client.h>
30#include <aws/s3/S3Errors.h>
31#include <aws/s3/model/CopyObjectRequest.h>
32#include <aws/s3/model/DeleteObjectRequest.h>
33#include <aws/s3/model/GetObjectRequest.h>
34#include <aws/s3/model/HeadBucketRequest.h>
35#include <aws/s3/model/HeadObjectRequest.h>
36#include <aws/s3/model/ListObjectsRequest.h>
37#include <aws/s3/model/PutObjectRequest.h>
38
39#include <cstdlib>
40
41namespace tensorflow {
42
43namespace {
44static const char* kS3FileSystemAllocationTag = "S3FileSystemAllocation";
45static const size_t kS3ReadAppendableFileBufferSize = 1024 * 1024;
46static const int kS3GetChildrenMaxKeys = 100;
47
48Aws::Client::ClientConfiguration& GetDefaultClientConfig() {
49  static mutex cfg_lock(LINKER_INITIALIZED);
50  static bool init(false);
51  static Aws::Client::ClientConfiguration cfg;
52
53  std::lock_guard<mutex> lock(cfg_lock);
54
55  if (!init) {
56    const char* endpoint = getenv("S3_ENDPOINT");
57    if (endpoint) {
58      cfg.endpointOverride = Aws::String(endpoint);
59    }
60    const char* region = getenv("AWS_REGION");
61    if (!region) {
62      // TODO (yongtang): `S3_REGION` should be deprecated after 2.0.
63      region = getenv("S3_REGION");
64    }
65    if (region) {
66      cfg.region = Aws::String(region);
67    } else {
68      // Load config file (e.g., ~/.aws/config) only if AWS_SDK_LOAD_CONFIG
69      // is set with a truthy value.
70      const char* load_config_env = getenv("AWS_SDK_LOAD_CONFIG");
71      string load_config =
72          load_config_env ? str_util::Lowercase(load_config_env) : "";
73      if (load_config == "true" || load_config == "1") {
74        Aws::String config_file;
75        // If AWS_CONFIG_FILE is set then use it, otherwise use ~/.aws/config.
76        const char* config_file_env = getenv("AWS_CONFIG_FILE");
77        if (config_file_env) {
78          config_file = config_file_env;
79        } else {
80          const char* home_env = getenv("HOME");
81          if (home_env) {
82            config_file = home_env;
83            config_file += "/.aws/config";
84          }
85        }
86        Aws::Config::AWSConfigFileProfileConfigLoader loader(config_file);
87        loader.Load();
88        auto profiles = loader.GetProfiles();
89        if (!profiles["default"].GetRegion().empty()) {
90          cfg.region = profiles["default"].GetRegion();
91        }
92      }
93    }
94    const char* use_https = getenv("S3_USE_HTTPS");
95    if (use_https) {
96      if (use_https[0] == '0') {
97        cfg.scheme = Aws::Http::Scheme::HTTP;
98      } else {
99        cfg.scheme = Aws::Http::Scheme::HTTPS;
100      }
101    }
102    const char* verify_ssl = getenv("S3_VERIFY_SSL");
103    if (verify_ssl) {
104      if (verify_ssl[0] == '0') {
105        cfg.verifySSL = false;
106      } else {
107        cfg.verifySSL = true;
108      }
109    }
110    const char* connect_timeout = getenv("S3_CONNECT_TIMEOUT_MSEC");
111    if (connect_timeout) {
112      int64 timeout;
113
114      if (strings::safe_strto64(connect_timeout, &timeout)) {
115        cfg.connectTimeoutMs = timeout;
116      }
117    }
118    const char* request_timeout = getenv("S3_REQUEST_TIMEOUT_MSEC");
119    if (request_timeout) {
120      int64 timeout;
121
122      if (strings::safe_strto64(request_timeout, &timeout)) {
123        cfg.requestTimeoutMs = timeout;
124      }
125    }
126
127    init = true;
128  }
129
130  return cfg;
131};
132
133void ShutdownClient(Aws::S3::S3Client* s3_client) {
134  if (s3_client != nullptr) {
135    delete s3_client;
136    Aws::SDKOptions options;
137    Aws::ShutdownAPI(options);
138    AWSLogSystem::ShutdownAWSLogging();
139  }
140}
141
142Status ParseS3Path(const string& fname, bool empty_object_ok, string* bucket,
143                   string* object) {
144  if (!bucket || !object) {
145    return errors::Internal("bucket and object cannot be null.");
146  }
147  StringPiece scheme, bucketp, objectp;
148  io::ParseURI(fname, &scheme, &bucketp, &objectp);
149  if (scheme != "s3") {
150    return errors::InvalidArgument("S3 path doesn't start with 's3://': ",
151                                   fname);
152  }
153  *bucket = bucketp.ToString();
154  if (bucket->empty() || *bucket == ".") {
155    return errors::InvalidArgument("S3 path doesn't contain a bucket name: ",
156                                   fname);
157  }
158  objectp.Consume("/");
159  *object = objectp.ToString();
160  if (!empty_object_ok && object->empty()) {
161    return errors::InvalidArgument("S3 path doesn't contain an object name: ",
162                                   fname);
163  }
164  return Status::OK();
165}
166
167class S3RandomAccessFile : public RandomAccessFile {
168 public:
169  S3RandomAccessFile(const string& bucket, const string& object,
170                     std::shared_ptr<Aws::S3::S3Client> s3_client)
171      : bucket_(bucket), object_(object), s3_client_(s3_client) {}
172
173  Status Read(uint64 offset, size_t n, StringPiece* result,
174              char* scratch) const override {
175    Aws::S3::Model::GetObjectRequest getObjectRequest;
176    getObjectRequest.WithBucket(bucket_.c_str()).WithKey(object_.c_str());
177    string bytes = strings::StrCat("bytes=", offset, "-", offset + n - 1);
178    getObjectRequest.SetRange(bytes.c_str());
179    getObjectRequest.SetResponseStreamFactory([]() {
180      return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag);
181    });
182    auto getObjectOutcome = this->s3_client_->GetObject(getObjectRequest);
183    if (!getObjectOutcome.IsSuccess()) {
184      n = 0;
185      *result = StringPiece(scratch, n);
186      return Status(error::OUT_OF_RANGE, "Read less bytes than requested");
187    }
188    n = getObjectOutcome.GetResult().GetContentLength();
189    std::stringstream ss;
190    ss << getObjectOutcome.GetResult().GetBody().rdbuf();
191    ss.read(scratch, n);
192
193    *result = StringPiece(scratch, n);
194    return Status::OK();
195  }
196
197 private:
198  string bucket_;
199  string object_;
200  std::shared_ptr<Aws::S3::S3Client> s3_client_;
201};
202
203class S3WritableFile : public WritableFile {
204 public:
205  S3WritableFile(const string& bucket, const string& object,
206                 std::shared_ptr<Aws::S3::S3Client> s3_client)
207      : bucket_(bucket),
208        object_(object),
209        s3_client_(s3_client),
210        sync_needed_(true),
211        outfile_(Aws::MakeShared<Aws::Utils::TempFile>(
212            kS3FileSystemAllocationTag, "/tmp/s3_filesystem_XXXXXX",
213            std::ios_base::binary | std::ios_base::trunc | std::ios_base::in |
214                std::ios_base::out)) {}
215
216  Status Append(const StringPiece& data) override {
217    if (!outfile_) {
218      return errors::FailedPrecondition(
219          "The internal temporary file is not writable.");
220    }
221    sync_needed_ = true;
222    outfile_->write(data.data(), data.size());
223    if (!outfile_->good()) {
224      return errors::Internal(
225          "Could not append to the internal temporary file.");
226    }
227    return Status::OK();
228  }
229
230  Status Close() override {
231    if (outfile_) {
232      TF_RETURN_IF_ERROR(Sync());
233      outfile_.reset();
234    }
235    return Status::OK();
236  }
237
238  Status Flush() override { return Sync(); }
239
240  Status Sync() override {
241    if (!outfile_) {
242      return errors::FailedPrecondition(
243          "The internal temporary file is not writable.");
244    }
245    if (!sync_needed_) {
246      return Status::OK();
247    }
248    Aws::S3::Model::PutObjectRequest putObjectRequest;
249    putObjectRequest.WithBucket(bucket_.c_str()).WithKey(object_.c_str());
250    long offset = outfile_->tellp();
251    outfile_->seekg(0);
252    putObjectRequest.SetBody(outfile_);
253    putObjectRequest.SetContentLength(offset);
254    auto putObjectOutcome = this->s3_client_->PutObject(putObjectRequest);
255    outfile_->clear();
256    outfile_->seekp(offset);
257    if (!putObjectOutcome.IsSuccess()) {
258      string error = strings::StrCat(
259          putObjectOutcome.GetError().GetExceptionName().c_str(), ": ",
260          putObjectOutcome.GetError().GetMessage().c_str());
261      return errors::Internal(error);
262    }
263    return Status::OK();
264  }
265
266 private:
267  string bucket_;
268  string object_;
269  std::shared_ptr<Aws::S3::S3Client> s3_client_;
270  bool sync_needed_;
271  std::shared_ptr<Aws::Utils::TempFile> outfile_;
272};
273
274class S3ReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
275 public:
276  S3ReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
277      : data_(std::move(data)), length_(length) {}
278  const void* data() override { return reinterpret_cast<void*>(data_.get()); }
279  uint64 length() override { return length_; }
280
281 private:
282  std::unique_ptr<char[]> data_;
283  uint64 length_;
284};
285
286}  // namespace
287
288S3FileSystem::S3FileSystem()
289    : s3_client_(nullptr, ShutdownClient), client_lock_() {}
290
291S3FileSystem::~S3FileSystem() {}
292
293// Initializes s3_client_, if needed, and returns it.
294std::shared_ptr<Aws::S3::S3Client> S3FileSystem::GetS3Client() {
295  std::lock_guard<mutex> lock(this->client_lock_);
296
297  if (this->s3_client_.get() == nullptr) {
298    AWSLogSystem::InitializeAWSLogging();
299
300    Aws::SDKOptions options;
301    options.cryptoOptions.sha256Factory_create_fn = []() {
302      return Aws::MakeShared<S3SHA256Factory>(S3CryptoAllocationTag);
303    };
304    options.cryptoOptions.sha256HMACFactory_create_fn = []() {
305      return Aws::MakeShared<S3SHA256HmacFactory>(S3CryptoAllocationTag);
306    };
307    Aws::InitAPI(options);
308
309    // The creation of S3Client disables virtual addressing:
310    //   S3Client(clientConfiguration, signPayloads, useVirtualAdressing = true)
311    // The purpose is to address the issue encountered when there is an `.`
312    // in the bucket name. Due to TLS hostname validation or DNS rules,
313    // the bucket may not be resolved. Disabling of virtual addressing
314    // should address the issue. See GitHub issue 16397 for details.
315    this->s3_client_ = std::shared_ptr<Aws::S3::S3Client>(new Aws::S3::S3Client(
316        GetDefaultClientConfig(),
317        Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false));
318  }
319
320  return this->s3_client_;
321}
322
323Status S3FileSystem::NewRandomAccessFile(
324    const string& fname, std::unique_ptr<RandomAccessFile>* result) {
325  string bucket, object;
326  TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
327  result->reset(new S3RandomAccessFile(bucket, object, this->GetS3Client()));
328  return Status::OK();
329}
330
331Status S3FileSystem::NewWritableFile(const string& fname,
332                                     std::unique_ptr<WritableFile>* result) {
333  string bucket, object;
334  TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
335  result->reset(new S3WritableFile(bucket, object, this->GetS3Client()));
336  return Status::OK();
337}
338
339Status S3FileSystem::NewAppendableFile(const string& fname,
340                                       std::unique_ptr<WritableFile>* result) {
341  std::unique_ptr<RandomAccessFile> reader;
342  TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader));
343  std::unique_ptr<char[]> buffer(new char[kS3ReadAppendableFileBufferSize]);
344  Status status;
345  uint64 offset = 0;
346  StringPiece read_chunk;
347
348  string bucket, object;
349  TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
350  result->reset(new S3WritableFile(bucket, object, this->GetS3Client()));
351
352  while (true) {
353    status = reader->Read(offset, kS3ReadAppendableFileBufferSize, &read_chunk,
354                          buffer.get());
355    if (status.ok()) {
356      (*result)->Append(read_chunk);
357      offset += kS3ReadAppendableFileBufferSize;
358    } else if (status.code() == error::OUT_OF_RANGE) {
359      (*result)->Append(read_chunk);
360      break;
361    } else {
362      (*result).reset();
363      return status;
364    }
365  }
366
367  return Status::OK();
368}
369
370Status S3FileSystem::NewReadOnlyMemoryRegionFromFile(
371    const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
372  uint64 size;
373  TF_RETURN_IF_ERROR(GetFileSize(fname, &size));
374  std::unique_ptr<char[]> data(new char[size]);
375
376  std::unique_ptr<RandomAccessFile> file;
377  TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file));
378
379  StringPiece piece;
380  TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
381
382  result->reset(new S3ReadOnlyMemoryRegion(std::move(data), size));
383  return Status::OK();
384}
385
386Status S3FileSystem::FileExists(const string& fname) {
387  FileStatistics stats;
388  TF_RETURN_IF_ERROR(this->Stat(fname, &stats));
389  return Status::OK();
390}
391
392Status S3FileSystem::GetChildren(const string& dir,
393                                 std::vector<string>* result) {
394  string bucket, prefix;
395  TF_RETURN_IF_ERROR(ParseS3Path(dir, false, &bucket, &prefix));
396
397  if (prefix.back() != '/') {
398    prefix.push_back('/');
399  }
400
401  Aws::S3::Model::ListObjectsRequest listObjectsRequest;
402  listObjectsRequest.WithBucket(bucket.c_str())
403      .WithPrefix(prefix.c_str())
404      .WithMaxKeys(kS3GetChildrenMaxKeys)
405      .WithDelimiter("/");
406  listObjectsRequest.SetResponseStreamFactory(
407      []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); });
408
409  Aws::S3::Model::ListObjectsResult listObjectsResult;
410  do {
411    auto listObjectsOutcome =
412        this->GetS3Client()->ListObjects(listObjectsRequest);
413    if (!listObjectsOutcome.IsSuccess()) {
414      string error = strings::StrCat(
415          listObjectsOutcome.GetError().GetExceptionName().c_str(), ": ",
416          listObjectsOutcome.GetError().GetMessage().c_str());
417      return errors::Internal(error);
418    }
419
420    listObjectsResult = listObjectsOutcome.GetResult();
421    for (const auto& object : listObjectsResult.GetCommonPrefixes()) {
422      Aws::String s = object.GetPrefix();
423      s.erase(s.length() - 1);
424      Aws::String entry = s.substr(strlen(prefix.c_str()));
425      if (entry.length() > 0) {
426        result->push_back(entry.c_str());
427      }
428    }
429    for (const auto& object : listObjectsResult.GetContents()) {
430      Aws::String s = object.GetKey();
431      Aws::String entry = s.substr(strlen(prefix.c_str()));
432      if (entry.length() > 0) {
433        result->push_back(entry.c_str());
434      }
435    }
436    listObjectsRequest.SetMarker(listObjectsResult.GetNextMarker());
437  } while (listObjectsResult.GetIsTruncated());
438
439  return Status::OK();
440}
441
442Status S3FileSystem::Stat(const string& fname, FileStatistics* stats) {
443  string bucket, object;
444  TF_RETURN_IF_ERROR(ParseS3Path(fname, true, &bucket, &object));
445
446  if (object.empty()) {
447    Aws::S3::Model::HeadBucketRequest headBucketRequest;
448    headBucketRequest.WithBucket(bucket.c_str());
449    auto headBucketOutcome = this->GetS3Client()->HeadBucket(headBucketRequest);
450    if (!headBucketOutcome.IsSuccess()) {
451      string error = strings::StrCat(
452          headBucketOutcome.GetError().GetExceptionName().c_str(), ": ",
453          headBucketOutcome.GetError().GetMessage().c_str());
454      return errors::Internal(error);
455    }
456    stats->length = 0;
457    stats->is_directory = 1;
458    return Status::OK();
459  }
460
461  bool found = false;
462
463  Aws::S3::Model::HeadObjectRequest headObjectRequest;
464  headObjectRequest.WithBucket(bucket.c_str()).WithKey(object.c_str());
465  headObjectRequest.SetResponseStreamFactory(
466      []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); });
467  auto headObjectOutcome = this->GetS3Client()->HeadObject(headObjectRequest);
468  if (headObjectOutcome.IsSuccess()) {
469    stats->length = headObjectOutcome.GetResult().GetContentLength();
470    stats->is_directory = 0;
471    stats->mtime_nsec =
472        headObjectOutcome.GetResult().GetLastModified().Millis() * 1e6;
473    found = true;
474  }
475  string prefix = object;
476  if (prefix.back() != '/') {
477    prefix.push_back('/');
478  }
479  Aws::S3::Model::ListObjectsRequest listObjectsRequest;
480  listObjectsRequest.WithBucket(bucket.c_str())
481      .WithPrefix(prefix.c_str())
482      .WithMaxKeys(1);
483  listObjectsRequest.SetResponseStreamFactory(
484      []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); });
485  auto listObjectsOutcome =
486      this->GetS3Client()->ListObjects(listObjectsRequest);
487  if (listObjectsOutcome.IsSuccess()) {
488    if (listObjectsOutcome.GetResult().GetContents().size() > 0) {
489      stats->length = 0;
490      stats->is_directory = 1;
491      found = true;
492    }
493  }
494  if (!found) {
495    return errors::NotFound("Object ", fname, " does not exist");
496  }
497  return Status::OK();
498}
499
500Status S3FileSystem::DeleteFile(const string& fname) {
501  string bucket, object;
502  TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object));
503
504  Aws::S3::Model::DeleteObjectRequest deleteObjectRequest;
505  deleteObjectRequest.WithBucket(bucket.c_str()).WithKey(object.c_str());
506
507  auto deleteObjectOutcome =
508      this->GetS3Client()->DeleteObject(deleteObjectRequest);
509  if (!deleteObjectOutcome.IsSuccess()) {
510    string error = strings::StrCat(
511        deleteObjectOutcome.GetError().GetExceptionName().c_str(), ": ",
512        deleteObjectOutcome.GetError().GetMessage().c_str());
513    return errors::Internal(error);
514  }
515  return Status::OK();
516}
517
518Status S3FileSystem::CreateDir(const string& dirname) {
519  string bucket, object;
520  TF_RETURN_IF_ERROR(ParseS3Path(dirname, true, &bucket, &object));
521
522  if (object.empty()) {
523    Aws::S3::Model::HeadBucketRequest headBucketRequest;
524    headBucketRequest.WithBucket(bucket.c_str());
525    auto headBucketOutcome = this->GetS3Client()->HeadBucket(headBucketRequest);
526    if (!headBucketOutcome.IsSuccess()) {
527      return errors::NotFound("The bucket ", bucket, " was not found.");
528    }
529    return Status::OK();
530  }
531  string filename = dirname;
532  if (filename.back() != '/') {
533    filename.push_back('/');
534  }
535  std::unique_ptr<WritableFile> file;
536  TF_RETURN_IF_ERROR(NewWritableFile(filename, &file));
537  TF_RETURN_IF_ERROR(file->Close());
538  return Status::OK();
539}
540
541Status S3FileSystem::DeleteDir(const string& dirname) {
542  string bucket, object;
543  TF_RETURN_IF_ERROR(ParseS3Path(dirname, false, &bucket, &object));
544
545  string prefix = object;
546  if (prefix.back() != '/') {
547    prefix.push_back('/');
548  }
549  Aws::S3::Model::ListObjectsRequest listObjectsRequest;
550  listObjectsRequest.WithBucket(bucket.c_str())
551      .WithPrefix(prefix.c_str())
552      .WithMaxKeys(2);
553  listObjectsRequest.SetResponseStreamFactory(
554      []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); });
555  auto listObjectsOutcome =
556      this->GetS3Client()->ListObjects(listObjectsRequest);
557  if (listObjectsOutcome.IsSuccess()) {
558    auto contents = listObjectsOutcome.GetResult().GetContents();
559    if (contents.size() > 1 ||
560        (contents.size() == 1 && contents[0].GetKey() != prefix.c_str())) {
561      return errors::FailedPrecondition("Cannot delete a non-empty directory.");
562    }
563    if (contents.size() == 1 && contents[0].GetKey() == prefix.c_str()) {
564      string filename = dirname;
565      if (filename.back() != '/') {
566        filename.push_back('/');
567      }
568      return DeleteFile(filename);
569    }
570  }
571  return Status::OK();
572}
573
574Status S3FileSystem::GetFileSize(const string& fname, uint64* file_size) {
575  FileStatistics stats;
576  TF_RETURN_IF_ERROR(this->Stat(fname, &stats));
577  *file_size = stats.length;
578  return Status::OK();
579}
580
581Status S3FileSystem::RenameFile(const string& src, const string& target) {
582  string src_bucket, src_object, target_bucket, target_object;
583  TF_RETURN_IF_ERROR(ParseS3Path(src, false, &src_bucket, &src_object));
584  TF_RETURN_IF_ERROR(
585      ParseS3Path(target, false, &target_bucket, &target_object));
586  if (src_object.back() == '/') {
587    if (target_object.back() != '/') {
588      target_object.push_back('/');
589    }
590  } else {
591    if (target_object.back() == '/') {
592      target_object.pop_back();
593    }
594  }
595
596  Aws::S3::Model::CopyObjectRequest copyObjectRequest;
597  Aws::S3::Model::DeleteObjectRequest deleteObjectRequest;
598
599  Aws::S3::Model::ListObjectsRequest listObjectsRequest;
600  listObjectsRequest.WithBucket(src_bucket.c_str())
601      .WithPrefix(src_object.c_str())
602      .WithMaxKeys(kS3GetChildrenMaxKeys);
603  listObjectsRequest.SetResponseStreamFactory(
604      []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); });
605
606  Aws::S3::Model::ListObjectsResult listObjectsResult;
607  do {
608    auto listObjectsOutcome =
609        this->GetS3Client()->ListObjects(listObjectsRequest);
610    if (!listObjectsOutcome.IsSuccess()) {
611      string error = strings::StrCat(
612          listObjectsOutcome.GetError().GetExceptionName().c_str(), ": ",
613          listObjectsOutcome.GetError().GetMessage().c_str());
614      return errors::Internal(error);
615    }
616
617    listObjectsResult = listObjectsOutcome.GetResult();
618    for (const auto& object : listObjectsResult.GetContents()) {
619      Aws::String src_key = object.GetKey();
620      Aws::String target_key = src_key;
621      target_key.replace(0, src_object.length(), target_object.c_str());
622      Aws::String source = Aws::String(src_bucket.c_str()) + "/" +
623                           Aws::Utils::StringUtils::URLEncode(src_key.c_str());
624
625      copyObjectRequest.SetBucket(target_bucket.c_str());
626      copyObjectRequest.SetKey(target_key);
627      copyObjectRequest.SetCopySource(source);
628
629      auto copyObjectOutcome =
630          this->GetS3Client()->CopyObject(copyObjectRequest);
631      if (!copyObjectOutcome.IsSuccess()) {
632        string error = strings::StrCat(
633            copyObjectOutcome.GetError().GetExceptionName().c_str(), ": ",
634            copyObjectOutcome.GetError().GetMessage().c_str());
635        return errors::Internal(error);
636      }
637
638      deleteObjectRequest.SetBucket(src_bucket.c_str());
639      deleteObjectRequest.SetKey(src_key.c_str());
640
641      auto deleteObjectOutcome =
642          this->GetS3Client()->DeleteObject(deleteObjectRequest);
643      if (!deleteObjectOutcome.IsSuccess()) {
644        string error = strings::StrCat(
645            deleteObjectOutcome.GetError().GetExceptionName().c_str(), ": ",
646            deleteObjectOutcome.GetError().GetMessage().c_str());
647        return errors::Internal(error);
648      }
649    }
650    listObjectsRequest.SetMarker(listObjectsResult.GetNextMarker());
651  } while (listObjectsResult.GetIsTruncated());
652
653  return Status::OK();
654}
655
656REGISTER_FILE_SYSTEM("s3", S3FileSystem);
657
658}  // namespace tensorflow
659