1// Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5#include "db/log_reader.h" 6#include "db/log_writer.h" 7#include "leveldb/env.h" 8#include "util/coding.h" 9#include "util/crc32c.h" 10#include "util/random.h" 11#include "util/testharness.h" 12 13namespace leveldb { 14namespace log { 15 16// Construct a string of the specified length made out of the supplied 17// partial string. 18static std::string BigString(const std::string& partial_string, size_t n) { 19 std::string result; 20 while (result.size() < n) { 21 result.append(partial_string); 22 } 23 result.resize(n); 24 return result; 25} 26 27// Construct a string from a number 28static std::string NumberString(int n) { 29 char buf[50]; 30 snprintf(buf, sizeof(buf), "%d.", n); 31 return std::string(buf); 32} 33 34// Return a skewed potentially long string 35static std::string RandomSkewedString(int i, Random* rnd) { 36 return BigString(NumberString(i), rnd->Skewed(17)); 37} 38 39class LogTest { 40 private: 41 class StringDest : public WritableFile { 42 public: 43 std::string contents_; 44 45 virtual Status Close() { return Status::OK(); } 46 virtual Status Flush() { return Status::OK(); } 47 virtual Status Sync() { return Status::OK(); } 48 virtual Status Append(const Slice& slice) { 49 contents_.append(slice.data(), slice.size()); 50 return Status::OK(); 51 } 52 }; 53 54 class StringSource : public SequentialFile { 55 public: 56 Slice contents_; 57 bool force_error_; 58 bool returned_partial_; 59 StringSource() : force_error_(false), returned_partial_(false) { } 60 61 virtual Status Read(size_t n, Slice* result, char* scratch) { 62 ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; 63 64 if (force_error_) { 65 force_error_ = false; 66 returned_partial_ = true; 67 return Status::Corruption("read error"); 68 } 69 70 if (contents_.size() < n) { 71 n = contents_.size(); 72 returned_partial_ = true; 73 } 74 *result = Slice(contents_.data(), n); 75 contents_.remove_prefix(n); 76 return Status::OK(); 77 } 78 79 virtual Status Skip(uint64_t n) { 80 if (n > contents_.size()) { 81 contents_.clear(); 82 return Status::NotFound("in-memory file skipepd past end"); 83 } 84 85 contents_.remove_prefix(n); 86 87 return Status::OK(); 88 } 89 }; 90 91 class ReportCollector : public Reader::Reporter { 92 public: 93 size_t dropped_bytes_; 94 std::string message_; 95 96 ReportCollector() : dropped_bytes_(0) { } 97 virtual void Corruption(size_t bytes, const Status& status) { 98 dropped_bytes_ += bytes; 99 message_.append(status.ToString()); 100 } 101 }; 102 103 StringDest dest_; 104 StringSource source_; 105 ReportCollector report_; 106 bool reading_; 107 Writer writer_; 108 Reader reader_; 109 110 // Record metadata for testing initial offset functionality 111 static size_t initial_offset_record_sizes_[]; 112 static uint64_t initial_offset_last_record_offsets_[]; 113 114 public: 115 LogTest() : reading_(false), 116 writer_(&dest_), 117 reader_(&source_, &report_, true/*checksum*/, 118 0/*initial_offset*/) { 119 } 120 121 void Write(const std::string& msg) { 122 ASSERT_TRUE(!reading_) << "Write() after starting to read"; 123 writer_.AddRecord(Slice(msg)); 124 } 125 126 size_t WrittenBytes() const { 127 return dest_.contents_.size(); 128 } 129 130 std::string Read() { 131 if (!reading_) { 132 reading_ = true; 133 source_.contents_ = Slice(dest_.contents_); 134 } 135 std::string scratch; 136 Slice record; 137 if (reader_.ReadRecord(&record, &scratch)) { 138 return record.ToString(); 139 } else { 140 return "EOF"; 141 } 142 } 143 144 void IncrementByte(int offset, int delta) { 145 dest_.contents_[offset] += delta; 146 } 147 148 void SetByte(int offset, char new_byte) { 149 dest_.contents_[offset] = new_byte; 150 } 151 152 void ShrinkSize(int bytes) { 153 dest_.contents_.resize(dest_.contents_.size() - bytes); 154 } 155 156 void FixChecksum(int header_offset, int len) { 157 // Compute crc of type/len/data 158 uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len); 159 crc = crc32c::Mask(crc); 160 EncodeFixed32(&dest_.contents_[header_offset], crc); 161 } 162 163 void ForceError() { 164 source_.force_error_ = true; 165 } 166 167 size_t DroppedBytes() const { 168 return report_.dropped_bytes_; 169 } 170 171 std::string ReportMessage() const { 172 return report_.message_; 173 } 174 175 // Returns OK iff recorded error message contains "msg" 176 std::string MatchError(const std::string& msg) const { 177 if (report_.message_.find(msg) == std::string::npos) { 178 return report_.message_; 179 } else { 180 return "OK"; 181 } 182 } 183 184 void WriteInitialOffsetLog() { 185 for (int i = 0; i < 4; i++) { 186 std::string record(initial_offset_record_sizes_[i], 187 static_cast<char>('a' + i)); 188 Write(record); 189 } 190 } 191 192 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { 193 WriteInitialOffsetLog(); 194 reading_ = true; 195 source_.contents_ = Slice(dest_.contents_); 196 Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/, 197 WrittenBytes() + offset_past_end); 198 Slice record; 199 std::string scratch; 200 ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch)); 201 delete offset_reader; 202 } 203 204 void CheckInitialOffsetRecord(uint64_t initial_offset, 205 int expected_record_offset) { 206 WriteInitialOffsetLog(); 207 reading_ = true; 208 source_.contents_ = Slice(dest_.contents_); 209 Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/, 210 initial_offset); 211 Slice record; 212 std::string scratch; 213 ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch)); 214 ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset], 215 record.size()); 216 ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset], 217 offset_reader->LastRecordOffset()); 218 ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]); 219 delete offset_reader; 220 } 221 222}; 223 224size_t LogTest::initial_offset_record_sizes_[] = 225 {10000, // Two sizable records in first block 226 10000, 227 2 * log::kBlockSize - 1000, // Span three blocks 228 1}; 229 230uint64_t LogTest::initial_offset_last_record_offsets_[] = 231 {0, 232 kHeaderSize + 10000, 233 2 * (kHeaderSize + 10000), 234 2 * (kHeaderSize + 10000) + 235 (2 * log::kBlockSize - 1000) + 3 * kHeaderSize}; 236 237 238TEST(LogTest, Empty) { 239 ASSERT_EQ("EOF", Read()); 240} 241 242TEST(LogTest, ReadWrite) { 243 Write("foo"); 244 Write("bar"); 245 Write(""); 246 Write("xxxx"); 247 ASSERT_EQ("foo", Read()); 248 ASSERT_EQ("bar", Read()); 249 ASSERT_EQ("", Read()); 250 ASSERT_EQ("xxxx", Read()); 251 ASSERT_EQ("EOF", Read()); 252 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work 253} 254 255TEST(LogTest, ManyBlocks) { 256 for (int i = 0; i < 100000; i++) { 257 Write(NumberString(i)); 258 } 259 for (int i = 0; i < 100000; i++) { 260 ASSERT_EQ(NumberString(i), Read()); 261 } 262 ASSERT_EQ("EOF", Read()); 263} 264 265TEST(LogTest, Fragmentation) { 266 Write("small"); 267 Write(BigString("medium", 50000)); 268 Write(BigString("large", 100000)); 269 ASSERT_EQ("small", Read()); 270 ASSERT_EQ(BigString("medium", 50000), Read()); 271 ASSERT_EQ(BigString("large", 100000), Read()); 272 ASSERT_EQ("EOF", Read()); 273} 274 275TEST(LogTest, MarginalTrailer) { 276 // Make a trailer that is exactly the same length as an empty record. 277 const int n = kBlockSize - 2*kHeaderSize; 278 Write(BigString("foo", n)); 279 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); 280 Write(""); 281 Write("bar"); 282 ASSERT_EQ(BigString("foo", n), Read()); 283 ASSERT_EQ("", Read()); 284 ASSERT_EQ("bar", Read()); 285 ASSERT_EQ("EOF", Read()); 286} 287 288TEST(LogTest, MarginalTrailer2) { 289 // Make a trailer that is exactly the same length as an empty record. 290 const int n = kBlockSize - 2*kHeaderSize; 291 Write(BigString("foo", n)); 292 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); 293 Write("bar"); 294 ASSERT_EQ(BigString("foo", n), Read()); 295 ASSERT_EQ("bar", Read()); 296 ASSERT_EQ("EOF", Read()); 297 ASSERT_EQ(0, DroppedBytes()); 298 ASSERT_EQ("", ReportMessage()); 299} 300 301TEST(LogTest, ShortTrailer) { 302 const int n = kBlockSize - 2*kHeaderSize + 4; 303 Write(BigString("foo", n)); 304 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes()); 305 Write(""); 306 Write("bar"); 307 ASSERT_EQ(BigString("foo", n), Read()); 308 ASSERT_EQ("", Read()); 309 ASSERT_EQ("bar", Read()); 310 ASSERT_EQ("EOF", Read()); 311} 312 313TEST(LogTest, AlignedEof) { 314 const int n = kBlockSize - 2*kHeaderSize + 4; 315 Write(BigString("foo", n)); 316 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes()); 317 ASSERT_EQ(BigString("foo", n), Read()); 318 ASSERT_EQ("EOF", Read()); 319} 320 321TEST(LogTest, RandomRead) { 322 const int N = 500; 323 Random write_rnd(301); 324 for (int i = 0; i < N; i++) { 325 Write(RandomSkewedString(i, &write_rnd)); 326 } 327 Random read_rnd(301); 328 for (int i = 0; i < N; i++) { 329 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read()); 330 } 331 ASSERT_EQ("EOF", Read()); 332} 333 334// Tests of all the error paths in log_reader.cc follow: 335 336TEST(LogTest, ReadError) { 337 Write("foo"); 338 ForceError(); 339 ASSERT_EQ("EOF", Read()); 340 ASSERT_EQ(kBlockSize, DroppedBytes()); 341 ASSERT_EQ("OK", MatchError("read error")); 342} 343 344TEST(LogTest, BadRecordType) { 345 Write("foo"); 346 // Type is stored in header[6] 347 IncrementByte(6, 100); 348 FixChecksum(0, 3); 349 ASSERT_EQ("EOF", Read()); 350 ASSERT_EQ(3, DroppedBytes()); 351 ASSERT_EQ("OK", MatchError("unknown record type")); 352} 353 354TEST(LogTest, TruncatedTrailingRecord) { 355 Write("foo"); 356 ShrinkSize(4); // Drop all payload as well as a header byte 357 ASSERT_EQ("EOF", Read()); 358 ASSERT_EQ(kHeaderSize - 1, DroppedBytes()); 359 ASSERT_EQ("OK", MatchError("truncated record at end of file")); 360} 361 362TEST(LogTest, BadLength) { 363 Write("foo"); 364 ShrinkSize(1); 365 ASSERT_EQ("EOF", Read()); 366 ASSERT_EQ(kHeaderSize + 2, DroppedBytes()); 367 ASSERT_EQ("OK", MatchError("bad record length")); 368} 369 370TEST(LogTest, ChecksumMismatch) { 371 Write("foo"); 372 IncrementByte(0, 10); 373 ASSERT_EQ("EOF", Read()); 374 ASSERT_EQ(10, DroppedBytes()); 375 ASSERT_EQ("OK", MatchError("checksum mismatch")); 376} 377 378TEST(LogTest, UnexpectedMiddleType) { 379 Write("foo"); 380 SetByte(6, kMiddleType); 381 FixChecksum(0, 3); 382 ASSERT_EQ("EOF", Read()); 383 ASSERT_EQ(3, DroppedBytes()); 384 ASSERT_EQ("OK", MatchError("missing start")); 385} 386 387TEST(LogTest, UnexpectedLastType) { 388 Write("foo"); 389 SetByte(6, kLastType); 390 FixChecksum(0, 3); 391 ASSERT_EQ("EOF", Read()); 392 ASSERT_EQ(3, DroppedBytes()); 393 ASSERT_EQ("OK", MatchError("missing start")); 394} 395 396TEST(LogTest, UnexpectedFullType) { 397 Write("foo"); 398 Write("bar"); 399 SetByte(6, kFirstType); 400 FixChecksum(0, 3); 401 ASSERT_EQ("bar", Read()); 402 ASSERT_EQ("EOF", Read()); 403 ASSERT_EQ(3, DroppedBytes()); 404 ASSERT_EQ("OK", MatchError("partial record without end")); 405} 406 407TEST(LogTest, UnexpectedFirstType) { 408 Write("foo"); 409 Write(BigString("bar", 100000)); 410 SetByte(6, kFirstType); 411 FixChecksum(0, 3); 412 ASSERT_EQ(BigString("bar", 100000), Read()); 413 ASSERT_EQ("EOF", Read()); 414 ASSERT_EQ(3, DroppedBytes()); 415 ASSERT_EQ("OK", MatchError("partial record without end")); 416} 417 418TEST(LogTest, ErrorJoinsRecords) { 419 // Consider two fragmented records: 420 // first(R1) last(R1) first(R2) last(R2) 421 // where the middle two fragments disappear. We do not want 422 // first(R1),last(R2) to get joined and returned as a valid record. 423 424 // Write records that span two blocks 425 Write(BigString("foo", kBlockSize)); 426 Write(BigString("bar", kBlockSize)); 427 Write("correct"); 428 429 // Wipe the middle block 430 for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) { 431 SetByte(offset, 'x'); 432 } 433 434 ASSERT_EQ("correct", Read()); 435 ASSERT_EQ("EOF", Read()); 436 const int dropped = DroppedBytes(); 437 ASSERT_LE(dropped, 2*kBlockSize + 100); 438 ASSERT_GE(dropped, 2*kBlockSize); 439} 440 441TEST(LogTest, ReadStart) { 442 CheckInitialOffsetRecord(0, 0); 443} 444 445TEST(LogTest, ReadSecondOneOff) { 446 CheckInitialOffsetRecord(1, 1); 447} 448 449TEST(LogTest, ReadSecondTenThousand) { 450 CheckInitialOffsetRecord(10000, 1); 451} 452 453TEST(LogTest, ReadSecondStart) { 454 CheckInitialOffsetRecord(10007, 1); 455} 456 457TEST(LogTest, ReadThirdOneOff) { 458 CheckInitialOffsetRecord(10008, 2); 459} 460 461TEST(LogTest, ReadThirdStart) { 462 CheckInitialOffsetRecord(20014, 2); 463} 464 465TEST(LogTest, ReadFourthOneOff) { 466 CheckInitialOffsetRecord(20015, 3); 467} 468 469TEST(LogTest, ReadFourthFirstBlockTrailer) { 470 CheckInitialOffsetRecord(log::kBlockSize - 4, 3); 471} 472 473TEST(LogTest, ReadFourthMiddleBlock) { 474 CheckInitialOffsetRecord(log::kBlockSize + 1, 3); 475} 476 477TEST(LogTest, ReadFourthLastBlock) { 478 CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3); 479} 480 481TEST(LogTest, ReadFourthStart) { 482 CheckInitialOffsetRecord( 483 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, 484 3); 485} 486 487TEST(LogTest, ReadEnd) { 488 CheckOffsetPastEndReturnsNoRecords(0); 489} 490 491TEST(LogTest, ReadPastEnd) { 492 CheckOffsetPastEndReturnsNoRecords(5); 493} 494 495} // namespace log 496} // namespace leveldb 497 498int main(int argc, char** argv) { 499 return leveldb::test::RunAllTests(); 500} 501