1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "db/log_reader.h"
6#include "db/log_writer.h"
7#include "leveldb/env.h"
8#include "util/coding.h"
9#include "util/crc32c.h"
10#include "util/random.h"
11#include "util/testharness.h"
12
13namespace leveldb {
14namespace log {
15
16// Construct a string of the specified length made out of the supplied
17// partial string.
18static std::string BigString(const std::string& partial_string, size_t n) {
19  std::string result;
20  while (result.size() < n) {
21    result.append(partial_string);
22  }
23  result.resize(n);
24  return result;
25}
26
27// Construct a string from a number
28static std::string NumberString(int n) {
29  char buf[50];
30  snprintf(buf, sizeof(buf), "%d.", n);
31  return std::string(buf);
32}
33
34// Return a skewed potentially long string
35static std::string RandomSkewedString(int i, Random* rnd) {
36  return BigString(NumberString(i), rnd->Skewed(17));
37}
38
39class LogTest {
40 private:
41  class StringDest : public WritableFile {
42   public:
43    std::string contents_;
44
45    virtual Status Close() { return Status::OK(); }
46    virtual Status Flush() { return Status::OK(); }
47    virtual Status Sync() { return Status::OK(); }
48    virtual Status Append(const Slice& slice) {
49      contents_.append(slice.data(), slice.size());
50      return Status::OK();
51    }
52  };
53
54  class StringSource : public SequentialFile {
55   public:
56    Slice contents_;
57    bool force_error_;
58    bool returned_partial_;
59    StringSource() : force_error_(false), returned_partial_(false) { }
60
61    virtual Status Read(size_t n, Slice* result, char* scratch) {
62      ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
63
64      if (force_error_) {
65        force_error_ = false;
66        returned_partial_ = true;
67        return Status::Corruption("read error");
68      }
69
70      if (contents_.size() < n) {
71        n = contents_.size();
72        returned_partial_ = true;
73      }
74      *result = Slice(contents_.data(), n);
75      contents_.remove_prefix(n);
76      return Status::OK();
77    }
78
79    virtual Status Skip(uint64_t n) {
80      if (n > contents_.size()) {
81        contents_.clear();
82        return Status::NotFound("in-memory file skipepd past end");
83      }
84
85      contents_.remove_prefix(n);
86
87      return Status::OK();
88    }
89  };
90
91  class ReportCollector : public Reader::Reporter {
92   public:
93    size_t dropped_bytes_;
94    std::string message_;
95
96    ReportCollector() : dropped_bytes_(0) { }
97    virtual void Corruption(size_t bytes, const Status& status) {
98      dropped_bytes_ += bytes;
99      message_.append(status.ToString());
100    }
101  };
102
103  StringDest dest_;
104  StringSource source_;
105  ReportCollector report_;
106  bool reading_;
107  Writer writer_;
108  Reader reader_;
109
110  // Record metadata for testing initial offset functionality
111  static size_t initial_offset_record_sizes_[];
112  static uint64_t initial_offset_last_record_offsets_[];
113
114 public:
115  LogTest() : reading_(false),
116              writer_(&dest_),
117              reader_(&source_, &report_, true/*checksum*/,
118                      0/*initial_offset*/) {
119  }
120
121  void Write(const std::string& msg) {
122    ASSERT_TRUE(!reading_) << "Write() after starting to read";
123    writer_.AddRecord(Slice(msg));
124  }
125
126  size_t WrittenBytes() const {
127    return dest_.contents_.size();
128  }
129
130  std::string Read() {
131    if (!reading_) {
132      reading_ = true;
133      source_.contents_ = Slice(dest_.contents_);
134    }
135    std::string scratch;
136    Slice record;
137    if (reader_.ReadRecord(&record, &scratch)) {
138      return record.ToString();
139    } else {
140      return "EOF";
141    }
142  }
143
144  void IncrementByte(int offset, int delta) {
145    dest_.contents_[offset] += delta;
146  }
147
148  void SetByte(int offset, char new_byte) {
149    dest_.contents_[offset] = new_byte;
150  }
151
152  void ShrinkSize(int bytes) {
153    dest_.contents_.resize(dest_.contents_.size() - bytes);
154  }
155
156  void FixChecksum(int header_offset, int len) {
157    // Compute crc of type/len/data
158    uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
159    crc = crc32c::Mask(crc);
160    EncodeFixed32(&dest_.contents_[header_offset], crc);
161  }
162
163  void ForceError() {
164    source_.force_error_ = true;
165  }
166
167  size_t DroppedBytes() const {
168    return report_.dropped_bytes_;
169  }
170
171  std::string ReportMessage() const {
172    return report_.message_;
173  }
174
175  // Returns OK iff recorded error message contains "msg"
176  std::string MatchError(const std::string& msg) const {
177    if (report_.message_.find(msg) == std::string::npos) {
178      return report_.message_;
179    } else {
180      return "OK";
181    }
182  }
183
184  void WriteInitialOffsetLog() {
185    for (int i = 0; i < 4; i++) {
186      std::string record(initial_offset_record_sizes_[i],
187                         static_cast<char>('a' + i));
188      Write(record);
189    }
190  }
191
192  void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
193    WriteInitialOffsetLog();
194    reading_ = true;
195    source_.contents_ = Slice(dest_.contents_);
196    Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
197                                       WrittenBytes() + offset_past_end);
198    Slice record;
199    std::string scratch;
200    ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
201    delete offset_reader;
202  }
203
204  void CheckInitialOffsetRecord(uint64_t initial_offset,
205                                int expected_record_offset) {
206    WriteInitialOffsetLog();
207    reading_ = true;
208    source_.contents_ = Slice(dest_.contents_);
209    Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
210                                       initial_offset);
211    Slice record;
212    std::string scratch;
213    ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
214    ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
215              record.size());
216    ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
217              offset_reader->LastRecordOffset());
218    ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
219    delete offset_reader;
220  }
221
222};
223
224size_t LogTest::initial_offset_record_sizes_[] =
225    {10000,  // Two sizable records in first block
226     10000,
227     2 * log::kBlockSize - 1000,  // Span three blocks
228     1};
229
230uint64_t LogTest::initial_offset_last_record_offsets_[] =
231    {0,
232     kHeaderSize + 10000,
233     2 * (kHeaderSize + 10000),
234     2 * (kHeaderSize + 10000) +
235         (2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
236
237
238TEST(LogTest, Empty) {
239  ASSERT_EQ("EOF", Read());
240}
241
242TEST(LogTest, ReadWrite) {
243  Write("foo");
244  Write("bar");
245  Write("");
246  Write("xxxx");
247  ASSERT_EQ("foo", Read());
248  ASSERT_EQ("bar", Read());
249  ASSERT_EQ("", Read());
250  ASSERT_EQ("xxxx", Read());
251  ASSERT_EQ("EOF", Read());
252  ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
253}
254
255TEST(LogTest, ManyBlocks) {
256  for (int i = 0; i < 100000; i++) {
257    Write(NumberString(i));
258  }
259  for (int i = 0; i < 100000; i++) {
260    ASSERT_EQ(NumberString(i), Read());
261  }
262  ASSERT_EQ("EOF", Read());
263}
264
265TEST(LogTest, Fragmentation) {
266  Write("small");
267  Write(BigString("medium", 50000));
268  Write(BigString("large", 100000));
269  ASSERT_EQ("small", Read());
270  ASSERT_EQ(BigString("medium", 50000), Read());
271  ASSERT_EQ(BigString("large", 100000), Read());
272  ASSERT_EQ("EOF", Read());
273}
274
275TEST(LogTest, MarginalTrailer) {
276  // Make a trailer that is exactly the same length as an empty record.
277  const int n = kBlockSize - 2*kHeaderSize;
278  Write(BigString("foo", n));
279  ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
280  Write("");
281  Write("bar");
282  ASSERT_EQ(BigString("foo", n), Read());
283  ASSERT_EQ("", Read());
284  ASSERT_EQ("bar", Read());
285  ASSERT_EQ("EOF", Read());
286}
287
288TEST(LogTest, MarginalTrailer2) {
289  // Make a trailer that is exactly the same length as an empty record.
290  const int n = kBlockSize - 2*kHeaderSize;
291  Write(BigString("foo", n));
292  ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
293  Write("bar");
294  ASSERT_EQ(BigString("foo", n), Read());
295  ASSERT_EQ("bar", Read());
296  ASSERT_EQ("EOF", Read());
297  ASSERT_EQ(0, DroppedBytes());
298  ASSERT_EQ("", ReportMessage());
299}
300
301TEST(LogTest, ShortTrailer) {
302  const int n = kBlockSize - 2*kHeaderSize + 4;
303  Write(BigString("foo", n));
304  ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
305  Write("");
306  Write("bar");
307  ASSERT_EQ(BigString("foo", n), Read());
308  ASSERT_EQ("", Read());
309  ASSERT_EQ("bar", Read());
310  ASSERT_EQ("EOF", Read());
311}
312
313TEST(LogTest, AlignedEof) {
314  const int n = kBlockSize - 2*kHeaderSize + 4;
315  Write(BigString("foo", n));
316  ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
317  ASSERT_EQ(BigString("foo", n), Read());
318  ASSERT_EQ("EOF", Read());
319}
320
321TEST(LogTest, RandomRead) {
322  const int N = 500;
323  Random write_rnd(301);
324  for (int i = 0; i < N; i++) {
325    Write(RandomSkewedString(i, &write_rnd));
326  }
327  Random read_rnd(301);
328  for (int i = 0; i < N; i++) {
329    ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
330  }
331  ASSERT_EQ("EOF", Read());
332}
333
334// Tests of all the error paths in log_reader.cc follow:
335
336TEST(LogTest, ReadError) {
337  Write("foo");
338  ForceError();
339  ASSERT_EQ("EOF", Read());
340  ASSERT_EQ(kBlockSize, DroppedBytes());
341  ASSERT_EQ("OK", MatchError("read error"));
342}
343
344TEST(LogTest, BadRecordType) {
345  Write("foo");
346  // Type is stored in header[6]
347  IncrementByte(6, 100);
348  FixChecksum(0, 3);
349  ASSERT_EQ("EOF", Read());
350  ASSERT_EQ(3, DroppedBytes());
351  ASSERT_EQ("OK", MatchError("unknown record type"));
352}
353
354TEST(LogTest, TruncatedTrailingRecord) {
355  Write("foo");
356  ShrinkSize(4);   // Drop all payload as well as a header byte
357  ASSERT_EQ("EOF", Read());
358  ASSERT_EQ(kHeaderSize - 1, DroppedBytes());
359  ASSERT_EQ("OK", MatchError("truncated record at end of file"));
360}
361
362TEST(LogTest, BadLength) {
363  Write("foo");
364  ShrinkSize(1);
365  ASSERT_EQ("EOF", Read());
366  ASSERT_EQ(kHeaderSize + 2, DroppedBytes());
367  ASSERT_EQ("OK", MatchError("bad record length"));
368}
369
370TEST(LogTest, ChecksumMismatch) {
371  Write("foo");
372  IncrementByte(0, 10);
373  ASSERT_EQ("EOF", Read());
374  ASSERT_EQ(10, DroppedBytes());
375  ASSERT_EQ("OK", MatchError("checksum mismatch"));
376}
377
378TEST(LogTest, UnexpectedMiddleType) {
379  Write("foo");
380  SetByte(6, kMiddleType);
381  FixChecksum(0, 3);
382  ASSERT_EQ("EOF", Read());
383  ASSERT_EQ(3, DroppedBytes());
384  ASSERT_EQ("OK", MatchError("missing start"));
385}
386
387TEST(LogTest, UnexpectedLastType) {
388  Write("foo");
389  SetByte(6, kLastType);
390  FixChecksum(0, 3);
391  ASSERT_EQ("EOF", Read());
392  ASSERT_EQ(3, DroppedBytes());
393  ASSERT_EQ("OK", MatchError("missing start"));
394}
395
396TEST(LogTest, UnexpectedFullType) {
397  Write("foo");
398  Write("bar");
399  SetByte(6, kFirstType);
400  FixChecksum(0, 3);
401  ASSERT_EQ("bar", Read());
402  ASSERT_EQ("EOF", Read());
403  ASSERT_EQ(3, DroppedBytes());
404  ASSERT_EQ("OK", MatchError("partial record without end"));
405}
406
407TEST(LogTest, UnexpectedFirstType) {
408  Write("foo");
409  Write(BigString("bar", 100000));
410  SetByte(6, kFirstType);
411  FixChecksum(0, 3);
412  ASSERT_EQ(BigString("bar", 100000), Read());
413  ASSERT_EQ("EOF", Read());
414  ASSERT_EQ(3, DroppedBytes());
415  ASSERT_EQ("OK", MatchError("partial record without end"));
416}
417
418TEST(LogTest, ErrorJoinsRecords) {
419  // Consider two fragmented records:
420  //    first(R1) last(R1) first(R2) last(R2)
421  // where the middle two fragments disappear.  We do not want
422  // first(R1),last(R2) to get joined and returned as a valid record.
423
424  // Write records that span two blocks
425  Write(BigString("foo", kBlockSize));
426  Write(BigString("bar", kBlockSize));
427  Write("correct");
428
429  // Wipe the middle block
430  for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
431    SetByte(offset, 'x');
432  }
433
434  ASSERT_EQ("correct", Read());
435  ASSERT_EQ("EOF", Read());
436  const int dropped = DroppedBytes();
437  ASSERT_LE(dropped, 2*kBlockSize + 100);
438  ASSERT_GE(dropped, 2*kBlockSize);
439}
440
441TEST(LogTest, ReadStart) {
442  CheckInitialOffsetRecord(0, 0);
443}
444
445TEST(LogTest, ReadSecondOneOff) {
446  CheckInitialOffsetRecord(1, 1);
447}
448
449TEST(LogTest, ReadSecondTenThousand) {
450  CheckInitialOffsetRecord(10000, 1);
451}
452
453TEST(LogTest, ReadSecondStart) {
454  CheckInitialOffsetRecord(10007, 1);
455}
456
457TEST(LogTest, ReadThirdOneOff) {
458  CheckInitialOffsetRecord(10008, 2);
459}
460
461TEST(LogTest, ReadThirdStart) {
462  CheckInitialOffsetRecord(20014, 2);
463}
464
465TEST(LogTest, ReadFourthOneOff) {
466  CheckInitialOffsetRecord(20015, 3);
467}
468
469TEST(LogTest, ReadFourthFirstBlockTrailer) {
470  CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
471}
472
473TEST(LogTest, ReadFourthMiddleBlock) {
474  CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
475}
476
477TEST(LogTest, ReadFourthLastBlock) {
478  CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
479}
480
481TEST(LogTest, ReadFourthStart) {
482  CheckInitialOffsetRecord(
483      2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
484      3);
485}
486
487TEST(LogTest, ReadEnd) {
488  CheckOffsetPastEndReturnsNoRecords(0);
489}
490
491TEST(LogTest, ReadPastEnd) {
492  CheckOffsetPastEndReturnsNoRecords(5);
493}
494
495}  // namespace log
496}  // namespace leveldb
497
498int main(int argc, char** argv) {
499  return leveldb::test::RunAllTests();
500}
501