1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/core/lib/io/record_reader.h"
17
18#include <limits.h>
19
20#include "tensorflow/core/lib/core/coding.h"
21#include "tensorflow/core/lib/core/errors.h"
22#include "tensorflow/core/lib/hash/crc32c.h"
23#include "tensorflow/core/lib/io/buffered_inputstream.h"
24#include "tensorflow/core/lib/io/compression.h"
25#include "tensorflow/core/lib/io/random_inputstream.h"
26#include "tensorflow/core/platform/env.h"
27
28namespace tensorflow {
29namespace io {
30
31RecordReaderOptions RecordReaderOptions::CreateRecordReaderOptions(
32    const string& compression_type) {
33  RecordReaderOptions options;
34  if (compression_type == "ZLIB") {
35    options.compression_type = io::RecordReaderOptions::ZLIB_COMPRESSION;
36#if defined(IS_SLIM_BUILD)
37    LOG(ERROR) << "Compression is not supported but compression_type is set."
38               << " No compression will be used.";
39#else
40    options.zlib_options = io::ZlibCompressionOptions::DEFAULT();
41#endif  // IS_SLIM_BUILD
42  } else if (compression_type == compression::kGzip) {
43    options.compression_type = io::RecordReaderOptions::ZLIB_COMPRESSION;
44#if defined(IS_SLIM_BUILD)
45    LOG(ERROR) << "Compression is not supported but compression_type is set."
46               << " No compression will be used.";
47#else
48    options.zlib_options = io::ZlibCompressionOptions::GZIP();
49#endif  // IS_SLIM_BUILD
50  } else if (compression_type != compression::kNone) {
51    LOG(ERROR) << "Unsupported compression_type:" << compression_type
52               << ". No compression will be used.";
53  }
54  return options;
55}
56
57RecordReader::RecordReader(RandomAccessFile* file,
58                           const RecordReaderOptions& options)
59    : src_(file), options_(options) {
60  if (options.buffer_size > 0) {
61    input_stream_.reset(new BufferedInputStream(file, options.buffer_size));
62  } else {
63    input_stream_.reset(new RandomAccessInputStream(file));
64  }
65  if (options.compression_type == RecordReaderOptions::ZLIB_COMPRESSION) {
66// We don't have zlib available on all embedded platforms, so fail.
67#if defined(IS_SLIM_BUILD)
68    LOG(FATAL) << "Zlib compression is unsupported on mobile platforms.";
69#else   // IS_SLIM_BUILD
70    zlib_input_stream_.reset(new ZlibInputStream(
71        input_stream_.get(), options.zlib_options.input_buffer_size,
72        options.zlib_options.output_buffer_size, options.zlib_options));
73#endif  // IS_SLIM_BUILD
74  } else if (options.compression_type == RecordReaderOptions::NONE) {
75    // Nothing to do.
76  } else {
77    LOG(FATAL) << "Unspecified compression type :" << options.compression_type;
78  }
79}
80
81// Read n+4 bytes from file, verify that checksum of first n bytes is
82// stored in the last 4 bytes and store the first n bytes in *result.
83// May use *storage as backing store.
84Status RecordReader::ReadChecksummed(uint64 offset, size_t n,
85                                     StringPiece* result, string* storage) {
86  if (n >= SIZE_MAX - sizeof(uint32)) {
87    return errors::DataLoss("record size too large");
88  }
89
90  const size_t expected = n + sizeof(uint32);
91  storage->resize(expected);
92
93#if !defined(IS_SLIM_BUILD)
94  if (zlib_input_stream_) {
95    // If we have a zlib compressed buffer, we assume that the
96    // file is being read sequentially, and we use the underlying
97    // implementation to read the data.
98    //
99    // No checks are done to validate that the file is being read
100    // sequentially.  At some point the zlib input buffer may support
101    // seeking, possibly inefficiently.
102    TF_RETURN_IF_ERROR(zlib_input_stream_->ReadNBytes(expected, storage));
103
104    if (storage->size() != expected) {
105      if (storage->empty()) {
106        return errors::OutOfRange("eof");
107      } else {
108        return errors::DataLoss("truncated record at ", offset);
109      }
110    }
111
112    uint32 masked_crc = core::DecodeFixed32(storage->data() + n);
113    if (crc32c::Unmask(masked_crc) != crc32c::Value(storage->data(), n)) {
114      return errors::DataLoss("corrupted record at ", offset);
115    }
116    *result = StringPiece(storage->data(), n);
117  } else {
118#endif  // IS_SLIM_BUILD
119    if (options_.buffer_size > 0) {
120      // If we have a buffer, we assume that the file is being read
121      // sequentially, and we use the underlying implementation to read the
122      // data.
123      //
124      // No checks are done to validate that the file is being read
125      // sequentially.
126      TF_RETURN_IF_ERROR(input_stream_->ReadNBytes(expected, storage));
127
128      if (storage->size() != expected) {
129        if (storage->empty()) {
130          return errors::OutOfRange("eof");
131        } else {
132          return errors::DataLoss("truncated record at ", offset);
133        }
134      }
135
136      const uint32 masked_crc = core::DecodeFixed32(storage->data() + n);
137      if (crc32c::Unmask(masked_crc) != crc32c::Value(storage->data(), n)) {
138        return errors::DataLoss("corrupted record at ", offset);
139      }
140      *result = StringPiece(storage->data(), n);
141    } else {
142      // This version supports reading from arbitrary offsets
143      // since we are accessing the random access file directly.
144      StringPiece data;
145      TF_RETURN_IF_ERROR(src_->Read(offset, expected, &data, &(*storage)[0]));
146      if (data.size() != expected) {
147        if (data.empty()) {
148          return errors::OutOfRange("eof");
149        } else {
150          return errors::DataLoss("truncated record at ", offset);
151        }
152      }
153      const uint32 masked_crc = core::DecodeFixed32(data.data() + n);
154      if (crc32c::Unmask(masked_crc) != crc32c::Value(data.data(), n)) {
155        return errors::DataLoss("corrupted record at ", offset);
156      }
157      *result = StringPiece(data.data(), n);
158    }
159#if !defined(IS_SLIM_BUILD)
160  }
161#endif  // IS_SLIM_BUILD
162
163  return Status::OK();
164}
165
166Status RecordReader::ReadRecord(uint64* offset, string* record) {
167  static const size_t kHeaderSize = sizeof(uint64) + sizeof(uint32);
168  static const size_t kFooterSize = sizeof(uint32);
169
170  // Read header data.
171  StringPiece lbuf;
172  Status s = ReadChecksummed(*offset, sizeof(uint64), &lbuf, record);
173  if (!s.ok()) {
174    return s;
175  }
176  const uint64 length = core::DecodeFixed64(lbuf.data());
177
178  // Read data
179  StringPiece data;
180  s = ReadChecksummed(*offset + kHeaderSize, length, &data, record);
181  if (!s.ok()) {
182    if (errors::IsOutOfRange(s)) {
183      s = errors::DataLoss("truncated record at ", *offset);
184    }
185    return s;
186  }
187
188  if (record->data() != data.data()) {
189    // RandomAccessFile placed the data in some other location.
190    memmove(&(*record)[0], data.data(), data.size());
191  }
192
193  record->resize(data.size());
194
195  *offset += kHeaderSize + length + kFooterSize;
196  return Status::OK();
197}
198
199Status RecordReader::SkipNBytes(uint64 offset) {
200#if !defined(IS_SLIM_BUILD)
201  if (zlib_input_stream_) {
202    TF_RETURN_IF_ERROR(zlib_input_stream_->SkipNBytes(offset));
203  } else {
204#endif
205    if (options_.buffer_size > 0) {
206      TF_RETURN_IF_ERROR(input_stream_->SkipNBytes(offset));
207    }
208  }
209  return Status::OK();
210}  // namespace io
211
212SequentialRecordReader::SequentialRecordReader(
213    RandomAccessFile* file, const RecordReaderOptions& options)
214    : underlying_(file, options), offset_(0) {}
215
216}  // namespace io
217}  // namespace tensorflow
218