1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "leveldb/table_builder.h"
6
7#include <assert.h>
8#include "leveldb/comparator.h"
9#include "leveldb/env.h"
10#include "leveldb/filter_policy.h"
11#include "leveldb/options.h"
12#include "table/block_builder.h"
13#include "table/filter_block.h"
14#include "table/format.h"
15#include "util/coding.h"
16#include "util/crc32c.h"
17
18namespace leveldb {
19
20struct TableBuilder::Rep {
21  Options options;
22  Options index_block_options;
23  WritableFile* file;
24  uint64_t offset;
25  Status status;
26  BlockBuilder data_block;
27  BlockBuilder index_block;
28  std::string last_key;
29  int64_t num_entries;
30  bool closed;          // Either Finish() or Abandon() has been called.
31  FilterBlockBuilder* filter_block;
32
33  // We do not emit the index entry for a block until we have seen the
34  // first key for the next data block.  This allows us to use shorter
35  // keys in the index block.  For example, consider a block boundary
36  // between the keys "the quick brown fox" and "the who".  We can use
37  // "the r" as the key for the index block entry since it is >= all
38  // entries in the first block and < all entries in subsequent
39  // blocks.
40  //
41  // Invariant: r->pending_index_entry is true only if data_block is empty.
42  bool pending_index_entry;
43  BlockHandle pending_handle;  // Handle to add to index block
44
45  std::string compressed_output;
46
47  Rep(const Options& opt, WritableFile* f)
48      : options(opt),
49        index_block_options(opt),
50        file(f),
51        offset(0),
52        data_block(&options),
53        index_block(&index_block_options),
54        num_entries(0),
55        closed(false),
56        filter_block(opt.filter_policy == NULL ? NULL
57                     : new FilterBlockBuilder(opt.filter_policy)),
58        pending_index_entry(false) {
59    index_block_options.block_restart_interval = 1;
60  }
61};
62
63TableBuilder::TableBuilder(const Options& options, WritableFile* file)
64    : rep_(new Rep(options, file)) {
65  if (rep_->filter_block != NULL) {
66    rep_->filter_block->StartBlock(0);
67  }
68}
69
70TableBuilder::~TableBuilder() {
71  assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
72  delete rep_->filter_block;
73  delete rep_;
74}
75
76Status TableBuilder::ChangeOptions(const Options& options) {
77  // Note: if more fields are added to Options, update
78  // this function to catch changes that should not be allowed to
79  // change in the middle of building a Table.
80  if (options.comparator != rep_->options.comparator) {
81    return Status::InvalidArgument("changing comparator while building table");
82  }
83
84  // Note that any live BlockBuilders point to rep_->options and therefore
85  // will automatically pick up the updated options.
86  rep_->options = options;
87  rep_->index_block_options = options;
88  rep_->index_block_options.block_restart_interval = 1;
89  return Status::OK();
90}
91
92void TableBuilder::Add(const Slice& key, const Slice& value) {
93  Rep* r = rep_;
94  assert(!r->closed);
95  if (!ok()) return;
96  if (r->num_entries > 0) {
97    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
98  }
99
100  if (r->pending_index_entry) {
101    assert(r->data_block.empty());
102    r->options.comparator->FindShortestSeparator(&r->last_key, key);
103    std::string handle_encoding;
104    r->pending_handle.EncodeTo(&handle_encoding);
105    r->index_block.Add(r->last_key, Slice(handle_encoding));
106    r->pending_index_entry = false;
107  }
108
109  if (r->filter_block != NULL) {
110    r->filter_block->AddKey(key);
111  }
112
113  r->last_key.assign(key.data(), key.size());
114  r->num_entries++;
115  r->data_block.Add(key, value);
116
117  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
118  if (estimated_block_size >= r->options.block_size) {
119    Flush();
120  }
121}
122
123void TableBuilder::Flush() {
124  Rep* r = rep_;
125  assert(!r->closed);
126  if (!ok()) return;
127  if (r->data_block.empty()) return;
128  assert(!r->pending_index_entry);
129  WriteBlock(&r->data_block, &r->pending_handle);
130  if (ok()) {
131    r->pending_index_entry = true;
132    r->status = r->file->Flush();
133  }
134  if (r->filter_block != NULL) {
135    r->filter_block->StartBlock(r->offset);
136  }
137}
138
139void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
140  // File format contains a sequence of blocks where each block has:
141  //    block_data: uint8[n]
142  //    type: uint8
143  //    crc: uint32
144  assert(ok());
145  Rep* r = rep_;
146  Slice raw = block->Finish();
147
148  Slice block_contents;
149  CompressionType type = r->options.compression;
150  // TODO(postrelease): Support more compression options: zlib?
151  switch (type) {
152    case kNoCompression:
153      block_contents = raw;
154      break;
155
156    case kSnappyCompression: {
157      std::string* compressed = &r->compressed_output;
158      if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
159          compressed->size() < raw.size() - (raw.size() / 8u)) {
160        block_contents = *compressed;
161      } else {
162        // Snappy not supported, or compressed less than 12.5%, so just
163        // store uncompressed form
164        block_contents = raw;
165        type = kNoCompression;
166      }
167      break;
168    }
169  }
170  WriteRawBlock(block_contents, type, handle);
171  r->compressed_output.clear();
172  block->Reset();
173}
174
175void TableBuilder::WriteRawBlock(const Slice& block_contents,
176                                 CompressionType type,
177                                 BlockHandle* handle) {
178  Rep* r = rep_;
179  handle->set_offset(r->offset);
180  handle->set_size(block_contents.size());
181  r->status = r->file->Append(block_contents);
182  if (r->status.ok()) {
183    char trailer[kBlockTrailerSize];
184    trailer[0] = type;
185    uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
186    crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
187    EncodeFixed32(trailer+1, crc32c::Mask(crc));
188    r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
189    if (r->status.ok()) {
190      r->offset += block_contents.size() + kBlockTrailerSize;
191    }
192  }
193}
194
195Status TableBuilder::status() const {
196  return rep_->status;
197}
198
199Status TableBuilder::Finish() {
200  Rep* r = rep_;
201  Flush();
202  assert(!r->closed);
203  r->closed = true;
204
205  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
206
207  // Write filter block
208  if (ok() && r->filter_block != NULL) {
209    WriteRawBlock(r->filter_block->Finish(), kNoCompression,
210                  &filter_block_handle);
211  }
212
213  // Write metaindex block
214  if (ok()) {
215    BlockBuilder meta_index_block(&r->options);
216    if (r->filter_block != NULL) {
217      // Add mapping from "filter.Name" to location of filter data
218      std::string key = "filter.";
219      key.append(r->options.filter_policy->Name());
220      std::string handle_encoding;
221      filter_block_handle.EncodeTo(&handle_encoding);
222      meta_index_block.Add(key, handle_encoding);
223    }
224
225    // TODO(postrelease): Add stats and other meta blocks
226    WriteBlock(&meta_index_block, &metaindex_block_handle);
227  }
228
229  // Write index block
230  if (ok()) {
231    if (r->pending_index_entry) {
232      r->options.comparator->FindShortSuccessor(&r->last_key);
233      std::string handle_encoding;
234      r->pending_handle.EncodeTo(&handle_encoding);
235      r->index_block.Add(r->last_key, Slice(handle_encoding));
236      r->pending_index_entry = false;
237    }
238    WriteBlock(&r->index_block, &index_block_handle);
239  }
240
241  // Write footer
242  if (ok()) {
243    Footer footer;
244    footer.set_metaindex_handle(metaindex_block_handle);
245    footer.set_index_handle(index_block_handle);
246    std::string footer_encoding;
247    footer.EncodeTo(&footer_encoding);
248    r->status = r->file->Append(footer_encoding);
249    if (r->status.ok()) {
250      r->offset += footer_encoding.size();
251    }
252  }
253  return r->status;
254}
255
256void TableBuilder::Abandon() {
257  Rep* r = rep_;
258  assert(!r->closed);
259  r->closed = true;
260}
261
262uint64_t TableBuilder::NumEntries() const {
263  return rep_->num_entries;
264}
265
266uint64_t TableBuilder::FileSize() const {
267  return rep_->offset;
268}
269
270}  // namespace leveldb
271