1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "db/log_reader.h"
6#include "db/log_writer.h"
7#include "leveldb/env.h"
8#include "util/coding.h"
9#include "util/crc32c.h"
10#include "util/random.h"
11#include "util/testharness.h"
12
13namespace leveldb {
14namespace log {
15
16// Construct a string of the specified length made out of the supplied
17// partial string.
18static std::string BigString(const std::string& partial_string, size_t n) {
19  std::string result;
20  while (result.size() < n) {
21    result.append(partial_string);
22  }
23  result.resize(n);
24  return result;
25}
26
27// Construct a string from a number
28static std::string NumberString(int n) {
29  char buf[50];
30  snprintf(buf, sizeof(buf), "%d.", n);
31  return std::string(buf);
32}
33
34// Return a skewed potentially long string
35static std::string RandomSkewedString(int i, Random* rnd) {
36  return BigString(NumberString(i), rnd->Skewed(17));
37}
38
39class LogTest {
40 private:
41  class StringDest : public WritableFile {
42   public:
43    std::string contents_;
44
45    virtual Status Close() { return Status::OK(); }
46    virtual Status Flush() { return Status::OK(); }
47    virtual Status Sync() { return Status::OK(); }
48    virtual Status Append(const Slice& slice) {
49      contents_.append(slice.data(), slice.size());
50      return Status::OK();
51    }
52  };
53
54  class StringSource : public SequentialFile {
55   public:
56    Slice contents_;
57    bool force_error_;
58    bool returned_partial_;
59    StringSource() : force_error_(false), returned_partial_(false) { }
60
61    virtual Status Read(size_t n, Slice* result, char* scratch) {
62      ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
63
64      if (force_error_) {
65        force_error_ = false;
66        returned_partial_ = true;
67        return Status::Corruption("read error");
68      }
69
70      if (contents_.size() < n) {
71        n = contents_.size();
72        returned_partial_ = true;
73      }
74      *result = Slice(contents_.data(), n);
75      contents_.remove_prefix(n);
76      return Status::OK();
77    }
78
79    virtual Status Skip(uint64_t n) {
80      if (n > contents_.size()) {
81        contents_.clear();
82        return Status::NotFound("in-memory file skipepd past end");
83      }
84
85      contents_.remove_prefix(n);
86
87      return Status::OK();
88    }
89  };
90
91  class ReportCollector : public Reader::Reporter {
92   public:
93    size_t dropped_bytes_;
94    std::string message_;
95
96    ReportCollector() : dropped_bytes_(0) { }
97    virtual void Corruption(size_t bytes, const Status& status) {
98      dropped_bytes_ += bytes;
99      message_.append(status.ToString());
100    }
101  };
102
103  StringDest dest_;
104  StringSource source_;
105  ReportCollector report_;
106  bool reading_;
107  Writer writer_;
108  Reader reader_;
109
110  // Record metadata for testing initial offset functionality
111  static size_t initial_offset_record_sizes_[];
112  static uint64_t initial_offset_last_record_offsets_[];
113
114 public:
115  LogTest() : reading_(false),
116              writer_(&dest_),
117              reader_(&source_, &report_, true/*checksum*/,
118                      0/*initial_offset*/) {
119  }
120
121  void Write(const std::string& msg) {
122    ASSERT_TRUE(!reading_) << "Write() after starting to read";
123    writer_.AddRecord(Slice(msg));
124  }
125
126  size_t WrittenBytes() const {
127    return dest_.contents_.size();
128  }
129
130  std::string Read() {
131    if (!reading_) {
132      reading_ = true;
133      source_.contents_ = Slice(dest_.contents_);
134    }
135    std::string scratch;
136    Slice record;
137    if (reader_.ReadRecord(&record, &scratch)) {
138      return record.ToString();
139    } else {
140      return "EOF";
141    }
142  }
143
144  void IncrementByte(int offset, int delta) {
145    dest_.contents_[offset] += delta;
146  }
147
148  void SetByte(int offset, char new_byte) {
149    dest_.contents_[offset] = new_byte;
150  }
151
152  void ShrinkSize(int bytes) {
153    dest_.contents_.resize(dest_.contents_.size() - bytes);
154  }
155
156  void FixChecksum(int header_offset, int len) {
157    // Compute crc of type/len/data
158    uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
159    crc = crc32c::Mask(crc);
160    EncodeFixed32(&dest_.contents_[header_offset], crc);
161  }
162
163  void ForceError() {
164    source_.force_error_ = true;
165  }
166
167  size_t DroppedBytes() const {
168    return report_.dropped_bytes_;
169  }
170
171  std::string ReportMessage() const {
172    return report_.message_;
173  }
174
175  // Returns OK iff recorded error message contains "msg"
176  std::string MatchError(const std::string& msg) const {
177    if (report_.message_.find(msg) == std::string::npos) {
178      return report_.message_;
179    } else {
180      return "OK";
181    }
182  }
183
184  void WriteInitialOffsetLog() {
185    for (int i = 0; i < 4; i++) {
186      std::string record(initial_offset_record_sizes_[i],
187                         static_cast<char>('a' + i));
188      Write(record);
189    }
190  }
191
192  void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
193    WriteInitialOffsetLog();
194    reading_ = true;
195    source_.contents_ = Slice(dest_.contents_);
196    Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
197                                       WrittenBytes() + offset_past_end);
198    Slice record;
199    std::string scratch;
200    ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
201    delete offset_reader;
202  }
203
204  void CheckInitialOffsetRecord(uint64_t initial_offset,
205                                int expected_record_offset) {
206    WriteInitialOffsetLog();
207    reading_ = true;
208    source_.contents_ = Slice(dest_.contents_);
209    Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
210                                       initial_offset);
211    Slice record;
212    std::string scratch;
213    ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
214    ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
215              record.size());
216    ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
217              offset_reader->LastRecordOffset());
218    ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
219    delete offset_reader;
220  }
221
222};
223
224size_t LogTest::initial_offset_record_sizes_[] =
225    {10000,  // Two sizable records in first block
226     10000,
227     2 * log::kBlockSize - 1000,  // Span three blocks
228     1};
229
230uint64_t LogTest::initial_offset_last_record_offsets_[] =
231    {0,
232     kHeaderSize + 10000,
233     2 * (kHeaderSize + 10000),
234     2 * (kHeaderSize + 10000) +
235         (2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
236
237
238TEST(LogTest, Empty) {
239  ASSERT_EQ("EOF", Read());
240}
241
242TEST(LogTest, ReadWrite) {
243  Write("foo");
244  Write("bar");
245  Write("");
246  Write("xxxx");
247  ASSERT_EQ("foo", Read());
248  ASSERT_EQ("bar", Read());
249  ASSERT_EQ("", Read());
250  ASSERT_EQ("xxxx", Read());
251  ASSERT_EQ("EOF", Read());
252  ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
253}
254
255TEST(LogTest, ManyBlocks) {
256  for (int i = 0; i < 100000; i++) {
257    Write(NumberString(i));
258  }
259  for (int i = 0; i < 100000; i++) {
260    ASSERT_EQ(NumberString(i), Read());
261  }
262  ASSERT_EQ("EOF", Read());
263}
264
265TEST(LogTest, Fragmentation) {
266  Write("small");
267  Write(BigString("medium", 50000));
268  Write(BigString("large", 100000));
269  ASSERT_EQ("small", Read());
270  ASSERT_EQ(BigString("medium", 50000), Read());
271  ASSERT_EQ(BigString("large", 100000), Read());
272  ASSERT_EQ("EOF", Read());
273}
274
275TEST(LogTest, MarginalTrailer) {
276  // Make a trailer that is exactly the same length as an empty record.
277  const int n = kBlockSize - 2*kHeaderSize;
278  Write(BigString("foo", n));
279  ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
280  Write("");
281  Write("bar");
282  ASSERT_EQ(BigString("foo", n), Read());
283  ASSERT_EQ("", Read());
284  ASSERT_EQ("bar", Read());
285  ASSERT_EQ("EOF", Read());
286}
287
288TEST(LogTest, MarginalTrailer2) {
289  // Make a trailer that is exactly the same length as an empty record.
290  const int n = kBlockSize - 2*kHeaderSize;
291  Write(BigString("foo", n));
292  ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
293  Write("bar");
294  ASSERT_EQ(BigString("foo", n), Read());
295  ASSERT_EQ("bar", Read());
296  ASSERT_EQ("EOF", Read());
297  ASSERT_EQ(0, DroppedBytes());
298  ASSERT_EQ("", ReportMessage());
299}
300
301TEST(LogTest, ShortTrailer) {
302  const int n = kBlockSize - 2*kHeaderSize + 4;
303  Write(BigString("foo", n));
304  ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
305  Write("");
306  Write("bar");
307  ASSERT_EQ(BigString("foo", n), Read());
308  ASSERT_EQ("", Read());
309  ASSERT_EQ("bar", Read());
310  ASSERT_EQ("EOF", Read());
311}
312
313TEST(LogTest, AlignedEof) {
314  const int n = kBlockSize - 2*kHeaderSize + 4;
315  Write(BigString("foo", n));
316  ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
317  ASSERT_EQ(BigString("foo", n), Read());
318  ASSERT_EQ("EOF", Read());
319}
320
321TEST(LogTest, RandomRead) {
322  const int N = 500;
323  Random write_rnd(301);
324  for (int i = 0; i < N; i++) {
325    Write(RandomSkewedString(i, &write_rnd));
326  }
327  Random read_rnd(301);
328  for (int i = 0; i < N; i++) {
329    ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
330  }
331  ASSERT_EQ("EOF", Read());
332}
333
334// Tests of all the error paths in log_reader.cc follow:
335
336TEST(LogTest, ReadError) {
337  Write("foo");
338  ForceError();
339  ASSERT_EQ("EOF", Read());
340  ASSERT_EQ(kBlockSize, DroppedBytes());
341  ASSERT_EQ("OK", MatchError("read error"));
342}
343
344TEST(LogTest, BadRecordType) {
345  Write("foo");
346  // Type is stored in header[6]
347  IncrementByte(6, 100);
348  FixChecksum(0, 3);
349  ASSERT_EQ("EOF", Read());
350  ASSERT_EQ(3, DroppedBytes());
351  ASSERT_EQ("OK", MatchError("unknown record type"));
352}
353
354TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
355  Write("foo");
356  ShrinkSize(4);   // Drop all payload as well as a header byte
357  ASSERT_EQ("EOF", Read());
358  // Truncated last record is ignored, not treated as an error.
359  ASSERT_EQ(0, DroppedBytes());
360  ASSERT_EQ("", ReportMessage());
361}
362
363TEST(LogTest, BadLength) {
364  const int kPayloadSize = kBlockSize - kHeaderSize;
365  Write(BigString("bar", kPayloadSize));
366  Write("foo");
367  // Least significant size byte is stored in header[4].
368  IncrementByte(4, 1);
369  ASSERT_EQ("foo", Read());
370  ASSERT_EQ(kBlockSize, DroppedBytes());
371  ASSERT_EQ("OK", MatchError("bad record length"));
372}
373
374TEST(LogTest, BadLengthAtEndIsIgnored) {
375  Write("foo");
376  ShrinkSize(1);
377  ASSERT_EQ("EOF", Read());
378  ASSERT_EQ(0, DroppedBytes());
379  ASSERT_EQ("", ReportMessage());
380}
381
382TEST(LogTest, ChecksumMismatch) {
383  Write("foo");
384  IncrementByte(0, 10);
385  ASSERT_EQ("EOF", Read());
386  ASSERT_EQ(10, DroppedBytes());
387  ASSERT_EQ("OK", MatchError("checksum mismatch"));
388}
389
390TEST(LogTest, UnexpectedMiddleType) {
391  Write("foo");
392  SetByte(6, kMiddleType);
393  FixChecksum(0, 3);
394  ASSERT_EQ("EOF", Read());
395  ASSERT_EQ(3, DroppedBytes());
396  ASSERT_EQ("OK", MatchError("missing start"));
397}
398
399TEST(LogTest, UnexpectedLastType) {
400  Write("foo");
401  SetByte(6, kLastType);
402  FixChecksum(0, 3);
403  ASSERT_EQ("EOF", Read());
404  ASSERT_EQ(3, DroppedBytes());
405  ASSERT_EQ("OK", MatchError("missing start"));
406}
407
408TEST(LogTest, UnexpectedFullType) {
409  Write("foo");
410  Write("bar");
411  SetByte(6, kFirstType);
412  FixChecksum(0, 3);
413  ASSERT_EQ("bar", Read());
414  ASSERT_EQ("EOF", Read());
415  ASSERT_EQ(3, DroppedBytes());
416  ASSERT_EQ("OK", MatchError("partial record without end"));
417}
418
419TEST(LogTest, UnexpectedFirstType) {
420  Write("foo");
421  Write(BigString("bar", 100000));
422  SetByte(6, kFirstType);
423  FixChecksum(0, 3);
424  ASSERT_EQ(BigString("bar", 100000), Read());
425  ASSERT_EQ("EOF", Read());
426  ASSERT_EQ(3, DroppedBytes());
427  ASSERT_EQ("OK", MatchError("partial record without end"));
428}
429
430TEST(LogTest, MissingLastIsIgnored) {
431  Write(BigString("bar", kBlockSize));
432  // Remove the LAST block, including header.
433  ShrinkSize(14);
434  ASSERT_EQ("EOF", Read());
435  ASSERT_EQ("", ReportMessage());
436  ASSERT_EQ(0, DroppedBytes());
437}
438
439TEST(LogTest, PartialLastIsIgnored) {
440  Write(BigString("bar", kBlockSize));
441  // Cause a bad record length in the LAST block.
442  ShrinkSize(1);
443  ASSERT_EQ("EOF", Read());
444  ASSERT_EQ("", ReportMessage());
445  ASSERT_EQ(0, DroppedBytes());
446}
447
448TEST(LogTest, ErrorJoinsRecords) {
449  // Consider two fragmented records:
450  //    first(R1) last(R1) first(R2) last(R2)
451  // where the middle two fragments disappear.  We do not want
452  // first(R1),last(R2) to get joined and returned as a valid record.
453
454  // Write records that span two blocks
455  Write(BigString("foo", kBlockSize));
456  Write(BigString("bar", kBlockSize));
457  Write("correct");
458
459  // Wipe the middle block
460  for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
461    SetByte(offset, 'x');
462  }
463
464  ASSERT_EQ("correct", Read());
465  ASSERT_EQ("EOF", Read());
466  const int dropped = DroppedBytes();
467  ASSERT_LE(dropped, 2*kBlockSize + 100);
468  ASSERT_GE(dropped, 2*kBlockSize);
469}
470
471TEST(LogTest, ReadStart) {
472  CheckInitialOffsetRecord(0, 0);
473}
474
475TEST(LogTest, ReadSecondOneOff) {
476  CheckInitialOffsetRecord(1, 1);
477}
478
479TEST(LogTest, ReadSecondTenThousand) {
480  CheckInitialOffsetRecord(10000, 1);
481}
482
483TEST(LogTest, ReadSecondStart) {
484  CheckInitialOffsetRecord(10007, 1);
485}
486
487TEST(LogTest, ReadThirdOneOff) {
488  CheckInitialOffsetRecord(10008, 2);
489}
490
491TEST(LogTest, ReadThirdStart) {
492  CheckInitialOffsetRecord(20014, 2);
493}
494
495TEST(LogTest, ReadFourthOneOff) {
496  CheckInitialOffsetRecord(20015, 3);
497}
498
499TEST(LogTest, ReadFourthFirstBlockTrailer) {
500  CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
501}
502
503TEST(LogTest, ReadFourthMiddleBlock) {
504  CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
505}
506
507TEST(LogTest, ReadFourthLastBlock) {
508  CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
509}
510
511TEST(LogTest, ReadFourthStart) {
512  CheckInitialOffsetRecord(
513      2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
514      3);
515}
516
517TEST(LogTest, ReadEnd) {
518  CheckOffsetPastEndReturnsNoRecords(0);
519}
520
521TEST(LogTest, ReadPastEnd) {
522  CheckOffsetPastEndReturnsNoRecords(5);
523}
524
525}  // namespace log
526}  // namespace leveldb
527
528int main(int argc, char** argv) {
529  return leveldb::test::RunAllTests();
530}
531