1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/core/lib/io/inputbuffer.h"
17#include "tensorflow/core/lib/core/errors.h"
18#include "tensorflow/core/platform/logging.h"
19
20namespace tensorflow {
21namespace io {
22
23InputBuffer::InputBuffer(RandomAccessFile* file, size_t buffer_bytes)
24    : file_(file),
25      file_pos_(0),
26      size_(buffer_bytes),
27      buf_(new char[size_]),
28      pos_(buf_),
29      limit_(buf_) {}
30
31InputBuffer::~InputBuffer() { delete[] buf_; }
32
33Status InputBuffer::FillBuffer() {
34  StringPiece data;
35  Status s = file_->Read(file_pos_, size_, &data, buf_);
36  if (data.data() != buf_) {
37    memmove(buf_, data.data(), data.size());
38  }
39  pos_ = buf_;
40  limit_ = pos_ + data.size();
41  file_pos_ += data.size();
42  return s;
43}
44
45Status InputBuffer::ReadLine(string* result) {
46  result->clear();
47  Status s;
48  do {
49    size_t buf_remain = limit_ - pos_;
50    char* newline = static_cast<char*>(memchr(pos_, '\n', buf_remain));
51    if (newline != nullptr) {
52      size_t result_len = newline - pos_;
53      result->append(pos_, result_len);
54      pos_ = newline + 1;
55      if (!result->empty() && result->back() == '\r') {
56        result->resize(result->size() - 1);
57      }
58      return Status::OK();
59    }
60    if (buf_remain > 0) result->append(pos_, buf_remain);
61    // Get more data into buffer
62    s = FillBuffer();
63    DCHECK_EQ(pos_, buf_);
64  } while (limit_ != buf_);
65  if (!result->empty() && result->back() == '\r') {
66    result->resize(result->size() - 1);
67  }
68  if (errors::IsOutOfRange(s) && !result->empty()) {
69    return Status::OK();
70  }
71  return s;
72}
73
74Status InputBuffer::ReadNBytes(int64 bytes_to_read, string* result) {
75  result->clear();
76  if (bytes_to_read < 0) {
77    return errors::InvalidArgument("Can't read a negative number of bytes: ",
78                                   bytes_to_read);
79  }
80  result->resize(bytes_to_read);
81  size_t bytes_read = 0;
82  Status status = ReadNBytes(bytes_to_read, &(*result)[0], &bytes_read);
83  if (bytes_read < bytes_to_read) result->resize(bytes_read);
84  return status;
85}
86
87Status InputBuffer::ReadNBytes(int64 bytes_to_read, char* result,
88                               size_t* bytes_read) {
89  if (bytes_to_read < 0) {
90    return errors::InvalidArgument("Can't read a negative number of bytes: ",
91                                   bytes_to_read);
92  }
93  Status status;
94  *bytes_read = 0;
95  while (*bytes_read < static_cast<size_t>(bytes_to_read)) {
96    if (pos_ == limit_) {
97      // Get more data into buffer.
98      status = FillBuffer();
99      if (limit_ == buf_) {
100        break;
101      }
102    }
103    // Do not go over the buffer boundary.
104    const int64 bytes_to_copy =
105        std::min<int64>(limit_ - pos_, bytes_to_read - *bytes_read);
106    // Copies buffered data into the destination.
107    memcpy(result + *bytes_read, pos_, bytes_to_copy);
108    pos_ += bytes_to_copy;
109    *bytes_read += bytes_to_copy;
110  }
111  if (errors::IsOutOfRange(status) &&
112      (*bytes_read == static_cast<size_t>(bytes_to_read))) {
113    return Status::OK();
114  }
115  return status;
116}
117
118Status InputBuffer::ReadVarint32Fallback(uint32* result) {
119  Status s = ReadVarintFallback(result, core::kMaxVarint32Bytes);
120  if (errors::IsDataLoss(s)) {
121    return errors::DataLoss("Stored data is too large to be a varint32.");
122  }
123  return s;
124}
125
126Status InputBuffer::ReadVarint64Fallback(uint64* result) {
127  Status s = ReadVarintFallback(result, core::kMaxVarint64Bytes);
128  if (errors::IsDataLoss(s)) {
129    return errors::DataLoss("Stored data is too large to be a varint64.");
130  }
131  return s;
132}
133
134template <typename T>
135Status InputBuffer::ReadVarintFallback(T* result, int max_bytes) {
136  uint8 scratch = 0;
137  auto* p = reinterpret_cast<char*>(&scratch);
138  size_t unused_bytes_read = 0;
139
140  *result = 0;
141  for (int index = 0; index < max_bytes; index++) {
142    int shift = 7 * index;
143    TF_RETURN_IF_ERROR(ReadNBytes(1, p, &unused_bytes_read));
144    *result |= (static_cast<T>(scratch) & 127) << shift;
145    if (!(scratch & 128)) return Status::OK();
146  }
147  return errors::DataLoss("Stored data longer than ", max_bytes, " bytes.");
148}
149
150Status InputBuffer::SkipNBytes(int64 bytes_to_skip) {
151  if (bytes_to_skip < 0) {
152    return errors::InvalidArgument("Can only skip forward, not ",
153                                   bytes_to_skip);
154  }
155  int64 bytes_skipped = 0;
156  Status s;
157  while (bytes_skipped < bytes_to_skip) {
158    if (pos_ == limit_) {
159      // Get more data into buffer
160      s = FillBuffer();
161      if (limit_ == buf_) {
162        break;
163      }
164    }
165    const int64 bytes_to_advance =
166        std::min<int64>(limit_ - pos_, bytes_to_skip - bytes_skipped);
167    bytes_skipped += bytes_to_advance;
168    pos_ += bytes_to_advance;
169  }
170  if (errors::IsOutOfRange(s) && bytes_skipped == bytes_to_skip) {
171    return Status::OK();
172  }
173  return s;
174}
175
176Status InputBuffer::Seek(int64 position) {
177  if (position < 0) {
178    return errors::InvalidArgument("Seeking to a negative position: ",
179                                   position);
180  }
181  // Position of the buffer within file.
182  const int64 bufpos = file_pos_ - static_cast<int64>(limit_ - buf_);
183  if (position >= bufpos && position < file_pos_) {
184    // Seeks to somewhere inside the buffer.
185    pos_ = buf_ + (position - bufpos);
186    DCHECK(pos_ >= buf_ && pos_ < limit_);
187  } else {
188    // Seeks to somewhere outside.  Discards the buffered data.
189    pos_ = limit_ = buf_;
190    file_pos_ = position;
191  }
192  return Status::OK();
193}
194
195}  // namespace io
196}  // namespace tensorflow
197