1// Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. See the AUTHORS file for names of contributors. 4// 5// We recover the contents of the descriptor from the other files we find. 6// (1) Any log files are first converted to tables 7// (2) We scan every table to compute 8// (a) smallest/largest for the table 9// (b) largest sequence number in the table 10// (3) We generate descriptor contents: 11// - log number is set to zero 12// - next-file-number is set to 1 + largest file number we found 13// - last-sequence-number is set to largest sequence# found across 14// all tables (see 2c) 15// - compaction pointers are cleared 16// - every table file is added at level 0 17// 18// Possible optimization 1: 19// (a) Compute total size and use to pick appropriate max-level M 20// (b) Sort tables by largest sequence# in the table 21// (c) For each table: if it overlaps earlier table, place in level-0, 22// else place in level-M. 23// Possible optimization 2: 24// Store per-table metadata (smallest, largest, largest-seq#, ...) 25// in the table's meta section to speed up ScanTable. 26 27#include "db/builder.h" 28#include "db/db_impl.h" 29#include "db/dbformat.h" 30#include "db/filename.h" 31#include "db/log_reader.h" 32#include "db/log_writer.h" 33#include "db/memtable.h" 34#include "db/table_cache.h" 35#include "db/version_edit.h" 36#include "db/write_batch_internal.h" 37#include "leveldb/comparator.h" 38#include "leveldb/db.h" 39#include "leveldb/env.h" 40 41namespace leveldb { 42 43namespace { 44 45class Repairer { 46 public: 47 Repairer(const std::string& dbname, const Options& options) 48 : dbname_(dbname), 49 env_(options.env), 50 icmp_(options.comparator), 51 ipolicy_(options.filter_policy), 52 options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)), 53 owns_info_log_(options_.info_log != options.info_log), 54 owns_cache_(options_.block_cache != options.block_cache), 55 next_file_number_(1) { 56 // TableCache can be small since we expect each table to be opened once. 57 table_cache_ = new TableCache(dbname_, &options_, 10); 58 } 59 60 ~Repairer() { 61 delete table_cache_; 62 if (owns_info_log_) { 63 delete options_.info_log; 64 } 65 if (owns_cache_) { 66 delete options_.block_cache; 67 } 68 } 69 70 Status Run() { 71 Status status = FindFiles(); 72 if (status.ok()) { 73 ConvertLogFilesToTables(); 74 ExtractMetaData(); 75 status = WriteDescriptor(); 76 } 77 if (status.ok()) { 78 unsigned long long bytes = 0; 79 for (size_t i = 0; i < tables_.size(); i++) { 80 bytes += tables_[i].meta.file_size; 81 } 82 Log(options_.info_log, 83 "**** Repaired leveldb %s; " 84 "recovered %d files; %llu bytes. " 85 "Some data may have been lost. " 86 "****", 87 dbname_.c_str(), 88 static_cast<int>(tables_.size()), 89 bytes); 90 } 91 return status; 92 } 93 94 private: 95 struct TableInfo { 96 FileMetaData meta; 97 SequenceNumber max_sequence; 98 }; 99 100 std::string const dbname_; 101 Env* const env_; 102 InternalKeyComparator const icmp_; 103 InternalFilterPolicy const ipolicy_; 104 Options const options_; 105 bool owns_info_log_; 106 bool owns_cache_; 107 TableCache* table_cache_; 108 VersionEdit edit_; 109 110 std::vector<std::string> manifests_; 111 std::vector<uint64_t> table_numbers_; 112 std::vector<uint64_t> logs_; 113 std::vector<TableInfo> tables_; 114 uint64_t next_file_number_; 115 116 Status FindFiles() { 117 std::vector<std::string> filenames; 118 Status status = env_->GetChildren(dbname_, &filenames); 119 if (!status.ok()) { 120 return status; 121 } 122 if (filenames.empty()) { 123 return Status::IOError(dbname_, "repair found no files"); 124 } 125 126 uint64_t number; 127 FileType type; 128 for (size_t i = 0; i < filenames.size(); i++) { 129 if (ParseFileName(filenames[i], &number, &type)) { 130 if (type == kDescriptorFile) { 131 manifests_.push_back(filenames[i]); 132 } else { 133 if (number + 1 > next_file_number_) { 134 next_file_number_ = number + 1; 135 } 136 if (type == kLogFile) { 137 logs_.push_back(number); 138 } else if (type == kTableFile) { 139 table_numbers_.push_back(number); 140 } else { 141 // Ignore other files 142 } 143 } 144 } 145 } 146 return status; 147 } 148 149 void ConvertLogFilesToTables() { 150 for (size_t i = 0; i < logs_.size(); i++) { 151 std::string logname = LogFileName(dbname_, logs_[i]); 152 Status status = ConvertLogToTable(logs_[i]); 153 if (!status.ok()) { 154 Log(options_.info_log, "Log #%llu: ignoring conversion error: %s", 155 (unsigned long long) logs_[i], 156 status.ToString().c_str()); 157 } 158 ArchiveFile(logname); 159 } 160 } 161 162 Status ConvertLogToTable(uint64_t log) { 163 struct LogReporter : public log::Reader::Reporter { 164 Env* env; 165 Logger* info_log; 166 uint64_t lognum; 167 virtual void Corruption(size_t bytes, const Status& s) { 168 // We print error messages for corruption, but continue repairing. 169 Log(info_log, "Log #%llu: dropping %d bytes; %s", 170 (unsigned long long) lognum, 171 static_cast<int>(bytes), 172 s.ToString().c_str()); 173 } 174 }; 175 176 // Open the log file 177 std::string logname = LogFileName(dbname_, log); 178 SequentialFile* lfile; 179 Status status = env_->NewSequentialFile(logname, &lfile); 180 if (!status.ok()) { 181 return status; 182 } 183 184 // Create the log reader. 185 LogReporter reporter; 186 reporter.env = env_; 187 reporter.info_log = options_.info_log; 188 reporter.lognum = log; 189 // We intentially make log::Reader do checksumming so that 190 // corruptions cause entire commits to be skipped instead of 191 // propagating bad information (like overly large sequence 192 // numbers). 193 log::Reader reader(lfile, &reporter, false/*do not checksum*/, 194 0/*initial_offset*/); 195 196 // Read all the records and add to a memtable 197 std::string scratch; 198 Slice record; 199 WriteBatch batch; 200 MemTable* mem = new MemTable(icmp_); 201 mem->Ref(); 202 int counter = 0; 203 while (reader.ReadRecord(&record, &scratch)) { 204 if (record.size() < 12) { 205 reporter.Corruption( 206 record.size(), Status::Corruption("log record too small")); 207 continue; 208 } 209 WriteBatchInternal::SetContents(&batch, record); 210 status = WriteBatchInternal::InsertInto(&batch, mem); 211 if (status.ok()) { 212 counter += WriteBatchInternal::Count(&batch); 213 } else { 214 Log(options_.info_log, "Log #%llu: ignoring %s", 215 (unsigned long long) log, 216 status.ToString().c_str()); 217 status = Status::OK(); // Keep going with rest of file 218 } 219 } 220 delete lfile; 221 222 // Do not record a version edit for this conversion to a Table 223 // since ExtractMetaData() will also generate edits. 224 FileMetaData meta; 225 meta.number = next_file_number_++; 226 Iterator* iter = mem->NewIterator(); 227 status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); 228 delete iter; 229 mem->Unref(); 230 mem = NULL; 231 if (status.ok()) { 232 if (meta.file_size > 0) { 233 table_numbers_.push_back(meta.number); 234 } 235 } 236 Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s", 237 (unsigned long long) log, 238 counter, 239 (unsigned long long) meta.number, 240 status.ToString().c_str()); 241 return status; 242 } 243 244 void ExtractMetaData() { 245 std::vector<TableInfo> kept; 246 for (size_t i = 0; i < table_numbers_.size(); i++) { 247 TableInfo t; 248 t.meta.number = table_numbers_[i]; 249 Status status = ScanTable(&t); 250 if (!status.ok()) { 251 std::string fname = TableFileName(dbname_, table_numbers_[i]); 252 Log(options_.info_log, "Table #%llu: ignoring %s", 253 (unsigned long long) table_numbers_[i], 254 status.ToString().c_str()); 255 ArchiveFile(fname); 256 } else { 257 tables_.push_back(t); 258 } 259 } 260 } 261 262 Status ScanTable(TableInfo* t) { 263 std::string fname = TableFileName(dbname_, t->meta.number); 264 int counter = 0; 265 Status status = env_->GetFileSize(fname, &t->meta.file_size); 266 if (status.ok()) { 267 Iterator* iter = table_cache_->NewIterator( 268 ReadOptions(), t->meta.number, t->meta.file_size); 269 bool empty = true; 270 ParsedInternalKey parsed; 271 t->max_sequence = 0; 272 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { 273 Slice key = iter->key(); 274 if (!ParseInternalKey(key, &parsed)) { 275 Log(options_.info_log, "Table #%llu: unparsable key %s", 276 (unsigned long long) t->meta.number, 277 EscapeString(key).c_str()); 278 continue; 279 } 280 281 counter++; 282 if (empty) { 283 empty = false; 284 t->meta.smallest.DecodeFrom(key); 285 } 286 t->meta.largest.DecodeFrom(key); 287 if (parsed.sequence > t->max_sequence) { 288 t->max_sequence = parsed.sequence; 289 } 290 } 291 if (!iter->status().ok()) { 292 status = iter->status(); 293 } 294 delete iter; 295 } 296 Log(options_.info_log, "Table #%llu: %d entries %s", 297 (unsigned long long) t->meta.number, 298 counter, 299 status.ToString().c_str()); 300 return status; 301 } 302 303 Status WriteDescriptor() { 304 std::string tmp = TempFileName(dbname_, 1); 305 WritableFile* file; 306 Status status = env_->NewWritableFile(tmp, &file); 307 if (!status.ok()) { 308 return status; 309 } 310 311 SequenceNumber max_sequence = 0; 312 for (size_t i = 0; i < tables_.size(); i++) { 313 if (max_sequence < tables_[i].max_sequence) { 314 max_sequence = tables_[i].max_sequence; 315 } 316 } 317 318 edit_.SetComparatorName(icmp_.user_comparator()->Name()); 319 edit_.SetLogNumber(0); 320 edit_.SetNextFile(next_file_number_); 321 edit_.SetLastSequence(max_sequence); 322 323 for (size_t i = 0; i < tables_.size(); i++) { 324 // TODO(opt): separate out into multiple levels 325 const TableInfo& t = tables_[i]; 326 edit_.AddFile(0, t.meta.number, t.meta.file_size, 327 t.meta.smallest, t.meta.largest); 328 } 329 330 //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); 331 { 332 log::Writer log(file); 333 std::string record; 334 edit_.EncodeTo(&record); 335 status = log.AddRecord(record); 336 } 337 if (status.ok()) { 338 status = file->Close(); 339 } 340 delete file; 341 file = NULL; 342 343 if (!status.ok()) { 344 env_->DeleteFile(tmp); 345 } else { 346 // Discard older manifests 347 for (size_t i = 0; i < manifests_.size(); i++) { 348 ArchiveFile(dbname_ + "/" + manifests_[i]); 349 } 350 351 // Install new manifest 352 status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1)); 353 if (status.ok()) { 354 status = SetCurrentFile(env_, dbname_, 1); 355 } else { 356 env_->DeleteFile(tmp); 357 } 358 } 359 return status; 360 } 361 362 void ArchiveFile(const std::string& fname) { 363 // Move into another directory. E.g., for 364 // dir/foo 365 // rename to 366 // dir/lost/foo 367 const char* slash = strrchr(fname.c_str(), '/'); 368 std::string new_dir; 369 if (slash != NULL) { 370 new_dir.assign(fname.data(), slash - fname.data()); 371 } 372 new_dir.append("/lost"); 373 env_->CreateDir(new_dir); // Ignore error 374 std::string new_file = new_dir; 375 new_file.append("/"); 376 new_file.append((slash == NULL) ? fname.c_str() : slash + 1); 377 Status s = env_->RenameFile(fname, new_file); 378 Log(options_.info_log, "Archiving %s: %s\n", 379 fname.c_str(), s.ToString().c_str()); 380 } 381}; 382} // namespace 383 384Status RepairDB(const std::string& dbname, const Options& options) { 385 Repairer repairer(dbname, options); 386 return repairer.Run(); 387} 388 389} // namespace leveldb 390