streams.cc revision 5821806d5e7f356e8fa4b058a389a808ea183019
1// Copyright (c) 2011 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5// Streams classes. 6// 7// These memory-resident streams are used for serializing data into a sequential 8// region of memory. 9// 10// Streams are divided into SourceStreams for reading and SinkStreams for 11// writing. Streams are aggregated into Sets which allows several streams to be 12// used at once. Example: we can write A1, B1, A2, B2 but achieve the memory 13// layout A1 A2 B1 B2 by writing 'A's to one stream and 'B's to another. 14// 15// The aggregated streams are important to Courgette's compression efficiency, 16// we use it to cluster similar kinds of data which helps to generate longer 17// common subsequences and repeated sequences. 18 19#include "courgette/streams.h" 20 21#include <memory.h> 22 23#include "base/basictypes.h" 24#include "base/logging.h" 25 26namespace courgette { 27 28// Update this version number if the serialization format of a StreamSet 29// changes. 30static const unsigned int kStreamsSerializationFormatVersion = 20090218; 31 32// 33// This is a cut down Varint implementation, implementing only what we use for 34// streams. 35// 36class Varint { 37 public: 38 // Maximum lengths of varint encoding of uint32 39 static const int kMax32 = 5; 40 41 // Parses a Varint32 encoded value from |source| and stores it in |output|, 42 // and returns a pointer to the following byte. Returns NULL if a valid 43 // varint value was not found before |limit|. 44 static const uint8* Parse32WithLimit(const uint8* source, const uint8* limit, 45 uint32* output); 46 47 // Writes the Varint32 encoded representation of |value| to buffer 48 // |destination|. |destination| must have sufficient length to hold kMax32 49 // bytes. Returns a pointer to the byte just past the last encoded byte. 50 static uint8* Encode32(uint8* destination, uint32 value); 51}; 52 53// Parses a Varint32 encoded unsigned number from |source|. The Varint32 54// encoding is a little-endian sequence of bytes containing base-128 digits, 55// with the high order bit set to indicate if there are more digits. 56// 57// For each byte, we mask out the digit and 'or' it into the right place in the 58// result. 59// 60// The digit loop is unrolled for performance. It usually exits after the first 61// one or two digits. 62const uint8* Varint::Parse32WithLimit(const uint8* source, 63 const uint8* limit, 64 uint32* output) { 65 uint32 digit, result; 66 if (source >= limit) 67 return NULL; 68 digit = *(source++); 69 result = digit & 127; 70 if (digit < 128) { 71 *output = result; 72 return source; 73 } 74 75 if (source >= limit) 76 return NULL; 77 digit = *(source++); 78 result |= (digit & 127) << 7; 79 if (digit < 128) { 80 *output = result; 81 return source; 82 } 83 84 if (source >= limit) 85 return NULL; 86 digit = *(source++); 87 result |= (digit & 127) << 14; 88 if (digit < 128) { 89 *output = result; 90 return source; 91 } 92 93 if (source >= limit) 94 return NULL; 95 digit = *(source++); 96 result |= (digit & 127) << 21; 97 if (digit < 128) { 98 *output = result; 99 return source; 100 } 101 102 if (source >= limit) 103 return NULL; 104 digit = *(source++); 105 result |= (digit & 127) << 28; 106 if (digit < 128) { 107 *output = result; 108 return source; 109 } 110 111 return NULL; // Value is too long to be a Varint32. 112} 113 114// Write the base-128 digits in little-endian order. All except the last digit 115// have the high bit set to indicate more digits. 116inline uint8* Varint::Encode32(uint8* destination, uint32 value) { 117 while (value >= 128) { 118 *(destination++) = value | 128; 119 value = value >> 7; 120 } 121 *(destination++) = value; 122 return destination; 123} 124 125void SourceStream::Init(const SinkStream& sink) { 126 Init(sink.Buffer(), sink.Length()); 127} 128 129bool SourceStream::Read(void* destination, size_t count) { 130 if (current_ + count > end_) 131 return false; 132 memcpy(destination, current_, count); 133 current_ += count; 134 return true; 135} 136 137bool SourceStream::ReadVarint32(uint32* output_value) { 138 const uint8* after = Varint::Parse32WithLimit(current_, end_, output_value); 139 if (!after) 140 return false; 141 current_ = after; 142 return true; 143} 144 145bool SourceStream::ReadVarint32Signed(int32* output_value) { 146 // Signed numbers are encoded as unsigned numbers so that numbers nearer zero 147 // have shorter varint encoding. 148 // 0000xxxx encoded as 000xxxx0. 149 // 1111xxxx encoded as 000yyyy1 where yyyy is complement of xxxx. 150 uint32 unsigned_value; 151 if (!ReadVarint32(&unsigned_value)) 152 return false; 153 if (unsigned_value & 1) 154 *output_value = ~static_cast<int32>(unsigned_value >> 1); 155 else 156 *output_value = (unsigned_value >> 1); 157 return true; 158} 159 160bool SourceStream::ShareSubstream(size_t offset, size_t length, 161 SourceStream* substream) { 162 if (offset > Remaining()) 163 return false; 164 if (length > Remaining() - offset) 165 return false; 166 substream->Init(current_ + offset, length); 167 return true; 168} 169 170bool SourceStream::ReadSubstream(size_t length, SourceStream* substream) { 171 if (!ShareSubstream(0, length, substream)) 172 return false; 173 current_ += length; 174 return true; 175} 176 177bool SourceStream::Skip(size_t byte_count) { 178 if (current_ + byte_count > end_) 179 return false; 180 current_ += byte_count; 181 return true; 182} 183 184CheckBool SinkStream::Write(const void* data, size_t byte_count) { 185 return buffer_.append(static_cast<const char*>(data), byte_count); 186} 187 188CheckBool SinkStream::WriteVarint32(uint32 value) { 189 uint8 buffer[Varint::kMax32]; 190 uint8* end = Varint::Encode32(buffer, value); 191 return Write(buffer, end - buffer); 192} 193 194CheckBool SinkStream::WriteVarint32Signed(int32 value) { 195 // Encode signed numbers so that numbers nearer zero have shorter 196 // varint encoding. 197 // 0000xxxx encoded as 000xxxx0. 198 // 1111xxxx encoded as 000yyyy1 where yyyy is complement of xxxx. 199 bool ret; 200 if (value < 0) 201 ret = WriteVarint32(~value * 2 + 1); 202 else 203 ret = WriteVarint32(value * 2); 204 return ret; 205} 206 207CheckBool SinkStream::WriteSizeVarint32(size_t value) { 208 uint32 narrowed_value = static_cast<uint32>(value); 209 // On 32-bit, the compiler should figure out this test always fails. 210 LOG_ASSERT(value == narrowed_value); 211 return WriteVarint32(narrowed_value); 212} 213 214CheckBool SinkStream::Append(SinkStream* other) { 215 bool ret = Write(other->buffer_.data(), other->buffer_.size()); 216 if (ret) 217 other->Retire(); 218 return ret; 219} 220 221void SinkStream::Retire() { 222 buffer_.clear(); 223} 224 225//////////////////////////////////////////////////////////////////////////////// 226 227SourceStreamSet::SourceStreamSet() 228 : count_(kMaxStreams) { 229} 230 231SourceStreamSet::~SourceStreamSet() { 232} 233 234 235// Initializes from |source|. 236// The stream set for N streams is serialized as a header 237// <version><N><length1><length2>...<lengthN> 238// followed by the stream contents 239// <bytes1><bytes2>...<bytesN> 240// 241bool SourceStreamSet::Init(const void* source, size_t byte_count) { 242 const uint8* start = static_cast<const uint8*>(source); 243 const uint8* end = start + byte_count; 244 245 unsigned int version; 246 const uint8* finger = Varint::Parse32WithLimit(start, end, &version); 247 if (finger == NULL) 248 return false; 249 if (version != kStreamsSerializationFormatVersion) 250 return false; 251 252 unsigned int count; 253 finger = Varint::Parse32WithLimit(finger, end, &count); 254 if (finger == NULL) 255 return false; 256 if (count > kMaxStreams) 257 return false; 258 259 count_ = count; 260 261 unsigned int lengths[kMaxStreams]; 262 size_t accumulated_length = 0; 263 264 for (size_t i = 0; i < count_; ++i) { 265 finger = Varint::Parse32WithLimit(finger, end, &lengths[i]); 266 if (finger == NULL) 267 return false; 268 accumulated_length += lengths[i]; 269 } 270 271 // Remaining bytes should add up to sum of lengths. 272 if (static_cast<size_t>(end - finger) != accumulated_length) 273 return false; 274 275 accumulated_length = finger - start; 276 for (size_t i = 0; i < count_; ++i) { 277 stream(i)->Init(start + accumulated_length, lengths[i]); 278 accumulated_length += lengths[i]; 279 } 280 281 return true; 282} 283 284bool SourceStreamSet::Init(SourceStream* source) { 285 // TODO(sra): consume the rest of |source|. 286 return Init(source->Buffer(), source->Remaining()); 287} 288 289bool SourceStreamSet::ReadSet(SourceStreamSet* set) { 290 uint32 stream_count = 0; 291 SourceStream* control_stream = this->stream(0); 292 if (!control_stream->ReadVarint32(&stream_count)) 293 return false; 294 295 uint32 lengths[kMaxStreams] = {}; // i.e. all zero. 296 297 for (size_t i = 0; i < stream_count; ++i) { 298 if (!control_stream->ReadVarint32(&lengths[i])) 299 return false; 300 } 301 302 for (size_t i = 0; i < stream_count; ++i) { 303 if (!this->stream(i)->ReadSubstream(lengths[i], set->stream(i))) 304 return false; 305 } 306 return true; 307} 308 309bool SourceStreamSet::Empty() const { 310 for (size_t i = 0; i < count_; ++i) { 311 if (streams_[i].Remaining() != 0) 312 return false; 313 } 314 return true; 315} 316 317//////////////////////////////////////////////////////////////////////////////// 318 319SinkStreamSet::SinkStreamSet() 320 : count_(kMaxStreams) { 321} 322 323SinkStreamSet::~SinkStreamSet() { 324} 325 326void SinkStreamSet::Init(size_t stream_index_limit) { 327 count_ = stream_index_limit; 328} 329 330// The header for a stream set for N streams is serialized as 331// <version><N><length1><length2>...<lengthN> 332CheckBool SinkStreamSet::CopyHeaderTo(SinkStream* header) { 333 bool ret = header->WriteVarint32(kStreamsSerializationFormatVersion); 334 if (ret) { 335 ret = header->WriteSizeVarint32(count_); 336 for (size_t i = 0; ret && i < count_; ++i) { 337 ret = header->WriteSizeVarint32(stream(i)->Length()); 338 } 339 } 340 return ret; 341} 342 343// Writes |this| to |combined_stream|. See SourceStreamSet::Init for the layout 344// of the stream metadata and contents. 345CheckBool SinkStreamSet::CopyTo(SinkStream *combined_stream) { 346 SinkStream header; 347 bool ret = CopyHeaderTo(&header); 348 if (!ret) 349 return ret; 350 351 // Reserve the correct amount of storage. 352 size_t length = header.Length(); 353 for (size_t i = 0; i < count_; ++i) { 354 length += stream(i)->Length(); 355 } 356 ret = combined_stream->Reserve(length); 357 if (ret) { 358 ret = combined_stream->Append(&header); 359 for (size_t i = 0; ret && i < count_; ++i) { 360 ret = combined_stream->Append(stream(i)); 361 } 362 } 363 return ret; 364} 365 366CheckBool SinkStreamSet::WriteSet(SinkStreamSet* set) { 367 uint32 lengths[kMaxStreams]; 368 // 'stream_count' includes all non-empty streams and all empty stream numbered 369 // lower than a non-empty stream. 370 size_t stream_count = 0; 371 for (size_t i = 0; i < kMaxStreams; ++i) { 372 SinkStream* stream = set->stream(i); 373 lengths[i] = static_cast<uint32>(stream->Length()); 374 if (lengths[i] > 0) 375 stream_count = i + 1; 376 } 377 378 SinkStream* control_stream = this->stream(0); 379 bool ret = control_stream->WriteSizeVarint32(stream_count); 380 for (size_t i = 0; ret && i < stream_count; ++i) { 381 ret = control_stream->WriteSizeVarint32(lengths[i]); 382 } 383 384 for (size_t i = 0; ret && i < stream_count; ++i) { 385 ret = this->stream(i)->Append(set->stream(i)); 386 } 387 return ret; 388} 389 390} // namespace 391