1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include "tensorflow/core/lib/io/snappy/snappy_inputbuffer.h" 17 18namespace tensorflow { 19namespace io { 20SnappyInputBuffer::SnappyInputBuffer( 21 RandomAccessFile* file, 22 size_t input_buffer_bytes, // size of input_buffer_ 23 size_t output_buffer_bytes // size of output_buffer_ 24 ) 25 : file_(file), 26 input_buffer_capacity_(input_buffer_bytes), 27 output_buffer_capacity_(output_buffer_bytes), 28 input_buffer_(new char[input_buffer_capacity_]), 29 output_buffer_(new char[output_buffer_capacity_]), 30 next_in_(input_buffer_.get()) {} 31 32Status SnappyInputBuffer::ReadNBytes(int64 bytes_to_read, string* result) { 33 result->clear(); 34 // Read as many bytes as possible from cache. 35 bytes_to_read -= ReadBytesFromCache(bytes_to_read, result); 36 37 while (bytes_to_read > 0) { 38 // At this point we can be sure that cache has been emptied. 39 DCHECK(avail_out_ == 0); 40 41 // Now that the cache is empty we need to inflate more data. 42 TF_RETURN_IF_ERROR(Inflate()); 43 44 bytes_to_read -= ReadBytesFromCache(bytes_to_read, result); 45 } 46 47 return Status::OK(); 48} 49 50int64 SnappyInputBuffer::Tell() const { 51 // TODO(srbs): Implement this. 52 return -1; 53} 54 55Status SnappyInputBuffer::Reset() { 56 file_pos_ = 0; 57 avail_in_ = 0; 58 avail_out_ = 0; 59 next_in_ = input_buffer_.get(); 60 61 return Status::OK(); 62} 63 64size_t SnappyInputBuffer::ReadBytesFromCache(size_t bytes_to_read, 65 string* result) { 66 size_t can_read_bytes = std::min(bytes_to_read, avail_out_); 67 if (can_read_bytes > 0) { 68 result->append(next_out_, can_read_bytes); 69 next_out_ += can_read_bytes; 70 avail_out_ -= can_read_bytes; 71 } 72 73 return can_read_bytes; 74} 75 76Status SnappyInputBuffer::Inflate() { 77 // Read length of compressed block. 78 uint32 compressed_block_length; 79 TF_RETURN_IF_ERROR(ReadCompressedBlockLength(&compressed_block_length)); 80 81 // If the entire block is not in cache do a read from file. 82 if (avail_in_ < compressed_block_length) { 83 TF_RETURN_IF_ERROR(ReadFromFile()); 84 if (avail_in_ < compressed_block_length) { 85 if (compressed_block_length > input_buffer_capacity_) { 86 return errors::ResourceExhausted( 87 "Input buffer(size: ", input_buffer_capacity_, 88 " bytes) too small. Should be larger ", "than ", 89 compressed_block_length, " bytes."); 90 } else { 91 return errors::DataLoss( 92 strings::StrCat("Failed to read ", compressed_block_length, 93 " bytes from file. Possible data corruption.")); 94 } 95 } 96 } 97 98 size_t uncompressed_length; 99 if (!port::Snappy_GetUncompressedLength(next_in_, compressed_block_length, 100 &uncompressed_length)) { 101 return errors::DataLoss("Parsing error in Snappy_GetUncompressedLength"); 102 } 103 104 // Output buffer must have been cleared before uncompressing more input. 105 DCHECK_EQ(avail_out_, 0); 106 107 // Output buffer must be large enough to fit the uncompressed block. 108 DCHECK_GE(output_buffer_capacity_, uncompressed_length); 109 next_out_ = output_buffer_.get(); 110 111 bool status = port::Snappy_Uncompress(next_in_, compressed_block_length, 112 output_buffer_.get()); 113 if (!status) { 114 return errors::DataLoss("Snappy_Uncompress failed"); 115 } 116 next_in_ += compressed_block_length; 117 avail_in_ -= compressed_block_length; 118 avail_out_ += uncompressed_length; 119 return Status::OK(); 120} 121 122Status SnappyInputBuffer::ReadCompressedBlockLength(uint32* length) { 123 *length = 0; 124 size_t bytes_to_read = 4; 125 while (bytes_to_read > 0) { 126 if (avail_in_ == 0) { 127 TF_RETURN_IF_ERROR(ReadFromFile()); 128 } 129 size_t readable = std::min(bytes_to_read, avail_in_); 130 131 for (int i = 0; i < readable; i++) { 132 // The "unsigned char" type cast is intentional to avoid implicit type 133 // casting of the signed char to unsigned int during bitwise OR which 134 // causes weird overflow errors. 135 *length = (*length << 8) | static_cast<unsigned char>(next_in_[0]); 136 bytes_to_read--; 137 next_in_++; 138 avail_in_--; 139 } 140 } 141 return Status::OK(); 142} 143 144Status SnappyInputBuffer::ReadFromFile() { 145 int bytes_to_read = input_buffer_capacity_; 146 char* read_location = reinterpret_cast<char*>(input_buffer_.get()); 147 148 // If there are unread bytes in the input stream we move them to the head 149 // of the stream to maximize the space available to read new data into. 150 // TODO(srbs): A circular buffer would be useful here. 151 if (avail_in_ > 0) { 152 size_t read_bytes = next_in_ - input_buffer_.get(); 153 // Remove `read_bytes` from the head of the input stream. 154 // Move unread bytes to the head of the input stream. 155 if (read_bytes > 0) { 156 memmove(input_buffer_.get(), next_in_, avail_in_); 157 } 158 159 bytes_to_read -= avail_in_; 160 read_location += avail_in_; 161 } 162 StringPiece data; 163 // Try to read enough data to fill up input_buffer_. 164 Status s = file_->Read(file_pos_, bytes_to_read, &data, read_location); 165 if (data.data() != read_location) { 166 memmove(read_location, data.data(), data.size()); 167 } 168 169 // Since we moved unread data to the head of the input stream we can point 170 // next_in to the head of the input stream. 171 next_in_ = input_buffer_.get(); 172 173 // Note: data.size() could be different from bytes_to_read. 174 avail_in_ += data.size(); 175 file_pos_ += data.size(); 176 177 if (!s.ok() && !errors::IsOutOfRange(s)) { 178 return s; 179 } 180 181 // We throw OutOfRange error iff no new data has been read from file. 182 // Since we never check how much data is remaining in the file, it is 183 // possible that on the last read there isn't enough data in the file to 184 // fill up the buffer in which case file_->ReadNBytes would return an 185 // OutOfRange error. 186 if (data.empty()) { 187 return errors::OutOfRange("EOF reached"); 188 } 189 if (errors::IsOutOfRange(s)) { 190 return Status::OK(); 191 } 192 193 return s; 194} 195 196} // namespace io 197} // namespace tensorflow 198