1// Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5#include "db/db_impl.h" 6 7#include <algorithm> 8#include <set> 9#include <string> 10#include <stdint.h> 11#include <stdio.h> 12#include <vector> 13#include "db/builder.h" 14#include "db/db_iter.h" 15#include "db/dbformat.h" 16#include "db/filename.h" 17#include "db/log_reader.h" 18#include "db/log_writer.h" 19#include "db/memtable.h" 20#include "db/table_cache.h" 21#include "db/version_set.h" 22#include "db/write_batch_internal.h" 23#include "leveldb/db.h" 24#include "leveldb/env.h" 25#include "leveldb/status.h" 26#include "leveldb/table.h" 27#include "leveldb/table_builder.h" 28#include "port/port.h" 29#include "table/block.h" 30#include "table/merger.h" 31#include "table/two_level_iterator.h" 32#include "util/coding.h" 33#include "util/logging.h" 34#include "util/mutexlock.h" 35 36namespace leveldb { 37 38const int kNumNonTableCacheFiles = 10; 39 40// Information kept for every waiting writer 41struct DBImpl::Writer { 42 Status status; 43 WriteBatch* batch; 44 bool sync; 45 bool done; 46 port::CondVar cv; 47 48 explicit Writer(port::Mutex* mu) : cv(mu) { } 49}; 50 51struct DBImpl::CompactionState { 52 Compaction* const compaction; 53 54 // Sequence numbers < smallest_snapshot are not significant since we 55 // will never have to service a snapshot below smallest_snapshot. 56 // Therefore if we have seen a sequence number S <= smallest_snapshot, 57 // we can drop all entries for the same key with sequence numbers < S. 58 SequenceNumber smallest_snapshot; 59 60 // Files produced by compaction 61 struct Output { 62 uint64_t number; 63 uint64_t file_size; 64 InternalKey smallest, largest; 65 }; 66 std::vector<Output> outputs; 67 68 // State kept for output being generated 69 WritableFile* outfile; 70 TableBuilder* builder; 71 72 uint64_t total_bytes; 73 74 Output* current_output() { return &outputs[outputs.size()-1]; } 75 76 explicit CompactionState(Compaction* c) 77 : compaction(c), 78 outfile(NULL), 79 builder(NULL), 80 total_bytes(0) { 81 } 82}; 83 84// Fix user-supplied options to be reasonable 85template <class T,class V> 86static void ClipToRange(T* ptr, V minvalue, V maxvalue) { 87 if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; 88 if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; 89} 90Options SanitizeOptions(const std::string& dbname, 91 const InternalKeyComparator* icmp, 92 const InternalFilterPolicy* ipolicy, 93 const Options& src) { 94 Options result = src; 95 result.comparator = icmp; 96 result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL; 97 ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); 98 ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); 99 ClipToRange(&result.block_size, 1<<10, 4<<20); 100 if (result.info_log == NULL) { 101 // Open a log file in the same directory as the db 102 src.env->CreateDir(dbname); // In case it does not exist 103 src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); 104 Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); 105 if (!s.ok()) { 106 // No place suitable for logging 107 result.info_log = NULL; 108 } 109 } 110 if (result.block_cache == NULL) { 111 result.block_cache = NewLRUCache(8 << 20); 112 } 113 return result; 114} 115 116DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) 117 : env_(raw_options.env), 118 internal_comparator_(raw_options.comparator), 119 internal_filter_policy_(raw_options.filter_policy), 120 options_(SanitizeOptions(dbname, &internal_comparator_, 121 &internal_filter_policy_, raw_options)), 122 owns_info_log_(options_.info_log != raw_options.info_log), 123 owns_cache_(options_.block_cache != raw_options.block_cache), 124 dbname_(dbname), 125 db_lock_(NULL), 126 shutting_down_(NULL), 127 bg_cv_(&mutex_), 128 mem_(new MemTable(internal_comparator_)), 129 imm_(NULL), 130 logfile_(NULL), 131 logfile_number_(0), 132 log_(NULL), 133 seed_(0), 134 tmp_batch_(new WriteBatch), 135 bg_compaction_scheduled_(false), 136 manual_compaction_(NULL) { 137 mem_->Ref(); 138 has_imm_.Release_Store(NULL); 139 140 // Reserve ten files or so for other uses and give the rest to TableCache. 141 const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; 142 table_cache_ = new TableCache(dbname_, &options_, table_cache_size); 143 144 versions_ = new VersionSet(dbname_, &options_, table_cache_, 145 &internal_comparator_); 146} 147 148DBImpl::~DBImpl() { 149 // Wait for background work to finish 150 mutex_.Lock(); 151 shutting_down_.Release_Store(this); // Any non-NULL value is ok 152 while (bg_compaction_scheduled_) { 153 bg_cv_.Wait(); 154 } 155 mutex_.Unlock(); 156 157 if (db_lock_ != NULL) { 158 env_->UnlockFile(db_lock_); 159 } 160 161 delete versions_; 162 if (mem_ != NULL) mem_->Unref(); 163 if (imm_ != NULL) imm_->Unref(); 164 delete tmp_batch_; 165 delete log_; 166 delete logfile_; 167 delete table_cache_; 168 169 if (owns_info_log_) { 170 delete options_.info_log; 171 } 172 if (owns_cache_) { 173 delete options_.block_cache; 174 } 175} 176 177Status DBImpl::NewDB() { 178 VersionEdit new_db; 179 new_db.SetComparatorName(user_comparator()->Name()); 180 new_db.SetLogNumber(0); 181 new_db.SetNextFile(2); 182 new_db.SetLastSequence(0); 183 184 const std::string manifest = DescriptorFileName(dbname_, 1); 185 WritableFile* file; 186 Status s = env_->NewWritableFile(manifest, &file); 187 if (!s.ok()) { 188 return s; 189 } 190 { 191 log::Writer log(file); 192 std::string record; 193 new_db.EncodeTo(&record); 194 s = log.AddRecord(record); 195 if (s.ok()) { 196 s = file->Close(); 197 } 198 } 199 delete file; 200 if (s.ok()) { 201 // Make "CURRENT" file that points to the new manifest file. 202 s = SetCurrentFile(env_, dbname_, 1); 203 } else { 204 env_->DeleteFile(manifest); 205 } 206 return s; 207} 208 209void DBImpl::MaybeIgnoreError(Status* s) const { 210 if (s->ok() || options_.paranoid_checks) { 211 // No change needed 212 } else { 213 Log(options_.info_log, "Ignoring error %s", s->ToString().c_str()); 214 *s = Status::OK(); 215 } 216} 217 218void DBImpl::DeleteObsoleteFiles() { 219 if (!bg_error_.ok()) { 220 // After a background error, we don't know whether a new version may 221 // or may not have been committed, so we cannot safely garbage collect. 222 return; 223 } 224 225 // Make a set of all of the live files 226 std::set<uint64_t> live = pending_outputs_; 227 versions_->AddLiveFiles(&live); 228 229 std::vector<std::string> filenames; 230 env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose 231 uint64_t number; 232 FileType type; 233 for (size_t i = 0; i < filenames.size(); i++) { 234 if (ParseFileName(filenames[i], &number, &type)) { 235 bool keep = true; 236 switch (type) { 237 case kLogFile: 238 keep = ((number >= versions_->LogNumber()) || 239 (number == versions_->PrevLogNumber())); 240 break; 241 case kDescriptorFile: 242 // Keep my manifest file, and any newer incarnations' 243 // (in case there is a race that allows other incarnations) 244 keep = (number >= versions_->ManifestFileNumber()); 245 break; 246 case kTableFile: 247 keep = (live.find(number) != live.end()); 248 break; 249 case kTempFile: 250 // Any temp files that are currently being written to must 251 // be recorded in pending_outputs_, which is inserted into "live" 252 keep = (live.find(number) != live.end()); 253 break; 254 case kCurrentFile: 255 case kDBLockFile: 256 case kInfoLogFile: 257 keep = true; 258 break; 259 } 260 261 if (!keep) { 262 if (type == kTableFile) { 263 table_cache_->Evict(number); 264 } 265 Log(options_.info_log, "Delete type=%d #%lld\n", 266 int(type), 267 static_cast<unsigned long long>(number)); 268 env_->DeleteFile(dbname_ + "/" + filenames[i]); 269 } 270 } 271 } 272} 273 274Status DBImpl::Recover(VersionEdit* edit) { 275 mutex_.AssertHeld(); 276 277 // Ignore error from CreateDir since the creation of the DB is 278 // committed only when the descriptor is created, and this directory 279 // may already exist from a previous failed creation attempt. 280 env_->CreateDir(dbname_); 281 assert(db_lock_ == NULL); 282 Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); 283 if (!s.ok()) { 284 return s; 285 } 286 287 if (!env_->FileExists(CurrentFileName(dbname_))) { 288 if (options_.create_if_missing) { 289 s = NewDB(); 290 if (!s.ok()) { 291 return s; 292 } 293 } else { 294 return Status::InvalidArgument( 295 dbname_, "does not exist (create_if_missing is false)"); 296 } 297 } else { 298 if (options_.error_if_exists) { 299 return Status::InvalidArgument( 300 dbname_, "exists (error_if_exists is true)"); 301 } 302 } 303 304 s = versions_->Recover(); 305 if (s.ok()) { 306 SequenceNumber max_sequence(0); 307 308 // Recover from all newer log files than the ones named in the 309 // descriptor (new log files may have been added by the previous 310 // incarnation without registering them in the descriptor). 311 // 312 // Note that PrevLogNumber() is no longer used, but we pay 313 // attention to it in case we are recovering a database 314 // produced by an older version of leveldb. 315 const uint64_t min_log = versions_->LogNumber(); 316 const uint64_t prev_log = versions_->PrevLogNumber(); 317 std::vector<std::string> filenames; 318 s = env_->GetChildren(dbname_, &filenames); 319 if (!s.ok()) { 320 return s; 321 } 322 std::set<uint64_t> expected; 323 versions_->AddLiveFiles(&expected); 324 uint64_t number; 325 FileType type; 326 std::vector<uint64_t> logs; 327 for (size_t i = 0; i < filenames.size(); i++) { 328 if (ParseFileName(filenames[i], &number, &type)) { 329 expected.erase(number); 330 if (type == kLogFile && ((number >= min_log) || (number == prev_log))) 331 logs.push_back(number); 332 } 333 } 334 if (!expected.empty()) { 335 char buf[50]; 336 snprintf(buf, sizeof(buf), "%d missing files; e.g.", 337 static_cast<int>(expected.size())); 338 return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); 339 } 340 341 // Recover in the order in which the logs were generated 342 std::sort(logs.begin(), logs.end()); 343 for (size_t i = 0; i < logs.size(); i++) { 344 s = RecoverLogFile(logs[i], edit, &max_sequence); 345 346 // The previous incarnation may not have written any MANIFEST 347 // records after allocating this log number. So we manually 348 // update the file number allocation counter in VersionSet. 349 versions_->MarkFileNumberUsed(logs[i]); 350 } 351 352 if (s.ok()) { 353 if (versions_->LastSequence() < max_sequence) { 354 versions_->SetLastSequence(max_sequence); 355 } 356 } 357 } 358 359 return s; 360} 361 362Status DBImpl::RecoverLogFile(uint64_t log_number, 363 VersionEdit* edit, 364 SequenceNumber* max_sequence) { 365 struct LogReporter : public log::Reader::Reporter { 366 Env* env; 367 Logger* info_log; 368 const char* fname; 369 Status* status; // NULL if options_.paranoid_checks==false 370 virtual void Corruption(size_t bytes, const Status& s) { 371 Log(info_log, "%s%s: dropping %d bytes; %s", 372 (this->status == NULL ? "(ignoring error) " : ""), 373 fname, static_cast<int>(bytes), s.ToString().c_str()); 374 if (this->status != NULL && this->status->ok()) *this->status = s; 375 } 376 }; 377 378 mutex_.AssertHeld(); 379 380 // Open the log file 381 std::string fname = LogFileName(dbname_, log_number); 382 SequentialFile* file; 383 Status status = env_->NewSequentialFile(fname, &file); 384 if (!status.ok()) { 385 MaybeIgnoreError(&status); 386 return status; 387 } 388 389 // Create the log reader. 390 LogReporter reporter; 391 reporter.env = env_; 392 reporter.info_log = options_.info_log; 393 reporter.fname = fname.c_str(); 394 reporter.status = (options_.paranoid_checks ? &status : NULL); 395 // We intentially make log::Reader do checksumming even if 396 // paranoid_checks==false so that corruptions cause entire commits 397 // to be skipped instead of propagating bad information (like overly 398 // large sequence numbers). 399 log::Reader reader(file, &reporter, true/*checksum*/, 400 0/*initial_offset*/); 401 Log(options_.info_log, "Recovering log #%llu", 402 (unsigned long long) log_number); 403 404 // Read all the records and add to a memtable 405 std::string scratch; 406 Slice record; 407 WriteBatch batch; 408 MemTable* mem = NULL; 409 while (reader.ReadRecord(&record, &scratch) && 410 status.ok()) { 411 if (record.size() < 12) { 412 reporter.Corruption( 413 record.size(), Status::Corruption("log record too small")); 414 continue; 415 } 416 WriteBatchInternal::SetContents(&batch, record); 417 418 if (mem == NULL) { 419 mem = new MemTable(internal_comparator_); 420 mem->Ref(); 421 } 422 status = WriteBatchInternal::InsertInto(&batch, mem); 423 MaybeIgnoreError(&status); 424 if (!status.ok()) { 425 break; 426 } 427 const SequenceNumber last_seq = 428 WriteBatchInternal::Sequence(&batch) + 429 WriteBatchInternal::Count(&batch) - 1; 430 if (last_seq > *max_sequence) { 431 *max_sequence = last_seq; 432 } 433 434 if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { 435 status = WriteLevel0Table(mem, edit, NULL); 436 if (!status.ok()) { 437 // Reflect errors immediately so that conditions like full 438 // file-systems cause the DB::Open() to fail. 439 break; 440 } 441 mem->Unref(); 442 mem = NULL; 443 } 444 } 445 446 if (status.ok() && mem != NULL) { 447 status = WriteLevel0Table(mem, edit, NULL); 448 // Reflect errors immediately so that conditions like full 449 // file-systems cause the DB::Open() to fail. 450 } 451 452 if (mem != NULL) mem->Unref(); 453 delete file; 454 return status; 455} 456 457Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, 458 Version* base) { 459 mutex_.AssertHeld(); 460 const uint64_t start_micros = env_->NowMicros(); 461 FileMetaData meta; 462 meta.number = versions_->NewFileNumber(); 463 pending_outputs_.insert(meta.number); 464 Iterator* iter = mem->NewIterator(); 465 Log(options_.info_log, "Level-0 table #%llu: started", 466 (unsigned long long) meta.number); 467 468 Status s; 469 { 470 mutex_.Unlock(); 471 s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); 472 mutex_.Lock(); 473 } 474 475 Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", 476 (unsigned long long) meta.number, 477 (unsigned long long) meta.file_size, 478 s.ToString().c_str()); 479 delete iter; 480 pending_outputs_.erase(meta.number); 481 482 483 // Note that if file_size is zero, the file has been deleted and 484 // should not be added to the manifest. 485 int level = 0; 486 if (s.ok() && meta.file_size > 0) { 487 const Slice min_user_key = meta.smallest.user_key(); 488 const Slice max_user_key = meta.largest.user_key(); 489 if (base != NULL) { 490 level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); 491 } 492 edit->AddFile(level, meta.number, meta.file_size, 493 meta.smallest, meta.largest); 494 } 495 496 CompactionStats stats; 497 stats.micros = env_->NowMicros() - start_micros; 498 stats.bytes_written = meta.file_size; 499 stats_[level].Add(stats); 500 return s; 501} 502 503void DBImpl::CompactMemTable() { 504 mutex_.AssertHeld(); 505 assert(imm_ != NULL); 506 507 // Save the contents of the memtable as a new Table 508 VersionEdit edit; 509 Version* base = versions_->current(); 510 base->Ref(); 511 Status s = WriteLevel0Table(imm_, &edit, base); 512 base->Unref(); 513 514 if (s.ok() && shutting_down_.Acquire_Load()) { 515 s = Status::IOError("Deleting DB during memtable compaction"); 516 } 517 518 // Replace immutable memtable with the generated Table 519 if (s.ok()) { 520 edit.SetPrevLogNumber(0); 521 edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed 522 s = versions_->LogAndApply(&edit, &mutex_); 523 } 524 525 if (s.ok()) { 526 // Commit to the new state 527 imm_->Unref(); 528 imm_ = NULL; 529 has_imm_.Release_Store(NULL); 530 DeleteObsoleteFiles(); 531 } else { 532 RecordBackgroundError(s); 533 } 534} 535 536void DBImpl::CompactRange(const Slice* begin, const Slice* end) { 537 int max_level_with_files = 1; 538 { 539 MutexLock l(&mutex_); 540 Version* base = versions_->current(); 541 for (int level = 1; level < config::kNumLevels; level++) { 542 if (base->OverlapInLevel(level, begin, end)) { 543 max_level_with_files = level; 544 } 545 } 546 } 547 TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap 548 for (int level = 0; level < max_level_with_files; level++) { 549 TEST_CompactRange(level, begin, end); 550 } 551} 552 553void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { 554 assert(level >= 0); 555 assert(level + 1 < config::kNumLevels); 556 557 InternalKey begin_storage, end_storage; 558 559 ManualCompaction manual; 560 manual.level = level; 561 manual.done = false; 562 if (begin == NULL) { 563 manual.begin = NULL; 564 } else { 565 begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); 566 manual.begin = &begin_storage; 567 } 568 if (end == NULL) { 569 manual.end = NULL; 570 } else { 571 end_storage = InternalKey(*end, 0, static_cast<ValueType>(0)); 572 manual.end = &end_storage; 573 } 574 575 MutexLock l(&mutex_); 576 while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { 577 if (manual_compaction_ == NULL) { // Idle 578 manual_compaction_ = &manual; 579 MaybeScheduleCompaction(); 580 } else { // Running either my compaction or another compaction. 581 bg_cv_.Wait(); 582 } 583 } 584 if (manual_compaction_ == &manual) { 585 // Cancel my manual compaction since we aborted early for some reason. 586 manual_compaction_ = NULL; 587 } 588} 589 590Status DBImpl::TEST_CompactMemTable() { 591 // NULL batch means just wait for earlier writes to be done 592 Status s = Write(WriteOptions(), NULL); 593 if (s.ok()) { 594 // Wait until the compaction completes 595 MutexLock l(&mutex_); 596 while (imm_ != NULL && bg_error_.ok()) { 597 bg_cv_.Wait(); 598 } 599 if (imm_ != NULL) { 600 s = bg_error_; 601 } 602 } 603 return s; 604} 605 606void DBImpl::RecordBackgroundError(const Status& s) { 607 mutex_.AssertHeld(); 608 if (bg_error_.ok()) { 609 bg_error_ = s; 610 bg_cv_.SignalAll(); 611 } 612} 613 614void DBImpl::MaybeScheduleCompaction() { 615 mutex_.AssertHeld(); 616 if (bg_compaction_scheduled_) { 617 // Already scheduled 618 } else if (shutting_down_.Acquire_Load()) { 619 // DB is being deleted; no more background compactions 620 } else if (!bg_error_.ok()) { 621 // Already got an error; no more changes 622 } else if (imm_ == NULL && 623 manual_compaction_ == NULL && 624 !versions_->NeedsCompaction()) { 625 // No work to be done 626 } else { 627 bg_compaction_scheduled_ = true; 628 env_->Schedule(&DBImpl::BGWork, this); 629 } 630} 631 632void DBImpl::BGWork(void* db) { 633 reinterpret_cast<DBImpl*>(db)->BackgroundCall(); 634} 635 636void DBImpl::BackgroundCall() { 637 MutexLock l(&mutex_); 638 assert(bg_compaction_scheduled_); 639 if (shutting_down_.Acquire_Load()) { 640 // No more background work when shutting down. 641 } else if (!bg_error_.ok()) { 642 // No more background work after a background error. 643 } else { 644 BackgroundCompaction(); 645 } 646 647 bg_compaction_scheduled_ = false; 648 649 // Previous compaction may have produced too many files in a level, 650 // so reschedule another compaction if needed. 651 MaybeScheduleCompaction(); 652 bg_cv_.SignalAll(); 653} 654 655void DBImpl::BackgroundCompaction() { 656 mutex_.AssertHeld(); 657 658 if (imm_ != NULL) { 659 CompactMemTable(); 660 return; 661 } 662 663 Compaction* c; 664 bool is_manual = (manual_compaction_ != NULL); 665 InternalKey manual_end; 666 if (is_manual) { 667 ManualCompaction* m = manual_compaction_; 668 c = versions_->CompactRange(m->level, m->begin, m->end); 669 m->done = (c == NULL); 670 if (c != NULL) { 671 manual_end = c->input(0, c->num_input_files(0) - 1)->largest; 672 } 673 Log(options_.info_log, 674 "Manual compaction at level-%d from %s .. %s; will stop at %s\n", 675 m->level, 676 (m->begin ? m->begin->DebugString().c_str() : "(begin)"), 677 (m->end ? m->end->DebugString().c_str() : "(end)"), 678 (m->done ? "(end)" : manual_end.DebugString().c_str())); 679 } else { 680 c = versions_->PickCompaction(); 681 } 682 683 Status status; 684 if (c == NULL) { 685 // Nothing to do 686 } else if (!is_manual && c->IsTrivialMove()) { 687 // Move file to next level 688 assert(c->num_input_files(0) == 1); 689 FileMetaData* f = c->input(0, 0); 690 c->edit()->DeleteFile(c->level(), f->number); 691 c->edit()->AddFile(c->level() + 1, f->number, f->file_size, 692 f->smallest, f->largest); 693 status = versions_->LogAndApply(c->edit(), &mutex_); 694 if (!status.ok()) { 695 RecordBackgroundError(status); 696 } 697 VersionSet::LevelSummaryStorage tmp; 698 Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", 699 static_cast<unsigned long long>(f->number), 700 c->level() + 1, 701 static_cast<unsigned long long>(f->file_size), 702 status.ToString().c_str(), 703 versions_->LevelSummary(&tmp)); 704 } else { 705 CompactionState* compact = new CompactionState(c); 706 status = DoCompactionWork(compact); 707 if (!status.ok()) { 708 RecordBackgroundError(status); 709 } 710 CleanupCompaction(compact); 711 c->ReleaseInputs(); 712 DeleteObsoleteFiles(); 713 } 714 delete c; 715 716 if (status.ok()) { 717 // Done 718 } else if (shutting_down_.Acquire_Load()) { 719 // Ignore compaction errors found during shutting down 720 } else { 721 Log(options_.info_log, 722 "Compaction error: %s", status.ToString().c_str()); 723 } 724 725 if (is_manual) { 726 ManualCompaction* m = manual_compaction_; 727 if (!status.ok()) { 728 m->done = true; 729 } 730 if (!m->done) { 731 // We only compacted part of the requested range. Update *m 732 // to the range that is left to be compacted. 733 m->tmp_storage = manual_end; 734 m->begin = &m->tmp_storage; 735 } 736 manual_compaction_ = NULL; 737 } 738} 739 740void DBImpl::CleanupCompaction(CompactionState* compact) { 741 mutex_.AssertHeld(); 742 if (compact->builder != NULL) { 743 // May happen if we get a shutdown call in the middle of compaction 744 compact->builder->Abandon(); 745 delete compact->builder; 746 } else { 747 assert(compact->outfile == NULL); 748 } 749 delete compact->outfile; 750 for (size_t i = 0; i < compact->outputs.size(); i++) { 751 const CompactionState::Output& out = compact->outputs[i]; 752 pending_outputs_.erase(out.number); 753 } 754 delete compact; 755} 756 757Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { 758 assert(compact != NULL); 759 assert(compact->builder == NULL); 760 uint64_t file_number; 761 { 762 mutex_.Lock(); 763 file_number = versions_->NewFileNumber(); 764 pending_outputs_.insert(file_number); 765 CompactionState::Output out; 766 out.number = file_number; 767 out.smallest.Clear(); 768 out.largest.Clear(); 769 compact->outputs.push_back(out); 770 mutex_.Unlock(); 771 } 772 773 // Make the output file 774 std::string fname = TableFileName(dbname_, file_number); 775 Status s = env_->NewWritableFile(fname, &compact->outfile); 776 if (s.ok()) { 777 compact->builder = new TableBuilder(options_, compact->outfile); 778 } 779 return s; 780} 781 782Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, 783 Iterator* input) { 784 assert(compact != NULL); 785 assert(compact->outfile != NULL); 786 assert(compact->builder != NULL); 787 788 const uint64_t output_number = compact->current_output()->number; 789 assert(output_number != 0); 790 791 // Check for iterator errors 792 Status s = input->status(); 793 const uint64_t current_entries = compact->builder->NumEntries(); 794 if (s.ok()) { 795 s = compact->builder->Finish(); 796 } else { 797 compact->builder->Abandon(); 798 } 799 const uint64_t current_bytes = compact->builder->FileSize(); 800 compact->current_output()->file_size = current_bytes; 801 compact->total_bytes += current_bytes; 802 delete compact->builder; 803 compact->builder = NULL; 804 805 // Finish and check for file errors 806 if (s.ok()) { 807 s = compact->outfile->Sync(); 808 } 809 if (s.ok()) { 810 s = compact->outfile->Close(); 811 } 812 delete compact->outfile; 813 compact->outfile = NULL; 814 815 if (s.ok() && current_entries > 0) { 816 // Verify that the table is usable 817 Iterator* iter = table_cache_->NewIterator(ReadOptions(), 818 output_number, 819 current_bytes); 820 s = iter->status(); 821 delete iter; 822 if (s.ok()) { 823 Log(options_.info_log, 824 "Generated table #%llu: %lld keys, %lld bytes", 825 (unsigned long long) output_number, 826 (unsigned long long) current_entries, 827 (unsigned long long) current_bytes); 828 } 829 } 830 return s; 831} 832 833 834Status DBImpl::InstallCompactionResults(CompactionState* compact) { 835 mutex_.AssertHeld(); 836 Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", 837 compact->compaction->num_input_files(0), 838 compact->compaction->level(), 839 compact->compaction->num_input_files(1), 840 compact->compaction->level() + 1, 841 static_cast<long long>(compact->total_bytes)); 842 843 // Add compaction outputs 844 compact->compaction->AddInputDeletions(compact->compaction->edit()); 845 const int level = compact->compaction->level(); 846 for (size_t i = 0; i < compact->outputs.size(); i++) { 847 const CompactionState::Output& out = compact->outputs[i]; 848 compact->compaction->edit()->AddFile( 849 level + 1, 850 out.number, out.file_size, out.smallest, out.largest); 851 } 852 return versions_->LogAndApply(compact->compaction->edit(), &mutex_); 853} 854 855Status DBImpl::DoCompactionWork(CompactionState* compact) { 856 const uint64_t start_micros = env_->NowMicros(); 857 int64_t imm_micros = 0; // Micros spent doing imm_ compactions 858 859 Log(options_.info_log, "Compacting %d@%d + %d@%d files", 860 compact->compaction->num_input_files(0), 861 compact->compaction->level(), 862 compact->compaction->num_input_files(1), 863 compact->compaction->level() + 1); 864 865 assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); 866 assert(compact->builder == NULL); 867 assert(compact->outfile == NULL); 868 if (snapshots_.empty()) { 869 compact->smallest_snapshot = versions_->LastSequence(); 870 } else { 871 compact->smallest_snapshot = snapshots_.oldest()->number_; 872 } 873 874 // Release mutex while we're actually doing the compaction work 875 mutex_.Unlock(); 876 877 Iterator* input = versions_->MakeInputIterator(compact->compaction); 878 input->SeekToFirst(); 879 Status status; 880 ParsedInternalKey ikey; 881 std::string current_user_key; 882 bool has_current_user_key = false; 883 SequenceNumber last_sequence_for_key = kMaxSequenceNumber; 884 for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { 885 // Prioritize immutable compaction work 886 if (has_imm_.NoBarrier_Load() != NULL) { 887 const uint64_t imm_start = env_->NowMicros(); 888 mutex_.Lock(); 889 if (imm_ != NULL) { 890 CompactMemTable(); 891 bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary 892 } 893 mutex_.Unlock(); 894 imm_micros += (env_->NowMicros() - imm_start); 895 } 896 897 Slice key = input->key(); 898 if (compact->compaction->ShouldStopBefore(key) && 899 compact->builder != NULL) { 900 status = FinishCompactionOutputFile(compact, input); 901 if (!status.ok()) { 902 break; 903 } 904 } 905 906 // Handle key/value, add to state, etc. 907 bool drop = false; 908 if (!ParseInternalKey(key, &ikey)) { 909 // Do not hide error keys 910 current_user_key.clear(); 911 has_current_user_key = false; 912 last_sequence_for_key = kMaxSequenceNumber; 913 } else { 914 if (!has_current_user_key || 915 user_comparator()->Compare(ikey.user_key, 916 Slice(current_user_key)) != 0) { 917 // First occurrence of this user key 918 current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); 919 has_current_user_key = true; 920 last_sequence_for_key = kMaxSequenceNumber; 921 } 922 923 if (last_sequence_for_key <= compact->smallest_snapshot) { 924 // Hidden by an newer entry for same user key 925 drop = true; // (A) 926 } else if (ikey.type == kTypeDeletion && 927 ikey.sequence <= compact->smallest_snapshot && 928 compact->compaction->IsBaseLevelForKey(ikey.user_key)) { 929 // For this user key: 930 // (1) there is no data in higher levels 931 // (2) data in lower levels will have larger sequence numbers 932 // (3) data in layers that are being compacted here and have 933 // smaller sequence numbers will be dropped in the next 934 // few iterations of this loop (by rule (A) above). 935 // Therefore this deletion marker is obsolete and can be dropped. 936 drop = true; 937 } 938 939 last_sequence_for_key = ikey.sequence; 940 } 941#if 0 942 Log(options_.info_log, 943 " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " 944 "%d smallest_snapshot: %d", 945 ikey.user_key.ToString().c_str(), 946 (int)ikey.sequence, ikey.type, kTypeValue, drop, 947 compact->compaction->IsBaseLevelForKey(ikey.user_key), 948 (int)last_sequence_for_key, (int)compact->smallest_snapshot); 949#endif 950 951 if (!drop) { 952 // Open output file if necessary 953 if (compact->builder == NULL) { 954 status = OpenCompactionOutputFile(compact); 955 if (!status.ok()) { 956 break; 957 } 958 } 959 if (compact->builder->NumEntries() == 0) { 960 compact->current_output()->smallest.DecodeFrom(key); 961 } 962 compact->current_output()->largest.DecodeFrom(key); 963 compact->builder->Add(key, input->value()); 964 965 // Close output file if it is big enough 966 if (compact->builder->FileSize() >= 967 compact->compaction->MaxOutputFileSize()) { 968 status = FinishCompactionOutputFile(compact, input); 969 if (!status.ok()) { 970 break; 971 } 972 } 973 } 974 975 input->Next(); 976 } 977 978 if (status.ok() && shutting_down_.Acquire_Load()) { 979 status = Status::IOError("Deleting DB during compaction"); 980 } 981 if (status.ok() && compact->builder != NULL) { 982 status = FinishCompactionOutputFile(compact, input); 983 } 984 if (status.ok()) { 985 status = input->status(); 986 } 987 delete input; 988 input = NULL; 989 990 CompactionStats stats; 991 stats.micros = env_->NowMicros() - start_micros - imm_micros; 992 for (int which = 0; which < 2; which++) { 993 for (int i = 0; i < compact->compaction->num_input_files(which); i++) { 994 stats.bytes_read += compact->compaction->input(which, i)->file_size; 995 } 996 } 997 for (size_t i = 0; i < compact->outputs.size(); i++) { 998 stats.bytes_written += compact->outputs[i].file_size; 999 } 1000 1001 mutex_.Lock(); 1002 stats_[compact->compaction->level() + 1].Add(stats); 1003 1004 if (status.ok()) { 1005 status = InstallCompactionResults(compact); 1006 } 1007 if (!status.ok()) { 1008 RecordBackgroundError(status); 1009 } 1010 VersionSet::LevelSummaryStorage tmp; 1011 Log(options_.info_log, 1012 "compacted to: %s", versions_->LevelSummary(&tmp)); 1013 return status; 1014} 1015 1016namespace { 1017struct IterState { 1018 port::Mutex* mu; 1019 Version* version; 1020 MemTable* mem; 1021 MemTable* imm; 1022}; 1023 1024static void CleanupIteratorState(void* arg1, void* arg2) { 1025 IterState* state = reinterpret_cast<IterState*>(arg1); 1026 state->mu->Lock(); 1027 state->mem->Unref(); 1028 if (state->imm != NULL) state->imm->Unref(); 1029 state->version->Unref(); 1030 state->mu->Unlock(); 1031 delete state; 1032} 1033} // namespace 1034 1035Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, 1036 SequenceNumber* latest_snapshot, 1037 uint32_t* seed) { 1038 IterState* cleanup = new IterState; 1039 mutex_.Lock(); 1040 *latest_snapshot = versions_->LastSequence(); 1041 1042 // Collect together all needed child iterators 1043 std::vector<Iterator*> list; 1044 list.push_back(mem_->NewIterator()); 1045 mem_->Ref(); 1046 if (imm_ != NULL) { 1047 list.push_back(imm_->NewIterator()); 1048 imm_->Ref(); 1049 } 1050 versions_->current()->AddIterators(options, &list); 1051 Iterator* internal_iter = 1052 NewMergingIterator(&internal_comparator_, &list[0], list.size()); 1053 versions_->current()->Ref(); 1054 1055 cleanup->mu = &mutex_; 1056 cleanup->mem = mem_; 1057 cleanup->imm = imm_; 1058 cleanup->version = versions_->current(); 1059 internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); 1060 1061 *seed = ++seed_; 1062 mutex_.Unlock(); 1063 return internal_iter; 1064} 1065 1066Iterator* DBImpl::TEST_NewInternalIterator() { 1067 SequenceNumber ignored; 1068 uint32_t ignored_seed; 1069 return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); 1070} 1071 1072int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { 1073 MutexLock l(&mutex_); 1074 return versions_->MaxNextLevelOverlappingBytes(); 1075} 1076 1077Status DBImpl::Get(const ReadOptions& options, 1078 const Slice& key, 1079 std::string* value) { 1080 Status s; 1081 MutexLock l(&mutex_); 1082 SequenceNumber snapshot; 1083 if (options.snapshot != NULL) { 1084 snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; 1085 } else { 1086 snapshot = versions_->LastSequence(); 1087 } 1088 1089 MemTable* mem = mem_; 1090 MemTable* imm = imm_; 1091 Version* current = versions_->current(); 1092 mem->Ref(); 1093 if (imm != NULL) imm->Ref(); 1094 current->Ref(); 1095 1096 bool have_stat_update = false; 1097 Version::GetStats stats; 1098 1099 // Unlock while reading from files and memtables 1100 { 1101 mutex_.Unlock(); 1102 // First look in the memtable, then in the immutable memtable (if any). 1103 LookupKey lkey(key, snapshot); 1104 if (mem->Get(lkey, value, &s)) { 1105 // Done 1106 } else if (imm != NULL && imm->Get(lkey, value, &s)) { 1107 // Done 1108 } else { 1109 s = current->Get(options, lkey, value, &stats); 1110 have_stat_update = true; 1111 } 1112 mutex_.Lock(); 1113 } 1114 1115 if (have_stat_update && current->UpdateStats(stats)) { 1116 MaybeScheduleCompaction(); 1117 } 1118 mem->Unref(); 1119 if (imm != NULL) imm->Unref(); 1120 current->Unref(); 1121 return s; 1122} 1123 1124Iterator* DBImpl::NewIterator(const ReadOptions& options) { 1125 SequenceNumber latest_snapshot; 1126 uint32_t seed; 1127 Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); 1128 return NewDBIterator( 1129 this, user_comparator(), iter, 1130 (options.snapshot != NULL 1131 ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_ 1132 : latest_snapshot), 1133 seed); 1134} 1135 1136void DBImpl::RecordReadSample(Slice key) { 1137 MutexLock l(&mutex_); 1138 if (versions_->current()->RecordReadSample(key)) { 1139 MaybeScheduleCompaction(); 1140 } 1141} 1142 1143const Snapshot* DBImpl::GetSnapshot() { 1144 MutexLock l(&mutex_); 1145 return snapshots_.New(versions_->LastSequence()); 1146} 1147 1148void DBImpl::ReleaseSnapshot(const Snapshot* s) { 1149 MutexLock l(&mutex_); 1150 snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s)); 1151} 1152 1153// Convenience methods 1154Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { 1155 return DB::Put(o, key, val); 1156} 1157 1158Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { 1159 return DB::Delete(options, key); 1160} 1161 1162Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { 1163 Writer w(&mutex_); 1164 w.batch = my_batch; 1165 w.sync = options.sync; 1166 w.done = false; 1167 1168 MutexLock l(&mutex_); 1169 writers_.push_back(&w); 1170 while (!w.done && &w != writers_.front()) { 1171 w.cv.Wait(); 1172 } 1173 if (w.done) { 1174 return w.status; 1175 } 1176 1177 // May temporarily unlock and wait. 1178 Status status = MakeRoomForWrite(my_batch == NULL); 1179 uint64_t last_sequence = versions_->LastSequence(); 1180 Writer* last_writer = &w; 1181 if (status.ok() && my_batch != NULL) { // NULL batch is for compactions 1182 WriteBatch* updates = BuildBatchGroup(&last_writer); 1183 WriteBatchInternal::SetSequence(updates, last_sequence + 1); 1184 last_sequence += WriteBatchInternal::Count(updates); 1185 1186 // Add to log and apply to memtable. We can release the lock 1187 // during this phase since &w is currently responsible for logging 1188 // and protects against concurrent loggers and concurrent writes 1189 // into mem_. 1190 { 1191 mutex_.Unlock(); 1192 status = log_->AddRecord(WriteBatchInternal::Contents(updates)); 1193 bool sync_error = false; 1194 if (status.ok() && options.sync) { 1195 status = logfile_->Sync(); 1196 if (!status.ok()) { 1197 sync_error = true; 1198 } 1199 } 1200 if (status.ok()) { 1201 status = WriteBatchInternal::InsertInto(updates, mem_); 1202 } 1203 mutex_.Lock(); 1204 if (sync_error) { 1205 // The state of the log file is indeterminate: the log record we 1206 // just added may or may not show up when the DB is re-opened. 1207 // So we force the DB into a mode where all future writes fail. 1208 RecordBackgroundError(status); 1209 } 1210 } 1211 if (updates == tmp_batch_) tmp_batch_->Clear(); 1212 1213 versions_->SetLastSequence(last_sequence); 1214 } 1215 1216 while (true) { 1217 Writer* ready = writers_.front(); 1218 writers_.pop_front(); 1219 if (ready != &w) { 1220 ready->status = status; 1221 ready->done = true; 1222 ready->cv.Signal(); 1223 } 1224 if (ready == last_writer) break; 1225 } 1226 1227 // Notify new head of write queue 1228 if (!writers_.empty()) { 1229 writers_.front()->cv.Signal(); 1230 } 1231 1232 return status; 1233} 1234 1235// REQUIRES: Writer list must be non-empty 1236// REQUIRES: First writer must have a non-NULL batch 1237WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { 1238 assert(!writers_.empty()); 1239 Writer* first = writers_.front(); 1240 WriteBatch* result = first->batch; 1241 assert(result != NULL); 1242 1243 size_t size = WriteBatchInternal::ByteSize(first->batch); 1244 1245 // Allow the group to grow up to a maximum size, but if the 1246 // original write is small, limit the growth so we do not slow 1247 // down the small write too much. 1248 size_t max_size = 1 << 20; 1249 if (size <= (128<<10)) { 1250 max_size = size + (128<<10); 1251 } 1252 1253 *last_writer = first; 1254 std::deque<Writer*>::iterator iter = writers_.begin(); 1255 ++iter; // Advance past "first" 1256 for (; iter != writers_.end(); ++iter) { 1257 Writer* w = *iter; 1258 if (w->sync && !first->sync) { 1259 // Do not include a sync write into a batch handled by a non-sync write. 1260 break; 1261 } 1262 1263 if (w->batch != NULL) { 1264 size += WriteBatchInternal::ByteSize(w->batch); 1265 if (size > max_size) { 1266 // Do not make batch too big 1267 break; 1268 } 1269 1270 // Append to *reuslt 1271 if (result == first->batch) { 1272 // Switch to temporary batch instead of disturbing caller's batch 1273 result = tmp_batch_; 1274 assert(WriteBatchInternal::Count(result) == 0); 1275 WriteBatchInternal::Append(result, first->batch); 1276 } 1277 WriteBatchInternal::Append(result, w->batch); 1278 } 1279 *last_writer = w; 1280 } 1281 return result; 1282} 1283 1284// REQUIRES: mutex_ is held 1285// REQUIRES: this thread is currently at the front of the writer queue 1286Status DBImpl::MakeRoomForWrite(bool force) { 1287 mutex_.AssertHeld(); 1288 assert(!writers_.empty()); 1289 bool allow_delay = !force; 1290 Status s; 1291 while (true) { 1292 if (!bg_error_.ok()) { 1293 // Yield previous error 1294 s = bg_error_; 1295 break; 1296 } else if ( 1297 allow_delay && 1298 versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { 1299 // We are getting close to hitting a hard limit on the number of 1300 // L0 files. Rather than delaying a single write by several 1301 // seconds when we hit the hard limit, start delaying each 1302 // individual write by 1ms to reduce latency variance. Also, 1303 // this delay hands over some CPU to the compaction thread in 1304 // case it is sharing the same core as the writer. 1305 mutex_.Unlock(); 1306 env_->SleepForMicroseconds(1000); 1307 allow_delay = false; // Do not delay a single write more than once 1308 mutex_.Lock(); 1309 } else if (!force && 1310 (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { 1311 // There is room in current memtable 1312 break; 1313 } else if (imm_ != NULL) { 1314 // We have filled up the current memtable, but the previous 1315 // one is still being compacted, so we wait. 1316 Log(options_.info_log, "Current memtable full; waiting...\n"); 1317 bg_cv_.Wait(); 1318 } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { 1319 // There are too many level-0 files. 1320 Log(options_.info_log, "Too many L0 files; waiting...\n"); 1321 bg_cv_.Wait(); 1322 } else { 1323 // Attempt to switch to a new memtable and trigger compaction of old 1324 assert(versions_->PrevLogNumber() == 0); 1325 uint64_t new_log_number = versions_->NewFileNumber(); 1326 WritableFile* lfile = NULL; 1327 s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); 1328 if (!s.ok()) { 1329 // Avoid chewing through file number space in a tight loop. 1330 versions_->ReuseFileNumber(new_log_number); 1331 break; 1332 } 1333 delete log_; 1334 delete logfile_; 1335 logfile_ = lfile; 1336 logfile_number_ = new_log_number; 1337 log_ = new log::Writer(lfile); 1338 imm_ = mem_; 1339 has_imm_.Release_Store(imm_); 1340 mem_ = new MemTable(internal_comparator_); 1341 mem_->Ref(); 1342 force = false; // Do not force another compaction if have room 1343 MaybeScheduleCompaction(); 1344 } 1345 } 1346 return s; 1347} 1348 1349bool DBImpl::GetProperty(const Slice& property, std::string* value) { 1350 value->clear(); 1351 1352 MutexLock l(&mutex_); 1353 Slice in = property; 1354 Slice prefix("leveldb."); 1355 if (!in.starts_with(prefix)) return false; 1356 in.remove_prefix(prefix.size()); 1357 1358 if (in.starts_with("num-files-at-level")) { 1359 in.remove_prefix(strlen("num-files-at-level")); 1360 uint64_t level; 1361 bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); 1362 if (!ok || level >= config::kNumLevels) { 1363 return false; 1364 } else { 1365 char buf[100]; 1366 snprintf(buf, sizeof(buf), "%d", 1367 versions_->NumLevelFiles(static_cast<int>(level))); 1368 *value = buf; 1369 return true; 1370 } 1371 } else if (in == "stats") { 1372 char buf[200]; 1373 snprintf(buf, sizeof(buf), 1374 " Compactions\n" 1375 "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" 1376 "--------------------------------------------------\n" 1377 ); 1378 value->append(buf); 1379 for (int level = 0; level < config::kNumLevels; level++) { 1380 int files = versions_->NumLevelFiles(level); 1381 if (stats_[level].micros > 0 || files > 0) { 1382 snprintf( 1383 buf, sizeof(buf), 1384 "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", 1385 level, 1386 files, 1387 versions_->NumLevelBytes(level) / 1048576.0, 1388 stats_[level].micros / 1e6, 1389 stats_[level].bytes_read / 1048576.0, 1390 stats_[level].bytes_written / 1048576.0); 1391 value->append(buf); 1392 } 1393 } 1394 return true; 1395 } else if (in == "sstables") { 1396 *value = versions_->current()->DebugString(); 1397 return true; 1398 } 1399 1400 return false; 1401} 1402 1403void DBImpl::GetApproximateSizes( 1404 const Range* range, int n, 1405 uint64_t* sizes) { 1406 // TODO(opt): better implementation 1407 Version* v; 1408 { 1409 MutexLock l(&mutex_); 1410 versions_->current()->Ref(); 1411 v = versions_->current(); 1412 } 1413 1414 for (int i = 0; i < n; i++) { 1415 // Convert user_key into a corresponding internal key. 1416 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); 1417 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); 1418 uint64_t start = versions_->ApproximateOffsetOf(v, k1); 1419 uint64_t limit = versions_->ApproximateOffsetOf(v, k2); 1420 sizes[i] = (limit >= start ? limit - start : 0); 1421 } 1422 1423 { 1424 MutexLock l(&mutex_); 1425 v->Unref(); 1426 } 1427} 1428 1429// Default implementations of convenience methods that subclasses of DB 1430// can call if they wish 1431Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { 1432 WriteBatch batch; 1433 batch.Put(key, value); 1434 return Write(opt, &batch); 1435} 1436 1437Status DB::Delete(const WriteOptions& opt, const Slice& key) { 1438 WriteBatch batch; 1439 batch.Delete(key); 1440 return Write(opt, &batch); 1441} 1442 1443DB::~DB() { } 1444 1445Status DB::Open(const Options& options, const std::string& dbname, 1446 DB** dbptr) { 1447 *dbptr = NULL; 1448 1449 DBImpl* impl = new DBImpl(options, dbname); 1450 impl->mutex_.Lock(); 1451 VersionEdit edit; 1452 Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists 1453 if (s.ok()) { 1454 uint64_t new_log_number = impl->versions_->NewFileNumber(); 1455 WritableFile* lfile; 1456 s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), 1457 &lfile); 1458 if (s.ok()) { 1459 edit.SetLogNumber(new_log_number); 1460 impl->logfile_ = lfile; 1461 impl->logfile_number_ = new_log_number; 1462 impl->log_ = new log::Writer(lfile); 1463 s = impl->versions_->LogAndApply(&edit, &impl->mutex_); 1464 } 1465 if (s.ok()) { 1466 impl->DeleteObsoleteFiles(); 1467 impl->MaybeScheduleCompaction(); 1468 } 1469 } 1470 impl->mutex_.Unlock(); 1471 if (s.ok()) { 1472 *dbptr = impl; 1473 } else { 1474 delete impl; 1475 } 1476 return s; 1477} 1478 1479Snapshot::~Snapshot() { 1480} 1481 1482Status DestroyDB(const std::string& dbname, const Options& options) { 1483 Env* env = options.env; 1484 std::vector<std::string> filenames; 1485 // Ignore error in case directory does not exist 1486 env->GetChildren(dbname, &filenames); 1487 if (filenames.empty()) { 1488 return Status::OK(); 1489 } 1490 1491 FileLock* lock; 1492 const std::string lockname = LockFileName(dbname); 1493 Status result = env->LockFile(lockname, &lock); 1494 if (result.ok()) { 1495 uint64_t number; 1496 FileType type; 1497 for (size_t i = 0; i < filenames.size(); i++) { 1498 if (ParseFileName(filenames[i], &number, &type) && 1499 type != kDBLockFile) { // Lock file will be deleted at end 1500 Status del = env->DeleteFile(dbname + "/" + filenames[i]); 1501 if (result.ok() && !del.ok()) { 1502 result = del; 1503 } 1504 } 1505 } 1506 env->UnlockFile(lock); // Ignore error since state is already gone 1507 env->DeleteFile(lockname); 1508 env->DeleteDir(dbname); // Ignore error in case dir contains other files 1509 } 1510 return result; 1511} 1512 1513} // namespace leveldb 1514