1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include <sys/types.h>
6#include <stdio.h>
7#include <stdlib.h>
8#include "db/db_impl.h"
9#include "db/version_set.h"
10#include "leveldb/cache.h"
11#include "leveldb/db.h"
12#include "leveldb/env.h"
13#include "leveldb/write_batch.h"
14#include "port/port.h"
15#include "util/crc32c.h"
16#include "util/histogram.h"
17#include "util/mutexlock.h"
18#include "util/random.h"
19#include "util/testutil.h"
20
21// Comma-separated list of operations to run in the specified order
22//   Actual benchmarks:
23//      fillseq       -- write N values in sequential key order in async mode
24//      fillrandom    -- write N values in random key order in async mode
25//      overwrite     -- overwrite N values in random key order in async mode
26//      fillsync      -- write N/100 values in random key order in sync mode
27//      fill100K      -- write N/1000 100K values in random order in async mode
28//      deleteseq     -- delete N keys in sequential order
29//      deleterandom  -- delete N keys in random order
30//      readseq       -- read N times sequentially
31//      readreverse   -- read N times in reverse order
32//      readrandom    -- read N times in random order
33//      readmissing   -- read N missing keys in random order
34//      readhot       -- read N times in random order from 1% section of DB
35//      seekrandom    -- N random seeks
36//      crc32c        -- repeated crc32c of 4K of data
37//      acquireload   -- load N*1000 times
38//   Meta operations:
39//      compact     -- Compact the entire DB
40//      stats       -- Print DB stats
41//      sstables    -- Print sstable info
42//      heapprofile -- Dump a heap profile (if supported by this port)
43static const char* FLAGS_benchmarks =
44    "fillseq,"
45    "fillsync,"
46    "fillrandom,"
47    "overwrite,"
48    "readrandom,"
49    "readrandom,"  // Extra run to allow previous compactions to quiesce
50    "readseq,"
51    "readreverse,"
52    "compact,"
53    "readrandom,"
54    "readseq,"
55    "readreverse,"
56    "fill100K,"
57    "crc32c,"
58    "snappycomp,"
59    "snappyuncomp,"
60    "acquireload,"
61    ;
62
63// Number of key/values to place in database
64static int FLAGS_num = 1000000;
65
66// Number of read operations to do.  If negative, do FLAGS_num reads.
67static int FLAGS_reads = -1;
68
69// Number of concurrent threads to run.
70static int FLAGS_threads = 1;
71
72// Size of each value
73static int FLAGS_value_size = 100;
74
75// Arrange to generate values that shrink to this fraction of
76// their original size after compression
77static double FLAGS_compression_ratio = 0.5;
78
79// Print histogram of operation timings
80static bool FLAGS_histogram = false;
81
82// Number of bytes to buffer in memtable before compacting
83// (initialized to default value by "main")
84static int FLAGS_write_buffer_size = 0;
85
86// Number of bytes to use as a cache of uncompressed data.
87// Negative means use default settings.
88static int FLAGS_cache_size = -1;
89
90// Maximum number of files to keep open at the same time (use default if == 0)
91static int FLAGS_open_files = 0;
92
93// Bloom filter bits per key.
94// Negative means use default settings.
95static int FLAGS_bloom_bits = -1;
96
97// If true, do not destroy the existing database.  If you set this
98// flag and also specify a benchmark that wants a fresh database, that
99// benchmark will fail.
100static bool FLAGS_use_existing_db = false;
101
102// Use the db with the following name.
103static const char* FLAGS_db = NULL;
104
105namespace leveldb {
106
107namespace {
108
109// Helper for quickly generating random data.
110class RandomGenerator {
111 private:
112  std::string data_;
113  int pos_;
114
115 public:
116  RandomGenerator() {
117    // We use a limited amount of data over and over again and ensure
118    // that it is larger than the compression window (32KB), and also
119    // large enough to serve all typical value sizes we want to write.
120    Random rnd(301);
121    std::string piece;
122    while (data_.size() < 1048576) {
123      // Add a short fragment that is as compressible as specified
124      // by FLAGS_compression_ratio.
125      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
126      data_.append(piece);
127    }
128    pos_ = 0;
129  }
130
131  Slice Generate(size_t len) {
132    if (pos_ + len > data_.size()) {
133      pos_ = 0;
134      assert(len < data_.size());
135    }
136    pos_ += len;
137    return Slice(data_.data() + pos_ - len, len);
138  }
139};
140
141static Slice TrimSpace(Slice s) {
142  size_t start = 0;
143  while (start < s.size() && isspace(s[start])) {
144    start++;
145  }
146  size_t limit = s.size();
147  while (limit > start && isspace(s[limit-1])) {
148    limit--;
149  }
150  return Slice(s.data() + start, limit - start);
151}
152
153static void AppendWithSpace(std::string* str, Slice msg) {
154  if (msg.empty()) return;
155  if (!str->empty()) {
156    str->push_back(' ');
157  }
158  str->append(msg.data(), msg.size());
159}
160
161class Stats {
162 private:
163  double start_;
164  double finish_;
165  double seconds_;
166  int done_;
167  int next_report_;
168  int64_t bytes_;
169  double last_op_finish_;
170  Histogram hist_;
171  std::string message_;
172
173 public:
174  Stats() { Start(); }
175
176  void Start() {
177    next_report_ = 100;
178    last_op_finish_ = start_;
179    hist_.Clear();
180    done_ = 0;
181    bytes_ = 0;
182    seconds_ = 0;
183    start_ = Env::Default()->NowMicros();
184    finish_ = start_;
185    message_.clear();
186  }
187
188  void Merge(const Stats& other) {
189    hist_.Merge(other.hist_);
190    done_ += other.done_;
191    bytes_ += other.bytes_;
192    seconds_ += other.seconds_;
193    if (other.start_ < start_) start_ = other.start_;
194    if (other.finish_ > finish_) finish_ = other.finish_;
195
196    // Just keep the messages from one thread
197    if (message_.empty()) message_ = other.message_;
198  }
199
200  void Stop() {
201    finish_ = Env::Default()->NowMicros();
202    seconds_ = (finish_ - start_) * 1e-6;
203  }
204
205  void AddMessage(Slice msg) {
206    AppendWithSpace(&message_, msg);
207  }
208
209  void FinishedSingleOp() {
210    if (FLAGS_histogram) {
211      double now = Env::Default()->NowMicros();
212      double micros = now - last_op_finish_;
213      hist_.Add(micros);
214      if (micros > 20000) {
215        fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
216        fflush(stderr);
217      }
218      last_op_finish_ = now;
219    }
220
221    done_++;
222    if (done_ >= next_report_) {
223      if      (next_report_ < 1000)   next_report_ += 100;
224      else if (next_report_ < 5000)   next_report_ += 500;
225      else if (next_report_ < 10000)  next_report_ += 1000;
226      else if (next_report_ < 50000)  next_report_ += 5000;
227      else if (next_report_ < 100000) next_report_ += 10000;
228      else if (next_report_ < 500000) next_report_ += 50000;
229      else                            next_report_ += 100000;
230      fprintf(stderr, "... finished %d ops%30s\r", done_, "");
231      fflush(stderr);
232    }
233  }
234
235  void AddBytes(int64_t n) {
236    bytes_ += n;
237  }
238
239  void Report(const Slice& name) {
240    // Pretend at least one op was done in case we are running a benchmark
241    // that does not call FinishedSingleOp().
242    if (done_ < 1) done_ = 1;
243
244    std::string extra;
245    if (bytes_ > 0) {
246      // Rate is computed on actual elapsed time, not the sum of per-thread
247      // elapsed times.
248      double elapsed = (finish_ - start_) * 1e-6;
249      char rate[100];
250      snprintf(rate, sizeof(rate), "%6.1f MB/s",
251               (bytes_ / 1048576.0) / elapsed);
252      extra = rate;
253    }
254    AppendWithSpace(&extra, message_);
255
256    fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
257            name.ToString().c_str(),
258            seconds_ * 1e6 / done_,
259            (extra.empty() ? "" : " "),
260            extra.c_str());
261    if (FLAGS_histogram) {
262      fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
263    }
264    fflush(stdout);
265  }
266};
267
268// State shared by all concurrent executions of the same benchmark.
269struct SharedState {
270  port::Mutex mu;
271  port::CondVar cv;
272  int total;
273
274  // Each thread goes through the following states:
275  //    (1) initializing
276  //    (2) waiting for others to be initialized
277  //    (3) running
278  //    (4) done
279
280  int num_initialized;
281  int num_done;
282  bool start;
283
284  SharedState() : cv(&mu) { }
285};
286
287// Per-thread state for concurrent executions of the same benchmark.
288struct ThreadState {
289  int tid;             // 0..n-1 when running in n threads
290  Random rand;         // Has different seeds for different threads
291  Stats stats;
292  SharedState* shared;
293
294  ThreadState(int index)
295      : tid(index),
296        rand(1000 + index) {
297  }
298};
299
300}  // namespace
301
302class Benchmark {
303 private:
304  Cache* cache_;
305  const FilterPolicy* filter_policy_;
306  DB* db_;
307  int num_;
308  int value_size_;
309  int entries_per_batch_;
310  WriteOptions write_options_;
311  int reads_;
312  int heap_counter_;
313
314  void PrintHeader() {
315    const int kKeySize = 16;
316    PrintEnvironment();
317    fprintf(stdout, "Keys:       %d bytes each\n", kKeySize);
318    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
319            FLAGS_value_size,
320            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
321    fprintf(stdout, "Entries:    %d\n", num_);
322    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
323            ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
324             / 1048576.0));
325    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
326            (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
327             / 1048576.0));
328    PrintWarnings();
329    fprintf(stdout, "------------------------------------------------\n");
330  }
331
332  void PrintWarnings() {
333#if defined(__GNUC__) && !defined(__OPTIMIZE__)
334    fprintf(stdout,
335            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
336            );
337#endif
338#ifndef NDEBUG
339    fprintf(stdout,
340            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
341#endif
342
343    // See if snappy is working by attempting to compress a compressible string
344    const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
345    std::string compressed;
346    if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
347      fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
348    } else if (compressed.size() >= sizeof(text)) {
349      fprintf(stdout, "WARNING: Snappy compression is not effective\n");
350    }
351  }
352
353  void PrintEnvironment() {
354    fprintf(stderr, "LevelDB:    version %d.%d\n",
355            kMajorVersion, kMinorVersion);
356
357#if defined(__linux)
358    time_t now = time(NULL);
359    fprintf(stderr, "Date:       %s", ctime(&now));  // ctime() adds newline
360
361    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
362    if (cpuinfo != NULL) {
363      char line[1000];
364      int num_cpus = 0;
365      std::string cpu_type;
366      std::string cache_size;
367      while (fgets(line, sizeof(line), cpuinfo) != NULL) {
368        const char* sep = strchr(line, ':');
369        if (sep == NULL) {
370          continue;
371        }
372        Slice key = TrimSpace(Slice(line, sep - 1 - line));
373        Slice val = TrimSpace(Slice(sep + 1));
374        if (key == "model name") {
375          ++num_cpus;
376          cpu_type = val.ToString();
377        } else if (key == "cache size") {
378          cache_size = val.ToString();
379        }
380      }
381      fclose(cpuinfo);
382      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
383      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
384    }
385#endif
386  }
387
388 public:
389  Benchmark()
390  : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
391    filter_policy_(FLAGS_bloom_bits >= 0
392                   ? NewBloomFilterPolicy(FLAGS_bloom_bits)
393                   : NULL),
394    db_(NULL),
395    num_(FLAGS_num),
396    value_size_(FLAGS_value_size),
397    entries_per_batch_(1),
398    reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
399    heap_counter_(0) {
400    std::vector<std::string> files;
401    Env::Default()->GetChildren(FLAGS_db, &files);
402    for (size_t i = 0; i < files.size(); i++) {
403      if (Slice(files[i]).starts_with("heap-")) {
404        Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
405      }
406    }
407    if (!FLAGS_use_existing_db) {
408      DestroyDB(FLAGS_db, Options());
409    }
410  }
411
412  ~Benchmark() {
413    delete db_;
414    delete cache_;
415    delete filter_policy_;
416  }
417
418  void Run() {
419    PrintHeader();
420    Open();
421
422    const char* benchmarks = FLAGS_benchmarks;
423    while (benchmarks != NULL) {
424      const char* sep = strchr(benchmarks, ',');
425      Slice name;
426      if (sep == NULL) {
427        name = benchmarks;
428        benchmarks = NULL;
429      } else {
430        name = Slice(benchmarks, sep - benchmarks);
431        benchmarks = sep + 1;
432      }
433
434      // Reset parameters that may be overriddden bwlow
435      num_ = FLAGS_num;
436      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
437      value_size_ = FLAGS_value_size;
438      entries_per_batch_ = 1;
439      write_options_ = WriteOptions();
440
441      void (Benchmark::*method)(ThreadState*) = NULL;
442      bool fresh_db = false;
443      int num_threads = FLAGS_threads;
444
445      if (name == Slice("fillseq")) {
446        fresh_db = true;
447        method = &Benchmark::WriteSeq;
448      } else if (name == Slice("fillbatch")) {
449        fresh_db = true;
450        entries_per_batch_ = 1000;
451        method = &Benchmark::WriteSeq;
452      } else if (name == Slice("fillrandom")) {
453        fresh_db = true;
454        method = &Benchmark::WriteRandom;
455      } else if (name == Slice("overwrite")) {
456        fresh_db = false;
457        method = &Benchmark::WriteRandom;
458      } else if (name == Slice("fillsync")) {
459        fresh_db = true;
460        num_ /= 1000;
461        write_options_.sync = true;
462        method = &Benchmark::WriteRandom;
463      } else if (name == Slice("fill100K")) {
464        fresh_db = true;
465        num_ /= 1000;
466        value_size_ = 100 * 1000;
467        method = &Benchmark::WriteRandom;
468      } else if (name == Slice("readseq")) {
469        method = &Benchmark::ReadSequential;
470      } else if (name == Slice("readreverse")) {
471        method = &Benchmark::ReadReverse;
472      } else if (name == Slice("readrandom")) {
473        method = &Benchmark::ReadRandom;
474      } else if (name == Slice("readmissing")) {
475        method = &Benchmark::ReadMissing;
476      } else if (name == Slice("seekrandom")) {
477        method = &Benchmark::SeekRandom;
478      } else if (name == Slice("readhot")) {
479        method = &Benchmark::ReadHot;
480      } else if (name == Slice("readrandomsmall")) {
481        reads_ /= 1000;
482        method = &Benchmark::ReadRandom;
483      } else if (name == Slice("deleteseq")) {
484        method = &Benchmark::DeleteSeq;
485      } else if (name == Slice("deleterandom")) {
486        method = &Benchmark::DeleteRandom;
487      } else if (name == Slice("readwhilewriting")) {
488        num_threads++;  // Add extra thread for writing
489        method = &Benchmark::ReadWhileWriting;
490      } else if (name == Slice("compact")) {
491        method = &Benchmark::Compact;
492      } else if (name == Slice("crc32c")) {
493        method = &Benchmark::Crc32c;
494      } else if (name == Slice("acquireload")) {
495        method = &Benchmark::AcquireLoad;
496      } else if (name == Slice("snappycomp")) {
497        method = &Benchmark::SnappyCompress;
498      } else if (name == Slice("snappyuncomp")) {
499        method = &Benchmark::SnappyUncompress;
500      } else if (name == Slice("heapprofile")) {
501        HeapProfile();
502      } else if (name == Slice("stats")) {
503        PrintStats("leveldb.stats");
504      } else if (name == Slice("sstables")) {
505        PrintStats("leveldb.sstables");
506      } else {
507        if (name != Slice()) {  // No error message for empty name
508          fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
509        }
510      }
511
512      if (fresh_db) {
513        if (FLAGS_use_existing_db) {
514          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
515                  name.ToString().c_str());
516          method = NULL;
517        } else {
518          delete db_;
519          db_ = NULL;
520          DestroyDB(FLAGS_db, Options());
521          Open();
522        }
523      }
524
525      if (method != NULL) {
526        RunBenchmark(num_threads, name, method);
527      }
528    }
529  }
530
531 private:
532  struct ThreadArg {
533    Benchmark* bm;
534    SharedState* shared;
535    ThreadState* thread;
536    void (Benchmark::*method)(ThreadState*);
537  };
538
539  static void ThreadBody(void* v) {
540    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
541    SharedState* shared = arg->shared;
542    ThreadState* thread = arg->thread;
543    {
544      MutexLock l(&shared->mu);
545      shared->num_initialized++;
546      if (shared->num_initialized >= shared->total) {
547        shared->cv.SignalAll();
548      }
549      while (!shared->start) {
550        shared->cv.Wait();
551      }
552    }
553
554    thread->stats.Start();
555    (arg->bm->*(arg->method))(thread);
556    thread->stats.Stop();
557
558    {
559      MutexLock l(&shared->mu);
560      shared->num_done++;
561      if (shared->num_done >= shared->total) {
562        shared->cv.SignalAll();
563      }
564    }
565  }
566
567  void RunBenchmark(int n, Slice name,
568                    void (Benchmark::*method)(ThreadState*)) {
569    SharedState shared;
570    shared.total = n;
571    shared.num_initialized = 0;
572    shared.num_done = 0;
573    shared.start = false;
574
575    ThreadArg* arg = new ThreadArg[n];
576    for (int i = 0; i < n; i++) {
577      arg[i].bm = this;
578      arg[i].method = method;
579      arg[i].shared = &shared;
580      arg[i].thread = new ThreadState(i);
581      arg[i].thread->shared = &shared;
582      Env::Default()->StartThread(ThreadBody, &arg[i]);
583    }
584
585    shared.mu.Lock();
586    while (shared.num_initialized < n) {
587      shared.cv.Wait();
588    }
589
590    shared.start = true;
591    shared.cv.SignalAll();
592    while (shared.num_done < n) {
593      shared.cv.Wait();
594    }
595    shared.mu.Unlock();
596
597    for (int i = 1; i < n; i++) {
598      arg[0].thread->stats.Merge(arg[i].thread->stats);
599    }
600    arg[0].thread->stats.Report(name);
601
602    for (int i = 0; i < n; i++) {
603      delete arg[i].thread;
604    }
605    delete[] arg;
606  }
607
608  void Crc32c(ThreadState* thread) {
609    // Checksum about 500MB of data total
610    const int size = 4096;
611    const char* label = "(4K per op)";
612    std::string data(size, 'x');
613    int64_t bytes = 0;
614    uint32_t crc = 0;
615    while (bytes < 500 * 1048576) {
616      crc = crc32c::Value(data.data(), size);
617      thread->stats.FinishedSingleOp();
618      bytes += size;
619    }
620    // Print so result is not dead
621    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
622
623    thread->stats.AddBytes(bytes);
624    thread->stats.AddMessage(label);
625  }
626
627  void AcquireLoad(ThreadState* thread) {
628    int dummy;
629    port::AtomicPointer ap(&dummy);
630    int count = 0;
631    void *ptr = NULL;
632    thread->stats.AddMessage("(each op is 1000 loads)");
633    while (count < 100000) {
634      for (int i = 0; i < 1000; i++) {
635        ptr = ap.Acquire_Load();
636      }
637      count++;
638      thread->stats.FinishedSingleOp();
639    }
640    if (ptr == NULL) exit(1); // Disable unused variable warning.
641  }
642
643  void SnappyCompress(ThreadState* thread) {
644    RandomGenerator gen;
645    Slice input = gen.Generate(Options().block_size);
646    int64_t bytes = 0;
647    int64_t produced = 0;
648    bool ok = true;
649    std::string compressed;
650    while (ok && bytes < 1024 * 1048576) {  // Compress 1G
651      ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
652      produced += compressed.size();
653      bytes += input.size();
654      thread->stats.FinishedSingleOp();
655    }
656
657    if (!ok) {
658      thread->stats.AddMessage("(snappy failure)");
659    } else {
660      char buf[100];
661      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
662               (produced * 100.0) / bytes);
663      thread->stats.AddMessage(buf);
664      thread->stats.AddBytes(bytes);
665    }
666  }
667
668  void SnappyUncompress(ThreadState* thread) {
669    RandomGenerator gen;
670    Slice input = gen.Generate(Options().block_size);
671    std::string compressed;
672    bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
673    int64_t bytes = 0;
674    char* uncompressed = new char[input.size()];
675    while (ok && bytes < 1024 * 1048576) {  // Compress 1G
676      ok =  port::Snappy_Uncompress(compressed.data(), compressed.size(),
677                                    uncompressed);
678      bytes += input.size();
679      thread->stats.FinishedSingleOp();
680    }
681    delete[] uncompressed;
682
683    if (!ok) {
684      thread->stats.AddMessage("(snappy failure)");
685    } else {
686      thread->stats.AddBytes(bytes);
687    }
688  }
689
690  void Open() {
691    assert(db_ == NULL);
692    Options options;
693    options.create_if_missing = !FLAGS_use_existing_db;
694    options.block_cache = cache_;
695    options.write_buffer_size = FLAGS_write_buffer_size;
696    options.max_open_files = FLAGS_open_files;
697    options.filter_policy = filter_policy_;
698    Status s = DB::Open(options, FLAGS_db, &db_);
699    if (!s.ok()) {
700      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
701      exit(1);
702    }
703  }
704
705  void WriteSeq(ThreadState* thread) {
706    DoWrite(thread, true);
707  }
708
709  void WriteRandom(ThreadState* thread) {
710    DoWrite(thread, false);
711  }
712
713  void DoWrite(ThreadState* thread, bool seq) {
714    if (num_ != FLAGS_num) {
715      char msg[100];
716      snprintf(msg, sizeof(msg), "(%d ops)", num_);
717      thread->stats.AddMessage(msg);
718    }
719
720    RandomGenerator gen;
721    WriteBatch batch;
722    Status s;
723    int64_t bytes = 0;
724    for (int i = 0; i < num_; i += entries_per_batch_) {
725      batch.Clear();
726      for (int j = 0; j < entries_per_batch_; j++) {
727        const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
728        char key[100];
729        snprintf(key, sizeof(key), "%016d", k);
730        batch.Put(key, gen.Generate(value_size_));
731        bytes += value_size_ + strlen(key);
732        thread->stats.FinishedSingleOp();
733      }
734      s = db_->Write(write_options_, &batch);
735      if (!s.ok()) {
736        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
737        exit(1);
738      }
739    }
740    thread->stats.AddBytes(bytes);
741  }
742
743  void ReadSequential(ThreadState* thread) {
744    Iterator* iter = db_->NewIterator(ReadOptions());
745    int i = 0;
746    int64_t bytes = 0;
747    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
748      bytes += iter->key().size() + iter->value().size();
749      thread->stats.FinishedSingleOp();
750      ++i;
751    }
752    delete iter;
753    thread->stats.AddBytes(bytes);
754  }
755
756  void ReadReverse(ThreadState* thread) {
757    Iterator* iter = db_->NewIterator(ReadOptions());
758    int i = 0;
759    int64_t bytes = 0;
760    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
761      bytes += iter->key().size() + iter->value().size();
762      thread->stats.FinishedSingleOp();
763      ++i;
764    }
765    delete iter;
766    thread->stats.AddBytes(bytes);
767  }
768
769  void ReadRandom(ThreadState* thread) {
770    ReadOptions options;
771    std::string value;
772    int found = 0;
773    for (int i = 0; i < reads_; i++) {
774      char key[100];
775      const int k = thread->rand.Next() % FLAGS_num;
776      snprintf(key, sizeof(key), "%016d", k);
777      if (db_->Get(options, key, &value).ok()) {
778        found++;
779      }
780      thread->stats.FinishedSingleOp();
781    }
782    char msg[100];
783    snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
784    thread->stats.AddMessage(msg);
785  }
786
787  void ReadMissing(ThreadState* thread) {
788    ReadOptions options;
789    std::string value;
790    for (int i = 0; i < reads_; i++) {
791      char key[100];
792      const int k = thread->rand.Next() % FLAGS_num;
793      snprintf(key, sizeof(key), "%016d.", k);
794      db_->Get(options, key, &value);
795      thread->stats.FinishedSingleOp();
796    }
797  }
798
799  void ReadHot(ThreadState* thread) {
800    ReadOptions options;
801    std::string value;
802    const int range = (FLAGS_num + 99) / 100;
803    for (int i = 0; i < reads_; i++) {
804      char key[100];
805      const int k = thread->rand.Next() % range;
806      snprintf(key, sizeof(key), "%016d", k);
807      db_->Get(options, key, &value);
808      thread->stats.FinishedSingleOp();
809    }
810  }
811
812  void SeekRandom(ThreadState* thread) {
813    ReadOptions options;
814    std::string value;
815    int found = 0;
816    for (int i = 0; i < reads_; i++) {
817      Iterator* iter = db_->NewIterator(options);
818      char key[100];
819      const int k = thread->rand.Next() % FLAGS_num;
820      snprintf(key, sizeof(key), "%016d", k);
821      iter->Seek(key);
822      if (iter->Valid() && iter->key() == key) found++;
823      delete iter;
824      thread->stats.FinishedSingleOp();
825    }
826    char msg[100];
827    snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
828    thread->stats.AddMessage(msg);
829  }
830
831  void DoDelete(ThreadState* thread, bool seq) {
832    RandomGenerator gen;
833    WriteBatch batch;
834    Status s;
835    for (int i = 0; i < num_; i += entries_per_batch_) {
836      batch.Clear();
837      for (int j = 0; j < entries_per_batch_; j++) {
838        const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
839        char key[100];
840        snprintf(key, sizeof(key), "%016d", k);
841        batch.Delete(key);
842        thread->stats.FinishedSingleOp();
843      }
844      s = db_->Write(write_options_, &batch);
845      if (!s.ok()) {
846        fprintf(stderr, "del error: %s\n", s.ToString().c_str());
847        exit(1);
848      }
849    }
850  }
851
852  void DeleteSeq(ThreadState* thread) {
853    DoDelete(thread, true);
854  }
855
856  void DeleteRandom(ThreadState* thread) {
857    DoDelete(thread, false);
858  }
859
860  void ReadWhileWriting(ThreadState* thread) {
861    if (thread->tid > 0) {
862      ReadRandom(thread);
863    } else {
864      // Special thread that keeps writing until other threads are done.
865      RandomGenerator gen;
866      while (true) {
867        {
868          MutexLock l(&thread->shared->mu);
869          if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
870            // Other threads have finished
871            break;
872          }
873        }
874
875        const int k = thread->rand.Next() % FLAGS_num;
876        char key[100];
877        snprintf(key, sizeof(key), "%016d", k);
878        Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
879        if (!s.ok()) {
880          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
881          exit(1);
882        }
883      }
884
885      // Do not count any of the preceding work/delay in stats.
886      thread->stats.Start();
887    }
888  }
889
890  void Compact(ThreadState* thread) {
891    db_->CompactRange(NULL, NULL);
892  }
893
894  void PrintStats(const char* key) {
895    std::string stats;
896    if (!db_->GetProperty(key, &stats)) {
897      stats = "(failed)";
898    }
899    fprintf(stdout, "\n%s\n", stats.c_str());
900  }
901
902  static void WriteToFile(void* arg, const char* buf, int n) {
903    reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
904  }
905
906  void HeapProfile() {
907    char fname[100];
908    snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
909    WritableFile* file;
910    Status s = Env::Default()->NewWritableFile(fname, &file);
911    if (!s.ok()) {
912      fprintf(stderr, "%s\n", s.ToString().c_str());
913      return;
914    }
915    bool ok = port::GetHeapProfile(WriteToFile, file);
916    delete file;
917    if (!ok) {
918      fprintf(stderr, "heap profiling not supported\n");
919      Env::Default()->DeleteFile(fname);
920    }
921  }
922};
923
924}  // namespace leveldb
925
926int main(int argc, char** argv) {
927  FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
928  FLAGS_open_files = leveldb::Options().max_open_files;
929  std::string default_db_path;
930
931  for (int i = 1; i < argc; i++) {
932    double d;
933    int n;
934    char junk;
935    if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
936      FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
937    } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
938      FLAGS_compression_ratio = d;
939    } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
940               (n == 0 || n == 1)) {
941      FLAGS_histogram = n;
942    } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
943               (n == 0 || n == 1)) {
944      FLAGS_use_existing_db = n;
945    } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
946      FLAGS_num = n;
947    } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
948      FLAGS_reads = n;
949    } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
950      FLAGS_threads = n;
951    } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
952      FLAGS_value_size = n;
953    } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
954      FLAGS_write_buffer_size = n;
955    } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
956      FLAGS_cache_size = n;
957    } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
958      FLAGS_bloom_bits = n;
959    } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
960      FLAGS_open_files = n;
961    } else if (strncmp(argv[i], "--db=", 5) == 0) {
962      FLAGS_db = argv[i] + 5;
963    } else {
964      fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
965      exit(1);
966    }
967  }
968
969  // Choose a location for the test database if none given with --db=<path>
970  if (FLAGS_db == NULL) {
971      leveldb::Env::Default()->GetTestDirectory(&default_db_path);
972      default_db_path += "/dbbench";
973      FLAGS_db = default_db_path.c_str();
974  }
975
976  leveldb::Benchmark benchmark;
977  benchmark.Run();
978  return 0;
979}
980