1// Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5#include "db/log_writer.h" 6 7#include <stdint.h> 8#include "leveldb/env.h" 9#include "util/coding.h" 10#include "util/crc32c.h" 11 12namespace leveldb { 13namespace log { 14 15Writer::Writer(WritableFile* dest) 16 : dest_(dest), 17 block_offset_(0) { 18 for (int i = 0; i <= kMaxRecordType; i++) { 19 char t = static_cast<char>(i); 20 type_crc_[i] = crc32c::Value(&t, 1); 21 } 22} 23 24Writer::~Writer() { 25} 26 27Status Writer::AddRecord(const Slice& slice) { 28 const char* ptr = slice.data(); 29 size_t left = slice.size(); 30 31 // Fragment the record if necessary and emit it. Note that if slice 32 // is empty, we still want to iterate once to emit a single 33 // zero-length record 34 Status s; 35 bool begin = true; 36 do { 37 const int leftover = kBlockSize - block_offset_; 38 assert(leftover >= 0); 39 if (leftover < kHeaderSize) { 40 // Switch to a new block 41 if (leftover > 0) { 42 // Fill the trailer (literal below relies on kHeaderSize being 7) 43 assert(kHeaderSize == 7); 44 dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); 45 } 46 block_offset_ = 0; 47 } 48 49 // Invariant: we never leave < kHeaderSize bytes in a block. 50 assert(kBlockSize - block_offset_ - kHeaderSize >= 0); 51 52 const size_t avail = kBlockSize - block_offset_ - kHeaderSize; 53 const size_t fragment_length = (left < avail) ? left : avail; 54 55 RecordType type; 56 const bool end = (left == fragment_length); 57 if (begin && end) { 58 type = kFullType; 59 } else if (begin) { 60 type = kFirstType; 61 } else if (end) { 62 type = kLastType; 63 } else { 64 type = kMiddleType; 65 } 66 67 s = EmitPhysicalRecord(type, ptr, fragment_length); 68 ptr += fragment_length; 69 left -= fragment_length; 70 begin = false; 71 } while (s.ok() && left > 0); 72 return s; 73} 74 75Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { 76 assert(n <= 0xffff); // Must fit in two bytes 77 assert(block_offset_ + kHeaderSize + n <= kBlockSize); 78 79 // Format the header 80 char buf[kHeaderSize]; 81 buf[4] = static_cast<char>(n & 0xff); 82 buf[5] = static_cast<char>(n >> 8); 83 buf[6] = static_cast<char>(t); 84 85 // Compute the crc of the record type and the payload. 86 uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n); 87 crc = crc32c::Mask(crc); // Adjust for storage 88 EncodeFixed32(buf, crc); 89 90 // Write the header and the payload 91 Status s = dest_->Append(Slice(buf, kHeaderSize)); 92 if (s.ok()) { 93 s = dest_->Append(Slice(ptr, n)); 94 if (s.ok()) { 95 s = dest_->Flush(); 96 } 97 } 98 block_offset_ += kHeaderSize + n; 99 return s; 100} 101 102} // namespace log 103} // namespace leveldb 104