1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4//
5// WriteBatch::rep_ :=
6//    sequence: fixed64
7//    count: fixed32
8//    data: record[count]
9// record :=
10//    kTypeValue varstring varstring         |
11//    kTypeDeletion varstring
12// varstring :=
13//    len: varint32
14//    data: uint8[len]
15
16#include "leveldb/write_batch.h"
17
18#include "leveldb/db.h"
19#include "db/dbformat.h"
20#include "db/memtable.h"
21#include "db/write_batch_internal.h"
22#include "util/coding.h"
23
24namespace leveldb {
25
26// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
27static const size_t kHeader = 12;
28
29WriteBatch::WriteBatch() {
30  Clear();
31}
32
33WriteBatch::~WriteBatch() { }
34
35WriteBatch::Handler::~Handler() { }
36
37void WriteBatch::Clear() {
38  rep_.clear();
39  rep_.resize(kHeader);
40}
41
42Status WriteBatch::Iterate(Handler* handler) const {
43  Slice input(rep_);
44  if (input.size() < kHeader) {
45    return Status::Corruption("malformed WriteBatch (too small)");
46  }
47
48  input.remove_prefix(kHeader);
49  Slice key, value;
50  int found = 0;
51  while (!input.empty()) {
52    found++;
53    char tag = input[0];
54    input.remove_prefix(1);
55    switch (tag) {
56      case kTypeValue:
57        if (GetLengthPrefixedSlice(&input, &key) &&
58            GetLengthPrefixedSlice(&input, &value)) {
59          handler->Put(key, value);
60        } else {
61          return Status::Corruption("bad WriteBatch Put");
62        }
63        break;
64      case kTypeDeletion:
65        if (GetLengthPrefixedSlice(&input, &key)) {
66          handler->Delete(key);
67        } else {
68          return Status::Corruption("bad WriteBatch Delete");
69        }
70        break;
71      default:
72        return Status::Corruption("unknown WriteBatch tag");
73    }
74  }
75  if (found != WriteBatchInternal::Count(this)) {
76    return Status::Corruption("WriteBatch has wrong count");
77  } else {
78    return Status::OK();
79  }
80}
81
82int WriteBatchInternal::Count(const WriteBatch* b) {
83  return DecodeFixed32(b->rep_.data() + 8);
84}
85
86void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
87  EncodeFixed32(&b->rep_[8], n);
88}
89
90SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
91  return SequenceNumber(DecodeFixed64(b->rep_.data()));
92}
93
94void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
95  EncodeFixed64(&b->rep_[0], seq);
96}
97
98void WriteBatch::Put(const Slice& key, const Slice& value) {
99  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
100  rep_.push_back(static_cast<char>(kTypeValue));
101  PutLengthPrefixedSlice(&rep_, key);
102  PutLengthPrefixedSlice(&rep_, value);
103}
104
105void WriteBatch::Delete(const Slice& key) {
106  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
107  rep_.push_back(static_cast<char>(kTypeDeletion));
108  PutLengthPrefixedSlice(&rep_, key);
109}
110
111namespace {
112class MemTableInserter : public WriteBatch::Handler {
113 public:
114  SequenceNumber sequence_;
115  MemTable* mem_;
116
117  virtual void Put(const Slice& key, const Slice& value) {
118    mem_->Add(sequence_, kTypeValue, key, value);
119    sequence_++;
120  }
121  virtual void Delete(const Slice& key) {
122    mem_->Add(sequence_, kTypeDeletion, key, Slice());
123    sequence_++;
124  }
125};
126}  // namespace
127
128Status WriteBatchInternal::InsertInto(const WriteBatch* b,
129                                      MemTable* memtable) {
130  MemTableInserter inserter;
131  inserter.sequence_ = WriteBatchInternal::Sequence(b);
132  inserter.mem_ = memtable;
133  return b->Iterate(&inserter);
134}
135
136void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
137  assert(contents.size() >= kHeader);
138  b->rep_.assign(contents.data(), contents.size());
139}
140
141void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
142  SetCount(dst, Count(dst) + Count(src));
143  assert(src->rep_.size() >= kHeader);
144  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
145}
146
147}  // namespace leveldb
148