env.cc revision b1f5f433959406c7aad634c05e85ccd62fd06e87
1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include <sys/stat.h>
17#include <deque>
18#include <utility>
19#include <vector>
20#if defined(__APPLE__)
21#include <mach-o/dyld.h>
22#endif
23#if defined(__FreeBSD__)
24#include <sys/sysctl.h>
25#include <sys/types.h>
26#endif
27#if defined(PLATFORM_WINDOWS)
28#include <windows.h>
29#include "tensorflow/core/platform/windows/windows_file_system.h"
30#define PATH_MAX MAX_PATH
31#else
32#include <unistd.h>
33#endif
34
35#include "tensorflow/core/lib/core/errors.h"
36#include "tensorflow/core/lib/gtl/map_util.h"
37#include "tensorflow/core/lib/gtl/stl_util.h"
38#include "tensorflow/core/lib/io/path.h"
39#include "tensorflow/core/lib/strings/stringprintf.h"
40#include "tensorflow/core/platform/env.h"
41#include "tensorflow/core/platform/env_time.h"
42#include "tensorflow/core/platform/host_info.h"
43#include "tensorflow/core/platform/protobuf.h"
44
45namespace tensorflow {
46
47// 128KB copy buffer
48constexpr size_t kCopyFileBufferSize = 128 * 1024;
49
50class FileSystemRegistryImpl : public FileSystemRegistry {
51 public:
52  Status Register(const string& scheme, Factory factory) override;
53  FileSystem* Lookup(const string& scheme) override;
54  Status GetRegisteredFileSystemSchemes(std::vector<string>* schemes) override;
55
56 private:
57  mutable mutex mu_;
58  mutable std::unordered_map<string, std::unique_ptr<FileSystem>> registry_
59      GUARDED_BY(mu_);
60};
61
62Status FileSystemRegistryImpl::Register(const string& scheme,
63                                        FileSystemRegistry::Factory factory) {
64  mutex_lock lock(mu_);
65  if (!registry_.emplace(string(scheme), std::unique_ptr<FileSystem>(factory()))
66           .second) {
67    return errors::AlreadyExists("File factory for ", scheme,
68                                 " already registered");
69  }
70  return Status::OK();
71}
72
73FileSystem* FileSystemRegistryImpl::Lookup(const string& scheme) {
74  mutex_lock lock(mu_);
75  const auto found = registry_.find(scheme);
76  if (found == registry_.end()) {
77    return nullptr;
78  }
79  return found->second.get();
80}
81
82Status FileSystemRegistryImpl::GetRegisteredFileSystemSchemes(
83    std::vector<string>* schemes) {
84  mutex_lock lock(mu_);
85  for (const auto& e : registry_) {
86    schemes->push_back(e.first);
87  }
88  return Status::OK();
89}
90
91Env::Env() : file_system_registry_(new FileSystemRegistryImpl) {}
92
93Status Env::GetFileSystemForFile(const string& fname, FileSystem** result) {
94  StringPiece scheme, host, path;
95  io::ParseURI(fname, &scheme, &host, &path);
96  FileSystem* file_system = file_system_registry_->Lookup(scheme.ToString());
97  if (!file_system) {
98    if (scheme.empty()) {
99      scheme = "[local]";
100    }
101
102    return errors::Unimplemented("File system scheme '", scheme,
103                                 "' not implemented (file: '", fname, "')");
104  }
105  *result = file_system;
106  return Status::OK();
107}
108
109Status Env::GetRegisteredFileSystemSchemes(std::vector<string>* schemes) {
110  return file_system_registry_->GetRegisteredFileSystemSchemes(schemes);
111}
112
113Status Env::RegisterFileSystem(const string& scheme,
114                               FileSystemRegistry::Factory factory) {
115  return file_system_registry_->Register(scheme, std::move(factory));
116}
117
118Status Env::FlushFileSystemCaches() {
119  std::vector<string> schemes;
120  TF_RETURN_IF_ERROR(GetRegisteredFileSystemSchemes(&schemes));
121  for (const string& scheme : schemes) {
122    FileSystem* fs = nullptr;
123    TF_RETURN_IF_ERROR(
124        GetFileSystemForFile(io::CreateURI(scheme, "", ""), &fs));
125    fs->FlushCaches();
126  }
127  return Status::OK();
128}
129
130Status Env::NewRandomAccessFile(const string& fname,
131                                std::unique_ptr<RandomAccessFile>* result) {
132  FileSystem* fs;
133  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
134  return fs->NewRandomAccessFile(fname, result);
135}
136
137Status Env::NewReadOnlyMemoryRegionFromFile(
138    const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
139  FileSystem* fs;
140  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
141  return fs->NewReadOnlyMemoryRegionFromFile(fname, result);
142}
143
144Status Env::NewWritableFile(const string& fname,
145                            std::unique_ptr<WritableFile>* result) {
146  FileSystem* fs;
147  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
148  return fs->NewWritableFile(fname, result);
149}
150
151Status Env::NewAppendableFile(const string& fname,
152                              std::unique_ptr<WritableFile>* result) {
153  FileSystem* fs;
154  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
155  return fs->NewAppendableFile(fname, result);
156}
157
158Status Env::FileExists(const string& fname) {
159  FileSystem* fs;
160  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
161  return fs->FileExists(fname);
162}
163
164bool Env::FilesExist(const std::vector<string>& files,
165                     std::vector<Status>* status) {
166  std::unordered_map<string, std::vector<string>> files_per_fs;
167  for (const auto& file : files) {
168    StringPiece scheme, host, path;
169    io::ParseURI(file, &scheme, &host, &path);
170    files_per_fs[scheme.ToString()].push_back(file);
171  }
172
173  std::unordered_map<string, Status> per_file_status;
174  bool result = true;
175  for (auto itr : files_per_fs) {
176    FileSystem* file_system = file_system_registry_->Lookup(itr.first);
177    bool fs_result;
178    std::vector<Status> local_status;
179    std::vector<Status>* fs_status = status ? &local_status : nullptr;
180    if (!file_system) {
181      fs_result = false;
182      if (fs_status) {
183        Status s = errors::Unimplemented("File system scheme '", itr.first,
184                                         "' not implemented");
185        local_status.resize(itr.second.size(), s);
186      }
187    } else {
188      fs_result = file_system->FilesExist(itr.second, fs_status);
189    }
190    if (fs_status) {
191      result &= fs_result;
192      for (int i = 0; i < itr.second.size(); ++i) {
193        per_file_status[itr.second[i]] = fs_status->at(i);
194      }
195    } else if (!fs_result) {
196      // Return early
197      return false;
198    }
199  }
200
201  if (status) {
202    for (const auto& file : files) {
203      status->push_back(per_file_status[file]);
204    }
205  }
206
207  return result;
208}
209
210Status Env::GetChildren(const string& dir, std::vector<string>* result) {
211  FileSystem* fs;
212  TF_RETURN_IF_ERROR(GetFileSystemForFile(dir, &fs));
213  return fs->GetChildren(dir, result);
214}
215
216Status Env::GetMatchingPaths(const string& pattern,
217                             std::vector<string>* results) {
218  FileSystem* fs;
219  TF_RETURN_IF_ERROR(GetFileSystemForFile(pattern, &fs));
220  return fs->GetMatchingPaths(pattern, results);
221}
222
223Status Env::DeleteFile(const string& fname) {
224  FileSystem* fs;
225  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
226  return fs->DeleteFile(fname);
227}
228
229Status Env::RecursivelyCreateDir(const string& dirname) {
230  FileSystem* fs;
231  TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
232  return fs->RecursivelyCreateDir(dirname);
233}
234
235Status Env::CreateDir(const string& dirname) {
236  FileSystem* fs;
237  TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
238  return fs->CreateDir(dirname);
239}
240
241Status Env::DeleteDir(const string& dirname) {
242  FileSystem* fs;
243  TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
244  return fs->DeleteDir(dirname);
245}
246
247Status Env::Stat(const string& fname, FileStatistics* stat) {
248  FileSystem* fs;
249  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
250  return fs->Stat(fname, stat);
251}
252
253Status Env::IsDirectory(const string& fname) {
254  FileSystem* fs;
255  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
256  return fs->IsDirectory(fname);
257}
258
259Status Env::DeleteRecursively(const string& dirname, int64* undeleted_files,
260                              int64* undeleted_dirs) {
261  FileSystem* fs;
262  TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
263  return fs->DeleteRecursively(dirname, undeleted_files, undeleted_dirs);
264}
265
266Status Env::GetFileSize(const string& fname, uint64* file_size) {
267  FileSystem* fs;
268  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
269  return fs->GetFileSize(fname, file_size);
270}
271
272Status Env::RenameFile(const string& src, const string& target) {
273  FileSystem* src_fs;
274  FileSystem* target_fs;
275  TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs));
276  TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs));
277  if (src_fs != target_fs) {
278    return errors::Unimplemented("Renaming ", src, " to ", target,
279                                 " not implemented");
280  }
281  return src_fs->RenameFile(src, target);
282}
283
284Status Env::CopyFile(const string& src, const string& target) {
285  FileSystem* src_fs;
286  FileSystem* target_fs;
287  TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs));
288  TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs));
289  if (src_fs == target_fs) {
290    return src_fs->CopyFile(src, target);
291  }
292  return FileSystemCopyFile(src_fs, src, target_fs, target);
293}
294
295string Env::GetExecutablePath() {
296  char exe_path[PATH_MAX] = {0};
297#ifdef __APPLE__
298  uint32_t buffer_size(0U);
299  _NSGetExecutablePath(nullptr, &buffer_size);
300  char unresolved_path[buffer_size];
301  _NSGetExecutablePath(unresolved_path, &buffer_size);
302  CHECK(realpath(unresolved_path, exe_path));
303#elif defined(__FreeBSD__)
304  int mib[4] = {CTL_KERN, KERN_PROC, KERN_PROC_PATHNAME, -1};
305  size_t exe_path_size = PATH_MAX;
306
307  if (sysctl(mib, 4, exe_path, &exe_path_size, NULL, 0) != 0) {
308    // Resolution of path failed
309    return "";
310  }
311#elif defined(PLATFORM_WINDOWS)
312  HMODULE hModule = GetModuleHandleW(NULL);
313  WCHAR wc_file_path[MAX_PATH] = {0};
314  GetModuleFileNameW(hModule, wc_file_path, MAX_PATH);
315  string file_path = WindowsFileSystem::WideCharToUtf8(wc_file_path);
316  std::copy(file_path.begin(), file_path.end(), exe_path);
317#else
318  CHECK_NE(-1, readlink("/proc/self/exe", exe_path, sizeof(exe_path) - 1));
319#endif
320  // Make sure it's null-terminated:
321  exe_path[sizeof(exe_path) - 1] = 0;
322
323  return exe_path;
324}
325
326bool Env::LocalTempFilename(string* filename) {
327  std::vector<string> dirs;
328  GetLocalTempDirectories(&dirs);
329
330  // Try each directory, as they might be full, have inappropriate
331  // permissions or have different problems at times.
332  for (const string& dir : dirs) {
333    *filename = io::JoinPath(dir, "tempfile-");
334    if (CreateUniqueFileName(filename, "")) {
335      return true;
336    }
337  }
338  return false;
339}
340
341bool Env::CreateUniqueFileName(string* prefix, const string& suffix) {
342#ifdef __APPLE__
343  uint64_t tid64;
344  pthread_threadid_np(nullptr, &tid64);
345  int32 tid = static_cast<int32>(tid64);
346  int32 pid = static_cast<int32>(getpid());
347#elif defined(__FreeBSD__)
348  // Has to be casted to long first, else this error appears:
349  // static_cast from 'pthread_t' (aka 'pthread *') to 'int32' (aka 'int')
350  // is not allowed
351  int32 tid = static_cast<int32>(static_cast<int64>(pthread_self()));
352  int32 pid = static_cast<int32>(getpid());
353#elif defined(PLATFORM_WINDOWS)
354  int32 tid = static_cast<int32>(GetCurrentThreadId());
355  int32 pid = static_cast<int32>(GetCurrentProcessId());
356#else
357  int32 tid = static_cast<int32>(pthread_self());
358  int32 pid = static_cast<int32>(getpid());
359#endif
360  uint64 now_microsec = NowMicros();
361
362  *prefix += strings::Printf("%s-%x-%d-%llx", port::Hostname().c_str(), tid,
363                             pid, now_microsec);
364
365  if (!suffix.empty()) {
366    *prefix += suffix;
367  }
368  if (FileExists(*prefix).ok()) {
369    prefix->clear();
370    return false;
371  } else {
372    return true;
373  }
374}
375
376Thread::~Thread() {}
377
378EnvWrapper::~EnvWrapper() {}
379
380Status ReadFileToString(Env* env, const string& fname, string* data) {
381  uint64 file_size;
382  Status s = env->GetFileSize(fname, &file_size);
383  if (!s.ok()) {
384    return s;
385  }
386  std::unique_ptr<RandomAccessFile> file;
387  s = env->NewRandomAccessFile(fname, &file);
388  if (!s.ok()) {
389    return s;
390  }
391  gtl::STLStringResizeUninitialized(data, file_size);
392  char* p = gtl::string_as_array(data);
393  StringPiece result;
394  s = file->Read(0, file_size, &result, p);
395  if (!s.ok()) {
396    data->clear();
397  } else if (result.size() != file_size) {
398    s = errors::Aborted("File ", fname, " changed while reading: ", file_size,
399                        " vs. ", result.size());
400    data->clear();
401  } else if (result.data() == p) {
402    // Data is already in the correct location
403  } else {
404    memmove(p, result.data(), result.size());
405  }
406  return s;
407}
408
409Status WriteStringToFile(Env* env, const string& fname,
410                         const StringPiece& data) {
411  std::unique_ptr<WritableFile> file;
412  Status s = env->NewWritableFile(fname, &file);
413  if (!s.ok()) {
414    return s;
415  }
416  s = file->Append(data);
417  if (s.ok()) {
418    s = file->Close();
419  }
420  return s;
421}
422
423Status FileSystemCopyFile(FileSystem* src_fs, const string& src,
424                          FileSystem* target_fs, const string& target) {
425  std::unique_ptr<RandomAccessFile> src_file;
426  TF_RETURN_IF_ERROR(src_fs->NewRandomAccessFile(src, &src_file));
427
428  std::unique_ptr<WritableFile> target_file;
429  TF_RETURN_IF_ERROR(target_fs->NewWritableFile(target, &target_file));
430
431  uint64 offset = 0;
432  std::unique_ptr<char[]> scratch(new char[kCopyFileBufferSize]);
433  Status s = Status::OK();
434  while (s.ok()) {
435    StringPiece result;
436    s = src_file->Read(offset, kCopyFileBufferSize, &result, scratch.get());
437    if (!(s.ok() || s.code() == error::OUT_OF_RANGE)) {
438      return s;
439    }
440    TF_RETURN_IF_ERROR(target_file->Append(result));
441    offset += result.size();
442  }
443  return target_file->Close();
444}
445
446// A ZeroCopyInputStream on a RandomAccessFile.
447namespace {
448class FileStream : public ::tensorflow::protobuf::io::ZeroCopyInputStream {
449 public:
450  explicit FileStream(RandomAccessFile* file) : file_(file), pos_(0) {}
451
452  void BackUp(int count) override { pos_ -= count; }
453  bool Skip(int count) override {
454    pos_ += count;
455    return true;
456  }
457  protobuf_int64 ByteCount() const override { return pos_; }
458  Status status() const { return status_; }
459
460  bool Next(const void** data, int* size) override {
461    StringPiece result;
462    Status s = file_->Read(pos_, kBufSize, &result, scratch_);
463    if (result.empty()) {
464      status_ = s;
465      return false;
466    }
467    pos_ += result.size();
468    *data = result.data();
469    *size = result.size();
470    return true;
471  }
472
473 private:
474  static const int kBufSize = 512 << 10;
475
476  RandomAccessFile* file_;
477  int64 pos_;
478  Status status_;
479  char scratch_[kBufSize];
480};
481
482}  // namespace
483
484Status WriteBinaryProto(Env* env, const string& fname,
485                        const ::tensorflow::protobuf::MessageLite& proto) {
486  string serialized;
487  proto.AppendToString(&serialized);
488  return WriteStringToFile(env, fname, serialized);
489}
490
491Status ReadBinaryProto(Env* env, const string& fname,
492                       ::tensorflow::protobuf::MessageLite* proto) {
493  std::unique_ptr<RandomAccessFile> file;
494  TF_RETURN_IF_ERROR(env->NewRandomAccessFile(fname, &file));
495  std::unique_ptr<FileStream> stream(new FileStream(file.get()));
496
497  // TODO(jiayq): the following coded stream is for debugging purposes to allow
498  // one to parse arbitrarily large messages for MessageLite. One most likely
499  // doesn't want to put protobufs larger than 64MB on Android, so we should
500  // eventually remove this and quit loud when a large protobuf is passed in.
501  ::tensorflow::protobuf::io::CodedInputStream coded_stream(stream.get());
502  // Total bytes hard limit / warning limit are set to 1GB and 512MB
503  // respectively.
504  coded_stream.SetTotalBytesLimit(1024LL << 20, 512LL << 20);
505
506  if (!proto->ParseFromCodedStream(&coded_stream)) {
507    TF_RETURN_IF_ERROR(stream->status());
508    return errors::DataLoss("Can't parse ", fname, " as binary proto");
509  }
510  return Status::OK();
511}
512
513Status WriteTextProto(Env* env, const string& fname,
514                      const ::tensorflow::protobuf::Message& proto) {
515#if !defined(TENSORFLOW_LITE_PROTOS)
516  string serialized;
517  if (!::tensorflow::protobuf::TextFormat::PrintToString(proto, &serialized)) {
518    return errors::FailedPrecondition("Unable to convert proto to text.");
519  }
520  return WriteStringToFile(env, fname, serialized);
521#else
522  return errors::Unimplemented("Can't write text protos with protolite.");
523#endif
524}
525
526Status ReadTextProto(Env* env, const string& fname,
527                     ::tensorflow::protobuf::Message* proto) {
528#if !defined(TENSORFLOW_LITE_PROTOS)
529  std::unique_ptr<RandomAccessFile> file;
530  TF_RETURN_IF_ERROR(env->NewRandomAccessFile(fname, &file));
531  std::unique_ptr<FileStream> stream(new FileStream(file.get()));
532
533  if (!::tensorflow::protobuf::TextFormat::Parse(stream.get(), proto)) {
534    TF_RETURN_IF_ERROR(stream->status());
535    return errors::DataLoss("Can't parse ", fname, " as text proto");
536  }
537  return Status::OK();
538#else
539  return errors::Unimplemented("Can't parse text protos with protolite.");
540#endif
541}
542
543}  // namespace tensorflow
544