1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "db/db_impl.h"
6
7#include <algorithm>
8#include <set>
9#include <string>
10#include <stdint.h>
11#include <stdio.h>
12#include <vector>
13#include "db/builder.h"
14#include "db/db_iter.h"
15#include "db/dbformat.h"
16#include "db/filename.h"
17#include "db/log_reader.h"
18#include "db/log_writer.h"
19#include "db/memtable.h"
20#include "db/table_cache.h"
21#include "db/version_set.h"
22#include "db/write_batch_internal.h"
23#include "leveldb/db.h"
24#include "leveldb/env.h"
25#include "leveldb/status.h"
26#include "leveldb/table.h"
27#include "leveldb/table_builder.h"
28#include "port/port.h"
29#include "table/block.h"
30#include "table/merger.h"
31#include "table/two_level_iterator.h"
32#include "util/coding.h"
33#include "util/logging.h"
34#include "util/mutexlock.h"
35
36namespace leveldb {
37
38const int kNumNonTableCacheFiles = 10;
39
40// Information kept for every waiting writer
41struct DBImpl::Writer {
42  Status status;
43  WriteBatch* batch;
44  bool sync;
45  bool done;
46  port::CondVar cv;
47
48  explicit Writer(port::Mutex* mu) : cv(mu) { }
49};
50
51struct DBImpl::CompactionState {
52  Compaction* const compaction;
53
54  // Sequence numbers < smallest_snapshot are not significant since we
55  // will never have to service a snapshot below smallest_snapshot.
56  // Therefore if we have seen a sequence number S <= smallest_snapshot,
57  // we can drop all entries for the same key with sequence numbers < S.
58  SequenceNumber smallest_snapshot;
59
60  // Files produced by compaction
61  struct Output {
62    uint64_t number;
63    uint64_t file_size;
64    InternalKey smallest, largest;
65  };
66  std::vector<Output> outputs;
67
68  // State kept for output being generated
69  WritableFile* outfile;
70  TableBuilder* builder;
71
72  uint64_t total_bytes;
73
74  Output* current_output() { return &outputs[outputs.size()-1]; }
75
76  explicit CompactionState(Compaction* c)
77      : compaction(c),
78        outfile(NULL),
79        builder(NULL),
80        total_bytes(0) {
81  }
82};
83
84// Fix user-supplied options to be reasonable
85template <class T,class V>
86static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
87  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
88  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
89}
90Options SanitizeOptions(const std::string& dbname,
91                        const InternalKeyComparator* icmp,
92                        const InternalFilterPolicy* ipolicy,
93                        const Options& src) {
94  Options result = src;
95  result.comparator = icmp;
96  result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
97  ClipToRange(&result.max_open_files,    64 + kNumNonTableCacheFiles, 50000);
98  ClipToRange(&result.write_buffer_size, 64<<10,                      1<<30);
99  ClipToRange(&result.block_size,        1<<10,                       4<<20);
100  if (result.info_log == NULL) {
101    // Open a log file in the same directory as the db
102    src.env->CreateDir(dbname);  // In case it does not exist
103    src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
104    Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
105    if (!s.ok()) {
106      // No place suitable for logging
107      result.info_log = NULL;
108    }
109  }
110  if (result.block_cache == NULL) {
111    result.block_cache = NewLRUCache(8 << 20);
112  }
113  return result;
114}
115
116DBImpl::DBImpl(const Options& options, const std::string& dbname)
117    : env_(options.env),
118      internal_comparator_(options.comparator),
119      internal_filter_policy_(options.filter_policy),
120      options_(SanitizeOptions(
121          dbname, &internal_comparator_, &internal_filter_policy_, options)),
122      owns_info_log_(options_.info_log != options.info_log),
123      owns_cache_(options_.block_cache != options.block_cache),
124      dbname_(dbname),
125      db_lock_(NULL),
126      shutting_down_(NULL),
127      bg_cv_(&mutex_),
128      mem_(new MemTable(internal_comparator_)),
129      imm_(NULL),
130      logfile_(NULL),
131      logfile_number_(0),
132      log_(NULL),
133      tmp_batch_(new WriteBatch),
134      bg_compaction_scheduled_(false),
135      manual_compaction_(NULL),
136      consecutive_compaction_errors_(0) {
137  mem_->Ref();
138  has_imm_.Release_Store(NULL);
139
140  // Reserve ten files or so for other uses and give the rest to TableCache.
141  const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles;
142  table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
143
144  versions_ = new VersionSet(dbname_, &options_, table_cache_,
145                             &internal_comparator_);
146}
147
148DBImpl::~DBImpl() {
149  // Wait for background work to finish
150  mutex_.Lock();
151  shutting_down_.Release_Store(this);  // Any non-NULL value is ok
152  while (bg_compaction_scheduled_) {
153    bg_cv_.Wait();
154  }
155  mutex_.Unlock();
156
157  if (db_lock_ != NULL) {
158    env_->UnlockFile(db_lock_);
159  }
160
161  delete versions_;
162  if (mem_ != NULL) mem_->Unref();
163  if (imm_ != NULL) imm_->Unref();
164  delete tmp_batch_;
165  delete log_;
166  delete logfile_;
167  delete table_cache_;
168
169  if (owns_info_log_) {
170    delete options_.info_log;
171  }
172  if (owns_cache_) {
173    delete options_.block_cache;
174  }
175}
176
177Status DBImpl::NewDB() {
178  VersionEdit new_db;
179  new_db.SetComparatorName(user_comparator()->Name());
180  new_db.SetLogNumber(0);
181  new_db.SetNextFile(2);
182  new_db.SetLastSequence(0);
183
184  const std::string manifest = DescriptorFileName(dbname_, 1);
185  WritableFile* file;
186  Status s = env_->NewWritableFile(manifest, &file);
187  if (!s.ok()) {
188    return s;
189  }
190  {
191    log::Writer log(file);
192    std::string record;
193    new_db.EncodeTo(&record);
194    s = log.AddRecord(record);
195    if (s.ok()) {
196      s = file->Close();
197    }
198  }
199  delete file;
200  if (s.ok()) {
201    // Make "CURRENT" file that points to the new manifest file.
202    s = SetCurrentFile(env_, dbname_, 1);
203  } else {
204    env_->DeleteFile(manifest);
205  }
206  return s;
207}
208
209void DBImpl::MaybeIgnoreError(Status* s) const {
210  if (s->ok() || options_.paranoid_checks) {
211    // No change needed
212  } else {
213    Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
214    *s = Status::OK();
215  }
216}
217
218void DBImpl::DeleteObsoleteFiles() {
219  // Make a set of all of the live files
220  std::set<uint64_t> live = pending_outputs_;
221  versions_->AddLiveFiles(&live);
222
223  std::vector<std::string> filenames;
224  env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
225  uint64_t number;
226  FileType type;
227  for (size_t i = 0; i < filenames.size(); i++) {
228    if (ParseFileName(filenames[i], &number, &type)) {
229      bool keep = true;
230      switch (type) {
231        case kLogFile:
232          keep = ((number >= versions_->LogNumber()) ||
233                  (number == versions_->PrevLogNumber()));
234          break;
235        case kDescriptorFile:
236          // Keep my manifest file, and any newer incarnations'
237          // (in case there is a race that allows other incarnations)
238          keep = (number >= versions_->ManifestFileNumber());
239          break;
240        case kTableFile:
241          keep = (live.find(number) != live.end());
242          break;
243        case kTempFile:
244          // Any temp files that are currently being written to must
245          // be recorded in pending_outputs_, which is inserted into "live"
246          keep = (live.find(number) != live.end());
247          break;
248        case kCurrentFile:
249        case kDBLockFile:
250        case kInfoLogFile:
251          keep = true;
252          break;
253      }
254
255      if (!keep) {
256        if (type == kTableFile) {
257          table_cache_->Evict(number);
258        }
259        Log(options_.info_log, "Delete type=%d #%lld\n",
260            int(type),
261            static_cast<unsigned long long>(number));
262        env_->DeleteFile(dbname_ + "/" + filenames[i]);
263      }
264    }
265  }
266}
267
268Status DBImpl::Recover(VersionEdit* edit) {
269  mutex_.AssertHeld();
270
271  // Ignore error from CreateDir since the creation of the DB is
272  // committed only when the descriptor is created, and this directory
273  // may already exist from a previous failed creation attempt.
274  env_->CreateDir(dbname_);
275  assert(db_lock_ == NULL);
276  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
277  if (!s.ok()) {
278    return s;
279  }
280
281  if (!env_->FileExists(CurrentFileName(dbname_))) {
282    if (options_.create_if_missing) {
283      s = NewDB();
284      if (!s.ok()) {
285        return s;
286      }
287    } else {
288      return Status::InvalidArgument(
289          dbname_, "does not exist (create_if_missing is false)");
290    }
291  } else {
292    if (options_.error_if_exists) {
293      return Status::InvalidArgument(
294          dbname_, "exists (error_if_exists is true)");
295    }
296  }
297
298  s = versions_->Recover();
299  if (s.ok()) {
300    SequenceNumber max_sequence(0);
301
302    // Recover from all newer log files than the ones named in the
303    // descriptor (new log files may have been added by the previous
304    // incarnation without registering them in the descriptor).
305    //
306    // Note that PrevLogNumber() is no longer used, but we pay
307    // attention to it in case we are recovering a database
308    // produced by an older version of leveldb.
309    const uint64_t min_log = versions_->LogNumber();
310    const uint64_t prev_log = versions_->PrevLogNumber();
311    std::vector<std::string> filenames;
312    s = env_->GetChildren(dbname_, &filenames);
313    if (!s.ok()) {
314      return s;
315    }
316    std::set<uint64_t> expected;
317    versions_->AddLiveFiles(&expected);
318    uint64_t number;
319    FileType type;
320    std::vector<uint64_t> logs;
321    for (size_t i = 0; i < filenames.size(); i++) {
322      if (ParseFileName(filenames[i], &number, &type)) {
323        expected.erase(number);
324        if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
325          logs.push_back(number);
326      }
327    }
328    if (!expected.empty()) {
329      char buf[50];
330      snprintf(buf, sizeof(buf), "%d missing files; e.g.",
331               static_cast<int>(expected.size()));
332      return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
333    }
334
335    // Recover in the order in which the logs were generated
336    std::sort(logs.begin(), logs.end());
337    for (size_t i = 0; i < logs.size(); i++) {
338      s = RecoverLogFile(logs[i], edit, &max_sequence);
339
340      // The previous incarnation may not have written any MANIFEST
341      // records after allocating this log number.  So we manually
342      // update the file number allocation counter in VersionSet.
343      versions_->MarkFileNumberUsed(logs[i]);
344    }
345
346    if (s.ok()) {
347      if (versions_->LastSequence() < max_sequence) {
348        versions_->SetLastSequence(max_sequence);
349      }
350    }
351  }
352
353  return s;
354}
355
356Status DBImpl::RecoverLogFile(uint64_t log_number,
357                              VersionEdit* edit,
358                              SequenceNumber* max_sequence) {
359  struct LogReporter : public log::Reader::Reporter {
360    Env* env;
361    Logger* info_log;
362    const char* fname;
363    Status* status;  // NULL if options_.paranoid_checks==false
364    virtual void Corruption(size_t bytes, const Status& s) {
365      Log(info_log, "%s%s: dropping %d bytes; %s",
366          (this->status == NULL ? "(ignoring error) " : ""),
367          fname, static_cast<int>(bytes), s.ToString().c_str());
368      if (this->status != NULL && this->status->ok()) *this->status = s;
369    }
370  };
371
372  mutex_.AssertHeld();
373
374  // Open the log file
375  std::string fname = LogFileName(dbname_, log_number);
376  SequentialFile* file;
377  Status status = env_->NewSequentialFile(fname, &file);
378  if (!status.ok()) {
379    MaybeIgnoreError(&status);
380    return status;
381  }
382
383  // Create the log reader.
384  LogReporter reporter;
385  reporter.env = env_;
386  reporter.info_log = options_.info_log;
387  reporter.fname = fname.c_str();
388  reporter.status = (options_.paranoid_checks ? &status : NULL);
389  // We intentially make log::Reader do checksumming even if
390  // paranoid_checks==false so that corruptions cause entire commits
391  // to be skipped instead of propagating bad information (like overly
392  // large sequence numbers).
393  log::Reader reader(file, &reporter, true/*checksum*/,
394                     0/*initial_offset*/);
395  Log(options_.info_log, "Recovering log #%llu",
396      (unsigned long long) log_number);
397
398  // Read all the records and add to a memtable
399  std::string scratch;
400  Slice record;
401  WriteBatch batch;
402  MemTable* mem = NULL;
403  while (reader.ReadRecord(&record, &scratch) &&
404         status.ok()) {
405    if (record.size() < 12) {
406      reporter.Corruption(
407          record.size(), Status::Corruption("log record too small"));
408      continue;
409    }
410    WriteBatchInternal::SetContents(&batch, record);
411
412    if (mem == NULL) {
413      mem = new MemTable(internal_comparator_);
414      mem->Ref();
415    }
416    status = WriteBatchInternal::InsertInto(&batch, mem);
417    MaybeIgnoreError(&status);
418    if (!status.ok()) {
419      break;
420    }
421    const SequenceNumber last_seq =
422        WriteBatchInternal::Sequence(&batch) +
423        WriteBatchInternal::Count(&batch) - 1;
424    if (last_seq > *max_sequence) {
425      *max_sequence = last_seq;
426    }
427
428    if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
429      status = WriteLevel0Table(mem, edit, NULL);
430      if (!status.ok()) {
431        // Reflect errors immediately so that conditions like full
432        // file-systems cause the DB::Open() to fail.
433        break;
434      }
435      mem->Unref();
436      mem = NULL;
437    }
438  }
439
440  if (status.ok() && mem != NULL) {
441    status = WriteLevel0Table(mem, edit, NULL);
442    // Reflect errors immediately so that conditions like full
443    // file-systems cause the DB::Open() to fail.
444  }
445
446  if (mem != NULL) mem->Unref();
447  delete file;
448  return status;
449}
450
451Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
452                                Version* base) {
453  mutex_.AssertHeld();
454  const uint64_t start_micros = env_->NowMicros();
455  FileMetaData meta;
456  meta.number = versions_->NewFileNumber();
457  pending_outputs_.insert(meta.number);
458  Iterator* iter = mem->NewIterator();
459  Log(options_.info_log, "Level-0 table #%llu: started",
460      (unsigned long long) meta.number);
461
462  Status s;
463  {
464    mutex_.Unlock();
465    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
466    mutex_.Lock();
467  }
468
469  Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
470      (unsigned long long) meta.number,
471      (unsigned long long) meta.file_size,
472      s.ToString().c_str());
473  delete iter;
474  pending_outputs_.erase(meta.number);
475
476
477  // Note that if file_size is zero, the file has been deleted and
478  // should not be added to the manifest.
479  int level = 0;
480  if (s.ok() && meta.file_size > 0) {
481    const Slice min_user_key = meta.smallest.user_key();
482    const Slice max_user_key = meta.largest.user_key();
483    if (base != NULL) {
484      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
485    }
486    edit->AddFile(level, meta.number, meta.file_size,
487                  meta.smallest, meta.largest);
488  }
489
490  CompactionStats stats;
491  stats.micros = env_->NowMicros() - start_micros;
492  stats.bytes_written = meta.file_size;
493  stats_[level].Add(stats);
494  return s;
495}
496
497Status DBImpl::CompactMemTable() {
498  mutex_.AssertHeld();
499  assert(imm_ != NULL);
500
501  // Save the contents of the memtable as a new Table
502  VersionEdit edit;
503  Version* base = versions_->current();
504  base->Ref();
505  Status s = WriteLevel0Table(imm_, &edit, base);
506  base->Unref();
507
508  if (s.ok() && shutting_down_.Acquire_Load()) {
509    s = Status::IOError("Deleting DB during memtable compaction");
510  }
511
512  // Replace immutable memtable with the generated Table
513  if (s.ok()) {
514    edit.SetPrevLogNumber(0);
515    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
516    s = versions_->LogAndApply(&edit, &mutex_);
517  }
518
519  if (s.ok()) {
520    // Commit to the new state
521    imm_->Unref();
522    imm_ = NULL;
523    has_imm_.Release_Store(NULL);
524    DeleteObsoleteFiles();
525  }
526
527  return s;
528}
529
530void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
531  int max_level_with_files = 1;
532  {
533    MutexLock l(&mutex_);
534    Version* base = versions_->current();
535    for (int level = 1; level < config::kNumLevels; level++) {
536      if (base->OverlapInLevel(level, begin, end)) {
537        max_level_with_files = level;
538      }
539    }
540  }
541  TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
542  for (int level = 0; level < max_level_with_files; level++) {
543    TEST_CompactRange(level, begin, end);
544  }
545}
546
547void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
548  assert(level >= 0);
549  assert(level + 1 < config::kNumLevels);
550
551  InternalKey begin_storage, end_storage;
552
553  ManualCompaction manual;
554  manual.level = level;
555  manual.done = false;
556  if (begin == NULL) {
557    manual.begin = NULL;
558  } else {
559    begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
560    manual.begin = &begin_storage;
561  }
562  if (end == NULL) {
563    manual.end = NULL;
564  } else {
565    end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
566    manual.end = &end_storage;
567  }
568
569  MutexLock l(&mutex_);
570  while (!manual.done) {
571    while (manual_compaction_ != NULL) {
572      bg_cv_.Wait();
573    }
574    manual_compaction_ = &manual;
575    MaybeScheduleCompaction();
576    while (manual_compaction_ == &manual) {
577      bg_cv_.Wait();
578    }
579  }
580}
581
582Status DBImpl::TEST_CompactMemTable() {
583  // NULL batch means just wait for earlier writes to be done
584  Status s = Write(WriteOptions(), NULL);
585  if (s.ok()) {
586    // Wait until the compaction completes
587    MutexLock l(&mutex_);
588    while (imm_ != NULL && bg_error_.ok()) {
589      bg_cv_.Wait();
590    }
591    if (imm_ != NULL) {
592      s = bg_error_;
593    }
594  }
595  return s;
596}
597
598void DBImpl::MaybeScheduleCompaction() {
599  mutex_.AssertHeld();
600  if (bg_compaction_scheduled_) {
601    // Already scheduled
602  } else if (shutting_down_.Acquire_Load()) {
603    // DB is being deleted; no more background compactions
604  } else if (imm_ == NULL &&
605             manual_compaction_ == NULL &&
606             !versions_->NeedsCompaction()) {
607    // No work to be done
608  } else {
609    bg_compaction_scheduled_ = true;
610    env_->Schedule(&DBImpl::BGWork, this);
611  }
612}
613
614void DBImpl::BGWork(void* db) {
615  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
616}
617
618void DBImpl::BackgroundCall() {
619  MutexLock l(&mutex_);
620  assert(bg_compaction_scheduled_);
621  if (!shutting_down_.Acquire_Load()) {
622    Status s = BackgroundCompaction();
623    if (s.ok()) {
624      // Success
625      consecutive_compaction_errors_ = 0;
626    } else if (shutting_down_.Acquire_Load()) {
627      // Error most likely due to shutdown; do not wait
628    } else {
629      // Wait a little bit before retrying background compaction in
630      // case this is an environmental problem and we do not want to
631      // chew up resources for failed compactions for the duration of
632      // the problem.
633      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
634      Log(options_.info_log, "Waiting after background compaction error: %s",
635          s.ToString().c_str());
636      mutex_.Unlock();
637      ++consecutive_compaction_errors_;
638      int seconds_to_sleep = 1;
639      for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
640        seconds_to_sleep *= 2;
641      }
642      env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
643      mutex_.Lock();
644    }
645  }
646
647  bg_compaction_scheduled_ = false;
648
649  // Previous compaction may have produced too many files in a level,
650  // so reschedule another compaction if needed.
651  MaybeScheduleCompaction();
652  bg_cv_.SignalAll();
653}
654
655Status DBImpl::BackgroundCompaction() {
656  mutex_.AssertHeld();
657
658  if (imm_ != NULL) {
659    return CompactMemTable();
660  }
661
662  Compaction* c;
663  bool is_manual = (manual_compaction_ != NULL);
664  InternalKey manual_end;
665  if (is_manual) {
666    ManualCompaction* m = manual_compaction_;
667    c = versions_->CompactRange(m->level, m->begin, m->end);
668    m->done = (c == NULL);
669    if (c != NULL) {
670      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
671    }
672    Log(options_.info_log,
673        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
674        m->level,
675        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
676        (m->end ? m->end->DebugString().c_str() : "(end)"),
677        (m->done ? "(end)" : manual_end.DebugString().c_str()));
678  } else {
679    c = versions_->PickCompaction();
680  }
681
682  Status status;
683  if (c == NULL) {
684    // Nothing to do
685  } else if (!is_manual && c->IsTrivialMove()) {
686    // Move file to next level
687    assert(c->num_input_files(0) == 1);
688    FileMetaData* f = c->input(0, 0);
689    c->edit()->DeleteFile(c->level(), f->number);
690    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
691                       f->smallest, f->largest);
692    status = versions_->LogAndApply(c->edit(), &mutex_);
693    VersionSet::LevelSummaryStorage tmp;
694    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
695        static_cast<unsigned long long>(f->number),
696        c->level() + 1,
697        static_cast<unsigned long long>(f->file_size),
698        status.ToString().c_str(),
699        versions_->LevelSummary(&tmp));
700  } else {
701    CompactionState* compact = new CompactionState(c);
702    status = DoCompactionWork(compact);
703    CleanupCompaction(compact);
704    c->ReleaseInputs();
705    DeleteObsoleteFiles();
706  }
707  delete c;
708
709  if (status.ok()) {
710    // Done
711  } else if (shutting_down_.Acquire_Load()) {
712    // Ignore compaction errors found during shutting down
713  } else {
714    Log(options_.info_log,
715        "Compaction error: %s", status.ToString().c_str());
716    if (options_.paranoid_checks && bg_error_.ok()) {
717      bg_error_ = status;
718    }
719  }
720
721  if (is_manual) {
722    ManualCompaction* m = manual_compaction_;
723    if (!status.ok()) {
724      m->done = true;
725    }
726    if (!m->done) {
727      // We only compacted part of the requested range.  Update *m
728      // to the range that is left to be compacted.
729      m->tmp_storage = manual_end;
730      m->begin = &m->tmp_storage;
731    }
732    manual_compaction_ = NULL;
733  }
734  return status;
735}
736
737void DBImpl::CleanupCompaction(CompactionState* compact) {
738  mutex_.AssertHeld();
739  if (compact->builder != NULL) {
740    // May happen if we get a shutdown call in the middle of compaction
741    compact->builder->Abandon();
742    delete compact->builder;
743  } else {
744    assert(compact->outfile == NULL);
745  }
746  delete compact->outfile;
747  for (size_t i = 0; i < compact->outputs.size(); i++) {
748    const CompactionState::Output& out = compact->outputs[i];
749    pending_outputs_.erase(out.number);
750  }
751  delete compact;
752}
753
754Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
755  assert(compact != NULL);
756  assert(compact->builder == NULL);
757  uint64_t file_number;
758  {
759    mutex_.Lock();
760    file_number = versions_->NewFileNumber();
761    pending_outputs_.insert(file_number);
762    CompactionState::Output out;
763    out.number = file_number;
764    out.smallest.Clear();
765    out.largest.Clear();
766    compact->outputs.push_back(out);
767    mutex_.Unlock();
768  }
769
770  // Make the output file
771  std::string fname = TableFileName(dbname_, file_number);
772  Status s = env_->NewWritableFile(fname, &compact->outfile);
773  if (s.ok()) {
774    compact->builder = new TableBuilder(options_, compact->outfile);
775  }
776  return s;
777}
778
779Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
780                                          Iterator* input) {
781  assert(compact != NULL);
782  assert(compact->outfile != NULL);
783  assert(compact->builder != NULL);
784
785  const uint64_t output_number = compact->current_output()->number;
786  assert(output_number != 0);
787
788  // Check for iterator errors
789  Status s = input->status();
790  const uint64_t current_entries = compact->builder->NumEntries();
791  if (s.ok()) {
792    s = compact->builder->Finish();
793  } else {
794    compact->builder->Abandon();
795  }
796  const uint64_t current_bytes = compact->builder->FileSize();
797  compact->current_output()->file_size = current_bytes;
798  compact->total_bytes += current_bytes;
799  delete compact->builder;
800  compact->builder = NULL;
801
802  // Finish and check for file errors
803  if (s.ok()) {
804    s = compact->outfile->Sync();
805  }
806  if (s.ok()) {
807    s = compact->outfile->Close();
808  }
809  delete compact->outfile;
810  compact->outfile = NULL;
811
812  if (s.ok() && current_entries > 0) {
813    // Verify that the table is usable
814    Iterator* iter = table_cache_->NewIterator(ReadOptions(),
815                                               output_number,
816                                               current_bytes);
817    s = iter->status();
818    delete iter;
819    if (s.ok()) {
820      Log(options_.info_log,
821          "Generated table #%llu: %lld keys, %lld bytes",
822          (unsigned long long) output_number,
823          (unsigned long long) current_entries,
824          (unsigned long long) current_bytes);
825    }
826  }
827  return s;
828}
829
830
831Status DBImpl::InstallCompactionResults(CompactionState* compact) {
832  mutex_.AssertHeld();
833  Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
834      compact->compaction->num_input_files(0),
835      compact->compaction->level(),
836      compact->compaction->num_input_files(1),
837      compact->compaction->level() + 1,
838      static_cast<long long>(compact->total_bytes));
839
840  // Add compaction outputs
841  compact->compaction->AddInputDeletions(compact->compaction->edit());
842  const int level = compact->compaction->level();
843  for (size_t i = 0; i < compact->outputs.size(); i++) {
844    const CompactionState::Output& out = compact->outputs[i];
845    compact->compaction->edit()->AddFile(
846        level + 1,
847        out.number, out.file_size, out.smallest, out.largest);
848  }
849  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
850}
851
852Status DBImpl::DoCompactionWork(CompactionState* compact) {
853  const uint64_t start_micros = env_->NowMicros();
854  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
855
856  Log(options_.info_log,  "Compacting %d@%d + %d@%d files",
857      compact->compaction->num_input_files(0),
858      compact->compaction->level(),
859      compact->compaction->num_input_files(1),
860      compact->compaction->level() + 1);
861
862  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
863  assert(compact->builder == NULL);
864  assert(compact->outfile == NULL);
865  if (snapshots_.empty()) {
866    compact->smallest_snapshot = versions_->LastSequence();
867  } else {
868    compact->smallest_snapshot = snapshots_.oldest()->number_;
869  }
870
871  // Release mutex while we're actually doing the compaction work
872  mutex_.Unlock();
873
874  Iterator* input = versions_->MakeInputIterator(compact->compaction);
875  input->SeekToFirst();
876  Status status;
877  ParsedInternalKey ikey;
878  std::string current_user_key;
879  bool has_current_user_key = false;
880  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
881  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
882    // Prioritize immutable compaction work
883    if (has_imm_.NoBarrier_Load() != NULL) {
884      const uint64_t imm_start = env_->NowMicros();
885      mutex_.Lock();
886      if (imm_ != NULL) {
887        CompactMemTable();
888        bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
889      }
890      mutex_.Unlock();
891      imm_micros += (env_->NowMicros() - imm_start);
892    }
893
894    Slice key = input->key();
895    if (compact->compaction->ShouldStopBefore(key) &&
896        compact->builder != NULL) {
897      status = FinishCompactionOutputFile(compact, input);
898      if (!status.ok()) {
899        break;
900      }
901    }
902
903    // Handle key/value, add to state, etc.
904    bool drop = false;
905    if (!ParseInternalKey(key, &ikey)) {
906      // Do not hide error keys
907      current_user_key.clear();
908      has_current_user_key = false;
909      last_sequence_for_key = kMaxSequenceNumber;
910    } else {
911      if (!has_current_user_key ||
912          user_comparator()->Compare(ikey.user_key,
913                                     Slice(current_user_key)) != 0) {
914        // First occurrence of this user key
915        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
916        has_current_user_key = true;
917        last_sequence_for_key = kMaxSequenceNumber;
918      }
919
920      if (last_sequence_for_key <= compact->smallest_snapshot) {
921        // Hidden by an newer entry for same user key
922        drop = true;    // (A)
923      } else if (ikey.type == kTypeDeletion &&
924                 ikey.sequence <= compact->smallest_snapshot &&
925                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
926        // For this user key:
927        // (1) there is no data in higher levels
928        // (2) data in lower levels will have larger sequence numbers
929        // (3) data in layers that are being compacted here and have
930        //     smaller sequence numbers will be dropped in the next
931        //     few iterations of this loop (by rule (A) above).
932        // Therefore this deletion marker is obsolete and can be dropped.
933        drop = true;
934      }
935
936      last_sequence_for_key = ikey.sequence;
937    }
938#if 0
939    Log(options_.info_log,
940        "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
941        "%d smallest_snapshot: %d",
942        ikey.user_key.ToString().c_str(),
943        (int)ikey.sequence, ikey.type, kTypeValue, drop,
944        compact->compaction->IsBaseLevelForKey(ikey.user_key),
945        (int)last_sequence_for_key, (int)compact->smallest_snapshot);
946#endif
947
948    if (!drop) {
949      // Open output file if necessary
950      if (compact->builder == NULL) {
951        status = OpenCompactionOutputFile(compact);
952        if (!status.ok()) {
953          break;
954        }
955      }
956      if (compact->builder->NumEntries() == 0) {
957        compact->current_output()->smallest.DecodeFrom(key);
958      }
959      compact->current_output()->largest.DecodeFrom(key);
960      compact->builder->Add(key, input->value());
961
962      // Close output file if it is big enough
963      if (compact->builder->FileSize() >=
964          compact->compaction->MaxOutputFileSize()) {
965        status = FinishCompactionOutputFile(compact, input);
966        if (!status.ok()) {
967          break;
968        }
969      }
970    }
971
972    input->Next();
973  }
974
975  if (status.ok() && shutting_down_.Acquire_Load()) {
976    status = Status::IOError("Deleting DB during compaction");
977  }
978  if (status.ok() && compact->builder != NULL) {
979    status = FinishCompactionOutputFile(compact, input);
980  }
981  if (status.ok()) {
982    status = input->status();
983  }
984  delete input;
985  input = NULL;
986
987  CompactionStats stats;
988  stats.micros = env_->NowMicros() - start_micros - imm_micros;
989  for (int which = 0; which < 2; which++) {
990    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
991      stats.bytes_read += compact->compaction->input(which, i)->file_size;
992    }
993  }
994  for (size_t i = 0; i < compact->outputs.size(); i++) {
995    stats.bytes_written += compact->outputs[i].file_size;
996  }
997
998  mutex_.Lock();
999  stats_[compact->compaction->level() + 1].Add(stats);
1000
1001  if (status.ok()) {
1002    status = InstallCompactionResults(compact);
1003  }
1004  VersionSet::LevelSummaryStorage tmp;
1005  Log(options_.info_log,
1006      "compacted to: %s", versions_->LevelSummary(&tmp));
1007  return status;
1008}
1009
1010namespace {
1011struct IterState {
1012  port::Mutex* mu;
1013  Version* version;
1014  MemTable* mem;
1015  MemTable* imm;
1016};
1017
1018static void CleanupIteratorState(void* arg1, void* arg2) {
1019  IterState* state = reinterpret_cast<IterState*>(arg1);
1020  state->mu->Lock();
1021  state->mem->Unref();
1022  if (state->imm != NULL) state->imm->Unref();
1023  state->version->Unref();
1024  state->mu->Unlock();
1025  delete state;
1026}
1027}  // namespace
1028
1029Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
1030                                      SequenceNumber* latest_snapshot) {
1031  IterState* cleanup = new IterState;
1032  mutex_.Lock();
1033  *latest_snapshot = versions_->LastSequence();
1034
1035  // Collect together all needed child iterators
1036  std::vector<Iterator*> list;
1037  list.push_back(mem_->NewIterator());
1038  mem_->Ref();
1039  if (imm_ != NULL) {
1040    list.push_back(imm_->NewIterator());
1041    imm_->Ref();
1042  }
1043  versions_->current()->AddIterators(options, &list);
1044  Iterator* internal_iter =
1045      NewMergingIterator(&internal_comparator_, &list[0], list.size());
1046  versions_->current()->Ref();
1047
1048  cleanup->mu = &mutex_;
1049  cleanup->mem = mem_;
1050  cleanup->imm = imm_;
1051  cleanup->version = versions_->current();
1052  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
1053
1054  mutex_.Unlock();
1055  return internal_iter;
1056}
1057
1058Iterator* DBImpl::TEST_NewInternalIterator() {
1059  SequenceNumber ignored;
1060  return NewInternalIterator(ReadOptions(), &ignored);
1061}
1062
1063int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1064  MutexLock l(&mutex_);
1065  return versions_->MaxNextLevelOverlappingBytes();
1066}
1067
1068Status DBImpl::Get(const ReadOptions& options,
1069                   const Slice& key,
1070                   std::string* value) {
1071  Status s;
1072  MutexLock l(&mutex_);
1073  SequenceNumber snapshot;
1074  if (options.snapshot != NULL) {
1075    snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
1076  } else {
1077    snapshot = versions_->LastSequence();
1078  }
1079
1080  MemTable* mem = mem_;
1081  MemTable* imm = imm_;
1082  Version* current = versions_->current();
1083  mem->Ref();
1084  if (imm != NULL) imm->Ref();
1085  current->Ref();
1086
1087  bool have_stat_update = false;
1088  Version::GetStats stats;
1089
1090  // Unlock while reading from files and memtables
1091  {
1092    mutex_.Unlock();
1093    // First look in the memtable, then in the immutable memtable (if any).
1094    LookupKey lkey(key, snapshot);
1095    if (mem->Get(lkey, value, &s)) {
1096      // Done
1097    } else if (imm != NULL && imm->Get(lkey, value, &s)) {
1098      // Done
1099    } else {
1100      s = current->Get(options, lkey, value, &stats);
1101      have_stat_update = true;
1102    }
1103    mutex_.Lock();
1104  }
1105
1106  if (have_stat_update && current->UpdateStats(stats)) {
1107    MaybeScheduleCompaction();
1108  }
1109  mem->Unref();
1110  if (imm != NULL) imm->Unref();
1111  current->Unref();
1112  return s;
1113}
1114
1115Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1116  SequenceNumber latest_snapshot;
1117  Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
1118  return NewDBIterator(
1119      &dbname_, env_, user_comparator(), internal_iter,
1120      (options.snapshot != NULL
1121       ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
1122       : latest_snapshot));
1123}
1124
1125const Snapshot* DBImpl::GetSnapshot() {
1126  MutexLock l(&mutex_);
1127  return snapshots_.New(versions_->LastSequence());
1128}
1129
1130void DBImpl::ReleaseSnapshot(const Snapshot* s) {
1131  MutexLock l(&mutex_);
1132  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
1133}
1134
1135// Convenience methods
1136Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1137  return DB::Put(o, key, val);
1138}
1139
1140Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1141  return DB::Delete(options, key);
1142}
1143
1144Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1145  Writer w(&mutex_);
1146  w.batch = my_batch;
1147  w.sync = options.sync;
1148  w.done = false;
1149
1150  MutexLock l(&mutex_);
1151  writers_.push_back(&w);
1152  while (!w.done && &w != writers_.front()) {
1153    w.cv.Wait();
1154  }
1155  if (w.done) {
1156    return w.status;
1157  }
1158
1159  // May temporarily unlock and wait.
1160  Status status = MakeRoomForWrite(my_batch == NULL);
1161  uint64_t last_sequence = versions_->LastSequence();
1162  Writer* last_writer = &w;
1163  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
1164    WriteBatch* updates = BuildBatchGroup(&last_writer);
1165    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
1166    last_sequence += WriteBatchInternal::Count(updates);
1167
1168    // Add to log and apply to memtable.  We can release the lock
1169    // during this phase since &w is currently responsible for logging
1170    // and protects against concurrent loggers and concurrent writes
1171    // into mem_.
1172    {
1173      mutex_.Unlock();
1174      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
1175      if (status.ok() && options.sync) {
1176        status = logfile_->Sync();
1177      }
1178      if (status.ok()) {
1179        status = WriteBatchInternal::InsertInto(updates, mem_);
1180      }
1181      mutex_.Lock();
1182    }
1183    if (updates == tmp_batch_) tmp_batch_->Clear();
1184
1185    versions_->SetLastSequence(last_sequence);
1186  }
1187
1188  while (true) {
1189    Writer* ready = writers_.front();
1190    writers_.pop_front();
1191    if (ready != &w) {
1192      ready->status = status;
1193      ready->done = true;
1194      ready->cv.Signal();
1195    }
1196    if (ready == last_writer) break;
1197  }
1198
1199  // Notify new head of write queue
1200  if (!writers_.empty()) {
1201    writers_.front()->cv.Signal();
1202  }
1203
1204  return status;
1205}
1206
1207// REQUIRES: Writer list must be non-empty
1208// REQUIRES: First writer must have a non-NULL batch
1209WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1210  assert(!writers_.empty());
1211  Writer* first = writers_.front();
1212  WriteBatch* result = first->batch;
1213  assert(result != NULL);
1214
1215  size_t size = WriteBatchInternal::ByteSize(first->batch);
1216
1217  // Allow the group to grow up to a maximum size, but if the
1218  // original write is small, limit the growth so we do not slow
1219  // down the small write too much.
1220  size_t max_size = 1 << 20;
1221  if (size <= (128<<10)) {
1222    max_size = size + (128<<10);
1223  }
1224
1225  *last_writer = first;
1226  std::deque<Writer*>::iterator iter = writers_.begin();
1227  ++iter;  // Advance past "first"
1228  for (; iter != writers_.end(); ++iter) {
1229    Writer* w = *iter;
1230    if (w->sync && !first->sync) {
1231      // Do not include a sync write into a batch handled by a non-sync write.
1232      break;
1233    }
1234
1235    if (w->batch != NULL) {
1236      size += WriteBatchInternal::ByteSize(w->batch);
1237      if (size > max_size) {
1238        // Do not make batch too big
1239        break;
1240      }
1241
1242      // Append to *reuslt
1243      if (result == first->batch) {
1244        // Switch to temporary batch instead of disturbing caller's batch
1245        result = tmp_batch_;
1246        assert(WriteBatchInternal::Count(result) == 0);
1247        WriteBatchInternal::Append(result, first->batch);
1248      }
1249      WriteBatchInternal::Append(result, w->batch);
1250    }
1251    *last_writer = w;
1252  }
1253  return result;
1254}
1255
1256// REQUIRES: mutex_ is held
1257// REQUIRES: this thread is currently at the front of the writer queue
1258Status DBImpl::MakeRoomForWrite(bool force) {
1259  mutex_.AssertHeld();
1260  assert(!writers_.empty());
1261  bool allow_delay = !force;
1262  Status s;
1263  while (true) {
1264    if (!bg_error_.ok()) {
1265      // Yield previous error
1266      s = bg_error_;
1267      break;
1268    } else if (
1269        allow_delay &&
1270        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
1271      // We are getting close to hitting a hard limit on the number of
1272      // L0 files.  Rather than delaying a single write by several
1273      // seconds when we hit the hard limit, start delaying each
1274      // individual write by 1ms to reduce latency variance.  Also,
1275      // this delay hands over some CPU to the compaction thread in
1276      // case it is sharing the same core as the writer.
1277      mutex_.Unlock();
1278      env_->SleepForMicroseconds(1000);
1279      allow_delay = false;  // Do not delay a single write more than once
1280      mutex_.Lock();
1281    } else if (!force &&
1282               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1283      // There is room in current memtable
1284      break;
1285    } else if (imm_ != NULL) {
1286      // We have filled up the current memtable, but the previous
1287      // one is still being compacted, so we wait.
1288      Log(options_.info_log, "Current memtable full; waiting...\n");
1289      bg_cv_.Wait();
1290    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1291      // There are too many level-0 files.
1292      Log(options_.info_log, "Too many L0 files; waiting...\n");
1293      bg_cv_.Wait();
1294    } else {
1295      // Attempt to switch to a new memtable and trigger compaction of old
1296      assert(versions_->PrevLogNumber() == 0);
1297      uint64_t new_log_number = versions_->NewFileNumber();
1298      WritableFile* lfile = NULL;
1299      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1300      if (!s.ok()) {
1301        // Avoid chewing through file number space in a tight loop.
1302        versions_->ReuseFileNumber(new_log_number);
1303        break;
1304      }
1305      delete log_;
1306      delete logfile_;
1307      logfile_ = lfile;
1308      logfile_number_ = new_log_number;
1309      log_ = new log::Writer(lfile);
1310      imm_ = mem_;
1311      has_imm_.Release_Store(imm_);
1312      mem_ = new MemTable(internal_comparator_);
1313      mem_->Ref();
1314      force = false;   // Do not force another compaction if have room
1315      MaybeScheduleCompaction();
1316    }
1317  }
1318  return s;
1319}
1320
1321bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1322  value->clear();
1323
1324  MutexLock l(&mutex_);
1325  Slice in = property;
1326  Slice prefix("leveldb.");
1327  if (!in.starts_with(prefix)) return false;
1328  in.remove_prefix(prefix.size());
1329
1330  if (in.starts_with("num-files-at-level")) {
1331    in.remove_prefix(strlen("num-files-at-level"));
1332    uint64_t level;
1333    bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1334    if (!ok || level >= config::kNumLevels) {
1335      return false;
1336    } else {
1337      char buf[100];
1338      snprintf(buf, sizeof(buf), "%d",
1339               versions_->NumLevelFiles(static_cast<int>(level)));
1340      *value = buf;
1341      return true;
1342    }
1343  } else if (in == "stats") {
1344    char buf[200];
1345    snprintf(buf, sizeof(buf),
1346             "                               Compactions\n"
1347             "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1348             "--------------------------------------------------\n"
1349             );
1350    value->append(buf);
1351    for (int level = 0; level < config::kNumLevels; level++) {
1352      int files = versions_->NumLevelFiles(level);
1353      if (stats_[level].micros > 0 || files > 0) {
1354        snprintf(
1355            buf, sizeof(buf),
1356            "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1357            level,
1358            files,
1359            versions_->NumLevelBytes(level) / 1048576.0,
1360            stats_[level].micros / 1e6,
1361            stats_[level].bytes_read / 1048576.0,
1362            stats_[level].bytes_written / 1048576.0);
1363        value->append(buf);
1364      }
1365    }
1366    return true;
1367  } else if (in == "sstables") {
1368    *value = versions_->current()->DebugString();
1369    return true;
1370  }
1371
1372  return false;
1373}
1374
1375void DBImpl::GetApproximateSizes(
1376    const Range* range, int n,
1377    uint64_t* sizes) {
1378  // TODO(opt): better implementation
1379  Version* v;
1380  {
1381    MutexLock l(&mutex_);
1382    versions_->current()->Ref();
1383    v = versions_->current();
1384  }
1385
1386  for (int i = 0; i < n; i++) {
1387    // Convert user_key into a corresponding internal key.
1388    InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1389    InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1390    uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1391    uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1392    sizes[i] = (limit >= start ? limit - start : 0);
1393  }
1394
1395  {
1396    MutexLock l(&mutex_);
1397    v->Unref();
1398  }
1399}
1400
1401// Default implementations of convenience methods that subclasses of DB
1402// can call if they wish
1403Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1404  WriteBatch batch;
1405  batch.Put(key, value);
1406  return Write(opt, &batch);
1407}
1408
1409Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1410  WriteBatch batch;
1411  batch.Delete(key);
1412  return Write(opt, &batch);
1413}
1414
1415DB::~DB() { }
1416
1417Status DB::Open(const Options& options, const std::string& dbname,
1418                DB** dbptr) {
1419  *dbptr = NULL;
1420
1421  DBImpl* impl = new DBImpl(options, dbname);
1422  impl->mutex_.Lock();
1423  VersionEdit edit;
1424  Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
1425  if (s.ok()) {
1426    uint64_t new_log_number = impl->versions_->NewFileNumber();
1427    WritableFile* lfile;
1428    s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1429                                     &lfile);
1430    if (s.ok()) {
1431      edit.SetLogNumber(new_log_number);
1432      impl->logfile_ = lfile;
1433      impl->logfile_number_ = new_log_number;
1434      impl->log_ = new log::Writer(lfile);
1435      s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1436    }
1437    if (s.ok()) {
1438      impl->DeleteObsoleteFiles();
1439      impl->MaybeScheduleCompaction();
1440    }
1441  }
1442  impl->mutex_.Unlock();
1443  if (s.ok()) {
1444    *dbptr = impl;
1445  } else {
1446    delete impl;
1447  }
1448  return s;
1449}
1450
1451Snapshot::~Snapshot() {
1452}
1453
1454Status DestroyDB(const std::string& dbname, const Options& options) {
1455  Env* env = options.env;
1456  std::vector<std::string> filenames;
1457  // Ignore error in case directory does not exist
1458  env->GetChildren(dbname, &filenames);
1459  if (filenames.empty()) {
1460    return Status::OK();
1461  }
1462
1463  FileLock* lock;
1464  const std::string lockname = LockFileName(dbname);
1465  Status result = env->LockFile(lockname, &lock);
1466  if (result.ok()) {
1467    uint64_t number;
1468    FileType type;
1469    for (size_t i = 0; i < filenames.size(); i++) {
1470      if (ParseFileName(filenames[i], &number, &type) &&
1471          type != kDBLockFile) {  // Lock file will be deleted at end
1472        Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1473        if (result.ok() && !del.ok()) {
1474          result = del;
1475        }
1476      }
1477    }
1478    env->UnlockFile(lock);  // Ignore error since state is already gone
1479    env->DeleteFile(lockname);
1480    env->DeleteDir(dbname);  // Ignore error in case dir contains other files
1481  }
1482  return result;
1483}
1484
1485}  // namespace leveldb
1486