streams.cc revision 5821806d5e7f356e8fa4b058a389a808ea183019
1// Copyright (c) 2011 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// Streams classes.
6//
7// These memory-resident streams are used for serializing data into a sequential
8// region of memory.
9//
10// Streams are divided into SourceStreams for reading and SinkStreams for
11// writing.  Streams are aggregated into Sets which allows several streams to be
12// used at once.  Example: we can write A1, B1, A2, B2 but achieve the memory
13// layout A1 A2 B1 B2 by writing 'A's to one stream and 'B's to another.
14//
15// The aggregated streams are important to Courgette's compression efficiency,
16// we use it to cluster similar kinds of data which helps to generate longer
17// common subsequences and repeated sequences.
18
19#include "courgette/streams.h"
20
21#include <memory.h>
22
23#include "base/basictypes.h"
24#include "base/logging.h"
25
26namespace courgette {
27
28// Update this version number if the serialization format of a StreamSet
29// changes.
30static const unsigned int kStreamsSerializationFormatVersion = 20090218;
31
32//
33// This is a cut down Varint implementation, implementing only what we use for
34// streams.
35//
36class Varint {
37 public:
38  // Maximum lengths of varint encoding of uint32
39  static const int kMax32 = 5;
40
41  // Parses a Varint32 encoded value from |source| and stores it in |output|,
42  // and returns a pointer to the following byte.  Returns NULL if a valid
43  // varint value was not found before |limit|.
44  static const uint8* Parse32WithLimit(const uint8* source, const uint8* limit,
45                                       uint32* output);
46
47  // Writes the Varint32 encoded representation of |value| to buffer
48  // |destination|.  |destination| must have sufficient length to hold kMax32
49  // bytes.  Returns a pointer to the byte just past the last encoded byte.
50  static uint8* Encode32(uint8* destination, uint32 value);
51};
52
53// Parses a Varint32 encoded unsigned number from |source|.  The Varint32
54// encoding is a little-endian sequence of bytes containing base-128 digits,
55// with the high order bit set to indicate if there are more digits.
56//
57// For each byte, we mask out the digit and 'or' it into the right place in the
58// result.
59//
60// The digit loop is unrolled for performance.  It usually exits after the first
61// one or two digits.
62const uint8* Varint::Parse32WithLimit(const uint8* source,
63                                      const uint8* limit,
64                                      uint32* output) {
65  uint32 digit, result;
66  if (source >= limit)
67    return NULL;
68  digit = *(source++);
69  result = digit & 127;
70  if (digit < 128) {
71    *output = result;
72    return source;
73  }
74
75  if (source >= limit)
76    return NULL;
77  digit = *(source++);
78  result |= (digit & 127) <<  7;
79  if (digit < 128) {
80    *output = result;
81    return source;
82  }
83
84  if (source >= limit)
85    return NULL;
86  digit = *(source++);
87  result |= (digit & 127) << 14;
88  if (digit < 128) {
89    *output = result;
90    return source;
91  }
92
93  if (source >= limit)
94    return NULL;
95  digit = *(source++);
96  result |= (digit & 127) << 21;
97  if (digit < 128) {
98    *output = result;
99    return source;
100  }
101
102  if (source >= limit)
103    return NULL;
104  digit = *(source++);
105  result |= (digit & 127) << 28;
106  if (digit < 128) {
107    *output = result;
108    return source;
109  }
110
111  return NULL;  // Value is too long to be a Varint32.
112}
113
114// Write the base-128 digits in little-endian order.  All except the last digit
115// have the high bit set to indicate more digits.
116inline uint8* Varint::Encode32(uint8* destination, uint32 value) {
117  while (value >= 128) {
118    *(destination++) = value | 128;
119    value = value >> 7;
120  }
121  *(destination++) = value;
122  return destination;
123}
124
125void SourceStream::Init(const SinkStream& sink) {
126  Init(sink.Buffer(), sink.Length());
127}
128
129bool SourceStream::Read(void* destination, size_t count) {
130  if (current_ + count > end_)
131    return false;
132  memcpy(destination, current_, count);
133  current_ += count;
134  return true;
135}
136
137bool SourceStream::ReadVarint32(uint32* output_value) {
138  const uint8* after = Varint::Parse32WithLimit(current_, end_, output_value);
139  if (!after)
140    return false;
141  current_ = after;
142  return true;
143}
144
145bool SourceStream::ReadVarint32Signed(int32* output_value) {
146  // Signed numbers are encoded as unsigned numbers so that numbers nearer zero
147  // have shorter varint encoding.
148  //  0000xxxx encoded as 000xxxx0.
149  //  1111xxxx encoded as 000yyyy1 where yyyy is complement of xxxx.
150  uint32 unsigned_value;
151  if (!ReadVarint32(&unsigned_value))
152    return false;
153  if (unsigned_value & 1)
154    *output_value = ~static_cast<int32>(unsigned_value >> 1);
155  else
156    *output_value = (unsigned_value >> 1);
157  return true;
158}
159
160bool SourceStream::ShareSubstream(size_t offset, size_t length,
161                                  SourceStream* substream) {
162  if (offset > Remaining())
163    return false;
164  if (length > Remaining() - offset)
165    return false;
166  substream->Init(current_ + offset, length);
167  return true;
168}
169
170bool SourceStream::ReadSubstream(size_t length, SourceStream* substream) {
171  if (!ShareSubstream(0, length, substream))
172    return false;
173  current_ += length;
174  return true;
175}
176
177bool SourceStream::Skip(size_t byte_count) {
178  if (current_ + byte_count > end_)
179    return false;
180  current_ += byte_count;
181  return true;
182}
183
184CheckBool SinkStream::Write(const void* data, size_t byte_count) {
185  return buffer_.append(static_cast<const char*>(data), byte_count);
186}
187
188CheckBool SinkStream::WriteVarint32(uint32 value) {
189  uint8 buffer[Varint::kMax32];
190  uint8* end = Varint::Encode32(buffer, value);
191  return Write(buffer, end - buffer);
192}
193
194CheckBool SinkStream::WriteVarint32Signed(int32 value) {
195  // Encode signed numbers so that numbers nearer zero have shorter
196  // varint encoding.
197  //  0000xxxx encoded as 000xxxx0.
198  //  1111xxxx encoded as 000yyyy1 where yyyy is complement of xxxx.
199  bool ret;
200  if (value < 0)
201    ret = WriteVarint32(~value * 2 + 1);
202  else
203    ret = WriteVarint32(value * 2);
204  return ret;
205}
206
207CheckBool SinkStream::WriteSizeVarint32(size_t value) {
208  uint32 narrowed_value = static_cast<uint32>(value);
209  // On 32-bit, the compiler should figure out this test always fails.
210  LOG_ASSERT(value == narrowed_value);
211  return WriteVarint32(narrowed_value);
212}
213
214CheckBool SinkStream::Append(SinkStream* other) {
215  bool ret = Write(other->buffer_.data(), other->buffer_.size());
216  if (ret)
217    other->Retire();
218  return ret;
219}
220
221void SinkStream::Retire() {
222  buffer_.clear();
223}
224
225////////////////////////////////////////////////////////////////////////////////
226
227SourceStreamSet::SourceStreamSet()
228    : count_(kMaxStreams) {
229}
230
231SourceStreamSet::~SourceStreamSet() {
232}
233
234
235// Initializes from |source|.
236// The stream set for N streams is serialized as a header
237//   <version><N><length1><length2>...<lengthN>
238// followed by the stream contents
239//   <bytes1><bytes2>...<bytesN>
240//
241bool SourceStreamSet::Init(const void* source, size_t byte_count) {
242  const uint8* start = static_cast<const uint8*>(source);
243  const uint8* end = start + byte_count;
244
245  unsigned int version;
246  const uint8* finger = Varint::Parse32WithLimit(start, end, &version);
247  if (finger == NULL)
248    return false;
249  if (version != kStreamsSerializationFormatVersion)
250    return false;
251
252  unsigned int count;
253  finger = Varint::Parse32WithLimit(finger, end, &count);
254  if (finger == NULL)
255    return false;
256  if (count > kMaxStreams)
257    return false;
258
259  count_ = count;
260
261  unsigned int lengths[kMaxStreams];
262  size_t accumulated_length = 0;
263
264  for (size_t i = 0; i < count_; ++i) {
265    finger = Varint::Parse32WithLimit(finger, end, &lengths[i]);
266    if (finger == NULL)
267      return false;
268    accumulated_length += lengths[i];
269  }
270
271  // Remaining bytes should add up to sum of lengths.
272  if (static_cast<size_t>(end - finger) != accumulated_length)
273    return false;
274
275  accumulated_length = finger - start;
276  for (size_t i = 0; i < count_; ++i) {
277    stream(i)->Init(start + accumulated_length, lengths[i]);
278    accumulated_length += lengths[i];
279  }
280
281  return true;
282}
283
284bool SourceStreamSet::Init(SourceStream* source) {
285  // TODO(sra): consume the rest of |source|.
286  return Init(source->Buffer(), source->Remaining());
287}
288
289bool SourceStreamSet::ReadSet(SourceStreamSet* set) {
290  uint32 stream_count = 0;
291  SourceStream* control_stream = this->stream(0);
292  if (!control_stream->ReadVarint32(&stream_count))
293    return false;
294
295  uint32 lengths[kMaxStreams] = {};  // i.e. all zero.
296
297  for (size_t i = 0; i < stream_count; ++i) {
298    if (!control_stream->ReadVarint32(&lengths[i]))
299      return false;
300  }
301
302  for (size_t i = 0; i < stream_count; ++i) {
303    if (!this->stream(i)->ReadSubstream(lengths[i], set->stream(i)))
304      return false;
305  }
306  return true;
307}
308
309bool SourceStreamSet::Empty() const {
310  for (size_t i = 0; i < count_; ++i) {
311    if (streams_[i].Remaining() != 0)
312      return false;
313  }
314  return true;
315}
316
317////////////////////////////////////////////////////////////////////////////////
318
319SinkStreamSet::SinkStreamSet()
320  : count_(kMaxStreams) {
321}
322
323SinkStreamSet::~SinkStreamSet() {
324}
325
326void SinkStreamSet::Init(size_t stream_index_limit) {
327  count_ = stream_index_limit;
328}
329
330// The header for a stream set for N streams is serialized as
331//   <version><N><length1><length2>...<lengthN>
332CheckBool SinkStreamSet::CopyHeaderTo(SinkStream* header) {
333  bool ret = header->WriteVarint32(kStreamsSerializationFormatVersion);
334  if (ret) {
335    ret = header->WriteSizeVarint32(count_);
336    for (size_t i = 0; ret && i < count_; ++i) {
337      ret = header->WriteSizeVarint32(stream(i)->Length());
338    }
339  }
340  return ret;
341}
342
343// Writes |this| to |combined_stream|.  See SourceStreamSet::Init for the layout
344// of the stream metadata and contents.
345CheckBool SinkStreamSet::CopyTo(SinkStream *combined_stream) {
346  SinkStream header;
347  bool ret = CopyHeaderTo(&header);
348  if (!ret)
349    return ret;
350
351  // Reserve the correct amount of storage.
352  size_t length = header.Length();
353  for (size_t i = 0; i < count_; ++i) {
354    length += stream(i)->Length();
355  }
356  ret = combined_stream->Reserve(length);
357  if (ret) {
358    ret = combined_stream->Append(&header);
359    for (size_t i = 0; ret && i < count_; ++i) {
360      ret = combined_stream->Append(stream(i));
361    }
362  }
363  return ret;
364}
365
366CheckBool SinkStreamSet::WriteSet(SinkStreamSet* set) {
367  uint32 lengths[kMaxStreams];
368  // 'stream_count' includes all non-empty streams and all empty stream numbered
369  // lower than a non-empty stream.
370  size_t stream_count = 0;
371  for (size_t i = 0; i < kMaxStreams; ++i) {
372    SinkStream* stream = set->stream(i);
373    lengths[i] = static_cast<uint32>(stream->Length());
374    if (lengths[i] > 0)
375      stream_count = i + 1;
376  }
377
378  SinkStream* control_stream = this->stream(0);
379  bool ret = control_stream->WriteSizeVarint32(stream_count);
380  for (size_t i = 0; ret && i < stream_count; ++i) {
381    ret = control_stream->WriteSizeVarint32(lengths[i]);
382  }
383
384  for (size_t i = 0; ret && i < stream_count; ++i) {
385    ret = this->stream(i)->Append(set->stream(i));
386  }
387  return ret;
388}
389
390}  // namespace
391