1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "db/db_impl.h"
6
7#include <algorithm>
8#include <set>
9#include <string>
10#include <stdint.h>
11#include <stdio.h>
12#include <vector>
13#include "db/builder.h"
14#include "db/db_iter.h"
15#include "db/dbformat.h"
16#include "db/filename.h"
17#include "db/log_reader.h"
18#include "db/log_writer.h"
19#include "db/memtable.h"
20#include "db/table_cache.h"
21#include "db/version_set.h"
22#include "db/write_batch_internal.h"
23#include "leveldb/db.h"
24#include "leveldb/env.h"
25#include "leveldb/status.h"
26#include "leveldb/table.h"
27#include "leveldb/table_builder.h"
28#include "port/port.h"
29#include "table/block.h"
30#include "table/merger.h"
31#include "table/two_level_iterator.h"
32#include "util/coding.h"
33#include "util/logging.h"
34#include "util/mutexlock.h"
35
36namespace leveldb {
37
38const int kNumNonTableCacheFiles = 10;
39
40// Information kept for every waiting writer
41struct DBImpl::Writer {
42  Status status;
43  WriteBatch* batch;
44  bool sync;
45  bool done;
46  port::CondVar cv;
47
48  explicit Writer(port::Mutex* mu) : cv(mu) { }
49};
50
51struct DBImpl::CompactionState {
52  Compaction* const compaction;
53
54  // Sequence numbers < smallest_snapshot are not significant since we
55  // will never have to service a snapshot below smallest_snapshot.
56  // Therefore if we have seen a sequence number S <= smallest_snapshot,
57  // we can drop all entries for the same key with sequence numbers < S.
58  SequenceNumber smallest_snapshot;
59
60  // Files produced by compaction
61  struct Output {
62    uint64_t number;
63    uint64_t file_size;
64    InternalKey smallest, largest;
65  };
66  std::vector<Output> outputs;
67
68  // State kept for output being generated
69  WritableFile* outfile;
70  TableBuilder* builder;
71
72  uint64_t total_bytes;
73
74  Output* current_output() { return &outputs[outputs.size()-1]; }
75
76  explicit CompactionState(Compaction* c)
77      : compaction(c),
78        outfile(NULL),
79        builder(NULL),
80        total_bytes(0) {
81  }
82};
83
84// Fix user-supplied options to be reasonable
85template <class T,class V>
86static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
87  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
88  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
89}
90Options SanitizeOptions(const std::string& dbname,
91                        const InternalKeyComparator* icmp,
92                        const InternalFilterPolicy* ipolicy,
93                        const Options& src) {
94  Options result = src;
95  result.comparator = icmp;
96  result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
97  ClipToRange(&result.max_open_files,    64 + kNumNonTableCacheFiles, 50000);
98  ClipToRange(&result.write_buffer_size, 64<<10,                      1<<30);
99  ClipToRange(&result.block_size,        1<<10,                       4<<20);
100  if (result.info_log == NULL) {
101    // Open a log file in the same directory as the db
102    src.env->CreateDir(dbname);  // In case it does not exist
103    src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
104    Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
105    if (!s.ok()) {
106      // No place suitable for logging
107      result.info_log = NULL;
108    }
109  }
110  if (result.block_cache == NULL) {
111    result.block_cache = NewLRUCache(8 << 20);
112  }
113  return result;
114}
115
116DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
117    : env_(raw_options.env),
118      internal_comparator_(raw_options.comparator),
119      internal_filter_policy_(raw_options.filter_policy),
120      options_(SanitizeOptions(dbname, &internal_comparator_,
121                               &internal_filter_policy_, raw_options)),
122      owns_info_log_(options_.info_log != raw_options.info_log),
123      owns_cache_(options_.block_cache != raw_options.block_cache),
124      dbname_(dbname),
125      db_lock_(NULL),
126      shutting_down_(NULL),
127      bg_cv_(&mutex_),
128      mem_(new MemTable(internal_comparator_)),
129      imm_(NULL),
130      logfile_(NULL),
131      logfile_number_(0),
132      log_(NULL),
133      seed_(0),
134      tmp_batch_(new WriteBatch),
135      bg_compaction_scheduled_(false),
136      manual_compaction_(NULL) {
137  mem_->Ref();
138  has_imm_.Release_Store(NULL);
139
140  // Reserve ten files or so for other uses and give the rest to TableCache.
141  const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
142  table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
143
144  versions_ = new VersionSet(dbname_, &options_, table_cache_,
145                             &internal_comparator_);
146}
147
148DBImpl::~DBImpl() {
149  // Wait for background work to finish
150  mutex_.Lock();
151  shutting_down_.Release_Store(this);  // Any non-NULL value is ok
152  while (bg_compaction_scheduled_) {
153    bg_cv_.Wait();
154  }
155  mutex_.Unlock();
156
157  if (db_lock_ != NULL) {
158    env_->UnlockFile(db_lock_);
159  }
160
161  delete versions_;
162  if (mem_ != NULL) mem_->Unref();
163  if (imm_ != NULL) imm_->Unref();
164  delete tmp_batch_;
165  delete log_;
166  delete logfile_;
167  delete table_cache_;
168
169  if (owns_info_log_) {
170    delete options_.info_log;
171  }
172  if (owns_cache_) {
173    delete options_.block_cache;
174  }
175}
176
177Status DBImpl::NewDB() {
178  VersionEdit new_db;
179  new_db.SetComparatorName(user_comparator()->Name());
180  new_db.SetLogNumber(0);
181  new_db.SetNextFile(2);
182  new_db.SetLastSequence(0);
183
184  const std::string manifest = DescriptorFileName(dbname_, 1);
185  WritableFile* file;
186  Status s = env_->NewWritableFile(manifest, &file);
187  if (!s.ok()) {
188    return s;
189  }
190  {
191    log::Writer log(file);
192    std::string record;
193    new_db.EncodeTo(&record);
194    s = log.AddRecord(record);
195    if (s.ok()) {
196      s = file->Close();
197    }
198  }
199  delete file;
200  if (s.ok()) {
201    // Make "CURRENT" file that points to the new manifest file.
202    s = SetCurrentFile(env_, dbname_, 1);
203  } else {
204    env_->DeleteFile(manifest);
205  }
206  return s;
207}
208
209void DBImpl::MaybeIgnoreError(Status* s) const {
210  if (s->ok() || options_.paranoid_checks) {
211    // No change needed
212  } else {
213    Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
214    *s = Status::OK();
215  }
216}
217
218void DBImpl::DeleteObsoleteFiles() {
219  if (!bg_error_.ok()) {
220    // After a background error, we don't know whether a new version may
221    // or may not have been committed, so we cannot safely garbage collect.
222    return;
223  }
224
225  // Make a set of all of the live files
226  std::set<uint64_t> live = pending_outputs_;
227  versions_->AddLiveFiles(&live);
228
229  std::vector<std::string> filenames;
230  env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
231  uint64_t number;
232  FileType type;
233  for (size_t i = 0; i < filenames.size(); i++) {
234    if (ParseFileName(filenames[i], &number, &type)) {
235      bool keep = true;
236      switch (type) {
237        case kLogFile:
238          keep = ((number >= versions_->LogNumber()) ||
239                  (number == versions_->PrevLogNumber()));
240          break;
241        case kDescriptorFile:
242          // Keep my manifest file, and any newer incarnations'
243          // (in case there is a race that allows other incarnations)
244          keep = (number >= versions_->ManifestFileNumber());
245          break;
246        case kTableFile:
247          keep = (live.find(number) != live.end());
248          break;
249        case kTempFile:
250          // Any temp files that are currently being written to must
251          // be recorded in pending_outputs_, which is inserted into "live"
252          keep = (live.find(number) != live.end());
253          break;
254        case kCurrentFile:
255        case kDBLockFile:
256        case kInfoLogFile:
257          keep = true;
258          break;
259      }
260
261      if (!keep) {
262        if (type == kTableFile) {
263          table_cache_->Evict(number);
264        }
265        Log(options_.info_log, "Delete type=%d #%lld\n",
266            int(type),
267            static_cast<unsigned long long>(number));
268        env_->DeleteFile(dbname_ + "/" + filenames[i]);
269      }
270    }
271  }
272}
273
274Status DBImpl::Recover(VersionEdit* edit) {
275  mutex_.AssertHeld();
276
277  // Ignore error from CreateDir since the creation of the DB is
278  // committed only when the descriptor is created, and this directory
279  // may already exist from a previous failed creation attempt.
280  env_->CreateDir(dbname_);
281  assert(db_lock_ == NULL);
282  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
283  if (!s.ok()) {
284    return s;
285  }
286
287  if (!env_->FileExists(CurrentFileName(dbname_))) {
288    if (options_.create_if_missing) {
289      s = NewDB();
290      if (!s.ok()) {
291        return s;
292      }
293    } else {
294      return Status::InvalidArgument(
295          dbname_, "does not exist (create_if_missing is false)");
296    }
297  } else {
298    if (options_.error_if_exists) {
299      return Status::InvalidArgument(
300          dbname_, "exists (error_if_exists is true)");
301    }
302  }
303
304  s = versions_->Recover();
305  if (s.ok()) {
306    SequenceNumber max_sequence(0);
307
308    // Recover from all newer log files than the ones named in the
309    // descriptor (new log files may have been added by the previous
310    // incarnation without registering them in the descriptor).
311    //
312    // Note that PrevLogNumber() is no longer used, but we pay
313    // attention to it in case we are recovering a database
314    // produced by an older version of leveldb.
315    const uint64_t min_log = versions_->LogNumber();
316    const uint64_t prev_log = versions_->PrevLogNumber();
317    std::vector<std::string> filenames;
318    s = env_->GetChildren(dbname_, &filenames);
319    if (!s.ok()) {
320      return s;
321    }
322    std::set<uint64_t> expected;
323    versions_->AddLiveFiles(&expected);
324    uint64_t number;
325    FileType type;
326    std::vector<uint64_t> logs;
327    for (size_t i = 0; i < filenames.size(); i++) {
328      if (ParseFileName(filenames[i], &number, &type)) {
329        expected.erase(number);
330        if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
331          logs.push_back(number);
332      }
333    }
334    if (!expected.empty()) {
335      char buf[50];
336      snprintf(buf, sizeof(buf), "%d missing files; e.g.",
337               static_cast<int>(expected.size()));
338      return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
339    }
340
341    // Recover in the order in which the logs were generated
342    std::sort(logs.begin(), logs.end());
343    for (size_t i = 0; i < logs.size(); i++) {
344      s = RecoverLogFile(logs[i], edit, &max_sequence);
345
346      // The previous incarnation may not have written any MANIFEST
347      // records after allocating this log number.  So we manually
348      // update the file number allocation counter in VersionSet.
349      versions_->MarkFileNumberUsed(logs[i]);
350    }
351
352    if (s.ok()) {
353      if (versions_->LastSequence() < max_sequence) {
354        versions_->SetLastSequence(max_sequence);
355      }
356    }
357  }
358
359  return s;
360}
361
362Status DBImpl::RecoverLogFile(uint64_t log_number,
363                              VersionEdit* edit,
364                              SequenceNumber* max_sequence) {
365  struct LogReporter : public log::Reader::Reporter {
366    Env* env;
367    Logger* info_log;
368    const char* fname;
369    Status* status;  // NULL if options_.paranoid_checks==false
370    virtual void Corruption(size_t bytes, const Status& s) {
371      Log(info_log, "%s%s: dropping %d bytes; %s",
372          (this->status == NULL ? "(ignoring error) " : ""),
373          fname, static_cast<int>(bytes), s.ToString().c_str());
374      if (this->status != NULL && this->status->ok()) *this->status = s;
375    }
376  };
377
378  mutex_.AssertHeld();
379
380  // Open the log file
381  std::string fname = LogFileName(dbname_, log_number);
382  SequentialFile* file;
383  Status status = env_->NewSequentialFile(fname, &file);
384  if (!status.ok()) {
385    MaybeIgnoreError(&status);
386    return status;
387  }
388
389  // Create the log reader.
390  LogReporter reporter;
391  reporter.env = env_;
392  reporter.info_log = options_.info_log;
393  reporter.fname = fname.c_str();
394  reporter.status = (options_.paranoid_checks ? &status : NULL);
395  // We intentially make log::Reader do checksumming even if
396  // paranoid_checks==false so that corruptions cause entire commits
397  // to be skipped instead of propagating bad information (like overly
398  // large sequence numbers).
399  log::Reader reader(file, &reporter, true/*checksum*/,
400                     0/*initial_offset*/);
401  Log(options_.info_log, "Recovering log #%llu",
402      (unsigned long long) log_number);
403
404  // Read all the records and add to a memtable
405  std::string scratch;
406  Slice record;
407  WriteBatch batch;
408  MemTable* mem = NULL;
409  while (reader.ReadRecord(&record, &scratch) &&
410         status.ok()) {
411    if (record.size() < 12) {
412      reporter.Corruption(
413          record.size(), Status::Corruption("log record too small"));
414      continue;
415    }
416    WriteBatchInternal::SetContents(&batch, record);
417
418    if (mem == NULL) {
419      mem = new MemTable(internal_comparator_);
420      mem->Ref();
421    }
422    status = WriteBatchInternal::InsertInto(&batch, mem);
423    MaybeIgnoreError(&status);
424    if (!status.ok()) {
425      break;
426    }
427    const SequenceNumber last_seq =
428        WriteBatchInternal::Sequence(&batch) +
429        WriteBatchInternal::Count(&batch) - 1;
430    if (last_seq > *max_sequence) {
431      *max_sequence = last_seq;
432    }
433
434    if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
435      status = WriteLevel0Table(mem, edit, NULL);
436      if (!status.ok()) {
437        // Reflect errors immediately so that conditions like full
438        // file-systems cause the DB::Open() to fail.
439        break;
440      }
441      mem->Unref();
442      mem = NULL;
443    }
444  }
445
446  if (status.ok() && mem != NULL) {
447    status = WriteLevel0Table(mem, edit, NULL);
448    // Reflect errors immediately so that conditions like full
449    // file-systems cause the DB::Open() to fail.
450  }
451
452  if (mem != NULL) mem->Unref();
453  delete file;
454  return status;
455}
456
457Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
458                                Version* base) {
459  mutex_.AssertHeld();
460  const uint64_t start_micros = env_->NowMicros();
461  FileMetaData meta;
462  meta.number = versions_->NewFileNumber();
463  pending_outputs_.insert(meta.number);
464  Iterator* iter = mem->NewIterator();
465  Log(options_.info_log, "Level-0 table #%llu: started",
466      (unsigned long long) meta.number);
467
468  Status s;
469  {
470    mutex_.Unlock();
471    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
472    mutex_.Lock();
473  }
474
475  Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
476      (unsigned long long) meta.number,
477      (unsigned long long) meta.file_size,
478      s.ToString().c_str());
479  delete iter;
480  pending_outputs_.erase(meta.number);
481
482
483  // Note that if file_size is zero, the file has been deleted and
484  // should not be added to the manifest.
485  int level = 0;
486  if (s.ok() && meta.file_size > 0) {
487    const Slice min_user_key = meta.smallest.user_key();
488    const Slice max_user_key = meta.largest.user_key();
489    if (base != NULL) {
490      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
491    }
492    edit->AddFile(level, meta.number, meta.file_size,
493                  meta.smallest, meta.largest);
494  }
495
496  CompactionStats stats;
497  stats.micros = env_->NowMicros() - start_micros;
498  stats.bytes_written = meta.file_size;
499  stats_[level].Add(stats);
500  return s;
501}
502
503void DBImpl::CompactMemTable() {
504  mutex_.AssertHeld();
505  assert(imm_ != NULL);
506
507  // Save the contents of the memtable as a new Table
508  VersionEdit edit;
509  Version* base = versions_->current();
510  base->Ref();
511  Status s = WriteLevel0Table(imm_, &edit, base);
512  base->Unref();
513
514  if (s.ok() && shutting_down_.Acquire_Load()) {
515    s = Status::IOError("Deleting DB during memtable compaction");
516  }
517
518  // Replace immutable memtable with the generated Table
519  if (s.ok()) {
520    edit.SetPrevLogNumber(0);
521    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
522    s = versions_->LogAndApply(&edit, &mutex_);
523  }
524
525  if (s.ok()) {
526    // Commit to the new state
527    imm_->Unref();
528    imm_ = NULL;
529    has_imm_.Release_Store(NULL);
530    DeleteObsoleteFiles();
531  } else {
532    RecordBackgroundError(s);
533  }
534}
535
536void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
537  int max_level_with_files = 1;
538  {
539    MutexLock l(&mutex_);
540    Version* base = versions_->current();
541    for (int level = 1; level < config::kNumLevels; level++) {
542      if (base->OverlapInLevel(level, begin, end)) {
543        max_level_with_files = level;
544      }
545    }
546  }
547  TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
548  for (int level = 0; level < max_level_with_files; level++) {
549    TEST_CompactRange(level, begin, end);
550  }
551}
552
553void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
554  assert(level >= 0);
555  assert(level + 1 < config::kNumLevels);
556
557  InternalKey begin_storage, end_storage;
558
559  ManualCompaction manual;
560  manual.level = level;
561  manual.done = false;
562  if (begin == NULL) {
563    manual.begin = NULL;
564  } else {
565    begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
566    manual.begin = &begin_storage;
567  }
568  if (end == NULL) {
569    manual.end = NULL;
570  } else {
571    end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
572    manual.end = &end_storage;
573  }
574
575  MutexLock l(&mutex_);
576  while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
577    if (manual_compaction_ == NULL) {  // Idle
578      manual_compaction_ = &manual;
579      MaybeScheduleCompaction();
580    } else {  // Running either my compaction or another compaction.
581      bg_cv_.Wait();
582    }
583  }
584  if (manual_compaction_ == &manual) {
585    // Cancel my manual compaction since we aborted early for some reason.
586    manual_compaction_ = NULL;
587  }
588}
589
590Status DBImpl::TEST_CompactMemTable() {
591  // NULL batch means just wait for earlier writes to be done
592  Status s = Write(WriteOptions(), NULL);
593  if (s.ok()) {
594    // Wait until the compaction completes
595    MutexLock l(&mutex_);
596    while (imm_ != NULL && bg_error_.ok()) {
597      bg_cv_.Wait();
598    }
599    if (imm_ != NULL) {
600      s = bg_error_;
601    }
602  }
603  return s;
604}
605
606void DBImpl::RecordBackgroundError(const Status& s) {
607  mutex_.AssertHeld();
608  if (bg_error_.ok()) {
609    bg_error_ = s;
610    bg_cv_.SignalAll();
611  }
612}
613
614void DBImpl::MaybeScheduleCompaction() {
615  mutex_.AssertHeld();
616  if (bg_compaction_scheduled_) {
617    // Already scheduled
618  } else if (shutting_down_.Acquire_Load()) {
619    // DB is being deleted; no more background compactions
620  } else if (!bg_error_.ok()) {
621    // Already got an error; no more changes
622  } else if (imm_ == NULL &&
623             manual_compaction_ == NULL &&
624             !versions_->NeedsCompaction()) {
625    // No work to be done
626  } else {
627    bg_compaction_scheduled_ = true;
628    env_->Schedule(&DBImpl::BGWork, this);
629  }
630}
631
632void DBImpl::BGWork(void* db) {
633  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
634}
635
636void DBImpl::BackgroundCall() {
637  MutexLock l(&mutex_);
638  assert(bg_compaction_scheduled_);
639  if (shutting_down_.Acquire_Load()) {
640    // No more background work when shutting down.
641  } else if (!bg_error_.ok()) {
642    // No more background work after a background error.
643  } else {
644    BackgroundCompaction();
645  }
646
647  bg_compaction_scheduled_ = false;
648
649  // Previous compaction may have produced too many files in a level,
650  // so reschedule another compaction if needed.
651  MaybeScheduleCompaction();
652  bg_cv_.SignalAll();
653}
654
655void DBImpl::BackgroundCompaction() {
656  mutex_.AssertHeld();
657
658  if (imm_ != NULL) {
659    CompactMemTable();
660    return;
661  }
662
663  Compaction* c;
664  bool is_manual = (manual_compaction_ != NULL);
665  InternalKey manual_end;
666  if (is_manual) {
667    ManualCompaction* m = manual_compaction_;
668    c = versions_->CompactRange(m->level, m->begin, m->end);
669    m->done = (c == NULL);
670    if (c != NULL) {
671      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
672    }
673    Log(options_.info_log,
674        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
675        m->level,
676        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
677        (m->end ? m->end->DebugString().c_str() : "(end)"),
678        (m->done ? "(end)" : manual_end.DebugString().c_str()));
679  } else {
680    c = versions_->PickCompaction();
681  }
682
683  Status status;
684  if (c == NULL) {
685    // Nothing to do
686  } else if (!is_manual && c->IsTrivialMove()) {
687    // Move file to next level
688    assert(c->num_input_files(0) == 1);
689    FileMetaData* f = c->input(0, 0);
690    c->edit()->DeleteFile(c->level(), f->number);
691    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
692                       f->smallest, f->largest);
693    status = versions_->LogAndApply(c->edit(), &mutex_);
694    if (!status.ok()) {
695      RecordBackgroundError(status);
696    }
697    VersionSet::LevelSummaryStorage tmp;
698    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
699        static_cast<unsigned long long>(f->number),
700        c->level() + 1,
701        static_cast<unsigned long long>(f->file_size),
702        status.ToString().c_str(),
703        versions_->LevelSummary(&tmp));
704  } else {
705    CompactionState* compact = new CompactionState(c);
706    status = DoCompactionWork(compact);
707    if (!status.ok()) {
708      RecordBackgroundError(status);
709    }
710    CleanupCompaction(compact);
711    c->ReleaseInputs();
712    DeleteObsoleteFiles();
713  }
714  delete c;
715
716  if (status.ok()) {
717    // Done
718  } else if (shutting_down_.Acquire_Load()) {
719    // Ignore compaction errors found during shutting down
720  } else {
721    Log(options_.info_log,
722        "Compaction error: %s", status.ToString().c_str());
723  }
724
725  if (is_manual) {
726    ManualCompaction* m = manual_compaction_;
727    if (!status.ok()) {
728      m->done = true;
729    }
730    if (!m->done) {
731      // We only compacted part of the requested range.  Update *m
732      // to the range that is left to be compacted.
733      m->tmp_storage = manual_end;
734      m->begin = &m->tmp_storage;
735    }
736    manual_compaction_ = NULL;
737  }
738}
739
740void DBImpl::CleanupCompaction(CompactionState* compact) {
741  mutex_.AssertHeld();
742  if (compact->builder != NULL) {
743    // May happen if we get a shutdown call in the middle of compaction
744    compact->builder->Abandon();
745    delete compact->builder;
746  } else {
747    assert(compact->outfile == NULL);
748  }
749  delete compact->outfile;
750  for (size_t i = 0; i < compact->outputs.size(); i++) {
751    const CompactionState::Output& out = compact->outputs[i];
752    pending_outputs_.erase(out.number);
753  }
754  delete compact;
755}
756
757Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
758  assert(compact != NULL);
759  assert(compact->builder == NULL);
760  uint64_t file_number;
761  {
762    mutex_.Lock();
763    file_number = versions_->NewFileNumber();
764    pending_outputs_.insert(file_number);
765    CompactionState::Output out;
766    out.number = file_number;
767    out.smallest.Clear();
768    out.largest.Clear();
769    compact->outputs.push_back(out);
770    mutex_.Unlock();
771  }
772
773  // Make the output file
774  std::string fname = TableFileName(dbname_, file_number);
775  Status s = env_->NewWritableFile(fname, &compact->outfile);
776  if (s.ok()) {
777    compact->builder = new TableBuilder(options_, compact->outfile);
778  }
779  return s;
780}
781
782Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
783                                          Iterator* input) {
784  assert(compact != NULL);
785  assert(compact->outfile != NULL);
786  assert(compact->builder != NULL);
787
788  const uint64_t output_number = compact->current_output()->number;
789  assert(output_number != 0);
790
791  // Check for iterator errors
792  Status s = input->status();
793  const uint64_t current_entries = compact->builder->NumEntries();
794  if (s.ok()) {
795    s = compact->builder->Finish();
796  } else {
797    compact->builder->Abandon();
798  }
799  const uint64_t current_bytes = compact->builder->FileSize();
800  compact->current_output()->file_size = current_bytes;
801  compact->total_bytes += current_bytes;
802  delete compact->builder;
803  compact->builder = NULL;
804
805  // Finish and check for file errors
806  if (s.ok()) {
807    s = compact->outfile->Sync();
808  }
809  if (s.ok()) {
810    s = compact->outfile->Close();
811  }
812  delete compact->outfile;
813  compact->outfile = NULL;
814
815  if (s.ok() && current_entries > 0) {
816    // Verify that the table is usable
817    Iterator* iter = table_cache_->NewIterator(ReadOptions(),
818                                               output_number,
819                                               current_bytes);
820    s = iter->status();
821    delete iter;
822    if (s.ok()) {
823      Log(options_.info_log,
824          "Generated table #%llu: %lld keys, %lld bytes",
825          (unsigned long long) output_number,
826          (unsigned long long) current_entries,
827          (unsigned long long) current_bytes);
828    }
829  }
830  return s;
831}
832
833
834Status DBImpl::InstallCompactionResults(CompactionState* compact) {
835  mutex_.AssertHeld();
836  Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
837      compact->compaction->num_input_files(0),
838      compact->compaction->level(),
839      compact->compaction->num_input_files(1),
840      compact->compaction->level() + 1,
841      static_cast<long long>(compact->total_bytes));
842
843  // Add compaction outputs
844  compact->compaction->AddInputDeletions(compact->compaction->edit());
845  const int level = compact->compaction->level();
846  for (size_t i = 0; i < compact->outputs.size(); i++) {
847    const CompactionState::Output& out = compact->outputs[i];
848    compact->compaction->edit()->AddFile(
849        level + 1,
850        out.number, out.file_size, out.smallest, out.largest);
851  }
852  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
853}
854
855Status DBImpl::DoCompactionWork(CompactionState* compact) {
856  const uint64_t start_micros = env_->NowMicros();
857  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
858
859  Log(options_.info_log,  "Compacting %d@%d + %d@%d files",
860      compact->compaction->num_input_files(0),
861      compact->compaction->level(),
862      compact->compaction->num_input_files(1),
863      compact->compaction->level() + 1);
864
865  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
866  assert(compact->builder == NULL);
867  assert(compact->outfile == NULL);
868  if (snapshots_.empty()) {
869    compact->smallest_snapshot = versions_->LastSequence();
870  } else {
871    compact->smallest_snapshot = snapshots_.oldest()->number_;
872  }
873
874  // Release mutex while we're actually doing the compaction work
875  mutex_.Unlock();
876
877  Iterator* input = versions_->MakeInputIterator(compact->compaction);
878  input->SeekToFirst();
879  Status status;
880  ParsedInternalKey ikey;
881  std::string current_user_key;
882  bool has_current_user_key = false;
883  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
884  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
885    // Prioritize immutable compaction work
886    if (has_imm_.NoBarrier_Load() != NULL) {
887      const uint64_t imm_start = env_->NowMicros();
888      mutex_.Lock();
889      if (imm_ != NULL) {
890        CompactMemTable();
891        bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
892      }
893      mutex_.Unlock();
894      imm_micros += (env_->NowMicros() - imm_start);
895    }
896
897    Slice key = input->key();
898    if (compact->compaction->ShouldStopBefore(key) &&
899        compact->builder != NULL) {
900      status = FinishCompactionOutputFile(compact, input);
901      if (!status.ok()) {
902        break;
903      }
904    }
905
906    // Handle key/value, add to state, etc.
907    bool drop = false;
908    if (!ParseInternalKey(key, &ikey)) {
909      // Do not hide error keys
910      current_user_key.clear();
911      has_current_user_key = false;
912      last_sequence_for_key = kMaxSequenceNumber;
913    } else {
914      if (!has_current_user_key ||
915          user_comparator()->Compare(ikey.user_key,
916                                     Slice(current_user_key)) != 0) {
917        // First occurrence of this user key
918        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
919        has_current_user_key = true;
920        last_sequence_for_key = kMaxSequenceNumber;
921      }
922
923      if (last_sequence_for_key <= compact->smallest_snapshot) {
924        // Hidden by an newer entry for same user key
925        drop = true;    // (A)
926      } else if (ikey.type == kTypeDeletion &&
927                 ikey.sequence <= compact->smallest_snapshot &&
928                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
929        // For this user key:
930        // (1) there is no data in higher levels
931        // (2) data in lower levels will have larger sequence numbers
932        // (3) data in layers that are being compacted here and have
933        //     smaller sequence numbers will be dropped in the next
934        //     few iterations of this loop (by rule (A) above).
935        // Therefore this deletion marker is obsolete and can be dropped.
936        drop = true;
937      }
938
939      last_sequence_for_key = ikey.sequence;
940    }
941#if 0
942    Log(options_.info_log,
943        "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
944        "%d smallest_snapshot: %d",
945        ikey.user_key.ToString().c_str(),
946        (int)ikey.sequence, ikey.type, kTypeValue, drop,
947        compact->compaction->IsBaseLevelForKey(ikey.user_key),
948        (int)last_sequence_for_key, (int)compact->smallest_snapshot);
949#endif
950
951    if (!drop) {
952      // Open output file if necessary
953      if (compact->builder == NULL) {
954        status = OpenCompactionOutputFile(compact);
955        if (!status.ok()) {
956          break;
957        }
958      }
959      if (compact->builder->NumEntries() == 0) {
960        compact->current_output()->smallest.DecodeFrom(key);
961      }
962      compact->current_output()->largest.DecodeFrom(key);
963      compact->builder->Add(key, input->value());
964
965      // Close output file if it is big enough
966      if (compact->builder->FileSize() >=
967          compact->compaction->MaxOutputFileSize()) {
968        status = FinishCompactionOutputFile(compact, input);
969        if (!status.ok()) {
970          break;
971        }
972      }
973    }
974
975    input->Next();
976  }
977
978  if (status.ok() && shutting_down_.Acquire_Load()) {
979    status = Status::IOError("Deleting DB during compaction");
980  }
981  if (status.ok() && compact->builder != NULL) {
982    status = FinishCompactionOutputFile(compact, input);
983  }
984  if (status.ok()) {
985    status = input->status();
986  }
987  delete input;
988  input = NULL;
989
990  CompactionStats stats;
991  stats.micros = env_->NowMicros() - start_micros - imm_micros;
992  for (int which = 0; which < 2; which++) {
993    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
994      stats.bytes_read += compact->compaction->input(which, i)->file_size;
995    }
996  }
997  for (size_t i = 0; i < compact->outputs.size(); i++) {
998    stats.bytes_written += compact->outputs[i].file_size;
999  }
1000
1001  mutex_.Lock();
1002  stats_[compact->compaction->level() + 1].Add(stats);
1003
1004  if (status.ok()) {
1005    status = InstallCompactionResults(compact);
1006  }
1007  if (!status.ok()) {
1008    RecordBackgroundError(status);
1009  }
1010  VersionSet::LevelSummaryStorage tmp;
1011  Log(options_.info_log,
1012      "compacted to: %s", versions_->LevelSummary(&tmp));
1013  return status;
1014}
1015
1016namespace {
1017struct IterState {
1018  port::Mutex* mu;
1019  Version* version;
1020  MemTable* mem;
1021  MemTable* imm;
1022};
1023
1024static void CleanupIteratorState(void* arg1, void* arg2) {
1025  IterState* state = reinterpret_cast<IterState*>(arg1);
1026  state->mu->Lock();
1027  state->mem->Unref();
1028  if (state->imm != NULL) state->imm->Unref();
1029  state->version->Unref();
1030  state->mu->Unlock();
1031  delete state;
1032}
1033}  // namespace
1034
1035Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
1036                                      SequenceNumber* latest_snapshot,
1037                                      uint32_t* seed) {
1038  IterState* cleanup = new IterState;
1039  mutex_.Lock();
1040  *latest_snapshot = versions_->LastSequence();
1041
1042  // Collect together all needed child iterators
1043  std::vector<Iterator*> list;
1044  list.push_back(mem_->NewIterator());
1045  mem_->Ref();
1046  if (imm_ != NULL) {
1047    list.push_back(imm_->NewIterator());
1048    imm_->Ref();
1049  }
1050  versions_->current()->AddIterators(options, &list);
1051  Iterator* internal_iter =
1052      NewMergingIterator(&internal_comparator_, &list[0], list.size());
1053  versions_->current()->Ref();
1054
1055  cleanup->mu = &mutex_;
1056  cleanup->mem = mem_;
1057  cleanup->imm = imm_;
1058  cleanup->version = versions_->current();
1059  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
1060
1061  *seed = ++seed_;
1062  mutex_.Unlock();
1063  return internal_iter;
1064}
1065
1066Iterator* DBImpl::TEST_NewInternalIterator() {
1067  SequenceNumber ignored;
1068  uint32_t ignored_seed;
1069  return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1070}
1071
1072int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1073  MutexLock l(&mutex_);
1074  return versions_->MaxNextLevelOverlappingBytes();
1075}
1076
1077Status DBImpl::Get(const ReadOptions& options,
1078                   const Slice& key,
1079                   std::string* value) {
1080  Status s;
1081  MutexLock l(&mutex_);
1082  SequenceNumber snapshot;
1083  if (options.snapshot != NULL) {
1084    snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
1085  } else {
1086    snapshot = versions_->LastSequence();
1087  }
1088
1089  MemTable* mem = mem_;
1090  MemTable* imm = imm_;
1091  Version* current = versions_->current();
1092  mem->Ref();
1093  if (imm != NULL) imm->Ref();
1094  current->Ref();
1095
1096  bool have_stat_update = false;
1097  Version::GetStats stats;
1098
1099  // Unlock while reading from files and memtables
1100  {
1101    mutex_.Unlock();
1102    // First look in the memtable, then in the immutable memtable (if any).
1103    LookupKey lkey(key, snapshot);
1104    if (mem->Get(lkey, value, &s)) {
1105      // Done
1106    } else if (imm != NULL && imm->Get(lkey, value, &s)) {
1107      // Done
1108    } else {
1109      s = current->Get(options, lkey, value, &stats);
1110      have_stat_update = true;
1111    }
1112    mutex_.Lock();
1113  }
1114
1115  if (have_stat_update && current->UpdateStats(stats)) {
1116    MaybeScheduleCompaction();
1117  }
1118  mem->Unref();
1119  if (imm != NULL) imm->Unref();
1120  current->Unref();
1121  return s;
1122}
1123
1124Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1125  SequenceNumber latest_snapshot;
1126  uint32_t seed;
1127  Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1128  return NewDBIterator(
1129      this, user_comparator(), iter,
1130      (options.snapshot != NULL
1131       ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
1132       : latest_snapshot),
1133      seed);
1134}
1135
1136void DBImpl::RecordReadSample(Slice key) {
1137  MutexLock l(&mutex_);
1138  if (versions_->current()->RecordReadSample(key)) {
1139    MaybeScheduleCompaction();
1140  }
1141}
1142
1143const Snapshot* DBImpl::GetSnapshot() {
1144  MutexLock l(&mutex_);
1145  return snapshots_.New(versions_->LastSequence());
1146}
1147
1148void DBImpl::ReleaseSnapshot(const Snapshot* s) {
1149  MutexLock l(&mutex_);
1150  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
1151}
1152
1153// Convenience methods
1154Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1155  return DB::Put(o, key, val);
1156}
1157
1158Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1159  return DB::Delete(options, key);
1160}
1161
1162Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1163  Writer w(&mutex_);
1164  w.batch = my_batch;
1165  w.sync = options.sync;
1166  w.done = false;
1167
1168  MutexLock l(&mutex_);
1169  writers_.push_back(&w);
1170  while (!w.done && &w != writers_.front()) {
1171    w.cv.Wait();
1172  }
1173  if (w.done) {
1174    return w.status;
1175  }
1176
1177  // May temporarily unlock and wait.
1178  Status status = MakeRoomForWrite(my_batch == NULL);
1179  uint64_t last_sequence = versions_->LastSequence();
1180  Writer* last_writer = &w;
1181  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
1182    WriteBatch* updates = BuildBatchGroup(&last_writer);
1183    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
1184    last_sequence += WriteBatchInternal::Count(updates);
1185
1186    // Add to log and apply to memtable.  We can release the lock
1187    // during this phase since &w is currently responsible for logging
1188    // and protects against concurrent loggers and concurrent writes
1189    // into mem_.
1190    {
1191      mutex_.Unlock();
1192      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
1193      bool sync_error = false;
1194      if (status.ok() && options.sync) {
1195        status = logfile_->Sync();
1196        if (!status.ok()) {
1197          sync_error = true;
1198        }
1199      }
1200      if (status.ok()) {
1201        status = WriteBatchInternal::InsertInto(updates, mem_);
1202      }
1203      mutex_.Lock();
1204      if (sync_error) {
1205        // The state of the log file is indeterminate: the log record we
1206        // just added may or may not show up when the DB is re-opened.
1207        // So we force the DB into a mode where all future writes fail.
1208        RecordBackgroundError(status);
1209      }
1210    }
1211    if (updates == tmp_batch_) tmp_batch_->Clear();
1212
1213    versions_->SetLastSequence(last_sequence);
1214  }
1215
1216  while (true) {
1217    Writer* ready = writers_.front();
1218    writers_.pop_front();
1219    if (ready != &w) {
1220      ready->status = status;
1221      ready->done = true;
1222      ready->cv.Signal();
1223    }
1224    if (ready == last_writer) break;
1225  }
1226
1227  // Notify new head of write queue
1228  if (!writers_.empty()) {
1229    writers_.front()->cv.Signal();
1230  }
1231
1232  return status;
1233}
1234
1235// REQUIRES: Writer list must be non-empty
1236// REQUIRES: First writer must have a non-NULL batch
1237WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1238  assert(!writers_.empty());
1239  Writer* first = writers_.front();
1240  WriteBatch* result = first->batch;
1241  assert(result != NULL);
1242
1243  size_t size = WriteBatchInternal::ByteSize(first->batch);
1244
1245  // Allow the group to grow up to a maximum size, but if the
1246  // original write is small, limit the growth so we do not slow
1247  // down the small write too much.
1248  size_t max_size = 1 << 20;
1249  if (size <= (128<<10)) {
1250    max_size = size + (128<<10);
1251  }
1252
1253  *last_writer = first;
1254  std::deque<Writer*>::iterator iter = writers_.begin();
1255  ++iter;  // Advance past "first"
1256  for (; iter != writers_.end(); ++iter) {
1257    Writer* w = *iter;
1258    if (w->sync && !first->sync) {
1259      // Do not include a sync write into a batch handled by a non-sync write.
1260      break;
1261    }
1262
1263    if (w->batch != NULL) {
1264      size += WriteBatchInternal::ByteSize(w->batch);
1265      if (size > max_size) {
1266        // Do not make batch too big
1267        break;
1268      }
1269
1270      // Append to *reuslt
1271      if (result == first->batch) {
1272        // Switch to temporary batch instead of disturbing caller's batch
1273        result = tmp_batch_;
1274        assert(WriteBatchInternal::Count(result) == 0);
1275        WriteBatchInternal::Append(result, first->batch);
1276      }
1277      WriteBatchInternal::Append(result, w->batch);
1278    }
1279    *last_writer = w;
1280  }
1281  return result;
1282}
1283
1284// REQUIRES: mutex_ is held
1285// REQUIRES: this thread is currently at the front of the writer queue
1286Status DBImpl::MakeRoomForWrite(bool force) {
1287  mutex_.AssertHeld();
1288  assert(!writers_.empty());
1289  bool allow_delay = !force;
1290  Status s;
1291  while (true) {
1292    if (!bg_error_.ok()) {
1293      // Yield previous error
1294      s = bg_error_;
1295      break;
1296    } else if (
1297        allow_delay &&
1298        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
1299      // We are getting close to hitting a hard limit on the number of
1300      // L0 files.  Rather than delaying a single write by several
1301      // seconds when we hit the hard limit, start delaying each
1302      // individual write by 1ms to reduce latency variance.  Also,
1303      // this delay hands over some CPU to the compaction thread in
1304      // case it is sharing the same core as the writer.
1305      mutex_.Unlock();
1306      env_->SleepForMicroseconds(1000);
1307      allow_delay = false;  // Do not delay a single write more than once
1308      mutex_.Lock();
1309    } else if (!force &&
1310               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1311      // There is room in current memtable
1312      break;
1313    } else if (imm_ != NULL) {
1314      // We have filled up the current memtable, but the previous
1315      // one is still being compacted, so we wait.
1316      Log(options_.info_log, "Current memtable full; waiting...\n");
1317      bg_cv_.Wait();
1318    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1319      // There are too many level-0 files.
1320      Log(options_.info_log, "Too many L0 files; waiting...\n");
1321      bg_cv_.Wait();
1322    } else {
1323      // Attempt to switch to a new memtable and trigger compaction of old
1324      assert(versions_->PrevLogNumber() == 0);
1325      uint64_t new_log_number = versions_->NewFileNumber();
1326      WritableFile* lfile = NULL;
1327      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1328      if (!s.ok()) {
1329        // Avoid chewing through file number space in a tight loop.
1330        versions_->ReuseFileNumber(new_log_number);
1331        break;
1332      }
1333      delete log_;
1334      delete logfile_;
1335      logfile_ = lfile;
1336      logfile_number_ = new_log_number;
1337      log_ = new log::Writer(lfile);
1338      imm_ = mem_;
1339      has_imm_.Release_Store(imm_);
1340      mem_ = new MemTable(internal_comparator_);
1341      mem_->Ref();
1342      force = false;   // Do not force another compaction if have room
1343      MaybeScheduleCompaction();
1344    }
1345  }
1346  return s;
1347}
1348
1349bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1350  value->clear();
1351
1352  MutexLock l(&mutex_);
1353  Slice in = property;
1354  Slice prefix("leveldb.");
1355  if (!in.starts_with(prefix)) return false;
1356  in.remove_prefix(prefix.size());
1357
1358  if (in.starts_with("num-files-at-level")) {
1359    in.remove_prefix(strlen("num-files-at-level"));
1360    uint64_t level;
1361    bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1362    if (!ok || level >= config::kNumLevels) {
1363      return false;
1364    } else {
1365      char buf[100];
1366      snprintf(buf, sizeof(buf), "%d",
1367               versions_->NumLevelFiles(static_cast<int>(level)));
1368      *value = buf;
1369      return true;
1370    }
1371  } else if (in == "stats") {
1372    char buf[200];
1373    snprintf(buf, sizeof(buf),
1374             "                               Compactions\n"
1375             "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1376             "--------------------------------------------------\n"
1377             );
1378    value->append(buf);
1379    for (int level = 0; level < config::kNumLevels; level++) {
1380      int files = versions_->NumLevelFiles(level);
1381      if (stats_[level].micros > 0 || files > 0) {
1382        snprintf(
1383            buf, sizeof(buf),
1384            "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1385            level,
1386            files,
1387            versions_->NumLevelBytes(level) / 1048576.0,
1388            stats_[level].micros / 1e6,
1389            stats_[level].bytes_read / 1048576.0,
1390            stats_[level].bytes_written / 1048576.0);
1391        value->append(buf);
1392      }
1393    }
1394    return true;
1395  } else if (in == "sstables") {
1396    *value = versions_->current()->DebugString();
1397    return true;
1398  }
1399
1400  return false;
1401}
1402
1403void DBImpl::GetApproximateSizes(
1404    const Range* range, int n,
1405    uint64_t* sizes) {
1406  // TODO(opt): better implementation
1407  Version* v;
1408  {
1409    MutexLock l(&mutex_);
1410    versions_->current()->Ref();
1411    v = versions_->current();
1412  }
1413
1414  for (int i = 0; i < n; i++) {
1415    // Convert user_key into a corresponding internal key.
1416    InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1417    InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1418    uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1419    uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1420    sizes[i] = (limit >= start ? limit - start : 0);
1421  }
1422
1423  {
1424    MutexLock l(&mutex_);
1425    v->Unref();
1426  }
1427}
1428
1429// Default implementations of convenience methods that subclasses of DB
1430// can call if they wish
1431Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1432  WriteBatch batch;
1433  batch.Put(key, value);
1434  return Write(opt, &batch);
1435}
1436
1437Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1438  WriteBatch batch;
1439  batch.Delete(key);
1440  return Write(opt, &batch);
1441}
1442
1443DB::~DB() { }
1444
1445Status DB::Open(const Options& options, const std::string& dbname,
1446                DB** dbptr) {
1447  *dbptr = NULL;
1448
1449  DBImpl* impl = new DBImpl(options, dbname);
1450  impl->mutex_.Lock();
1451  VersionEdit edit;
1452  Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
1453  if (s.ok()) {
1454    uint64_t new_log_number = impl->versions_->NewFileNumber();
1455    WritableFile* lfile;
1456    s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1457                                     &lfile);
1458    if (s.ok()) {
1459      edit.SetLogNumber(new_log_number);
1460      impl->logfile_ = lfile;
1461      impl->logfile_number_ = new_log_number;
1462      impl->log_ = new log::Writer(lfile);
1463      s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1464    }
1465    if (s.ok()) {
1466      impl->DeleteObsoleteFiles();
1467      impl->MaybeScheduleCompaction();
1468    }
1469  }
1470  impl->mutex_.Unlock();
1471  if (s.ok()) {
1472    *dbptr = impl;
1473  } else {
1474    delete impl;
1475  }
1476  return s;
1477}
1478
1479Snapshot::~Snapshot() {
1480}
1481
1482Status DestroyDB(const std::string& dbname, const Options& options) {
1483  Env* env = options.env;
1484  std::vector<std::string> filenames;
1485  // Ignore error in case directory does not exist
1486  env->GetChildren(dbname, &filenames);
1487  if (filenames.empty()) {
1488    return Status::OK();
1489  }
1490
1491  FileLock* lock;
1492  const std::string lockname = LockFileName(dbname);
1493  Status result = env->LockFile(lockname, &lock);
1494  if (result.ok()) {
1495    uint64_t number;
1496    FileType type;
1497    for (size_t i = 0; i < filenames.size(); i++) {
1498      if (ParseFileName(filenames[i], &number, &type) &&
1499          type != kDBLockFile) {  // Lock file will be deleted at end
1500        Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1501        if (result.ok() && !del.ok()) {
1502          result = del;
1503        }
1504      }
1505    }
1506    env->UnlockFile(lock);  // Ignore error since state is already gone
1507    env->DeleteFile(lockname);
1508    env->DeleteDir(dbname);  // Ignore error in case dir contains other files
1509  }
1510  return result;
1511}
1512
1513}  // namespace leveldb
1514