env.cc revision f09265c145055341e69129d0f8aa938bf2f68acb
1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include <deque>
17#include <vector>
18
19#include "tensorflow/core/lib/core/errors.h"
20#include "tensorflow/core/lib/gtl/map_util.h"
21#include "tensorflow/core/lib/gtl/stl_util.h"
22#include "tensorflow/core/lib/io/path.h"
23#include "tensorflow/core/platform/env.h"
24#include "tensorflow/core/platform/protobuf.h"
25
26namespace tensorflow {
27
28class FileSystemRegistryImpl : public FileSystemRegistry {
29 public:
30  Status Register(const string& scheme, Factory factory) override;
31  FileSystem* Lookup(const string& scheme) override;
32  Status GetRegisteredFileSystemSchemes(std::vector<string>* schemes) override;
33
34 private:
35  mutable mutex mu_;
36  mutable std::unordered_map<string, std::unique_ptr<FileSystem>> registry_
37      GUARDED_BY(mu_);
38};
39
40Status FileSystemRegistryImpl::Register(const string& scheme,
41                                        FileSystemRegistry::Factory factory) {
42  mutex_lock lock(mu_);
43  if (!registry_.emplace(string(scheme), std::unique_ptr<FileSystem>(factory()))
44           .second) {
45    return errors::AlreadyExists("File factory for ", scheme,
46                                 " already registered");
47  }
48  return Status::OK();
49}
50
51FileSystem* FileSystemRegistryImpl::Lookup(const string& scheme) {
52  mutex_lock lock(mu_);
53  const auto found = registry_.find(scheme);
54  if (found == registry_.end()) {
55    return nullptr;
56  }
57  return found->second.get();
58}
59
60Status FileSystemRegistryImpl::GetRegisteredFileSystemSchemes(
61    std::vector<string>* schemes) {
62  mutex_lock lock(mu_);
63  for (const auto& e : registry_) {
64    schemes->push_back(e.first);
65  }
66  return Status::OK();
67}
68
69Env::Env() : file_system_registry_(new FileSystemRegistryImpl) {}
70
71Status Env::GetFileSystemForFile(const string& fname, FileSystem** result) {
72  StringPiece scheme, host, path;
73  io::ParseURI(fname, &scheme, &host, &path);
74  FileSystem* file_system = file_system_registry_->Lookup(scheme.ToString());
75  if (!file_system) {
76    return errors::Unimplemented("File system scheme ", scheme,
77                                 " not implemented");
78  }
79  *result = file_system;
80  return Status::OK();
81}
82
83Status Env::GetRegisteredFileSystemSchemes(std::vector<string>* schemes) {
84  return file_system_registry_->GetRegisteredFileSystemSchemes(schemes);
85}
86
87Status Env::RegisterFileSystem(const string& scheme,
88                               FileSystemRegistry::Factory factory) {
89  return file_system_registry_->Register(scheme, factory);
90}
91
92Status Env::NewRandomAccessFile(const string& fname,
93                                std::unique_ptr<RandomAccessFile>* result) {
94  FileSystem* fs;
95  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
96  return fs->NewRandomAccessFile(fname, result);
97}
98
99Status Env::NewReadOnlyMemoryRegionFromFile(
100    const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
101  FileSystem* fs;
102  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
103  return fs->NewReadOnlyMemoryRegionFromFile(fname, result);
104}
105
106Status Env::NewWritableFile(const string& fname,
107                            std::unique_ptr<WritableFile>* result) {
108  FileSystem* fs;
109  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
110  return fs->NewWritableFile(fname, result);
111}
112
113Status Env::NewAppendableFile(const string& fname,
114                              std::unique_ptr<WritableFile>* result) {
115  FileSystem* fs;
116  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
117  return fs->NewAppendableFile(fname, result);
118}
119
120Status Env::FileExists(const string& fname) {
121  FileSystem* fs;
122  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
123  return fs->FileExists(fname);
124}
125
126Status Env::GetChildren(const string& dir, std::vector<string>* result) {
127  FileSystem* fs;
128  TF_RETURN_IF_ERROR(GetFileSystemForFile(dir, &fs));
129  return fs->GetChildren(dir, result);
130}
131
132Status Env::GetMatchingPaths(const string& pattern,
133                             std::vector<string>* results) {
134  FileSystem* fs;
135  TF_RETURN_IF_ERROR(GetFileSystemForFile(pattern, &fs));
136  return fs->GetMatchingPaths(pattern, results);
137}
138
139Status Env::DeleteFile(const string& fname) {
140  FileSystem* fs;
141  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
142  return fs->DeleteFile(fname);
143}
144
145Status Env::RecursivelyCreateDir(const string& dirname) {
146  FileSystem* fs;
147  TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
148  return fs->RecursivelyCreateDir(dirname);
149}
150
151Status Env::CreateDir(const string& dirname) {
152  FileSystem* fs;
153  TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
154  return fs->CreateDir(dirname);
155}
156
157Status Env::DeleteDir(const string& dirname) {
158  FileSystem* fs;
159  TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
160  return fs->DeleteDir(dirname);
161}
162
163Status Env::Stat(const string& fname, FileStatistics* stat) {
164  FileSystem* fs;
165  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
166  return fs->Stat(fname, stat);
167}
168
169Status Env::IsDirectory(const string& fname) {
170  FileSystem* fs;
171  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
172  return fs->IsDirectory(fname);
173}
174
175Status Env::DeleteRecursively(const string& dirname, int64* undeleted_files,
176                              int64* undeleted_dirs) {
177  FileSystem* fs;
178  TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs));
179  return fs->DeleteRecursively(dirname, undeleted_files, undeleted_dirs);
180}
181
182Status Env::GetFileSize(const string& fname, uint64* file_size) {
183  FileSystem* fs;
184  TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs));
185  return fs->GetFileSize(fname, file_size);
186}
187
188Status Env::RenameFile(const string& src, const string& target) {
189  FileSystem* src_fs;
190  FileSystem* target_fs;
191  TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs));
192  TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs));
193  if (src_fs != target_fs) {
194    return errors::Unimplemented("Renaming ", src, " to ", target,
195                                 " not implemented");
196  }
197  return src_fs->RenameFile(src, target);
198}
199
200Thread::~Thread() {}
201
202EnvWrapper::~EnvWrapper() {}
203
204Status ReadFileToString(Env* env, const string& fname, string* data) {
205  uint64 file_size;
206  Status s = env->GetFileSize(fname, &file_size);
207  if (!s.ok()) {
208    return s;
209  }
210  std::unique_ptr<RandomAccessFile> file;
211  s = env->NewRandomAccessFile(fname, &file);
212  if (!s.ok()) {
213    return s;
214  }
215  gtl::STLStringResizeUninitialized(data, file_size);
216  char* p = gtl::string_as_array(data);
217  StringPiece result;
218  s = file->Read(0, file_size, &result, p);
219  if (!s.ok()) {
220    data->clear();
221  } else if (result.size() != file_size) {
222    s = errors::Aborted("File ", fname, " changed while reading: ", file_size,
223                        " vs. ", result.size());
224    data->clear();
225  } else if (result.data() == p) {
226    // Data is already in the correct location
227  } else {
228    memmove(p, result.data(), result.size());
229  }
230  return s;
231}
232
233Status WriteStringToFile(Env* env, const string& fname,
234                         const StringPiece& data) {
235  std::unique_ptr<WritableFile> file;
236  Status s = env->NewWritableFile(fname, &file);
237  if (!s.ok()) {
238    return s;
239  }
240  s = file->Append(data);
241  if (s.ok()) {
242    s = file->Close();
243  }
244  return s;
245}
246
247// A ZeroCopyInputStream on a RandomAccessFile.
248namespace {
249class FileStream : public ::tensorflow::protobuf::io::ZeroCopyInputStream {
250 public:
251  explicit FileStream(RandomAccessFile* file) : file_(file), pos_(0) {}
252
253  void BackUp(int count) override { pos_ -= count; }
254  bool Skip(int count) override {
255    pos_ += count;
256    return true;
257  }
258  protobuf_int64 ByteCount() const override { return pos_; }
259  Status status() const { return status_; }
260
261  bool Next(const void** data, int* size) override {
262    StringPiece result;
263    Status s = file_->Read(pos_, kBufSize, &result, scratch_);
264    if (result.empty()) {
265      status_ = s;
266      return false;
267    }
268    pos_ += result.size();
269    *data = result.data();
270    *size = result.size();
271    return true;
272  }
273
274 private:
275  static const int kBufSize = 512 << 10;
276
277  RandomAccessFile* file_;
278  int64 pos_;
279  Status status_;
280  char scratch_[kBufSize];
281};
282
283}  // namespace
284
285Status WriteBinaryProto(Env* env, const string& fname,
286                        const ::tensorflow::protobuf::MessageLite& proto) {
287  string serialized;
288  proto.AppendToString(&serialized);
289  return WriteStringToFile(env, fname, serialized);
290}
291
292Status ReadBinaryProto(Env* env, const string& fname,
293                       ::tensorflow::protobuf::MessageLite* proto) {
294  std::unique_ptr<RandomAccessFile> file;
295  TF_RETURN_IF_ERROR(env->NewRandomAccessFile(fname, &file));
296  std::unique_ptr<FileStream> stream(new FileStream(file.get()));
297
298  // TODO(jiayq): the following coded stream is for debugging purposes to allow
299  // one to parse arbitrarily large messages for MessageLite. One most likely
300  // doesn't want to put protobufs larger than 64MB on Android, so we should
301  // eventually remove this and quit loud when a large protobuf is passed in.
302  ::tensorflow::protobuf::io::CodedInputStream coded_stream(stream.get());
303  // Total bytes hard limit / warning limit are set to 1GB and 512MB
304  // respectively.
305  coded_stream.SetTotalBytesLimit(1024LL << 20, 512LL << 20);
306
307  if (!proto->ParseFromCodedStream(&coded_stream)) {
308    TF_RETURN_IF_ERROR(stream->status());
309    return errors::DataLoss("Can't parse ", fname, " as binary proto");
310  }
311  return Status::OK();
312}
313
314Status WriteTextProto(Env* env, const string& fname,
315                      const ::tensorflow::protobuf::Message& proto) {
316#if !defined(TENSORFLOW_LITE_PROTOS)
317  string serialized;
318  if (!::tensorflow::protobuf::TextFormat::PrintToString(proto, &serialized)) {
319    return errors::FailedPrecondition("Unable to convert proto to text.");
320  }
321  return WriteStringToFile(env, fname, serialized);
322#else
323  return errors::Unimplemented("Can't write text protos with protolite.");
324#endif
325}
326
327Status ReadTextProto(Env* env, const string& fname,
328                     ::tensorflow::protobuf::Message* proto) {
329#if !defined(TENSORFLOW_LITE_PROTOS)
330  std::unique_ptr<RandomAccessFile> file;
331  TF_RETURN_IF_ERROR(env->NewRandomAccessFile(fname, &file));
332  std::unique_ptr<FileStream> stream(new FileStream(file.get()));
333
334  if (!::tensorflow::protobuf::TextFormat::Parse(stream.get(), proto)) {
335    TF_RETURN_IF_ERROR(stream->status());
336    return errors::DataLoss("Can't parse ", fname, " as text proto");
337  }
338  return Status::OK();
339#else
340  return errors::Unimplemented("Can't parse text protos with protolite.");
341#endif
342}
343
344}  // namespace tensorflow
345