1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/core/lib/io/snappy/snappy_inputbuffer.h"
17
18namespace tensorflow {
19namespace io {
20SnappyInputBuffer::SnappyInputBuffer(
21    RandomAccessFile* file,
22    size_t input_buffer_bytes,  // size of input_buffer_
23    size_t output_buffer_bytes  // size of output_buffer_
24    )
25    : file_(file),
26      input_buffer_capacity_(input_buffer_bytes),
27      output_buffer_capacity_(output_buffer_bytes),
28      input_buffer_(new char[input_buffer_capacity_]),
29      output_buffer_(new char[output_buffer_capacity_]),
30      next_in_(input_buffer_.get()) {}
31
32Status SnappyInputBuffer::ReadNBytes(int64 bytes_to_read, string* result) {
33  result->clear();
34  // Read as many bytes as possible from cache.
35  bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
36
37  while (bytes_to_read > 0) {
38    // At this point we can be sure that cache has been emptied.
39    DCHECK(avail_out_ == 0);
40
41    // Now that the cache is empty we need to inflate more data.
42    TF_RETURN_IF_ERROR(Inflate());
43
44    bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
45  }
46
47  return Status::OK();
48}
49
50int64 SnappyInputBuffer::Tell() const {
51  // TODO(srbs): Implement this.
52  return -1;
53}
54
55Status SnappyInputBuffer::Reset() {
56  file_pos_ = 0;
57  avail_in_ = 0;
58  avail_out_ = 0;
59  next_in_ = input_buffer_.get();
60
61  return Status::OK();
62}
63
64size_t SnappyInputBuffer::ReadBytesFromCache(size_t bytes_to_read,
65                                             string* result) {
66  size_t can_read_bytes = std::min(bytes_to_read, avail_out_);
67  if (can_read_bytes > 0) {
68    result->append(next_out_, can_read_bytes);
69    next_out_ += can_read_bytes;
70    avail_out_ -= can_read_bytes;
71  }
72
73  return can_read_bytes;
74}
75
76Status SnappyInputBuffer::Inflate() {
77  // Read length of compressed block.
78  uint32 compressed_block_length;
79  TF_RETURN_IF_ERROR(ReadCompressedBlockLength(&compressed_block_length));
80
81  // If the entire block is not in cache do a read from file.
82  if (avail_in_ < compressed_block_length) {
83    TF_RETURN_IF_ERROR(ReadFromFile());
84    if (avail_in_ < compressed_block_length) {
85      if (compressed_block_length > input_buffer_capacity_) {
86        return errors::ResourceExhausted(
87            "Input buffer(size: ", input_buffer_capacity_,
88            " bytes) too small. Should be larger ", "than ",
89            compressed_block_length, " bytes.");
90      } else {
91        return errors::DataLoss(
92            strings::StrCat("Failed to read ", compressed_block_length,
93                            " bytes from file. Possible data corruption."));
94      }
95    }
96  }
97
98  size_t uncompressed_length;
99  if (!port::Snappy_GetUncompressedLength(next_in_, compressed_block_length,
100                                          &uncompressed_length)) {
101    return errors::DataLoss("Parsing error in Snappy_GetUncompressedLength");
102  }
103
104  // Output buffer must have been cleared before uncompressing more input.
105  DCHECK_EQ(avail_out_, 0);
106
107  // Output buffer must be large enough to fit the uncompressed block.
108  DCHECK_GE(output_buffer_capacity_, uncompressed_length);
109  next_out_ = output_buffer_.get();
110
111  bool status = port::Snappy_Uncompress(next_in_, compressed_block_length,
112                                        output_buffer_.get());
113  if (!status) {
114    return errors::DataLoss("Snappy_Uncompress failed");
115  }
116  next_in_ += compressed_block_length;
117  avail_in_ -= compressed_block_length;
118  avail_out_ += uncompressed_length;
119  return Status::OK();
120}
121
122Status SnappyInputBuffer::ReadCompressedBlockLength(uint32* length) {
123  *length = 0;
124  size_t bytes_to_read = 4;
125  while (bytes_to_read > 0) {
126    if (avail_in_ == 0) {
127      TF_RETURN_IF_ERROR(ReadFromFile());
128    }
129    size_t readable = std::min(bytes_to_read, avail_in_);
130
131    for (int i = 0; i < readable; i++) {
132      // The "unsigned char" type cast is intentional to avoid implicit type
133      // casting of the signed char to unsigned int during bitwise OR which
134      // causes weird overflow errors.
135      *length = (*length << 8) | static_cast<unsigned char>(next_in_[0]);
136      bytes_to_read--;
137      next_in_++;
138      avail_in_--;
139    }
140  }
141  return Status::OK();
142}
143
144Status SnappyInputBuffer::ReadFromFile() {
145  int bytes_to_read = input_buffer_capacity_;
146  char* read_location = reinterpret_cast<char*>(input_buffer_.get());
147
148  // If there are unread bytes in the input stream we move them to the head
149  // of the stream to maximize the space available to read new data into.
150  // TODO(srbs): A circular buffer would be useful here.
151  if (avail_in_ > 0) {
152    size_t read_bytes = next_in_ - input_buffer_.get();
153    // Remove `read_bytes` from the head of the input stream.
154    // Move unread bytes to the head of the input stream.
155    if (read_bytes > 0) {
156      memmove(input_buffer_.get(), next_in_, avail_in_);
157    }
158
159    bytes_to_read -= avail_in_;
160    read_location += avail_in_;
161  }
162  StringPiece data;
163  // Try to read enough data to fill up input_buffer_.
164  Status s = file_->Read(file_pos_, bytes_to_read, &data, read_location);
165  if (data.data() != read_location) {
166    memmove(read_location, data.data(), data.size());
167  }
168
169  // Since we moved unread data to the head of the input stream we can point
170  // next_in to the head of the input stream.
171  next_in_ = input_buffer_.get();
172
173  // Note: data.size() could be different from bytes_to_read.
174  avail_in_ += data.size();
175  file_pos_ += data.size();
176
177  if (!s.ok() && !errors::IsOutOfRange(s)) {
178    return s;
179  }
180
181  // We throw OutOfRange error iff no new data has been read from file.
182  // Since we never check how much data is remaining in the file, it is
183  // possible that on the last read there isn't enough data in the file to
184  // fill up the buffer in which case file_->ReadNBytes would return an
185  // OutOfRange error.
186  if (data.empty()) {
187    return errors::OutOfRange("EOF reached");
188  }
189  if (errors::IsOutOfRange(s)) {
190    return Status::OK();
191  }
192
193  return s;
194}
195
196}  // namespace io
197}  // namespace tensorflow
198