env.cc revision f09265c145055341e69129d0f8aa938bf2f68acb
1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include <deque> 17#include <vector> 18 19#include "tensorflow/core/lib/core/errors.h" 20#include "tensorflow/core/lib/gtl/map_util.h" 21#include "tensorflow/core/lib/gtl/stl_util.h" 22#include "tensorflow/core/lib/io/path.h" 23#include "tensorflow/core/platform/env.h" 24#include "tensorflow/core/platform/protobuf.h" 25 26namespace tensorflow { 27 28class FileSystemRegistryImpl : public FileSystemRegistry { 29 public: 30 Status Register(const string& scheme, Factory factory) override; 31 FileSystem* Lookup(const string& scheme) override; 32 Status GetRegisteredFileSystemSchemes(std::vector<string>* schemes) override; 33 34 private: 35 mutable mutex mu_; 36 mutable std::unordered_map<string, std::unique_ptr<FileSystem>> registry_ 37 GUARDED_BY(mu_); 38}; 39 40Status FileSystemRegistryImpl::Register(const string& scheme, 41 FileSystemRegistry::Factory factory) { 42 mutex_lock lock(mu_); 43 if (!registry_.emplace(string(scheme), std::unique_ptr<FileSystem>(factory())) 44 .second) { 45 return errors::AlreadyExists("File factory for ", scheme, 46 " already registered"); 47 } 48 return Status::OK(); 49} 50 51FileSystem* FileSystemRegistryImpl::Lookup(const string& scheme) { 52 mutex_lock lock(mu_); 53 const auto found = registry_.find(scheme); 54 if (found == registry_.end()) { 55 return nullptr; 56 } 57 return found->second.get(); 58} 59 60Status FileSystemRegistryImpl::GetRegisteredFileSystemSchemes( 61 std::vector<string>* schemes) { 62 mutex_lock lock(mu_); 63 for (const auto& e : registry_) { 64 schemes->push_back(e.first); 65 } 66 return Status::OK(); 67} 68 69Env::Env() : file_system_registry_(new FileSystemRegistryImpl) {} 70 71Status Env::GetFileSystemForFile(const string& fname, FileSystem** result) { 72 StringPiece scheme, host, path; 73 io::ParseURI(fname, &scheme, &host, &path); 74 FileSystem* file_system = file_system_registry_->Lookup(scheme.ToString()); 75 if (!file_system) { 76 return errors::Unimplemented("File system scheme ", scheme, 77 " not implemented"); 78 } 79 *result = file_system; 80 return Status::OK(); 81} 82 83Status Env::GetRegisteredFileSystemSchemes(std::vector<string>* schemes) { 84 return file_system_registry_->GetRegisteredFileSystemSchemes(schemes); 85} 86 87Status Env::RegisterFileSystem(const string& scheme, 88 FileSystemRegistry::Factory factory) { 89 return file_system_registry_->Register(scheme, factory); 90} 91 92Status Env::NewRandomAccessFile(const string& fname, 93 std::unique_ptr<RandomAccessFile>* result) { 94 FileSystem* fs; 95 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 96 return fs->NewRandomAccessFile(fname, result); 97} 98 99Status Env::NewReadOnlyMemoryRegionFromFile( 100 const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) { 101 FileSystem* fs; 102 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 103 return fs->NewReadOnlyMemoryRegionFromFile(fname, result); 104} 105 106Status Env::NewWritableFile(const string& fname, 107 std::unique_ptr<WritableFile>* result) { 108 FileSystem* fs; 109 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 110 return fs->NewWritableFile(fname, result); 111} 112 113Status Env::NewAppendableFile(const string& fname, 114 std::unique_ptr<WritableFile>* result) { 115 FileSystem* fs; 116 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 117 return fs->NewAppendableFile(fname, result); 118} 119 120Status Env::FileExists(const string& fname) { 121 FileSystem* fs; 122 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 123 return fs->FileExists(fname); 124} 125 126Status Env::GetChildren(const string& dir, std::vector<string>* result) { 127 FileSystem* fs; 128 TF_RETURN_IF_ERROR(GetFileSystemForFile(dir, &fs)); 129 return fs->GetChildren(dir, result); 130} 131 132Status Env::GetMatchingPaths(const string& pattern, 133 std::vector<string>* results) { 134 FileSystem* fs; 135 TF_RETURN_IF_ERROR(GetFileSystemForFile(pattern, &fs)); 136 return fs->GetMatchingPaths(pattern, results); 137} 138 139Status Env::DeleteFile(const string& fname) { 140 FileSystem* fs; 141 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 142 return fs->DeleteFile(fname); 143} 144 145Status Env::RecursivelyCreateDir(const string& dirname) { 146 FileSystem* fs; 147 TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs)); 148 return fs->RecursivelyCreateDir(dirname); 149} 150 151Status Env::CreateDir(const string& dirname) { 152 FileSystem* fs; 153 TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs)); 154 return fs->CreateDir(dirname); 155} 156 157Status Env::DeleteDir(const string& dirname) { 158 FileSystem* fs; 159 TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs)); 160 return fs->DeleteDir(dirname); 161} 162 163Status Env::Stat(const string& fname, FileStatistics* stat) { 164 FileSystem* fs; 165 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 166 return fs->Stat(fname, stat); 167} 168 169Status Env::IsDirectory(const string& fname) { 170 FileSystem* fs; 171 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 172 return fs->IsDirectory(fname); 173} 174 175Status Env::DeleteRecursively(const string& dirname, int64* undeleted_files, 176 int64* undeleted_dirs) { 177 FileSystem* fs; 178 TF_RETURN_IF_ERROR(GetFileSystemForFile(dirname, &fs)); 179 return fs->DeleteRecursively(dirname, undeleted_files, undeleted_dirs); 180} 181 182Status Env::GetFileSize(const string& fname, uint64* file_size) { 183 FileSystem* fs; 184 TF_RETURN_IF_ERROR(GetFileSystemForFile(fname, &fs)); 185 return fs->GetFileSize(fname, file_size); 186} 187 188Status Env::RenameFile(const string& src, const string& target) { 189 FileSystem* src_fs; 190 FileSystem* target_fs; 191 TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs)); 192 TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs)); 193 if (src_fs != target_fs) { 194 return errors::Unimplemented("Renaming ", src, " to ", target, 195 " not implemented"); 196 } 197 return src_fs->RenameFile(src, target); 198} 199 200Thread::~Thread() {} 201 202EnvWrapper::~EnvWrapper() {} 203 204Status ReadFileToString(Env* env, const string& fname, string* data) { 205 uint64 file_size; 206 Status s = env->GetFileSize(fname, &file_size); 207 if (!s.ok()) { 208 return s; 209 } 210 std::unique_ptr<RandomAccessFile> file; 211 s = env->NewRandomAccessFile(fname, &file); 212 if (!s.ok()) { 213 return s; 214 } 215 gtl::STLStringResizeUninitialized(data, file_size); 216 char* p = gtl::string_as_array(data); 217 StringPiece result; 218 s = file->Read(0, file_size, &result, p); 219 if (!s.ok()) { 220 data->clear(); 221 } else if (result.size() != file_size) { 222 s = errors::Aborted("File ", fname, " changed while reading: ", file_size, 223 " vs. ", result.size()); 224 data->clear(); 225 } else if (result.data() == p) { 226 // Data is already in the correct location 227 } else { 228 memmove(p, result.data(), result.size()); 229 } 230 return s; 231} 232 233Status WriteStringToFile(Env* env, const string& fname, 234 const StringPiece& data) { 235 std::unique_ptr<WritableFile> file; 236 Status s = env->NewWritableFile(fname, &file); 237 if (!s.ok()) { 238 return s; 239 } 240 s = file->Append(data); 241 if (s.ok()) { 242 s = file->Close(); 243 } 244 return s; 245} 246 247// A ZeroCopyInputStream on a RandomAccessFile. 248namespace { 249class FileStream : public ::tensorflow::protobuf::io::ZeroCopyInputStream { 250 public: 251 explicit FileStream(RandomAccessFile* file) : file_(file), pos_(0) {} 252 253 void BackUp(int count) override { pos_ -= count; } 254 bool Skip(int count) override { 255 pos_ += count; 256 return true; 257 } 258 protobuf_int64 ByteCount() const override { return pos_; } 259 Status status() const { return status_; } 260 261 bool Next(const void** data, int* size) override { 262 StringPiece result; 263 Status s = file_->Read(pos_, kBufSize, &result, scratch_); 264 if (result.empty()) { 265 status_ = s; 266 return false; 267 } 268 pos_ += result.size(); 269 *data = result.data(); 270 *size = result.size(); 271 return true; 272 } 273 274 private: 275 static const int kBufSize = 512 << 10; 276 277 RandomAccessFile* file_; 278 int64 pos_; 279 Status status_; 280 char scratch_[kBufSize]; 281}; 282 283} // namespace 284 285Status WriteBinaryProto(Env* env, const string& fname, 286 const ::tensorflow::protobuf::MessageLite& proto) { 287 string serialized; 288 proto.AppendToString(&serialized); 289 return WriteStringToFile(env, fname, serialized); 290} 291 292Status ReadBinaryProto(Env* env, const string& fname, 293 ::tensorflow::protobuf::MessageLite* proto) { 294 std::unique_ptr<RandomAccessFile> file; 295 TF_RETURN_IF_ERROR(env->NewRandomAccessFile(fname, &file)); 296 std::unique_ptr<FileStream> stream(new FileStream(file.get())); 297 298 // TODO(jiayq): the following coded stream is for debugging purposes to allow 299 // one to parse arbitrarily large messages for MessageLite. One most likely 300 // doesn't want to put protobufs larger than 64MB on Android, so we should 301 // eventually remove this and quit loud when a large protobuf is passed in. 302 ::tensorflow::protobuf::io::CodedInputStream coded_stream(stream.get()); 303 // Total bytes hard limit / warning limit are set to 1GB and 512MB 304 // respectively. 305 coded_stream.SetTotalBytesLimit(1024LL << 20, 512LL << 20); 306 307 if (!proto->ParseFromCodedStream(&coded_stream)) { 308 TF_RETURN_IF_ERROR(stream->status()); 309 return errors::DataLoss("Can't parse ", fname, " as binary proto"); 310 } 311 return Status::OK(); 312} 313 314Status WriteTextProto(Env* env, const string& fname, 315 const ::tensorflow::protobuf::Message& proto) { 316#if !defined(TENSORFLOW_LITE_PROTOS) 317 string serialized; 318 if (!::tensorflow::protobuf::TextFormat::PrintToString(proto, &serialized)) { 319 return errors::FailedPrecondition("Unable to convert proto to text."); 320 } 321 return WriteStringToFile(env, fname, serialized); 322#else 323 return errors::Unimplemented("Can't write text protos with protolite."); 324#endif 325} 326 327Status ReadTextProto(Env* env, const string& fname, 328 ::tensorflow::protobuf::Message* proto) { 329#if !defined(TENSORFLOW_LITE_PROTOS) 330 std::unique_ptr<RandomAccessFile> file; 331 TF_RETURN_IF_ERROR(env->NewRandomAccessFile(fname, &file)); 332 std::unique_ptr<FileStream> stream(new FileStream(file.get())); 333 334 if (!::tensorflow::protobuf::TextFormat::Parse(stream.get(), proto)) { 335 TF_RETURN_IF_ERROR(stream->status()); 336 return errors::DataLoss("Can't parse ", fname, " as text proto"); 337 } 338 return Status::OK(); 339#else 340 return errors::Unimplemented("Can't parse text protos with protolite."); 341#endif 342} 343 344} // namespace tensorflow 345