1// Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. See the AUTHORS file for names of contributors. 4// 5// WriteBatch::rep_ := 6// sequence: fixed64 7// count: fixed32 8// data: record[count] 9// record := 10// kTypeValue varstring varstring | 11// kTypeDeletion varstring 12// varstring := 13// len: varint32 14// data: uint8[len] 15 16#include "leveldb/write_batch.h" 17 18#include "leveldb/db.h" 19#include "db/dbformat.h" 20#include "db/memtable.h" 21#include "db/write_batch_internal.h" 22#include "util/coding.h" 23 24namespace leveldb { 25 26// WriteBatch header has an 8-byte sequence number followed by a 4-byte count. 27static const size_t kHeader = 12; 28 29WriteBatch::WriteBatch() { 30 Clear(); 31} 32 33WriteBatch::~WriteBatch() { } 34 35WriteBatch::Handler::~Handler() { } 36 37void WriteBatch::Clear() { 38 rep_.clear(); 39 rep_.resize(kHeader); 40} 41 42Status WriteBatch::Iterate(Handler* handler) const { 43 Slice input(rep_); 44 if (input.size() < kHeader) { 45 return Status::Corruption("malformed WriteBatch (too small)"); 46 } 47 48 input.remove_prefix(kHeader); 49 Slice key, value; 50 int found = 0; 51 while (!input.empty()) { 52 found++; 53 char tag = input[0]; 54 input.remove_prefix(1); 55 switch (tag) { 56 case kTypeValue: 57 if (GetLengthPrefixedSlice(&input, &key) && 58 GetLengthPrefixedSlice(&input, &value)) { 59 handler->Put(key, value); 60 } else { 61 return Status::Corruption("bad WriteBatch Put"); 62 } 63 break; 64 case kTypeDeletion: 65 if (GetLengthPrefixedSlice(&input, &key)) { 66 handler->Delete(key); 67 } else { 68 return Status::Corruption("bad WriteBatch Delete"); 69 } 70 break; 71 default: 72 return Status::Corruption("unknown WriteBatch tag"); 73 } 74 } 75 if (found != WriteBatchInternal::Count(this)) { 76 return Status::Corruption("WriteBatch has wrong count"); 77 } else { 78 return Status::OK(); 79 } 80} 81 82int WriteBatchInternal::Count(const WriteBatch* b) { 83 return DecodeFixed32(b->rep_.data() + 8); 84} 85 86void WriteBatchInternal::SetCount(WriteBatch* b, int n) { 87 EncodeFixed32(&b->rep_[8], n); 88} 89 90SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { 91 return SequenceNumber(DecodeFixed64(b->rep_.data())); 92} 93 94void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { 95 EncodeFixed64(&b->rep_[0], seq); 96} 97 98void WriteBatch::Put(const Slice& key, const Slice& value) { 99 WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); 100 rep_.push_back(static_cast<char>(kTypeValue)); 101 PutLengthPrefixedSlice(&rep_, key); 102 PutLengthPrefixedSlice(&rep_, value); 103} 104 105void WriteBatch::Delete(const Slice& key) { 106 WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); 107 rep_.push_back(static_cast<char>(kTypeDeletion)); 108 PutLengthPrefixedSlice(&rep_, key); 109} 110 111namespace { 112class MemTableInserter : public WriteBatch::Handler { 113 public: 114 SequenceNumber sequence_; 115 MemTable* mem_; 116 117 virtual void Put(const Slice& key, const Slice& value) { 118 mem_->Add(sequence_, kTypeValue, key, value); 119 sequence_++; 120 } 121 virtual void Delete(const Slice& key) { 122 mem_->Add(sequence_, kTypeDeletion, key, Slice()); 123 sequence_++; 124 } 125}; 126} // namespace 127 128Status WriteBatchInternal::InsertInto(const WriteBatch* b, 129 MemTable* memtable) { 130 MemTableInserter inserter; 131 inserter.sequence_ = WriteBatchInternal::Sequence(b); 132 inserter.mem_ = memtable; 133 return b->Iterate(&inserter); 134} 135 136void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { 137 assert(contents.size() >= kHeader); 138 b->rep_.assign(contents.data(), contents.size()); 139} 140 141void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { 142 SetCount(dst, Count(dst) + Count(src)); 143 assert(src->rep_.size() >= kHeader); 144 dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); 145} 146 147} // namespace leveldb 148