env.cc revision b1f5f433959406c7aad634c05e85ccd62fd06e87
1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include <sys/stat.h> 17#include <deque> 18#include <utility> 19#include <vector> 20#if defined(__APPLE__) 21#include <mach-o/dyld.h> 22#endif 23#if defined(__FreeBSD__) 24#include <sys/sysctl.h> 25#include <sys/types.h> 26#endif 27#if defined(PLATFORM_WINDOWS) 28#include <windows.h> 29#include "tensorflow/core/platform/windows/windows_file_system.h" 30#define PATH_MAX MAX_PATH 31#else 32#include <unistd.h> 33#endif 34 35#include "tensorflow/core/lib/core/errors.h" 36#include "tensorflow/core/lib/gtl/map_util.h" 37#include "tensorflow/core/lib/gtl/stl_util.h" 38#include "tensorflow/core/lib/io/path.h" 39#include "tensorflow/core/lib/strings/stringprintf.h" 40#include "tensorflow/core/platform/env.h" 41#include "tensorflow/core/platform/env_time.h" 42#include "tensorflow/core/platform/host_info.h" 43#include "tensorflow/core/platform/protobuf.h" 44 45namespace tensorflow { 46 47// 128KB copy buffer 48constexpr size_t kCopyFileBufferSize = 128 * 1024; 49 50class FileSystemRegistryImpl : public FileSystemRegistry { 51 public: 52 Status Register(const string& scheme, Factory factory) override; 53 FileSystem* Lookup(const string& scheme) override; 54 Status GetRegisteredFileSystemSchemes(std::vector<string>* schemes) override; 55 56 private: 57 mutable mutex mu_; 58 mutable std::unordered_map<string, std::unique_ptr<FileSystem>> registry_ 59 GUARDED_BY(mu_); 60}; 61 62Status FileSystemRegistryImpl::Register(const string& scheme, 63 FileSystemRegistry::Factory factory) { 64 mutex_lock lock(mu_); 65 if (!registry_.emplace(string(scheme), std::unique_ptr<FileSystem>(factory())) 66 .second) { 67 return errors::AlreadyExists("File factory for ", scheme, 68 " already registered"); 69 } 70 return Status::OK(); 71} 72 73FileSystem* FileSystemRegistryImpl::Lookup(const string& scheme) { 74 mutex_lock lock(mu_); 75 const auto found = registry_.find(scheme); 76 if (found == registry_.end()) { 77 return nullptr; 78 } 79 return found->second.get(); 80} 81 82Status FileSystemRegistryImpl::GetRegisteredFileSystemSchemes( 83 std::vector<string>* schemes) { 84 mutex_lock lock(mu_); 85 for (const auto& e : registry_) { 86 schemes->push_back(e.first); 87 } 88 return Status::OK(); 89} 90 91Env::Env() : file_system_registry_(new FileSystemRegistryImpl) {} 92 93Status Env::GetFileSystemForFile(const string& fname, FileSystem** result) { 94 StringPiece scheme, host, path; 95 io::ParseURI(fname, &scheme, &host, &path); 96 FileSystem* file_system = file_system_registry_->Lookup(scheme.ToString()); 97 if (!file_system) { 98 if (scheme.empty()) { 99 scheme = "[local]"; 100 } 101 102 return errors::Unimplemented("File system scheme '", scheme, 103 "' not implemented (file: '", fname, "')"); 104 } 105 *result = file_system; 106 return Status::OK(); 107} 108 109Status Env::GetRegisteredFileSystemSchemes(std::vector<string>* schemes) { 110 return file_system_registry_->GetRegisteredFileSystemSchemes(schemes); 111} 112 113Status Env::RegisterFileSystem(const string& scheme, 114 FileSystemRegistry::Factory factory) { 115 return file_system_registry_->Register(scheme, std::move(factory)); 116} 117 118Status Env::FlushFileSystemCaches() { 119 std::vector<string> schemes; 120 TF_RETURN_IF_ERROR(GetRegisteredFileSystemSchemes(&schemes)); 121 for (const string& scheme : schemes) { 122 FileSystem* fs = nullptr; 123 TF_RETURN_IF_ERROR( 124 GetFileSystemForFile(io::CreateURI(scheme, "", ""), &fs)); 125 fs->FlushCaches(); 126 } 127 return Status::OK(); 128} 129 130Status Env::NewRandomAccessFile(const string& fname, 131 std::unique_ptr<RandomAccessFile>* result) { 132 FileSystem* fs; 133 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 134 return fs->NewRandomAccessFile(fname, result); 135} 136 137Status Env::NewReadOnlyMemoryRegionFromFile( 138 const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) { 139 FileSystem* fs; 140 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 141 return fs->NewReadOnlyMemoryRegionFromFile(fname, result); 142} 143 144Status Env::NewWritableFile(const string& fname, 145 std::unique_ptr<WritableFile>* result) { 146 FileSystem* fs; 147 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 148 return fs->NewWritableFile(fname, result); 149} 150 151Status Env::NewAppendableFile(const string& fname, 152 std::unique_ptr<WritableFile>* result) { 153 FileSystem* fs; 154 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 155 return fs->NewAppendableFile(fname, result); 156} 157 158Status Env::FileExists(const string& fname) { 159 FileSystem* fs; 160 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 161 return fs->FileExists(fname); 162} 163 164bool Env::FilesExist(const std::vector<string>& files, 165 std::vector<Status>* status) { 166 std::unordered_map<string, std::vector<string>> files_per_fs; 167 for (const auto& file : files) { 168 StringPiece scheme, host, path; 169 io::ParseURI(file, &scheme, &host, &path); 170 files_per_fs[scheme.ToString()].push_back(file); 171 } 172 173 std::unordered_map<string, Status> per_file_status; 174 bool result = true; 175 for (auto itr : files_per_fs) { 176 FileSystem* file_system = file_system_registry_->Lookup(itr.first); 177 bool fs_result; 178 std::vector<Status> local_status; 179 std::vector<Status>* fs_status = status ? &local_status : nullptr; 180 if (!file_system) { 181 fs_result = false; 182 if (fs_status) { 183 Status s = errors::Unimplemented("File system scheme '", itr.first, 184 "' not implemented"); 185 local_status.resize(itr.second.size(), s); 186 } 187 } else { 188 fs_result = file_system->FilesExist(itr.second, fs_status); 189 } 190 if (fs_status) { 191 result &= fs_result; 192 for (int i = 0; i < itr.second.size(); ++i) { 193 per_file_status[itr.second[i]] = fs_status->at(i); 194 } 195 } else if (!fs_result) { 196 // Return early 197 return false; 198 } 199 } 200 201 if (status) { 202 for (const auto& file : files) { 203 status->push_back(per_file_status[file]); 204 } 205 } 206 207 return result; 208} 209 210Status Env::GetChildren(const string& dir, std::vector<string>* result) { 211 FileSystem* fs; 212 TF_RETURN_IF_ERROR(GetFileSystemForFile(dir, &fs)); 213 return fs->GetChildren(dir, result); 214} 215 216Status Env::GetMatchingPaths(const string& pattern, 217 std::vector<string>* results) { 218 FileSystem* fs; 219 TF_RETURN_IF_ERROR(GetFileSystemForFile(pattern, &fs)); 220 return fs->GetMatchingPaths(pattern, results); 221} 222 223Status Env::DeleteFile(const string& fname) { 224 FileSystem* fs; 225 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 226 return fs->DeleteFile(fname); 227} 228 229Status Env::RecursivelyCreateDir(const string& dirname) { 230 FileSystem* fs; 231 TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs)); 232 return fs->RecursivelyCreateDir(dirname); 233} 234 235Status Env::CreateDir(const string& dirname) { 236 FileSystem* fs; 237 TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs)); 238 return fs->CreateDir(dirname); 239} 240 241Status Env::DeleteDir(const string& dirname) { 242 FileSystem* fs; 243 TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs)); 244 return fs->DeleteDir(dirname); 245} 246 247Status Env::Stat(const string& fname, FileStatistics* stat) { 248 FileSystem* fs; 249 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 250 return fs->Stat(fname, stat); 251} 252 253Status Env::IsDirectory(const string& fname) { 254 FileSystem* fs; 255 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 256 return fs->IsDirectory(fname); 257} 258 259Status Env::DeleteRecursively(const string& dirname, int64* undeleted_files, 260 int64* undeleted_dirs) { 261 FileSystem* fs; 262 TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs)); 263 return fs->DeleteRecursively(dirname, undeleted_files, undeleted_dirs); 264} 265 266Status Env::GetFileSize(const string& fname, uint64* file_size) { 267 FileSystem* fs; 268 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 269 return fs->GetFileSize(fname, file_size); 270} 271 272Status Env::RenameFile(const string& src, const string& target) { 273 FileSystem* src_fs; 274 FileSystem* target_fs; 275 TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs)); 276 TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs)); 277 if (src_fs != target_fs) { 278 return errors::Unimplemented("Renaming ", src, " to ", target, 279 " not implemented"); 280 } 281 return src_fs->RenameFile(src, target); 282} 283 284Status Env::CopyFile(const string& src, const string& target) { 285 FileSystem* src_fs; 286 FileSystem* target_fs; 287 TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs)); 288 TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs)); 289 if (src_fs == target_fs) { 290 return src_fs->CopyFile(src, target); 291 } 292 return FileSystemCopyFile(src_fs, src, target_fs, target); 293} 294 295string Env::GetExecutablePath() { 296 char exe_path[PATH_MAX] = {0}; 297#ifdef __APPLE__ 298 uint32_t buffer_size(0U); 299 _NSGetExecutablePath(nullptr, &buffer_size); 300 char unresolved_path[buffer_size]; 301 _NSGetExecutablePath(unresolved_path, &buffer_size); 302 CHECK(realpath(unresolved_path, exe_path)); 303#elif defined(__FreeBSD__) 304 int mib[4] = {CTL_KERN, KERN_PROC, KERN_PROC_PATHNAME, -1}; 305 size_t exe_path_size = PATH_MAX; 306 307 if (sysctl(mib, 4, exe_path, &exe_path_size, NULL, 0) != 0) { 308 // Resolution of path failed 309 return ""; 310 } 311#elif defined(PLATFORM_WINDOWS) 312 HMODULE hModule = GetModuleHandleW(NULL); 313 WCHAR wc_file_path[MAX_PATH] = {0}; 314 GetModuleFileNameW(hModule, wc_file_path, MAX_PATH); 315 string file_path = WindowsFileSystem::WideCharToUtf8(wc_file_path); 316 std::copy(file_path.begin(), file_path.end(), exe_path); 317#else 318 CHECK_NE(-1, readlink("/proc/self/exe", exe_path, sizeof(exe_path) - 1)); 319#endif 320 // Make sure it's null-terminated: 321 exe_path[sizeof(exe_path) - 1] = 0; 322 323 return exe_path; 324} 325 326bool Env::LocalTempFilename(string* filename) { 327 std::vector<string> dirs; 328 GetLocalTempDirectories(&dirs); 329 330 // Try each directory, as they might be full, have inappropriate 331 // permissions or have different problems at times. 332 for (const string& dir : dirs) { 333 *filename = io::JoinPath(dir, "tempfile-"); 334 if (CreateUniqueFileName(filename, "")) { 335 return true; 336 } 337 } 338 return false; 339} 340 341bool Env::CreateUniqueFileName(string* prefix, const string& suffix) { 342#ifdef __APPLE__ 343 uint64_t tid64; 344 pthread_threadid_np(nullptr, &tid64); 345 int32 tid = static_cast<int32>(tid64); 346 int32 pid = static_cast<int32>(getpid()); 347#elif defined(__FreeBSD__) 348 // Has to be casted to long first, else this error appears: 349 // static_cast from 'pthread_t' (aka 'pthread *') to 'int32' (aka 'int') 350 // is not allowed 351 int32 tid = static_cast<int32>(static_cast<int64>(pthread_self())); 352 int32 pid = static_cast<int32>(getpid()); 353#elif defined(PLATFORM_WINDOWS) 354 int32 tid = static_cast<int32>(GetCurrentThreadId()); 355 int32 pid = static_cast<int32>(GetCurrentProcessId()); 356#else 357 int32 tid = static_cast<int32>(pthread_self()); 358 int32 pid = static_cast<int32>(getpid()); 359#endif 360 uint64 now_microsec = NowMicros(); 361 362 *prefix += strings::Printf("%s-%x-%d-%llx", port::Hostname().c_str(), tid, 363 pid, now_microsec); 364 365 if (!suffix.empty()) { 366 *prefix += suffix; 367 } 368 if (FileExists(*prefix).ok()) { 369 prefix->clear(); 370 return false; 371 } else { 372 return true; 373 } 374} 375 376Thread::~Thread() {} 377 378EnvWrapper::~EnvWrapper() {} 379 380Status ReadFileToString(Env* env, const string& fname, string* data) { 381 uint64 file_size; 382 Status s = env->GetFileSize(fname, &file_size); 383 if (!s.ok()) { 384 return s; 385 } 386 std::unique_ptr<RandomAccessFile> file; 387 s = env->NewRandomAccessFile(fname, &file); 388 if (!s.ok()) { 389 return s; 390 } 391 gtl::STLStringResizeUninitialized(data, file_size); 392 char* p = gtl::string_as_array(data); 393 StringPiece result; 394 s = file->Read(0, file_size, &result, p); 395 if (!s.ok()) { 396 data->clear(); 397 } else if (result.size() != file_size) { 398 s = errors::Aborted("File ", fname, " changed while reading: ", file_size, 399 " vs. ", result.size()); 400 data->clear(); 401 } else if (result.data() == p) { 402 // Data is already in the correct location 403 } else { 404 memmove(p, result.data(), result.size()); 405 } 406 return s; 407} 408 409Status WriteStringToFile(Env* env, const string& fname, 410 const StringPiece& data) { 411 std::unique_ptr<WritableFile> file; 412 Status s = env->NewWritableFile(fname, &file); 413 if (!s.ok()) { 414 return s; 415 } 416 s = file->Append(data); 417 if (s.ok()) { 418 s = file->Close(); 419 } 420 return s; 421} 422 423Status FileSystemCopyFile(FileSystem* src_fs, const string& src, 424 FileSystem* target_fs, const string& target) { 425 std::unique_ptr<RandomAccessFile> src_file; 426 TF_RETURN_IF_ERROR(src_fs->NewRandomAccessFile(src, &src_file)); 427 428 std::unique_ptr<WritableFile> target_file; 429 TF_RETURN_IF_ERROR(target_fs->NewWritableFile(target, &target_file)); 430 431 uint64 offset = 0; 432 std::unique_ptr<char[]> scratch(new char[kCopyFileBufferSize]); 433 Status s = Status::OK(); 434 while (s.ok()) { 435 StringPiece result; 436 s = src_file->Read(offset, kCopyFileBufferSize, &result, scratch.get()); 437 if (!(s.ok() || s.code() == error::OUT_OF_RANGE)) { 438 return s; 439 } 440 TF_RETURN_IF_ERROR(target_file->Append(result)); 441 offset += result.size(); 442 } 443 return target_file->Close(); 444} 445 446// A ZeroCopyInputStream on a RandomAccessFile. 447namespace { 448class FileStream : public ::tensorflow::protobuf::io::ZeroCopyInputStream { 449 public: 450 explicit FileStream(RandomAccessFile* file) : file_(file), pos_(0) {} 451 452 void BackUp(int count) override { pos_ -= count; } 453 bool Skip(int count) override { 454 pos_ += count; 455 return true; 456 } 457 protobuf_int64 ByteCount() const override { return pos_; } 458 Status status() const { return status_; } 459 460 bool Next(const void** data, int* size) override { 461 StringPiece result; 462 Status s = file_->Read(pos_, kBufSize, &result, scratch_); 463 if (result.empty()) { 464 status_ = s; 465 return false; 466 } 467 pos_ += result.size(); 468 *data = result.data(); 469 *size = result.size(); 470 return true; 471 } 472 473 private: 474 static const int kBufSize = 512 << 10; 475 476 RandomAccessFile* file_; 477 int64 pos_; 478 Status status_; 479 char scratch_[kBufSize]; 480}; 481 482} // namespace 483 484Status WriteBinaryProto(Env* env, const string& fname, 485 const ::tensorflow::protobuf::MessageLite& proto) { 486 string serialized; 487 proto.AppendToString(&serialized); 488 return WriteStringToFile(env, fname, serialized); 489} 490 491Status ReadBinaryProto(Env* env, const string& fname, 492 ::tensorflow::protobuf::MessageLite* proto) { 493 std::unique_ptr<RandomAccessFile> file; 494 TF_RETURN_IF_ERROR(env->NewRandomAccessFile(fname, &file)); 495 std::unique_ptr<FileStream> stream(new FileStream(file.get())); 496 497 // TODO(jiayq): the following coded stream is for debugging purposes to allow 498 // one to parse arbitrarily large messages for MessageLite. One most likely 499 // doesn't want to put protobufs larger than 64MB on Android, so we should 500 // eventually remove this and quit loud when a large protobuf is passed in. 501 ::tensorflow::protobuf::io::CodedInputStream coded_stream(stream.get()); 502 // Total bytes hard limit / warning limit are set to 1GB and 512MB 503 // respectively. 504 coded_stream.SetTotalBytesLimit(1024LL << 20, 512LL << 20); 505 506 if (!proto->ParseFromCodedStream(&coded_stream)) { 507 TF_RETURN_IF_ERROR(stream->status()); 508 return errors::DataLoss("Can't parse ", fname, " as binary proto"); 509 } 510 return Status::OK(); 511} 512 513Status WriteTextProto(Env* env, const string& fname, 514 const ::tensorflow::protobuf::Message& proto) { 515#if !defined(TENSORFLOW_LITE_PROTOS) 516 string serialized; 517 if (!::tensorflow::protobuf::TextFormat::PrintToString(proto, &serialized)) { 518 return errors::FailedPrecondition("Unable to convert proto to text."); 519 } 520 return WriteStringToFile(env, fname, serialized); 521#else 522 return errors::Unimplemented("Can't write text protos with protolite."); 523#endif 524} 525 526Status ReadTextProto(Env* env, const string& fname, 527 ::tensorflow::protobuf::Message* proto) { 528#if !defined(TENSORFLOW_LITE_PROTOS) 529 std::unique_ptr<RandomAccessFile> file; 530 TF_RETURN_IF_ERROR(env->NewRandomAccessFile(fname, &file)); 531 std::unique_ptr<FileStream> stream(new FileStream(file.get())); 532 533 if (!::tensorflow::protobuf::TextFormat::Parse(stream.get(), proto)) { 534 TF_RETURN_IF_ERROR(stream->status()); 535 return errors::DataLoss("Can't parse ", fname, " as text proto"); 536 } 537 return Status::OK(); 538#else 539 return errors::Unimplemented("Can't parse text protos with protolite."); 540#endif 541} 542 543} // namespace tensorflow 544