1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16// Decodes the blocks generated by block_builder.cc.
17
18#include "tensorflow/core/lib/io/block.h"
19
20#include <algorithm>
21#include "tensorflow/core/lib/core/coding.h"
22#include "tensorflow/core/lib/core/errors.h"
23#include "tensorflow/core/lib/io/format.h"
24#include "tensorflow/core/platform/logging.h"
25
26namespace tensorflow {
27namespace table {
28
29inline uint32 Block::NumRestarts() const {
30  assert(size_ >= sizeof(uint32));
31  return core::DecodeFixed32(data_ + size_ - sizeof(uint32));
32}
33
34Block::Block(const BlockContents& contents)
35    : data_(contents.data.data()),
36      size_(contents.data.size()),
37      owned_(contents.heap_allocated) {
38  if (size_ < sizeof(uint32)) {
39    size_ = 0;  // Error marker
40  } else {
41    size_t max_restarts_allowed = (size_ - sizeof(uint32)) / sizeof(uint32);
42    if (NumRestarts() > max_restarts_allowed) {
43      // The size is too small for NumRestarts()
44      size_ = 0;
45    } else {
46      restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32);
47    }
48  }
49}
50
51Block::~Block() {
52  if (owned_) {
53    delete[] data_;
54  }
55}
56
57// Helper routine: decode the next block entry starting at "p",
58// storing the number of shared key bytes, non_shared key bytes,
59// and the length of the value in "*shared", "*non_shared", and
60// "*value_length", respectively.  Will not dereference past "limit".
61//
62// If any errors are detected, returns NULL.  Otherwise, returns a
63// pointer to the key delta (just past the three decoded values).
64static inline const char* DecodeEntry(const char* p, const char* limit,
65                                      uint32* shared, uint32* non_shared,
66                                      uint32* value_length) {
67  if (limit - p < 3) return nullptr;
68  *shared = reinterpret_cast<const unsigned char*>(p)[0];
69  *non_shared = reinterpret_cast<const unsigned char*>(p)[1];
70  *value_length = reinterpret_cast<const unsigned char*>(p)[2];
71  if ((*shared | *non_shared | *value_length) < 128) {
72    // Fast path: all three values are encoded in one byte each
73    p += 3;
74  } else {
75    if ((p = core::GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
76    if ((p = core::GetVarint32Ptr(p, limit, non_shared)) == nullptr)
77      return nullptr;
78    if ((p = core::GetVarint32Ptr(p, limit, value_length)) == nullptr)
79      return nullptr;
80  }
81
82  if (static_cast<uint32>(limit - p) < (*non_shared + *value_length)) {
83    return nullptr;
84  }
85  return p;
86}
87
88class Block::Iter : public Iterator {
89 private:
90  const char* const data_;     // underlying block contents
91  uint32 const restarts_;      // Offset of restart array (list of fixed32)
92  uint32 const num_restarts_;  // Number of uint32 entries in restart array
93
94  // current_ is offset in data_ of current entry.  >= restarts_ if !Valid
95  uint32 current_;
96  uint32 restart_index_;  // Index of restart block in which current_ falls
97  string key_;
98  StringPiece value_;
99  Status status_;
100
101  inline int Compare(const StringPiece& a, const StringPiece& b) const {
102    return a.compare(b);
103  }
104
105  // Return the offset in data_ just past the end of the current entry.
106  inline uint32 NextEntryOffset() const {
107    return (value_.data() + value_.size()) - data_;
108  }
109
110  uint32 GetRestartPoint(uint32 index) {
111    assert(index < num_restarts_);
112    return core::DecodeFixed32(data_ + restarts_ + index * sizeof(uint32));
113  }
114
115  void SeekToRestartPoint(uint32 index) {
116    key_.clear();
117    restart_index_ = index;
118    // current_ will be fixed by ParseNextKey();
119
120    // ParseNextKey() starts at the end of value_, so set value_ accordingly
121    uint32 offset = GetRestartPoint(index);
122    value_ = StringPiece(data_ + offset, 0);
123  }
124
125 public:
126  Iter(const char* data, uint32 restarts, uint32 num_restarts)
127      : data_(data),
128        restarts_(restarts),
129        num_restarts_(num_restarts),
130        current_(restarts_),
131        restart_index_(num_restarts_) {
132    assert(num_restarts_ > 0);
133  }
134
135  bool Valid() const override { return current_ < restarts_; }
136  Status status() const override { return status_; }
137  StringPiece key() const override {
138    assert(Valid());
139    return key_;
140  }
141  StringPiece value() const override {
142    assert(Valid());
143    return value_;
144  }
145
146  void Next() override {
147    assert(Valid());
148    ParseNextKey();
149  }
150
151  void Seek(const StringPiece& target) override {
152    // Binary search in restart array to find the last restart point
153    // with a key < target
154    uint32 left = 0;
155    uint32 right = num_restarts_ - 1;
156    while (left < right) {
157      uint32 mid = (left + right + 1) / 2;
158      uint32 region_offset = GetRestartPoint(mid);
159      uint32 shared, non_shared, value_length;
160      const char* key_ptr =
161          DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
162                      &non_shared, &value_length);
163      if (key_ptr == nullptr || (shared != 0)) {
164        CorruptionError();
165        return;
166      }
167      StringPiece mid_key(key_ptr, non_shared);
168      if (Compare(mid_key, target) < 0) {
169        // Key at "mid" is smaller than "target".  Therefore all
170        // blocks before "mid" are uninteresting.
171        left = mid;
172      } else {
173        // Key at "mid" is >= "target".  Therefore all blocks at or
174        // after "mid" are uninteresting.
175        right = mid - 1;
176      }
177    }
178
179    // Linear search (within restart block) for first key >= target
180    SeekToRestartPoint(left);
181    while (true) {
182      if (!ParseNextKey()) {
183        return;
184      }
185      if (Compare(key_, target) >= 0) {
186        return;
187      }
188    }
189  }
190
191  void SeekToFirst() override {
192    SeekToRestartPoint(0);
193    ParseNextKey();
194  }
195
196 private:
197  void CorruptionError() {
198    current_ = restarts_;
199    restart_index_ = num_restarts_;
200    status_ = errors::DataLoss("bad entry in block");
201    key_.clear();
202    value_ = StringPiece();
203  }
204
205  bool ParseNextKey() {
206    current_ = NextEntryOffset();
207    const char* p = data_ + current_;
208    const char* limit = data_ + restarts_;  // Restarts come right after data
209    if (p >= limit) {
210      // No more entries to return.  Mark as invalid.
211      current_ = restarts_;
212      restart_index_ = num_restarts_;
213      return false;
214    }
215
216    // Decode next entry
217    uint32 shared, non_shared, value_length;
218    p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
219    if (p == nullptr || key_.size() < shared) {
220      CorruptionError();
221      return false;
222    } else {
223      key_.resize(shared);
224      key_.append(p, non_shared);
225      value_ = StringPiece(p + non_shared, value_length);
226      while (restart_index_ + 1 < num_restarts_ &&
227             GetRestartPoint(restart_index_ + 1) < current_) {
228        ++restart_index_;
229      }
230      return true;
231    }
232  }
233};
234
235Iterator* Block::NewIterator() {
236  if (size_ < sizeof(uint32)) {
237    return NewErrorIterator(errors::DataLoss("bad block contents"));
238  }
239  const uint32 num_restarts = NumRestarts();
240  if (num_restarts == 0) {
241    return NewEmptyIterator();
242  } else {
243    return new Iter(data_, restart_offset_, num_restarts);
244  }
245}
246
247}  // namespace table
248}  // namespace tensorflow
249