1// sttable.h 2 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14// 15// Copyright 2005-2010 Google, Inc. 16// Author: allauzen@google.com (Cyril Allauzen) 17// 18// \file 19// A generic string-to-type table file format 20// 21// This is not meant as a generalization of SSTable. This is more of 22// a simple replacement for SSTable in order to provide an open-source 23// implementation of the FAR format for the external version of the 24// FST Library. 25 26#ifndef FST_EXTENSIONS_FAR_STTABLE_H_ 27#define FST_EXTENSIONS_FAR_STTABLE_H_ 28 29#include <algorithm> 30#include <iostream> 31#include <fstream> 32#include <sstream> 33#include <fst/util.h> 34 35namespace fst { 36 37static const int32 kSTTableMagicNumber = 2125656924; 38static const int32 kSTTableFileVersion = 1; 39 40// String-to-type table writing class for object of type 'T' using functor 'W' 41// to write an object of type 'T' from a stream. 'W' must conform to the 42// following interface: 43// 44// struct Writer { 45// void operator()(ostream &, const T &) const; 46// }; 47// 48template <class T, class W> 49class STTableWriter { 50 public: 51 typedef T EntryType; 52 typedef W EntryWriter; 53 54 explicit STTableWriter(const string &filename) 55 : stream_(filename.c_str(), ofstream::out | ofstream::binary), 56 error_(false) { 57 WriteType(stream_, kSTTableMagicNumber); 58 WriteType(stream_, kSTTableFileVersion); 59 if (!stream_) { 60 FSTERROR() << "STTableWriter::STTableWriter: error writing to file: " 61 << filename; 62 error_=true; 63 } 64 } 65 66 static STTableWriter<T, W> *Create(const string &filename) { 67 if (filename.empty()) { 68 LOG(ERROR) << "STTableWriter: writing to standard out unsupported."; 69 return 0; 70 } 71 return new STTableWriter<T, W>(filename); 72 } 73 74 void Add(const string &key, const T &t) { 75 if (key == "") { 76 FSTERROR() << "STTableWriter::Add: key empty: " << key; 77 error_ = true; 78 } else if (key < last_key_) { 79 FSTERROR() << "STTableWriter::Add: key disorder: " << key; 80 error_ = true; 81 } 82 if (error_) return; 83 last_key_ = key; 84 positions_.push_back(stream_.tellp()); 85 WriteType(stream_, key); 86 entry_writer_(stream_, t); 87 } 88 89 bool Error() const { return error_; } 90 91 ~STTableWriter() { 92 WriteType(stream_, positions_); 93 WriteType(stream_, static_cast<int64>(positions_.size())); 94 } 95 96 private: 97 EntryWriter entry_writer_; // Write functor for 'EntryType' 98 ofstream stream_; // Output stream 99 vector<int64> positions_; // Position in file of each key-entry pair 100 string last_key_; // Last key 101 bool error_; 102 103 DISALLOW_COPY_AND_ASSIGN(STTableWriter); 104}; 105 106 107// String-to-type table reading class for object of type 'T' using functor 'R' 108// to read an object of type 'T' form a stream. 'R' must conform to the 109// following interface: 110// 111// struct Reader { 112// T *operator()(istream &) const; 113// }; 114// 115template <class T, class R> 116class STTableReader { 117 public: 118 typedef T EntryType; 119 typedef R EntryReader; 120 121 explicit STTableReader(const vector<string> &filenames) 122 : sources_(filenames), entry_(0), error_(false) { 123 compare_ = new Compare(&keys_); 124 keys_.resize(filenames.size()); 125 streams_.resize(filenames.size(), 0); 126 positions_.resize(filenames.size()); 127 for (size_t i = 0; i < filenames.size(); ++i) { 128 streams_[i] = new ifstream( 129 filenames[i].c_str(), ifstream::in | ifstream::binary); 130 int32 magic_number = 0, file_version = 0; 131 ReadType(*streams_[i], &magic_number); 132 ReadType(*streams_[i], &file_version); 133 if (magic_number != kSTTableMagicNumber) { 134 FSTERROR() << "STTableReader::STTableReader: wrong file type: " 135 << filenames[i]; 136 error_ = true; 137 return; 138 } 139 if (file_version != kSTTableFileVersion) { 140 FSTERROR() << "STTableReader::STTableReader: wrong file version: " 141 << filenames[i]; 142 error_ = true; 143 return; 144 } 145 int64 num_entries; 146 streams_[i]->seekg(-static_cast<int>(sizeof(int64)), ios_base::end); 147 ReadType(*streams_[i], &num_entries); 148 streams_[i]->seekg(-static_cast<int>(sizeof(int64)) * 149 (num_entries + 1), ios_base::end); 150 positions_[i].resize(num_entries); 151 for (size_t j = 0; (j < num_entries) && (*streams_[i]); ++j) 152 ReadType(*streams_[i], &(positions_[i][j])); 153 streams_[i]->seekg(positions_[i][0]); 154 if (!*streams_[i]) { 155 FSTERROR() << "STTableReader::STTableReader: error reading file: " 156 << filenames[i]; 157 error_ = true; 158 return; 159 } 160 161 } 162 MakeHeap(); 163 } 164 165 ~STTableReader() { 166 for (size_t i = 0; i < streams_.size(); ++i) 167 delete streams_[i]; 168 delete compare_; 169 if (entry_) 170 delete entry_; 171 } 172 173 static STTableReader<T, R> *Open(const string &filename) { 174 if (filename.empty()) { 175 LOG(ERROR) << "STTableReader: reading from standard in not supported"; 176 return 0; 177 } 178 vector<string> filenames; 179 filenames.push_back(filename); 180 return new STTableReader<T, R>(filenames); 181 } 182 183 static STTableReader<T, R> *Open(const vector<string> &filenames) { 184 return new STTableReader<T, R>(filenames); 185 } 186 187 void Reset() { 188 if (error_) return; 189 for (size_t i = 0; i < streams_.size(); ++i) 190 streams_[i]->seekg(positions_[i].front()); 191 MakeHeap(); 192 } 193 194 bool Find(const string &key) { 195 if (error_) return false; 196 for (size_t i = 0; i < streams_.size(); ++i) 197 LowerBound(i, key); 198 MakeHeap(); 199 return keys_[current_] == key; 200 } 201 202 bool Done() const { return error_ || heap_.empty(); } 203 204 void Next() { 205 if (error_) return; 206 if (streams_[current_]->tellg() <= positions_[current_].back()) { 207 ReadType(*(streams_[current_]), &(keys_[current_])); 208 if (!*streams_[current_]) { 209 FSTERROR() << "STTableReader: error reading file: " 210 << sources_[current_]; 211 error_ = true; 212 return; 213 } 214 push_heap(heap_.begin(), heap_.end(), *compare_); 215 } else { 216 heap_.pop_back(); 217 } 218 if (!heap_.empty()) 219 PopHeap(); 220 } 221 222 const string &GetKey() const { 223 return keys_[current_]; 224 } 225 226 const EntryType &GetEntry() const { 227 return *entry_; 228 } 229 230 bool Error() const { return error_; } 231 232 private: 233 // Comparison functor used to compare stream IDs in the heap 234 struct Compare { 235 Compare(const vector<string> *keys) : keys_(keys) {} 236 237 bool operator()(size_t i, size_t j) const { 238 return (*keys_)[i] > (*keys_)[j]; 239 }; 240 241 private: 242 const vector<string> *keys_; 243 }; 244 245 // Position the stream with ID 'id' at the position corresponding 246 // to the lower bound for key 'find_key' 247 void LowerBound(size_t id, const string &find_key) { 248 ifstream *strm = streams_[id]; 249 const vector<int64> &positions = positions_[id]; 250 size_t low = 0, high = positions.size() - 1; 251 252 while (low < high) { 253 size_t mid = (low + high)/2; 254 strm->seekg(positions[mid]); 255 string key; 256 ReadType(*strm, &key); 257 if (key > find_key) { 258 high = mid; 259 } else if (key < find_key) { 260 low = mid + 1; 261 } else { 262 for (size_t i = mid; i > low; --i) { 263 strm->seekg(positions[i - 1]); 264 ReadType(*strm, &key); 265 if (key != find_key) { 266 strm->seekg(positions[i]); 267 return; 268 } 269 } 270 strm->seekg(positions[low]); 271 return; 272 } 273 } 274 strm->seekg(positions[low]); 275 } 276 277 // Add all streams to the heap 278 void MakeHeap() { 279 heap_.clear(); 280 for (size_t i = 0; i < streams_.size(); ++i) { 281 ReadType(*streams_[i], &(keys_[i])); 282 if (!*streams_[i]) { 283 FSTERROR() << "STTableReader: error reading file: " << sources_[i]; 284 error_ = true; 285 return; 286 } 287 heap_.push_back(i); 288 } 289 make_heap(heap_.begin(), heap_.end(), *compare_); 290 PopHeap(); 291 } 292 293 // Position the stream with the lowest key at the top 294 // of the heap, set 'current_' to the ID of that stream 295 // and read the current entry from that stream 296 void PopHeap() { 297 pop_heap(heap_.begin(), heap_.end(), *compare_); 298 current_ = heap_.back(); 299 if (entry_) 300 delete entry_; 301 entry_ = entry_reader_(*streams_[current_]); 302 if (!entry_) 303 error_ = true; 304 if (!*streams_[current_]) { 305 FSTERROR() << "STTableReader: error reading entry for key: " 306 << keys_[current_] << ", file: " << sources_[current_]; 307 error_ = true; 308 } 309 } 310 311 312 EntryReader entry_reader_; // Read functor for 'EntryType' 313 vector<ifstream*> streams_; // Input streams 314 vector<string> sources_; // and corresponding file names 315 vector<vector<int64> > positions_; // Index of positions for each stream 316 vector<string> keys_; // Lowest unread key for each stream 317 vector<int64> heap_; // Heap containing ID of streams with unread keys 318 int64 current_; // Id of current stream to be read 319 Compare *compare_; // Functor comparing stream IDs for the heap 320 mutable EntryType *entry_; // Pointer to the currently read entry 321 bool error_; 322 323 DISALLOW_COPY_AND_ASSIGN(STTableReader); 324}; 325 326 327// String-to-type table header reading function template on the entry header 328// type 'H' having a member function: 329// Read(istream &strm, const string &filename); 330// Checks that 'filename' is an STTable and call the H::Read() on the last 331// entry in the STTable. 332template <class H> 333bool ReadSTTableHeader(const string &filename, H *header) { 334 ifstream strm(filename.c_str(), ifstream::in | ifstream::binary); 335 int32 magic_number = 0, file_version = 0; 336 ReadType(strm, &magic_number); 337 ReadType(strm, &file_version); 338 if (magic_number != kSTTableMagicNumber) { 339 LOG(ERROR) << "ReadSTTableHeader: wrong file type: " << filename; 340 return false; 341 } 342 if (file_version != kSTTableFileVersion) { 343 LOG(ERROR) << "ReadSTTableHeader: wrong file version: " << filename; 344 return false; 345 } 346 int64 i = -1; 347 strm.seekg(-static_cast<int>(sizeof(int64)), ios_base::end); 348 ReadType(strm, &i); // Read number of entries 349 if (!strm) { 350 LOG(ERROR) << "ReadSTTableHeader: error reading file: " << filename; 351 return false; 352 } 353 if (i == 0) return true; // No entry header to read 354 strm.seekg(-2 * static_cast<int>(sizeof(int64)), ios_base::end); 355 ReadType(strm, &i); // Read position for last entry in file 356 strm.seekg(i); 357 string key; 358 ReadType(strm, &key); 359 header->Read(strm, filename + ":" + key); 360 if (!strm) { 361 LOG(ERROR) << "ReadSTTableHeader: error reading file: " << filename; 362 return false; 363 } 364 return true; 365} 366 367bool IsSTTable(const string &filename); 368 369} // namespace fst 370 371#endif // FST_EXTENSIONS_FAR_STTABLE_H_ 372