1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include "tensorflow/core/lib/io/inputbuffer.h" 17#include "tensorflow/core/lib/core/errors.h" 18#include "tensorflow/core/platform/logging.h" 19 20namespace tensorflow { 21namespace io { 22 23InputBuffer::InputBuffer(RandomAccessFile* file, size_t buffer_bytes) 24 : file_(file), 25 file_pos_(0), 26 size_(buffer_bytes), 27 buf_(new char[size_]), 28 pos_(buf_), 29 limit_(buf_) {} 30 31InputBuffer::~InputBuffer() { delete[] buf_; } 32 33Status InputBuffer::FillBuffer() { 34 StringPiece data; 35 Status s = file_->Read(file_pos_, size_, &data, buf_); 36 if (data.data() != buf_) { 37 memmove(buf_, data.data(), data.size()); 38 } 39 pos_ = buf_; 40 limit_ = pos_ + data.size(); 41 file_pos_ += data.size(); 42 return s; 43} 44 45Status InputBuffer::ReadLine(string* result) { 46 result->clear(); 47 Status s; 48 do { 49 size_t buf_remain = limit_ - pos_; 50 char* newline = static_cast<char*>(memchr(pos_, '\n', buf_remain)); 51 if (newline != nullptr) { 52 size_t result_len = newline - pos_; 53 result->append(pos_, result_len); 54 pos_ = newline + 1; 55 if (!result->empty() && result->back() == '\r') { 56 result->resize(result->size() - 1); 57 } 58 return Status::OK(); 59 } 60 if (buf_remain > 0) result->append(pos_, buf_remain); 61 // Get more data into buffer 62 s = FillBuffer(); 63 DCHECK_EQ(pos_, buf_); 64 } while (limit_ != buf_); 65 if (!result->empty() && result->back() == '\r') { 66 result->resize(result->size() - 1); 67 } 68 if (errors::IsOutOfRange(s) && !result->empty()) { 69 return Status::OK(); 70 } 71 return s; 72} 73 74Status InputBuffer::ReadNBytes(int64 bytes_to_read, string* result) { 75 result->clear(); 76 if (bytes_to_read < 0) { 77 return errors::InvalidArgument("Can't read a negative number of bytes: ", 78 bytes_to_read); 79 } 80 result->resize(bytes_to_read); 81 size_t bytes_read = 0; 82 Status status = ReadNBytes(bytes_to_read, &(*result)[0], &bytes_read); 83 if (bytes_read < bytes_to_read) result->resize(bytes_read); 84 return status; 85} 86 87Status InputBuffer::ReadNBytes(int64 bytes_to_read, char* result, 88 size_t* bytes_read) { 89 if (bytes_to_read < 0) { 90 return errors::InvalidArgument("Can't read a negative number of bytes: ", 91 bytes_to_read); 92 } 93 Status status; 94 *bytes_read = 0; 95 while (*bytes_read < static_cast<size_t>(bytes_to_read)) { 96 if (pos_ == limit_) { 97 // Get more data into buffer. 98 status = FillBuffer(); 99 if (limit_ == buf_) { 100 break; 101 } 102 } 103 // Do not go over the buffer boundary. 104 const int64 bytes_to_copy = 105 std::min<int64>(limit_ - pos_, bytes_to_read - *bytes_read); 106 // Copies buffered data into the destination. 107 memcpy(result + *bytes_read, pos_, bytes_to_copy); 108 pos_ += bytes_to_copy; 109 *bytes_read += bytes_to_copy; 110 } 111 if (errors::IsOutOfRange(status) && 112 (*bytes_read == static_cast<size_t>(bytes_to_read))) { 113 return Status::OK(); 114 } 115 return status; 116} 117 118Status InputBuffer::ReadVarint32Fallback(uint32* result) { 119 Status s = ReadVarintFallback(result, core::kMaxVarint32Bytes); 120 if (errors::IsDataLoss(s)) { 121 return errors::DataLoss("Stored data is too large to be a varint32."); 122 } 123 return s; 124} 125 126Status InputBuffer::ReadVarint64Fallback(uint64* result) { 127 Status s = ReadVarintFallback(result, core::kMaxVarint64Bytes); 128 if (errors::IsDataLoss(s)) { 129 return errors::DataLoss("Stored data is too large to be a varint64."); 130 } 131 return s; 132} 133 134template <typename T> 135Status InputBuffer::ReadVarintFallback(T* result, int max_bytes) { 136 uint8 scratch = 0; 137 auto* p = reinterpret_cast<char*>(&scratch); 138 size_t unused_bytes_read = 0; 139 140 *result = 0; 141 for (int index = 0; index < max_bytes; index++) { 142 int shift = 7 * index; 143 TF_RETURN_IF_ERROR(ReadNBytes(1, p, &unused_bytes_read)); 144 *result |= (static_cast<T>(scratch) & 127) << shift; 145 if (!(scratch & 128)) return Status::OK(); 146 } 147 return errors::DataLoss("Stored data longer than ", max_bytes, " bytes."); 148} 149 150Status InputBuffer::SkipNBytes(int64 bytes_to_skip) { 151 if (bytes_to_skip < 0) { 152 return errors::InvalidArgument("Can only skip forward, not ", 153 bytes_to_skip); 154 } 155 int64 bytes_skipped = 0; 156 Status s; 157 while (bytes_skipped < bytes_to_skip) { 158 if (pos_ == limit_) { 159 // Get more data into buffer 160 s = FillBuffer(); 161 if (limit_ == buf_) { 162 break; 163 } 164 } 165 const int64 bytes_to_advance = 166 std::min<int64>(limit_ - pos_, bytes_to_skip - bytes_skipped); 167 bytes_skipped += bytes_to_advance; 168 pos_ += bytes_to_advance; 169 } 170 if (errors::IsOutOfRange(s) && bytes_skipped == bytes_to_skip) { 171 return Status::OK(); 172 } 173 return s; 174} 175 176Status InputBuffer::Seek(int64 position) { 177 if (position < 0) { 178 return errors::InvalidArgument("Seeking to a negative position: ", 179 position); 180 } 181 // Position of the buffer within file. 182 const int64 bufpos = file_pos_ - static_cast<int64>(limit_ - buf_); 183 if (position >= bufpos && position < file_pos_) { 184 // Seeks to somewhere inside the buffer. 185 pos_ = buf_ + (position - bufpos); 186 DCHECK(pos_ >= buf_ && pos_ < limit_); 187 } else { 188 // Seeks to somewhere outside. Discards the buffered data. 189 pos_ = limit_ = buf_; 190 file_pos_ = position; 191 } 192 return Status::OK(); 193} 194 195} // namespace io 196} // namespace tensorflow 197