1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include "tensorflow/core/platform/cloud/file_block_cache.h" 17#include <cstring> 18#include <memory> 19#include "tensorflow/core/lib/gtl/cleanup.h" 20#include "tensorflow/core/platform/env.h" 21 22namespace tensorflow { 23 24bool FileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) { 25 mutex_lock l(block->mu); 26 if (block->state != FetchState::FINISHED) { 27 return true; // No need to check for staleness. 28 } 29 if (max_staleness_ == 0) return true; // Not enforcing staleness. 30 return env_->NowSeconds() - block->timestamp <= max_staleness_; 31} 32 33std::shared_ptr<FileBlockCache::Block> FileBlockCache::Lookup(const Key& key) { 34 mutex_lock lock(mu_); 35 auto entry = block_map_.find(key); 36 if (entry != block_map_.end()) { 37 if (BlockNotStale(entry->second)) { 38 return entry->second; 39 } else { 40 // Remove the stale block and continue. 41 RemoveFile_Locked(key.first); 42 } 43 } 44 45 // Insert a new empty block, setting the bookkeeping to sentinel values 46 // in order to update them as appropriate. 47 auto new_entry = std::make_shared<Block>(); 48 lru_list_.push_front(key); 49 lra_list_.push_front(key); 50 new_entry->lru_iterator = lru_list_.begin(); 51 new_entry->lra_iterator = lra_list_.begin(); 52 new_entry->timestamp = env_->NowSeconds(); 53 block_map_.emplace(std::make_pair(key, new_entry)); 54 return new_entry; 55} 56 57// Remove blocks from the cache until we do not exceed our maximum size. 58void FileBlockCache::Trim() { 59 while (!lru_list_.empty() && cache_size_ > max_bytes_) { 60 RemoveBlock(block_map_.find(lru_list_.back())); 61 } 62} 63 64/// Move the block to the front of the LRU list if it isn't already there. 65Status FileBlockCache::UpdateLRU(const Key& key, 66 const std::shared_ptr<Block>& block) { 67 mutex_lock lock(mu_); 68 if (block->timestamp == 0) { 69 // The block was evicted from another thread. Allow it to remain evicted. 70 return Status::OK(); 71 } 72 if (block->lru_iterator != lru_list_.begin()) { 73 lru_list_.erase(block->lru_iterator); 74 lru_list_.push_front(key); 75 block->lru_iterator = lru_list_.begin(); 76 } 77 78 // Check for inconsistent state. If there is a block later in the same file 79 // in the cache, and our current block is not block size, this likely means 80 // we have inconsistent state within the cache. Note: it's possible some 81 // incomplete reads may still go undetected. 82 if (block->data.size() < block_size_) { 83 Key fmax = std::make_pair(key.first, std::numeric_limits<size_t>::max()); 84 auto fcmp = block_map_.upper_bound(fmax); 85 if (fcmp != block_map_.begin() && key < (--fcmp)->first) { 86 return errors::Internal("Block cache contents are inconsistent."); 87 } 88 } 89 90 Trim(); 91 92 return Status::OK(); 93} 94 95Status FileBlockCache::MaybeFetch(const Key& key, 96 const std::shared_ptr<Block>& block) { 97 bool downloaded_block = false; 98 auto reconcile_state = 99 gtl::MakeCleanup([this, &downloaded_block, &key, &block] { 100 // Perform this action in a cleanup callback to avoid locking mu_ after 101 // locking block->mu. 102 if (downloaded_block) { 103 mutex_lock l(mu_); 104 // Do not update state if the block is already to be evicted. 105 if (block->timestamp != 0) { 106 cache_size_ += block->data.size(); 107 // Put to beginning of LRA list. 108 lra_list_.erase(block->lra_iterator); 109 lra_list_.push_front(key); 110 block->lra_iterator = lra_list_.begin(); 111 block->timestamp = env_->NowSeconds(); 112 } 113 } 114 }); 115 // Loop until either block content is successfully fetched, or our request 116 // encounters an error. 117 mutex_lock l(block->mu); 118 Status status = Status::OK(); 119 while (true) { 120 switch (block->state) { 121 case FetchState::ERROR: 122 TF_FALLTHROUGH_INTENDED; 123 case FetchState::CREATED: 124 block->state = FetchState::FETCHING; 125 block->mu.unlock(); // Release the lock while making the API call. 126 block->data.clear(); 127 block->data.resize(block_size_, 0); 128 size_t bytes_transferred; 129 status.Update(block_fetcher_(key.first, key.second, block_size_, 130 block->data.data(), &bytes_transferred)); 131 block->mu.lock(); // Reacquire the lock immediately afterwards 132 if (status.ok()) { 133 block->data.resize(bytes_transferred, 0); 134 block->data.shrink_to_fit(); 135 downloaded_block = true; 136 block->state = FetchState::FINISHED; 137 } else { 138 block->state = FetchState::ERROR; 139 } 140 block->cond_var.notify_all(); 141 return status; 142 case FetchState::FETCHING: 143 block->cond_var.wait_for(l, std::chrono::seconds(60)); 144 if (block->state == FetchState::FINISHED) { 145 return Status::OK(); 146 } 147 // Re-loop in case of errors. 148 break; 149 case FetchState::FINISHED: 150 return Status::OK(); 151 } 152 } 153 return errors::Internal( 154 "Control flow should never reach the end of FileBlockCache::Fetch."); 155} 156 157Status FileBlockCache::Read(const string& filename, size_t offset, size_t n, 158 char* buffer, size_t* bytes_transferred) { 159 *bytes_transferred = 0; 160 if (n == 0) { 161 return Status::OK(); 162 } 163 if (block_size_ == 0 || max_bytes_ == 0) { 164 // The cache is effectively disabled, so we pass the read through to the 165 // fetcher without breaking it up into blocks. 166 return block_fetcher_(filename, offset, n, buffer, bytes_transferred); 167 } 168 // Calculate the block-aligned start and end of the read. 169 size_t start = block_size_ * (offset / block_size_); 170 size_t finish = block_size_ * ((offset + n) / block_size_); 171 if (finish < offset + n) { 172 finish += block_size_; 173 } 174 size_t total_bytes_transferred = 0; 175 // Now iterate through the blocks, reading them one at a time. 176 for (size_t pos = start; pos < finish; pos += block_size_) { 177 Key key = std::make_pair(filename, pos); 178 // Look up the block, fetching and inserting it if necessary, and update the 179 // LRU iterator for the key and block. 180 std::shared_ptr<Block> block = Lookup(key); 181 DCHECK(block) << "No block for key " << key.first << "@" << key.second; 182 TF_RETURN_IF_ERROR(MaybeFetch(key, block)); 183 TF_RETURN_IF_ERROR(UpdateLRU(key, block)); 184 // Copy the relevant portion of the block into the result buffer. 185 const auto& data = block->data; 186 if (offset >= pos + data.size()) { 187 // The requested offset is at or beyond the end of the file. This can 188 // happen if `offset` is not block-aligned, and the read returns the last 189 // block in the file, which does not extend all the way out to `offset`. 190 *bytes_transferred = total_bytes_transferred; 191 return errors::OutOfRange("EOF at offset ", offset, " in file ", filename, 192 " at position ", pos, "with data size ", 193 data.size()); 194 } 195 auto begin = data.begin(); 196 if (offset > pos) { 197 // The block begins before the slice we're reading. 198 begin += offset - pos; 199 } 200 auto end = data.end(); 201 if (pos + data.size() > offset + n) { 202 // The block extends past the end of the slice we're reading. 203 end -= (pos + data.size()) - (offset + n); 204 } 205 if (begin < end) { 206 size_t bytes_to_copy = end - begin; 207 memcpy(&buffer[total_bytes_transferred], &*begin, bytes_to_copy); 208 total_bytes_transferred += bytes_to_copy; 209 } 210 if (data.size() < block_size_) { 211 // The block was a partial block and thus signals EOF at its upper bound. 212 break; 213 } 214 } 215 *bytes_transferred = total_bytes_transferred; 216 return Status::OK(); 217} 218 219size_t FileBlockCache::CacheSize() const { 220 mutex_lock lock(mu_); 221 return cache_size_; 222} 223 224void FileBlockCache::Prune() { 225 while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) { 226 mutex_lock lock(mu_); 227 uint64 now = env_->NowSeconds(); 228 while (!lra_list_.empty()) { 229 auto it = block_map_.find(lra_list_.back()); 230 if (now - it->second->timestamp <= max_staleness_) { 231 // The oldest block is not yet expired. Come back later. 232 break; 233 } 234 // We need to make a copy of the filename here, since it could otherwise 235 // be used within RemoveFile_Locked after `it` is deleted. 236 RemoveFile_Locked(std::string(it->first.first)); 237 } 238 } 239} 240 241void FileBlockCache::Flush() { 242 mutex_lock lock(mu_); 243 block_map_.clear(); 244 lru_list_.clear(); 245 lra_list_.clear(); 246 cache_size_ = 0; 247} 248 249void FileBlockCache::RemoveFile(const string& filename) { 250 mutex_lock lock(mu_); 251 RemoveFile_Locked(filename); 252} 253 254void FileBlockCache::RemoveFile_Locked(const string& filename) { 255 Key begin = std::make_pair(filename, 0); 256 auto it = block_map_.lower_bound(begin); 257 while (it != block_map_.end() && it->first.first == filename) { 258 auto next = std::next(it); 259 RemoveBlock(it); 260 it = next; 261 } 262} 263 264void FileBlockCache::RemoveBlock(BlockMap::iterator entry) { 265 // This signals that the block is removed, and should not be inadvertently 266 // reinserted into the cache in UpdateLRU. 267 entry->second->timestamp = 0; 268 lru_list_.erase(entry->second->lru_iterator); 269 lra_list_.erase(entry->second->lra_iterator); 270 cache_size_ -= entry->second->data.size(); 271 block_map_.erase(entry); 272} 273 274} // namespace tensorflow 275