1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/core/platform/cloud/file_block_cache.h"
17#include <cstring>
18#include <memory>
19#include "tensorflow/core/lib/gtl/cleanup.h"
20#include "tensorflow/core/platform/env.h"
21
22namespace tensorflow {
23
24bool FileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) {
25  mutex_lock l(block->mu);
26  if (block->state != FetchState::FINISHED) {
27    return true;  // No need to check for staleness.
28  }
29  if (max_staleness_ == 0) return true;  // Not enforcing staleness.
30  return env_->NowSeconds() - block->timestamp <= max_staleness_;
31}
32
33std::shared_ptr<FileBlockCache::Block> FileBlockCache::Lookup(const Key& key) {
34  mutex_lock lock(mu_);
35  auto entry = block_map_.find(key);
36  if (entry != block_map_.end()) {
37    if (BlockNotStale(entry->second)) {
38      return entry->second;
39    } else {
40      // Remove the stale block and continue.
41      RemoveFile_Locked(key.first);
42    }
43  }
44
45  // Insert a new empty block, setting the bookkeeping to sentinel values
46  // in order to update them as appropriate.
47  auto new_entry = std::make_shared<Block>();
48  lru_list_.push_front(key);
49  lra_list_.push_front(key);
50  new_entry->lru_iterator = lru_list_.begin();
51  new_entry->lra_iterator = lra_list_.begin();
52  new_entry->timestamp = env_->NowSeconds();
53  block_map_.emplace(std::make_pair(key, new_entry));
54  return new_entry;
55}
56
57// Remove blocks from the cache until we do not exceed our maximum size.
58void FileBlockCache::Trim() {
59  while (!lru_list_.empty() && cache_size_ > max_bytes_) {
60    RemoveBlock(block_map_.find(lru_list_.back()));
61  }
62}
63
64/// Move the block to the front of the LRU list if it isn't already there.
65Status FileBlockCache::UpdateLRU(const Key& key,
66                                 const std::shared_ptr<Block>& block) {
67  mutex_lock lock(mu_);
68  if (block->timestamp == 0) {
69    // The block was evicted from another thread. Allow it to remain evicted.
70    return Status::OK();
71  }
72  if (block->lru_iterator != lru_list_.begin()) {
73    lru_list_.erase(block->lru_iterator);
74    lru_list_.push_front(key);
75    block->lru_iterator = lru_list_.begin();
76  }
77
78  // Check for inconsistent state. If there is a block later in the same file
79  // in the cache, and our current block is not block size, this likely means
80  // we have inconsistent state within the cache. Note: it's possible some
81  // incomplete reads may still go undetected.
82  if (block->data.size() < block_size_) {
83    Key fmax = std::make_pair(key.first, std::numeric_limits<size_t>::max());
84    auto fcmp = block_map_.upper_bound(fmax);
85    if (fcmp != block_map_.begin() && key < (--fcmp)->first) {
86      return errors::Internal("Block cache contents are inconsistent.");
87    }
88  }
89
90  Trim();
91
92  return Status::OK();
93}
94
95Status FileBlockCache::MaybeFetch(const Key& key,
96                                  const std::shared_ptr<Block>& block) {
97  bool downloaded_block = false;
98  auto reconcile_state =
99      gtl::MakeCleanup([this, &downloaded_block, &key, &block] {
100        // Perform this action in a cleanup callback to avoid locking mu_ after
101        // locking block->mu.
102        if (downloaded_block) {
103          mutex_lock l(mu_);
104          // Do not update state if the block is already to be evicted.
105          if (block->timestamp != 0) {
106            cache_size_ += block->data.size();
107            // Put to beginning of LRA list.
108            lra_list_.erase(block->lra_iterator);
109            lra_list_.push_front(key);
110            block->lra_iterator = lra_list_.begin();
111            block->timestamp = env_->NowSeconds();
112          }
113        }
114      });
115  // Loop until either block content is successfully fetched, or our request
116  // encounters an error.
117  mutex_lock l(block->mu);
118  Status status = Status::OK();
119  while (true) {
120    switch (block->state) {
121      case FetchState::ERROR:
122        TF_FALLTHROUGH_INTENDED;
123      case FetchState::CREATED:
124        block->state = FetchState::FETCHING;
125        block->mu.unlock();  // Release the lock while making the API call.
126        block->data.clear();
127        block->data.resize(block_size_, 0);
128        size_t bytes_transferred;
129        status.Update(block_fetcher_(key.first, key.second, block_size_,
130                                     block->data.data(), &bytes_transferred));
131        block->mu.lock();  // Reacquire the lock immediately afterwards
132        if (status.ok()) {
133          block->data.resize(bytes_transferred, 0);
134          block->data.shrink_to_fit();
135          downloaded_block = true;
136          block->state = FetchState::FINISHED;
137        } else {
138          block->state = FetchState::ERROR;
139        }
140        block->cond_var.notify_all();
141        return status;
142      case FetchState::FETCHING:
143        block->cond_var.wait_for(l, std::chrono::seconds(60));
144        if (block->state == FetchState::FINISHED) {
145          return Status::OK();
146        }
147        // Re-loop in case of errors.
148        break;
149      case FetchState::FINISHED:
150        return Status::OK();
151    }
152  }
153  return errors::Internal(
154      "Control flow should never reach the end of FileBlockCache::Fetch.");
155}
156
157Status FileBlockCache::Read(const string& filename, size_t offset, size_t n,
158                            char* buffer, size_t* bytes_transferred) {
159  *bytes_transferred = 0;
160  if (n == 0) {
161    return Status::OK();
162  }
163  if (block_size_ == 0 || max_bytes_ == 0) {
164    // The cache is effectively disabled, so we pass the read through to the
165    // fetcher without breaking it up into blocks.
166    return block_fetcher_(filename, offset, n, buffer, bytes_transferred);
167  }
168  // Calculate the block-aligned start and end of the read.
169  size_t start = block_size_ * (offset / block_size_);
170  size_t finish = block_size_ * ((offset + n) / block_size_);
171  if (finish < offset + n) {
172    finish += block_size_;
173  }
174  size_t total_bytes_transferred = 0;
175  // Now iterate through the blocks, reading them one at a time.
176  for (size_t pos = start; pos < finish; pos += block_size_) {
177    Key key = std::make_pair(filename, pos);
178    // Look up the block, fetching and inserting it if necessary, and update the
179    // LRU iterator for the key and block.
180    std::shared_ptr<Block> block = Lookup(key);
181    DCHECK(block) << "No block for key " << key.first << "@" << key.second;
182    TF_RETURN_IF_ERROR(MaybeFetch(key, block));
183    TF_RETURN_IF_ERROR(UpdateLRU(key, block));
184    // Copy the relevant portion of the block into the result buffer.
185    const auto& data = block->data;
186    if (offset >= pos + data.size()) {
187      // The requested offset is at or beyond the end of the file. This can
188      // happen if `offset` is not block-aligned, and the read returns the last
189      // block in the file, which does not extend all the way out to `offset`.
190      *bytes_transferred = total_bytes_transferred;
191      return errors::OutOfRange("EOF at offset ", offset, " in file ", filename,
192                                " at position ", pos, "with data size ",
193                                data.size());
194    }
195    auto begin = data.begin();
196    if (offset > pos) {
197      // The block begins before the slice we're reading.
198      begin += offset - pos;
199    }
200    auto end = data.end();
201    if (pos + data.size() > offset + n) {
202      // The block extends past the end of the slice we're reading.
203      end -= (pos + data.size()) - (offset + n);
204    }
205    if (begin < end) {
206      size_t bytes_to_copy = end - begin;
207      memcpy(&buffer[total_bytes_transferred], &*begin, bytes_to_copy);
208      total_bytes_transferred += bytes_to_copy;
209    }
210    if (data.size() < block_size_) {
211      // The block was a partial block and thus signals EOF at its upper bound.
212      break;
213    }
214  }
215  *bytes_transferred = total_bytes_transferred;
216  return Status::OK();
217}
218
219size_t FileBlockCache::CacheSize() const {
220  mutex_lock lock(mu_);
221  return cache_size_;
222}
223
224void FileBlockCache::Prune() {
225  while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) {
226    mutex_lock lock(mu_);
227    uint64 now = env_->NowSeconds();
228    while (!lra_list_.empty()) {
229      auto it = block_map_.find(lra_list_.back());
230      if (now - it->second->timestamp <= max_staleness_) {
231        // The oldest block is not yet expired. Come back later.
232        break;
233      }
234      // We need to make a copy of the filename here, since it could otherwise
235      // be used within RemoveFile_Locked after `it` is deleted.
236      RemoveFile_Locked(std::string(it->first.first));
237    }
238  }
239}
240
241void FileBlockCache::Flush() {
242  mutex_lock lock(mu_);
243  block_map_.clear();
244  lru_list_.clear();
245  lra_list_.clear();
246  cache_size_ = 0;
247}
248
249void FileBlockCache::RemoveFile(const string& filename) {
250  mutex_lock lock(mu_);
251  RemoveFile_Locked(filename);
252}
253
254void FileBlockCache::RemoveFile_Locked(const string& filename) {
255  Key begin = std::make_pair(filename, 0);
256  auto it = block_map_.lower_bound(begin);
257  while (it != block_map_.end() && it->first.first == filename) {
258    auto next = std::next(it);
259    RemoveBlock(it);
260    it = next;
261  }
262}
263
264void FileBlockCache::RemoveBlock(BlockMap::iterator entry) {
265  // This signals that the block is removed, and should not be inadvertently
266  // reinserted into the cache in UpdateLRU.
267  entry->second->timestamp = 0;
268  lru_list_.erase(entry->second->lru_iterator);
269  lra_list_.erase(entry->second->lra_iterator);
270  cache_size_ -= entry->second->data.size();
271  block_map_.erase(entry);
272}
273
274}  // namespace tensorflow
275