1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "table/merger.h"
6
7#include "leveldb/comparator.h"
8#include "leveldb/iterator.h"
9#include "table/iterator_wrapper.h"
10
11namespace leveldb {
12
13namespace {
14class MergingIterator : public Iterator {
15 public:
16  MergingIterator(const Comparator* comparator, Iterator** children, int n)
17      : comparator_(comparator),
18        children_(new IteratorWrapper[n]),
19        n_(n),
20        current_(NULL),
21        direction_(kForward) {
22    for (int i = 0; i < n; i++) {
23      children_[i].Set(children[i]);
24    }
25  }
26
27  virtual ~MergingIterator() {
28    delete[] children_;
29  }
30
31  virtual bool Valid() const {
32    return (current_ != NULL);
33  }
34
35  virtual void SeekToFirst() {
36    for (int i = 0; i < n_; i++) {
37      children_[i].SeekToFirst();
38    }
39    FindSmallest();
40    direction_ = kForward;
41  }
42
43  virtual void SeekToLast() {
44    for (int i = 0; i < n_; i++) {
45      children_[i].SeekToLast();
46    }
47    FindLargest();
48    direction_ = kReverse;
49  }
50
51  virtual void Seek(const Slice& target) {
52    for (int i = 0; i < n_; i++) {
53      children_[i].Seek(target);
54    }
55    FindSmallest();
56    direction_ = kForward;
57  }
58
59  virtual void Next() {
60    assert(Valid());
61
62    // Ensure that all children are positioned after key().
63    // If we are moving in the forward direction, it is already
64    // true for all of the non-current_ children since current_ is
65    // the smallest child and key() == current_->key().  Otherwise,
66    // we explicitly position the non-current_ children.
67    if (direction_ != kForward) {
68      for (int i = 0; i < n_; i++) {
69        IteratorWrapper* child = &children_[i];
70        if (child != current_) {
71          child->Seek(key());
72          if (child->Valid() &&
73              comparator_->Compare(key(), child->key()) == 0) {
74            child->Next();
75          }
76        }
77      }
78      direction_ = kForward;
79    }
80
81    current_->Next();
82    FindSmallest();
83  }
84
85  virtual void Prev() {
86    assert(Valid());
87
88    // Ensure that all children are positioned before key().
89    // If we are moving in the reverse direction, it is already
90    // true for all of the non-current_ children since current_ is
91    // the largest child and key() == current_->key().  Otherwise,
92    // we explicitly position the non-current_ children.
93    if (direction_ != kReverse) {
94      for (int i = 0; i < n_; i++) {
95        IteratorWrapper* child = &children_[i];
96        if (child != current_) {
97          child->Seek(key());
98          if (child->Valid()) {
99            // Child is at first entry >= key().  Step back one to be < key()
100            child->Prev();
101          } else {
102            // Child has no entries >= key().  Position at last entry.
103            child->SeekToLast();
104          }
105        }
106      }
107      direction_ = kReverse;
108    }
109
110    current_->Prev();
111    FindLargest();
112  }
113
114  virtual Slice key() const {
115    assert(Valid());
116    return current_->key();
117  }
118
119  virtual Slice value() const {
120    assert(Valid());
121    return current_->value();
122  }
123
124  virtual Status status() const {
125    Status status;
126    for (int i = 0; i < n_; i++) {
127      status = children_[i].status();
128      if (!status.ok()) {
129        break;
130      }
131    }
132    return status;
133  }
134
135 private:
136  void FindSmallest();
137  void FindLargest();
138
139  // We might want to use a heap in case there are lots of children.
140  // For now we use a simple array since we expect a very small number
141  // of children in leveldb.
142  const Comparator* comparator_;
143  IteratorWrapper* children_;
144  int n_;
145  IteratorWrapper* current_;
146
147  // Which direction is the iterator moving?
148  enum Direction {
149    kForward,
150    kReverse
151  };
152  Direction direction_;
153};
154
155void MergingIterator::FindSmallest() {
156  IteratorWrapper* smallest = NULL;
157  for (int i = 0; i < n_; i++) {
158    IteratorWrapper* child = &children_[i];
159    if (child->Valid()) {
160      if (smallest == NULL) {
161        smallest = child;
162      } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
163        smallest = child;
164      }
165    }
166  }
167  current_ = smallest;
168}
169
170void MergingIterator::FindLargest() {
171  IteratorWrapper* largest = NULL;
172  for (int i = n_-1; i >= 0; i--) {
173    IteratorWrapper* child = &children_[i];
174    if (child->Valid()) {
175      if (largest == NULL) {
176        largest = child;
177      } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
178        largest = child;
179      }
180    }
181  }
182  current_ = largest;
183}
184}  // namespace
185
186Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) {
187  assert(n >= 0);
188  if (n == 0) {
189    return NewEmptyIterator();
190  } else if (n == 1) {
191    return list[0];
192  } else {
193    return new MergingIterator(cmp, list, n);
194  }
195}
196
197}  // namespace leveldb
198