1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "leveldb/table.h"
6
7#include "leveldb/cache.h"
8#include "leveldb/comparator.h"
9#include "leveldb/env.h"
10#include "leveldb/filter_policy.h"
11#include "leveldb/options.h"
12#include "table/block.h"
13#include "table/filter_block.h"
14#include "table/format.h"
15#include "table/two_level_iterator.h"
16#include "util/coding.h"
17
18namespace leveldb {
19
20struct Table::Rep {
21  ~Rep() {
22    delete filter;
23    delete [] filter_data;
24    delete index_block;
25  }
26
27  Options options;
28  Status status;
29  RandomAccessFile* file;
30  uint64_t cache_id;
31  FilterBlockReader* filter;
32  const char* filter_data;
33
34  BlockHandle metaindex_handle;  // Handle to metaindex_block: saved from footer
35  Block* index_block;
36};
37
38Status Table::Open(const Options& options,
39                   RandomAccessFile* file,
40                   uint64_t size,
41                   Table** table) {
42  *table = NULL;
43  if (size < Footer::kEncodedLength) {
44    return Status::InvalidArgument("file is too short to be an sstable");
45  }
46
47  char footer_space[Footer::kEncodedLength];
48  Slice footer_input;
49  Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
50                        &footer_input, footer_space);
51  if (!s.ok()) return s;
52
53  Footer footer;
54  s = footer.DecodeFrom(&footer_input);
55  if (!s.ok()) return s;
56
57  // Read the index block
58  BlockContents contents;
59  Block* index_block = NULL;
60  if (s.ok()) {
61    s = ReadBlock(file, ReadOptions(), footer.index_handle(), &contents);
62    if (s.ok()) {
63      index_block = new Block(contents);
64    }
65  }
66
67  if (s.ok()) {
68    // We've successfully read the footer and the index block: we're
69    // ready to serve requests.
70    Rep* rep = new Table::Rep;
71    rep->options = options;
72    rep->file = file;
73    rep->metaindex_handle = footer.metaindex_handle();
74    rep->index_block = index_block;
75    rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
76    rep->filter_data = NULL;
77    rep->filter = NULL;
78    *table = new Table(rep);
79    (*table)->ReadMeta(footer);
80  } else {
81    if (index_block) delete index_block;
82  }
83
84  return s;
85}
86
87void Table::ReadMeta(const Footer& footer) {
88  if (rep_->options.filter_policy == NULL) {
89    return;  // Do not need any metadata
90  }
91
92  // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
93  // it is an empty block.
94  ReadOptions opt;
95  BlockContents contents;
96  if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) {
97    // Do not propagate errors since meta info is not needed for operation
98    return;
99  }
100  Block* meta = new Block(contents);
101
102  Iterator* iter = meta->NewIterator(BytewiseComparator());
103  std::string key = "filter.";
104  key.append(rep_->options.filter_policy->Name());
105  iter->Seek(key);
106  if (iter->Valid() && iter->key() == Slice(key)) {
107    ReadFilter(iter->value());
108  }
109  delete iter;
110  delete meta;
111}
112
113void Table::ReadFilter(const Slice& filter_handle_value) {
114  Slice v = filter_handle_value;
115  BlockHandle filter_handle;
116  if (!filter_handle.DecodeFrom(&v).ok()) {
117    return;
118  }
119
120  // We might want to unify with ReadBlock() if we start
121  // requiring checksum verification in Table::Open.
122  ReadOptions opt;
123  BlockContents block;
124  if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) {
125    return;
126  }
127  if (block.heap_allocated) {
128    rep_->filter_data = block.data.data();     // Will need to delete later
129  }
130  rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data);
131}
132
133Table::~Table() {
134  delete rep_;
135}
136
137static void DeleteBlock(void* arg, void* ignored) {
138  delete reinterpret_cast<Block*>(arg);
139}
140
141static void DeleteCachedBlock(const Slice& key, void* value) {
142  Block* block = reinterpret_cast<Block*>(value);
143  delete block;
144}
145
146static void ReleaseBlock(void* arg, void* h) {
147  Cache* cache = reinterpret_cast<Cache*>(arg);
148  Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
149  cache->Release(handle);
150}
151
152// Convert an index iterator value (i.e., an encoded BlockHandle)
153// into an iterator over the contents of the corresponding block.
154Iterator* Table::BlockReader(void* arg,
155                             const ReadOptions& options,
156                             const Slice& index_value) {
157  Table* table = reinterpret_cast<Table*>(arg);
158  Cache* block_cache = table->rep_->options.block_cache;
159  Block* block = NULL;
160  Cache::Handle* cache_handle = NULL;
161
162  BlockHandle handle;
163  Slice input = index_value;
164  Status s = handle.DecodeFrom(&input);
165  // We intentionally allow extra stuff in index_value so that we
166  // can add more features in the future.
167
168  if (s.ok()) {
169    BlockContents contents;
170    if (block_cache != NULL) {
171      char cache_key_buffer[16];
172      EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
173      EncodeFixed64(cache_key_buffer+8, handle.offset());
174      Slice key(cache_key_buffer, sizeof(cache_key_buffer));
175      cache_handle = block_cache->Lookup(key);
176      if (cache_handle != NULL) {
177        block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
178      } else {
179        s = ReadBlock(table->rep_->file, options, handle, &contents);
180        if (s.ok()) {
181          block = new Block(contents);
182          if (contents.cachable && options.fill_cache) {
183            cache_handle = block_cache->Insert(
184                key, block, block->size(), &DeleteCachedBlock);
185          }
186        }
187      }
188    } else {
189      s = ReadBlock(table->rep_->file, options, handle, &contents);
190      if (s.ok()) {
191        block = new Block(contents);
192      }
193    }
194  }
195
196  Iterator* iter;
197  if (block != NULL) {
198    iter = block->NewIterator(table->rep_->options.comparator);
199    if (cache_handle == NULL) {
200      iter->RegisterCleanup(&DeleteBlock, block, NULL);
201    } else {
202      iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
203    }
204  } else {
205    iter = NewErrorIterator(s);
206  }
207  return iter;
208}
209
210Iterator* Table::NewIterator(const ReadOptions& options) const {
211  return NewTwoLevelIterator(
212      rep_->index_block->NewIterator(rep_->options.comparator),
213      &Table::BlockReader, const_cast<Table*>(this), options);
214}
215
216Status Table::InternalGet(const ReadOptions& options, const Slice& k,
217                          void* arg,
218                          void (*saver)(void*, const Slice&, const Slice&)) {
219  Status s;
220  Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
221  iiter->Seek(k);
222  if (iiter->Valid()) {
223    Slice handle_value = iiter->value();
224    FilterBlockReader* filter = rep_->filter;
225    BlockHandle handle;
226    if (filter != NULL &&
227        handle.DecodeFrom(&handle_value).ok() &&
228        !filter->KeyMayMatch(handle.offset(), k)) {
229      // Not found
230    } else {
231      Iterator* block_iter = BlockReader(this, options, iiter->value());
232      block_iter->Seek(k);
233      if (block_iter->Valid()) {
234        (*saver)(arg, block_iter->key(), block_iter->value());
235      }
236      s = block_iter->status();
237      delete block_iter;
238    }
239  }
240  if (s.ok()) {
241    s = iiter->status();
242  }
243  delete iiter;
244  return s;
245}
246
247
248uint64_t Table::ApproximateOffsetOf(const Slice& key) const {
249  Iterator* index_iter =
250      rep_->index_block->NewIterator(rep_->options.comparator);
251  index_iter->Seek(key);
252  uint64_t result;
253  if (index_iter->Valid()) {
254    BlockHandle handle;
255    Slice input = index_iter->value();
256    Status s = handle.DecodeFrom(&input);
257    if (s.ok()) {
258      result = handle.offset();
259    } else {
260      // Strange: we can't decode the block handle in the index block.
261      // We'll just return the offset of the metaindex block, which is
262      // close to the whole file size for this case.
263      result = rep_->metaindex_handle.offset();
264    }
265  } else {
266    // key is past the last key in the file.  Approximate the offset
267    // by returning the offset of the metaindex block (which is
268    // right near the end of the file).
269    result = rep_->metaindex_handle.offset();
270  }
271  delete index_iter;
272  return result;
273}
274
275}  // namespace leveldb
276