1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16// Decodes the blocks generated by block_builder.cc. 17 18#include "tensorflow/core/lib/io/block.h" 19 20#include <algorithm> 21#include "tensorflow/core/lib/core/coding.h" 22#include "tensorflow/core/lib/core/errors.h" 23#include "tensorflow/core/lib/io/format.h" 24#include "tensorflow/core/platform/logging.h" 25 26namespace tensorflow { 27namespace table { 28 29inline uint32 Block::NumRestarts() const { 30 assert(size_ >= sizeof(uint32)); 31 return core::DecodeFixed32(data_ + size_ - sizeof(uint32)); 32} 33 34Block::Block(const BlockContents& contents) 35 : data_(contents.data.data()), 36 size_(contents.data.size()), 37 owned_(contents.heap_allocated) { 38 if (size_ < sizeof(uint32)) { 39 size_ = 0; // Error marker 40 } else { 41 size_t max_restarts_allowed = (size_ - sizeof(uint32)) / sizeof(uint32); 42 if (NumRestarts() > max_restarts_allowed) { 43 // The size is too small for NumRestarts() 44 size_ = 0; 45 } else { 46 restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32); 47 } 48 } 49} 50 51Block::~Block() { 52 if (owned_) { 53 delete[] data_; 54 } 55} 56 57// Helper routine: decode the next block entry starting at "p", 58// storing the number of shared key bytes, non_shared key bytes, 59// and the length of the value in "*shared", "*non_shared", and 60// "*value_length", respectively. Will not dereference past "limit". 61// 62// If any errors are detected, returns NULL. Otherwise, returns a 63// pointer to the key delta (just past the three decoded values). 64static inline const char* DecodeEntry(const char* p, const char* limit, 65 uint32* shared, uint32* non_shared, 66 uint32* value_length) { 67 if (limit - p < 3) return nullptr; 68 *shared = reinterpret_cast<const unsigned char*>(p)[0]; 69 *non_shared = reinterpret_cast<const unsigned char*>(p)[1]; 70 *value_length = reinterpret_cast<const unsigned char*>(p)[2]; 71 if ((*shared | *non_shared | *value_length) < 128) { 72 // Fast path: all three values are encoded in one byte each 73 p += 3; 74 } else { 75 if ((p = core::GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr; 76 if ((p = core::GetVarint32Ptr(p, limit, non_shared)) == nullptr) 77 return nullptr; 78 if ((p = core::GetVarint32Ptr(p, limit, value_length)) == nullptr) 79 return nullptr; 80 } 81 82 if (static_cast<uint32>(limit - p) < (*non_shared + *value_length)) { 83 return nullptr; 84 } 85 return p; 86} 87 88class Block::Iter : public Iterator { 89 private: 90 const char* const data_; // underlying block contents 91 uint32 const restarts_; // Offset of restart array (list of fixed32) 92 uint32 const num_restarts_; // Number of uint32 entries in restart array 93 94 // current_ is offset in data_ of current entry. >= restarts_ if !Valid 95 uint32 current_; 96 uint32 restart_index_; // Index of restart block in which current_ falls 97 string key_; 98 StringPiece value_; 99 Status status_; 100 101 inline int Compare(const StringPiece& a, const StringPiece& b) const { 102 return a.compare(b); 103 } 104 105 // Return the offset in data_ just past the end of the current entry. 106 inline uint32 NextEntryOffset() const { 107 return (value_.data() + value_.size()) - data_; 108 } 109 110 uint32 GetRestartPoint(uint32 index) { 111 assert(index < num_restarts_); 112 return core::DecodeFixed32(data_ + restarts_ + index * sizeof(uint32)); 113 } 114 115 void SeekToRestartPoint(uint32 index) { 116 key_.clear(); 117 restart_index_ = index; 118 // current_ will be fixed by ParseNextKey(); 119 120 // ParseNextKey() starts at the end of value_, so set value_ accordingly 121 uint32 offset = GetRestartPoint(index); 122 value_ = StringPiece(data_ + offset, 0); 123 } 124 125 public: 126 Iter(const char* data, uint32 restarts, uint32 num_restarts) 127 : data_(data), 128 restarts_(restarts), 129 num_restarts_(num_restarts), 130 current_(restarts_), 131 restart_index_(num_restarts_) { 132 assert(num_restarts_ > 0); 133 } 134 135 bool Valid() const override { return current_ < restarts_; } 136 Status status() const override { return status_; } 137 StringPiece key() const override { 138 assert(Valid()); 139 return key_; 140 } 141 StringPiece value() const override { 142 assert(Valid()); 143 return value_; 144 } 145 146 void Next() override { 147 assert(Valid()); 148 ParseNextKey(); 149 } 150 151 void Seek(const StringPiece& target) override { 152 // Binary search in restart array to find the last restart point 153 // with a key < target 154 uint32 left = 0; 155 uint32 right = num_restarts_ - 1; 156 while (left < right) { 157 uint32 mid = (left + right + 1) / 2; 158 uint32 region_offset = GetRestartPoint(mid); 159 uint32 shared, non_shared, value_length; 160 const char* key_ptr = 161 DecodeEntry(data_ + region_offset, data_ + restarts_, &shared, 162 &non_shared, &value_length); 163 if (key_ptr == nullptr || (shared != 0)) { 164 CorruptionError(); 165 return; 166 } 167 StringPiece mid_key(key_ptr, non_shared); 168 if (Compare(mid_key, target) < 0) { 169 // Key at "mid" is smaller than "target". Therefore all 170 // blocks before "mid" are uninteresting. 171 left = mid; 172 } else { 173 // Key at "mid" is >= "target". Therefore all blocks at or 174 // after "mid" are uninteresting. 175 right = mid - 1; 176 } 177 } 178 179 // Linear search (within restart block) for first key >= target 180 SeekToRestartPoint(left); 181 while (true) { 182 if (!ParseNextKey()) { 183 return; 184 } 185 if (Compare(key_, target) >= 0) { 186 return; 187 } 188 } 189 } 190 191 void SeekToFirst() override { 192 SeekToRestartPoint(0); 193 ParseNextKey(); 194 } 195 196 private: 197 void CorruptionError() { 198 current_ = restarts_; 199 restart_index_ = num_restarts_; 200 status_ = errors::DataLoss("bad entry in block"); 201 key_.clear(); 202 value_ = StringPiece(); 203 } 204 205 bool ParseNextKey() { 206 current_ = NextEntryOffset(); 207 const char* p = data_ + current_; 208 const char* limit = data_ + restarts_; // Restarts come right after data 209 if (p >= limit) { 210 // No more entries to return. Mark as invalid. 211 current_ = restarts_; 212 restart_index_ = num_restarts_; 213 return false; 214 } 215 216 // Decode next entry 217 uint32 shared, non_shared, value_length; 218 p = DecodeEntry(p, limit, &shared, &non_shared, &value_length); 219 if (p == nullptr || key_.size() < shared) { 220 CorruptionError(); 221 return false; 222 } else { 223 key_.resize(shared); 224 key_.append(p, non_shared); 225 value_ = StringPiece(p + non_shared, value_length); 226 while (restart_index_ + 1 < num_restarts_ && 227 GetRestartPoint(restart_index_ + 1) < current_) { 228 ++restart_index_; 229 } 230 return true; 231 } 232 } 233}; 234 235Iterator* Block::NewIterator() { 236 if (size_ < sizeof(uint32)) { 237 return NewErrorIterator(errors::DataLoss("bad block contents")); 238 } 239 const uint32 num_restarts = NumRestarts(); 240 if (num_restarts == 0) { 241 return NewEmptyIterator(); 242 } else { 243 return new Iter(data_, restart_offset_, num_restarts); 244 } 245} 246 247} // namespace table 248} // namespace tensorflow 249