1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15#include "tensorflow/core/platform/s3/s3_file_system.h" 16#include "tensorflow/core/lib/io/path.h" 17#include "tensorflow/core/lib/strings/str_util.h" 18#include "tensorflow/core/platform/mutex.h" 19#include "tensorflow/core/platform/s3/aws_logging.h" 20#include "tensorflow/core/platform/s3/s3_crypto.h" 21 22#include <aws/core/Aws.h> 23#include <aws/core/config/AWSProfileConfigLoader.h> 24#include <aws/core/utils/FileSystemUtils.h> 25#include <aws/core/utils/StringUtils.h> 26#include <aws/core/utils/logging/AWSLogging.h> 27#include <aws/core/utils/logging/LogSystemInterface.h> 28#include <aws/core/utils/StringUtils.h> 29#include <aws/s3/S3Client.h> 30#include <aws/s3/S3Errors.h> 31#include <aws/s3/model/CopyObjectRequest.h> 32#include <aws/s3/model/DeleteObjectRequest.h> 33#include <aws/s3/model/GetObjectRequest.h> 34#include <aws/s3/model/HeadBucketRequest.h> 35#include <aws/s3/model/HeadObjectRequest.h> 36#include <aws/s3/model/ListObjectsRequest.h> 37#include <aws/s3/model/PutObjectRequest.h> 38 39#include <cstdlib> 40 41namespace tensorflow { 42 43namespace { 44static const char* kS3FileSystemAllocationTag = "S3FileSystemAllocation"; 45static const size_t kS3ReadAppendableFileBufferSize = 1024 * 1024; 46static const int kS3GetChildrenMaxKeys = 100; 47 48Aws::Client::ClientConfiguration& GetDefaultClientConfig() { 49 static mutex cfg_lock(LINKER_INITIALIZED); 50 static bool init(false); 51 static Aws::Client::ClientConfiguration cfg; 52 53 std::lock_guard<mutex> lock(cfg_lock); 54 55 if (!init) { 56 const char* endpoint = getenv("S3_ENDPOINT"); 57 if (endpoint) { 58 cfg.endpointOverride = Aws::String(endpoint); 59 } 60 const char* region = getenv("AWS_REGION"); 61 if (!region) { 62 // TODO (yongtang): `S3_REGION` should be deprecated after 2.0. 63 region = getenv("S3_REGION"); 64 } 65 if (region) { 66 cfg.region = Aws::String(region); 67 } else { 68 // Load config file (e.g., ~/.aws/config) only if AWS_SDK_LOAD_CONFIG 69 // is set with a truthy value. 70 const char* load_config_env = getenv("AWS_SDK_LOAD_CONFIG"); 71 string load_config = 72 load_config_env ? str_util::Lowercase(load_config_env) : ""; 73 if (load_config == "true" || load_config == "1") { 74 Aws::String config_file; 75 // If AWS_CONFIG_FILE is set then use it, otherwise use ~/.aws/config. 76 const char* config_file_env = getenv("AWS_CONFIG_FILE"); 77 if (config_file_env) { 78 config_file = config_file_env; 79 } else { 80 const char* home_env = getenv("HOME"); 81 if (home_env) { 82 config_file = home_env; 83 config_file += "/.aws/config"; 84 } 85 } 86 Aws::Config::AWSConfigFileProfileConfigLoader loader(config_file); 87 loader.Load(); 88 auto profiles = loader.GetProfiles(); 89 if (!profiles["default"].GetRegion().empty()) { 90 cfg.region = profiles["default"].GetRegion(); 91 } 92 } 93 } 94 const char* use_https = getenv("S3_USE_HTTPS"); 95 if (use_https) { 96 if (use_https[0] == '0') { 97 cfg.scheme = Aws::Http::Scheme::HTTP; 98 } else { 99 cfg.scheme = Aws::Http::Scheme::HTTPS; 100 } 101 } 102 const char* verify_ssl = getenv("S3_VERIFY_SSL"); 103 if (verify_ssl) { 104 if (verify_ssl[0] == '0') { 105 cfg.verifySSL = false; 106 } else { 107 cfg.verifySSL = true; 108 } 109 } 110 const char* connect_timeout = getenv("S3_CONNECT_TIMEOUT_MSEC"); 111 if (connect_timeout) { 112 int64 timeout; 113 114 if (strings::safe_strto64(connect_timeout, &timeout)) { 115 cfg.connectTimeoutMs = timeout; 116 } 117 } 118 const char* request_timeout = getenv("S3_REQUEST_TIMEOUT_MSEC"); 119 if (request_timeout) { 120 int64 timeout; 121 122 if (strings::safe_strto64(request_timeout, &timeout)) { 123 cfg.requestTimeoutMs = timeout; 124 } 125 } 126 127 init = true; 128 } 129 130 return cfg; 131}; 132 133void ShutdownClient(Aws::S3::S3Client* s3_client) { 134 if (s3_client != nullptr) { 135 delete s3_client; 136 Aws::SDKOptions options; 137 Aws::ShutdownAPI(options); 138 AWSLogSystem::ShutdownAWSLogging(); 139 } 140} 141 142Status ParseS3Path(const string& fname, bool empty_object_ok, string* bucket, 143 string* object) { 144 if (!bucket || !object) { 145 return errors::Internal("bucket and object cannot be null."); 146 } 147 StringPiece scheme, bucketp, objectp; 148 io::ParseURI(fname, &scheme, &bucketp, &objectp); 149 if (scheme != "s3") { 150 return errors::InvalidArgument("S3 path doesn't start with 's3://': ", 151 fname); 152 } 153 *bucket = bucketp.ToString(); 154 if (bucket->empty() || *bucket == ".") { 155 return errors::InvalidArgument("S3 path doesn't contain a bucket name: ", 156 fname); 157 } 158 objectp.Consume("/"); 159 *object = objectp.ToString(); 160 if (!empty_object_ok && object->empty()) { 161 return errors::InvalidArgument("S3 path doesn't contain an object name: ", 162 fname); 163 } 164 return Status::OK(); 165} 166 167class S3RandomAccessFile : public RandomAccessFile { 168 public: 169 S3RandomAccessFile(const string& bucket, const string& object, 170 std::shared_ptr<Aws::S3::S3Client> s3_client) 171 : bucket_(bucket), object_(object), s3_client_(s3_client) {} 172 173 Status Read(uint64 offset, size_t n, StringPiece* result, 174 char* scratch) const override { 175 Aws::S3::Model::GetObjectRequest getObjectRequest; 176 getObjectRequest.WithBucket(bucket_.c_str()).WithKey(object_.c_str()); 177 string bytes = strings::StrCat("bytes=", offset, "-", offset + n - 1); 178 getObjectRequest.SetRange(bytes.c_str()); 179 getObjectRequest.SetResponseStreamFactory([]() { 180 return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); 181 }); 182 auto getObjectOutcome = this->s3_client_->GetObject(getObjectRequest); 183 if (!getObjectOutcome.IsSuccess()) { 184 n = 0; 185 *result = StringPiece(scratch, n); 186 return Status(error::OUT_OF_RANGE, "Read less bytes than requested"); 187 } 188 n = getObjectOutcome.GetResult().GetContentLength(); 189 std::stringstream ss; 190 ss << getObjectOutcome.GetResult().GetBody().rdbuf(); 191 ss.read(scratch, n); 192 193 *result = StringPiece(scratch, n); 194 return Status::OK(); 195 } 196 197 private: 198 string bucket_; 199 string object_; 200 std::shared_ptr<Aws::S3::S3Client> s3_client_; 201}; 202 203class S3WritableFile : public WritableFile { 204 public: 205 S3WritableFile(const string& bucket, const string& object, 206 std::shared_ptr<Aws::S3::S3Client> s3_client) 207 : bucket_(bucket), 208 object_(object), 209 s3_client_(s3_client), 210 sync_needed_(true), 211 outfile_(Aws::MakeShared<Aws::Utils::TempFile>( 212 kS3FileSystemAllocationTag, "/tmp/s3_filesystem_XXXXXX", 213 std::ios_base::binary | std::ios_base::trunc | std::ios_base::in | 214 std::ios_base::out)) {} 215 216 Status Append(const StringPiece& data) override { 217 if (!outfile_) { 218 return errors::FailedPrecondition( 219 "The internal temporary file is not writable."); 220 } 221 sync_needed_ = true; 222 outfile_->write(data.data(), data.size()); 223 if (!outfile_->good()) { 224 return errors::Internal( 225 "Could not append to the internal temporary file."); 226 } 227 return Status::OK(); 228 } 229 230 Status Close() override { 231 if (outfile_) { 232 TF_RETURN_IF_ERROR(Sync()); 233 outfile_.reset(); 234 } 235 return Status::OK(); 236 } 237 238 Status Flush() override { return Sync(); } 239 240 Status Sync() override { 241 if (!outfile_) { 242 return errors::FailedPrecondition( 243 "The internal temporary file is not writable."); 244 } 245 if (!sync_needed_) { 246 return Status::OK(); 247 } 248 Aws::S3::Model::PutObjectRequest putObjectRequest; 249 putObjectRequest.WithBucket(bucket_.c_str()).WithKey(object_.c_str()); 250 long offset = outfile_->tellp(); 251 outfile_->seekg(0); 252 putObjectRequest.SetBody(outfile_); 253 putObjectRequest.SetContentLength(offset); 254 auto putObjectOutcome = this->s3_client_->PutObject(putObjectRequest); 255 outfile_->clear(); 256 outfile_->seekp(offset); 257 if (!putObjectOutcome.IsSuccess()) { 258 string error = strings::StrCat( 259 putObjectOutcome.GetError().GetExceptionName().c_str(), ": ", 260 putObjectOutcome.GetError().GetMessage().c_str()); 261 return errors::Internal(error); 262 } 263 return Status::OK(); 264 } 265 266 private: 267 string bucket_; 268 string object_; 269 std::shared_ptr<Aws::S3::S3Client> s3_client_; 270 bool sync_needed_; 271 std::shared_ptr<Aws::Utils::TempFile> outfile_; 272}; 273 274class S3ReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { 275 public: 276 S3ReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length) 277 : data_(std::move(data)), length_(length) {} 278 const void* data() override { return reinterpret_cast<void*>(data_.get()); } 279 uint64 length() override { return length_; } 280 281 private: 282 std::unique_ptr<char[]> data_; 283 uint64 length_; 284}; 285 286} // namespace 287 288S3FileSystem::S3FileSystem() 289 : s3_client_(nullptr, ShutdownClient), client_lock_() {} 290 291S3FileSystem::~S3FileSystem() {} 292 293// Initializes s3_client_, if needed, and returns it. 294std::shared_ptr<Aws::S3::S3Client> S3FileSystem::GetS3Client() { 295 std::lock_guard<mutex> lock(this->client_lock_); 296 297 if (this->s3_client_.get() == nullptr) { 298 AWSLogSystem::InitializeAWSLogging(); 299 300 Aws::SDKOptions options; 301 options.cryptoOptions.sha256Factory_create_fn = []() { 302 return Aws::MakeShared<S3SHA256Factory>(S3CryptoAllocationTag); 303 }; 304 options.cryptoOptions.sha256HMACFactory_create_fn = []() { 305 return Aws::MakeShared<S3SHA256HmacFactory>(S3CryptoAllocationTag); 306 }; 307 Aws::InitAPI(options); 308 309 // The creation of S3Client disables virtual addressing: 310 // S3Client(clientConfiguration, signPayloads, useVirtualAdressing = true) 311 // The purpose is to address the issue encountered when there is an `.` 312 // in the bucket name. Due to TLS hostname validation or DNS rules, 313 // the bucket may not be resolved. Disabling of virtual addressing 314 // should address the issue. See GitHub issue 16397 for details. 315 this->s3_client_ = std::shared_ptr<Aws::S3::S3Client>(new Aws::S3::S3Client( 316 GetDefaultClientConfig(), 317 Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false)); 318 } 319 320 return this->s3_client_; 321} 322 323Status S3FileSystem::NewRandomAccessFile( 324 const string& fname, std::unique_ptr<RandomAccessFile>* result) { 325 string bucket, object; 326 TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object)); 327 result->reset(new S3RandomAccessFile(bucket, object, this->GetS3Client())); 328 return Status::OK(); 329} 330 331Status S3FileSystem::NewWritableFile(const string& fname, 332 std::unique_ptr<WritableFile>* result) { 333 string bucket, object; 334 TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object)); 335 result->reset(new S3WritableFile(bucket, object, this->GetS3Client())); 336 return Status::OK(); 337} 338 339Status S3FileSystem::NewAppendableFile(const string& fname, 340 std::unique_ptr<WritableFile>* result) { 341 std::unique_ptr<RandomAccessFile> reader; 342 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &reader)); 343 std::unique_ptr<char[]> buffer(new char[kS3ReadAppendableFileBufferSize]); 344 Status status; 345 uint64 offset = 0; 346 StringPiece read_chunk; 347 348 string bucket, object; 349 TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object)); 350 result->reset(new S3WritableFile(bucket, object, this->GetS3Client())); 351 352 while (true) { 353 status = reader->Read(offset, kS3ReadAppendableFileBufferSize, &read_chunk, 354 buffer.get()); 355 if (status.ok()) { 356 (*result)->Append(read_chunk); 357 offset += kS3ReadAppendableFileBufferSize; 358 } else if (status.code() == error::OUT_OF_RANGE) { 359 (*result)->Append(read_chunk); 360 break; 361 } else { 362 (*result).reset(); 363 return status; 364 } 365 } 366 367 return Status::OK(); 368} 369 370Status S3FileSystem::NewReadOnlyMemoryRegionFromFile( 371 const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) { 372 uint64 size; 373 TF_RETURN_IF_ERROR(GetFileSize(fname, &size)); 374 std::unique_ptr<char[]> data(new char[size]); 375 376 std::unique_ptr<RandomAccessFile> file; 377 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, &file)); 378 379 StringPiece piece; 380 TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get())); 381 382 result->reset(new S3ReadOnlyMemoryRegion(std::move(data), size)); 383 return Status::OK(); 384} 385 386Status S3FileSystem::FileExists(const string& fname) { 387 FileStatistics stats; 388 TF_RETURN_IF_ERROR(this->Stat(fname, &stats)); 389 return Status::OK(); 390} 391 392Status S3FileSystem::GetChildren(const string& dir, 393 std::vector<string>* result) { 394 string bucket, prefix; 395 TF_RETURN_IF_ERROR(ParseS3Path(dir, false, &bucket, &prefix)); 396 397 if (prefix.back() != '/') { 398 prefix.push_back('/'); 399 } 400 401 Aws::S3::Model::ListObjectsRequest listObjectsRequest; 402 listObjectsRequest.WithBucket(bucket.c_str()) 403 .WithPrefix(prefix.c_str()) 404 .WithMaxKeys(kS3GetChildrenMaxKeys) 405 .WithDelimiter("/"); 406 listObjectsRequest.SetResponseStreamFactory( 407 []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); }); 408 409 Aws::S3::Model::ListObjectsResult listObjectsResult; 410 do { 411 auto listObjectsOutcome = 412 this->GetS3Client()->ListObjects(listObjectsRequest); 413 if (!listObjectsOutcome.IsSuccess()) { 414 string error = strings::StrCat( 415 listObjectsOutcome.GetError().GetExceptionName().c_str(), ": ", 416 listObjectsOutcome.GetError().GetMessage().c_str()); 417 return errors::Internal(error); 418 } 419 420 listObjectsResult = listObjectsOutcome.GetResult(); 421 for (const auto& object : listObjectsResult.GetCommonPrefixes()) { 422 Aws::String s = object.GetPrefix(); 423 s.erase(s.length() - 1); 424 Aws::String entry = s.substr(strlen(prefix.c_str())); 425 if (entry.length() > 0) { 426 result->push_back(entry.c_str()); 427 } 428 } 429 for (const auto& object : listObjectsResult.GetContents()) { 430 Aws::String s = object.GetKey(); 431 Aws::String entry = s.substr(strlen(prefix.c_str())); 432 if (entry.length() > 0) { 433 result->push_back(entry.c_str()); 434 } 435 } 436 listObjectsRequest.SetMarker(listObjectsResult.GetNextMarker()); 437 } while (listObjectsResult.GetIsTruncated()); 438 439 return Status::OK(); 440} 441 442Status S3FileSystem::Stat(const string& fname, FileStatistics* stats) { 443 string bucket, object; 444 TF_RETURN_IF_ERROR(ParseS3Path(fname, true, &bucket, &object)); 445 446 if (object.empty()) { 447 Aws::S3::Model::HeadBucketRequest headBucketRequest; 448 headBucketRequest.WithBucket(bucket.c_str()); 449 auto headBucketOutcome = this->GetS3Client()->HeadBucket(headBucketRequest); 450 if (!headBucketOutcome.IsSuccess()) { 451 string error = strings::StrCat( 452 headBucketOutcome.GetError().GetExceptionName().c_str(), ": ", 453 headBucketOutcome.GetError().GetMessage().c_str()); 454 return errors::Internal(error); 455 } 456 stats->length = 0; 457 stats->is_directory = 1; 458 return Status::OK(); 459 } 460 461 bool found = false; 462 463 Aws::S3::Model::HeadObjectRequest headObjectRequest; 464 headObjectRequest.WithBucket(bucket.c_str()).WithKey(object.c_str()); 465 headObjectRequest.SetResponseStreamFactory( 466 []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); }); 467 auto headObjectOutcome = this->GetS3Client()->HeadObject(headObjectRequest); 468 if (headObjectOutcome.IsSuccess()) { 469 stats->length = headObjectOutcome.GetResult().GetContentLength(); 470 stats->is_directory = 0; 471 stats->mtime_nsec = 472 headObjectOutcome.GetResult().GetLastModified().Millis() * 1e6; 473 found = true; 474 } 475 string prefix = object; 476 if (prefix.back() != '/') { 477 prefix.push_back('/'); 478 } 479 Aws::S3::Model::ListObjectsRequest listObjectsRequest; 480 listObjectsRequest.WithBucket(bucket.c_str()) 481 .WithPrefix(prefix.c_str()) 482 .WithMaxKeys(1); 483 listObjectsRequest.SetResponseStreamFactory( 484 []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); }); 485 auto listObjectsOutcome = 486 this->GetS3Client()->ListObjects(listObjectsRequest); 487 if (listObjectsOutcome.IsSuccess()) { 488 if (listObjectsOutcome.GetResult().GetContents().size() > 0) { 489 stats->length = 0; 490 stats->is_directory = 1; 491 found = true; 492 } 493 } 494 if (!found) { 495 return errors::NotFound("Object ", fname, " does not exist"); 496 } 497 return Status::OK(); 498} 499 500Status S3FileSystem::DeleteFile(const string& fname) { 501 string bucket, object; 502 TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object)); 503 504 Aws::S3::Model::DeleteObjectRequest deleteObjectRequest; 505 deleteObjectRequest.WithBucket(bucket.c_str()).WithKey(object.c_str()); 506 507 auto deleteObjectOutcome = 508 this->GetS3Client()->DeleteObject(deleteObjectRequest); 509 if (!deleteObjectOutcome.IsSuccess()) { 510 string error = strings::StrCat( 511 deleteObjectOutcome.GetError().GetExceptionName().c_str(), ": ", 512 deleteObjectOutcome.GetError().GetMessage().c_str()); 513 return errors::Internal(error); 514 } 515 return Status::OK(); 516} 517 518Status S3FileSystem::CreateDir(const string& dirname) { 519 string bucket, object; 520 TF_RETURN_IF_ERROR(ParseS3Path(dirname, true, &bucket, &object)); 521 522 if (object.empty()) { 523 Aws::S3::Model::HeadBucketRequest headBucketRequest; 524 headBucketRequest.WithBucket(bucket.c_str()); 525 auto headBucketOutcome = this->GetS3Client()->HeadBucket(headBucketRequest); 526 if (!headBucketOutcome.IsSuccess()) { 527 return errors::NotFound("The bucket ", bucket, " was not found."); 528 } 529 return Status::OK(); 530 } 531 string filename = dirname; 532 if (filename.back() != '/') { 533 filename.push_back('/'); 534 } 535 std::unique_ptr<WritableFile> file; 536 TF_RETURN_IF_ERROR(NewWritableFile(filename, &file)); 537 TF_RETURN_IF_ERROR(file->Close()); 538 return Status::OK(); 539} 540 541Status S3FileSystem::DeleteDir(const string& dirname) { 542 string bucket, object; 543 TF_RETURN_IF_ERROR(ParseS3Path(dirname, false, &bucket, &object)); 544 545 string prefix = object; 546 if (prefix.back() != '/') { 547 prefix.push_back('/'); 548 } 549 Aws::S3::Model::ListObjectsRequest listObjectsRequest; 550 listObjectsRequest.WithBucket(bucket.c_str()) 551 .WithPrefix(prefix.c_str()) 552 .WithMaxKeys(2); 553 listObjectsRequest.SetResponseStreamFactory( 554 []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); }); 555 auto listObjectsOutcome = 556 this->GetS3Client()->ListObjects(listObjectsRequest); 557 if (listObjectsOutcome.IsSuccess()) { 558 auto contents = listObjectsOutcome.GetResult().GetContents(); 559 if (contents.size() > 1 || 560 (contents.size() == 1 && contents[0].GetKey() != prefix.c_str())) { 561 return errors::FailedPrecondition("Cannot delete a non-empty directory."); 562 } 563 if (contents.size() == 1 && contents[0].GetKey() == prefix.c_str()) { 564 string filename = dirname; 565 if (filename.back() != '/') { 566 filename.push_back('/'); 567 } 568 return DeleteFile(filename); 569 } 570 } 571 return Status::OK(); 572} 573 574Status S3FileSystem::GetFileSize(const string& fname, uint64* file_size) { 575 FileStatistics stats; 576 TF_RETURN_IF_ERROR(this->Stat(fname, &stats)); 577 *file_size = stats.length; 578 return Status::OK(); 579} 580 581Status S3FileSystem::RenameFile(const string& src, const string& target) { 582 string src_bucket, src_object, target_bucket, target_object; 583 TF_RETURN_IF_ERROR(ParseS3Path(src, false, &src_bucket, &src_object)); 584 TF_RETURN_IF_ERROR( 585 ParseS3Path(target, false, &target_bucket, &target_object)); 586 if (src_object.back() == '/') { 587 if (target_object.back() != '/') { 588 target_object.push_back('/'); 589 } 590 } else { 591 if (target_object.back() == '/') { 592 target_object.pop_back(); 593 } 594 } 595 596 Aws::S3::Model::CopyObjectRequest copyObjectRequest; 597 Aws::S3::Model::DeleteObjectRequest deleteObjectRequest; 598 599 Aws::S3::Model::ListObjectsRequest listObjectsRequest; 600 listObjectsRequest.WithBucket(src_bucket.c_str()) 601 .WithPrefix(src_object.c_str()) 602 .WithMaxKeys(kS3GetChildrenMaxKeys); 603 listObjectsRequest.SetResponseStreamFactory( 604 []() { return Aws::New<Aws::StringStream>(kS3FileSystemAllocationTag); }); 605 606 Aws::S3::Model::ListObjectsResult listObjectsResult; 607 do { 608 auto listObjectsOutcome = 609 this->GetS3Client()->ListObjects(listObjectsRequest); 610 if (!listObjectsOutcome.IsSuccess()) { 611 string error = strings::StrCat( 612 listObjectsOutcome.GetError().GetExceptionName().c_str(), ": ", 613 listObjectsOutcome.GetError().GetMessage().c_str()); 614 return errors::Internal(error); 615 } 616 617 listObjectsResult = listObjectsOutcome.GetResult(); 618 for (const auto& object : listObjectsResult.GetContents()) { 619 Aws::String src_key = object.GetKey(); 620 Aws::String target_key = src_key; 621 target_key.replace(0, src_object.length(), target_object.c_str()); 622 Aws::String source = Aws::String(src_bucket.c_str()) + "/" + 623 Aws::Utils::StringUtils::URLEncode(src_key.c_str()); 624 625 copyObjectRequest.SetBucket(target_bucket.c_str()); 626 copyObjectRequest.SetKey(target_key); 627 copyObjectRequest.SetCopySource(source); 628 629 auto copyObjectOutcome = 630 this->GetS3Client()->CopyObject(copyObjectRequest); 631 if (!copyObjectOutcome.IsSuccess()) { 632 string error = strings::StrCat( 633 copyObjectOutcome.GetError().GetExceptionName().c_str(), ": ", 634 copyObjectOutcome.GetError().GetMessage().c_str()); 635 return errors::Internal(error); 636 } 637 638 deleteObjectRequest.SetBucket(src_bucket.c_str()); 639 deleteObjectRequest.SetKey(src_key.c_str()); 640 641 auto deleteObjectOutcome = 642 this->GetS3Client()->DeleteObject(deleteObjectRequest); 643 if (!deleteObjectOutcome.IsSuccess()) { 644 string error = strings::StrCat( 645 deleteObjectOutcome.GetError().GetExceptionName().c_str(), ": ", 646 deleteObjectOutcome.GetError().GetMessage().c_str()); 647 return errors::Internal(error); 648 } 649 } 650 listObjectsRequest.SetMarker(listObjectsResult.GetNextMarker()); 651 } while (listObjectsResult.GetIsTruncated()); 652 653 return Status::OK(); 654} 655 656REGISTER_FILE_SYSTEM("s3", S3FileSystem); 657 658} // namespace tensorflow 659