1// Protocol Buffers - Google's data interchange format 2// Copyright 2008 Google Inc. All rights reserved. 3// http://code.google.com/p/protobuf/ 4// 5// Redistribution and use in source and binary forms, with or without 6// modification, are permitted provided that the following conditions are 7// met: 8// 9// * Redistributions of source code must retain the above copyright 10// notice, this list of conditions and the following disclaimer. 11// * Redistributions in binary form must reproduce the above 12// copyright notice, this list of conditions and the following disclaimer 13// in the documentation and/or other materials provided with the 14// distribution. 15// * Neither the name of Google Inc. nor the names of its 16// contributors may be used to endorse or promote products derived from 17// this software without specific prior written permission. 18// 19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 20// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 21// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 22// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 23// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 25// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 26// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 27// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 28// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 29// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 30 31// Author: kenton@google.com (Kenton Varda) 32// Based on original Protocol Buffers design by 33// Sanjay Ghemawat, Jeff Dean, and others. 34 35#ifdef _MSC_VER 36#include <io.h> 37#else 38#include <unistd.h> 39#include <sys/types.h> 40#include <sys/stat.h> 41#include <fcntl.h> 42#endif 43#include <errno.h> 44#include <iostream> 45#include <algorithm> 46 47#include <google/protobuf/io/zero_copy_stream_impl.h> 48#include <google/protobuf/stubs/common.h> 49#include <google/protobuf/stubs/stl_util.h> 50 51 52namespace google { 53namespace protobuf { 54namespace io { 55 56#ifdef _WIN32 57// Win32 lseek is broken: If invoked on a non-seekable file descriptor, its 58// return value is undefined. We re-define it to always produce an error. 59#define lseek(fd, offset, origin) ((off_t)-1) 60#endif 61 62namespace { 63 64// EINTR sucks. 65int close_no_eintr(int fd) { 66 int result; 67 do { 68 result = close(fd); 69 } while (result < 0 && errno == EINTR); 70 return result; 71} 72 73} // namespace 74 75 76// =================================================================== 77 78FileInputStream::FileInputStream(int file_descriptor, int block_size) 79 : copying_input_(file_descriptor), 80 impl_(©ing_input_, block_size) { 81} 82 83FileInputStream::~FileInputStream() {} 84 85bool FileInputStream::Close() { 86 return copying_input_.Close(); 87} 88 89bool FileInputStream::Next(const void** data, int* size) { 90 return impl_.Next(data, size); 91} 92 93void FileInputStream::BackUp(int count) { 94 impl_.BackUp(count); 95} 96 97bool FileInputStream::Skip(int count) { 98 return impl_.Skip(count); 99} 100 101int64 FileInputStream::ByteCount() const { 102 return impl_.ByteCount(); 103} 104 105FileInputStream::CopyingFileInputStream::CopyingFileInputStream( 106 int file_descriptor) 107 : file_(file_descriptor), 108 close_on_delete_(false), 109 is_closed_(false), 110 errno_(0), 111 previous_seek_failed_(false) { 112} 113 114FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() { 115 if (close_on_delete_) { 116 if (!Close()) { 117 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_); 118 } 119 } 120} 121 122bool FileInputStream::CopyingFileInputStream::Close() { 123 GOOGLE_CHECK(!is_closed_); 124 125 is_closed_ = true; 126 if (close_no_eintr(file_) != 0) { 127 // The docs on close() do not specify whether a file descriptor is still 128 // open after close() fails with EIO. However, the glibc source code 129 // seems to indicate that it is not. 130 errno_ = errno; 131 return false; 132 } 133 134 return true; 135} 136 137int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) { 138 GOOGLE_CHECK(!is_closed_); 139 140 int result; 141 do { 142 result = read(file_, buffer, size); 143 } while (result < 0 && errno == EINTR); 144 145 if (result < 0) { 146 // Read error (not EOF). 147 errno_ = errno; 148 } 149 150 return result; 151} 152 153int FileInputStream::CopyingFileInputStream::Skip(int count) { 154 GOOGLE_CHECK(!is_closed_); 155 156 if (!previous_seek_failed_ && 157 lseek(file_, count, SEEK_CUR) != (off_t)-1) { 158 // Seek succeeded. 159 return count; 160 } else { 161 // Failed to seek. 162 163 // Note to self: Don't seek again. This file descriptor doesn't 164 // support it. 165 previous_seek_failed_ = true; 166 167 // Use the default implementation. 168 return CopyingInputStream::Skip(count); 169 } 170} 171 172// =================================================================== 173 174FileOutputStream::FileOutputStream(int file_descriptor, int block_size) 175 : copying_output_(file_descriptor), 176 impl_(©ing_output_, block_size) { 177} 178 179FileOutputStream::~FileOutputStream() { 180 impl_.Flush(); 181} 182 183bool FileOutputStream::Close() { 184 bool flush_succeeded = impl_.Flush(); 185 return copying_output_.Close() && flush_succeeded; 186} 187 188bool FileOutputStream::Flush() { 189 return impl_.Flush(); 190} 191 192bool FileOutputStream::Next(void** data, int* size) { 193 return impl_.Next(data, size); 194} 195 196void FileOutputStream::BackUp(int count) { 197 impl_.BackUp(count); 198} 199 200int64 FileOutputStream::ByteCount() const { 201 return impl_.ByteCount(); 202} 203 204FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream( 205 int file_descriptor) 206 : file_(file_descriptor), 207 close_on_delete_(false), 208 is_closed_(false), 209 errno_(0) { 210} 211 212FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() { 213 if (close_on_delete_) { 214 if (!Close()) { 215 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_); 216 } 217 } 218} 219 220bool FileOutputStream::CopyingFileOutputStream::Close() { 221 GOOGLE_CHECK(!is_closed_); 222 223 is_closed_ = true; 224 if (close_no_eintr(file_) != 0) { 225 // The docs on close() do not specify whether a file descriptor is still 226 // open after close() fails with EIO. However, the glibc source code 227 // seems to indicate that it is not. 228 errno_ = errno; 229 return false; 230 } 231 232 return true; 233} 234 235bool FileOutputStream::CopyingFileOutputStream::Write( 236 const void* buffer, int size) { 237 GOOGLE_CHECK(!is_closed_); 238 int total_written = 0; 239 240 const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer); 241 242 while (total_written < size) { 243 int bytes; 244 do { 245 bytes = write(file_, buffer_base + total_written, size - total_written); 246 } while (bytes < 0 && errno == EINTR); 247 248 if (bytes <= 0) { 249 // Write error. 250 251 // FIXME(kenton): According to the man page, if write() returns zero, 252 // there was no error; write() simply did not write anything. It's 253 // unclear under what circumstances this might happen, but presumably 254 // errno won't be set in this case. I am confused as to how such an 255 // event should be handled. For now I'm treating it as an error, since 256 // retrying seems like it could lead to an infinite loop. I suspect 257 // this never actually happens anyway. 258 259 if (bytes < 0) { 260 errno_ = errno; 261 } 262 return false; 263 } 264 total_written += bytes; 265 } 266 267 return true; 268} 269 270// =================================================================== 271 272IstreamInputStream::IstreamInputStream(istream* input, int block_size) 273 : copying_input_(input), 274 impl_(©ing_input_, block_size) { 275} 276 277IstreamInputStream::~IstreamInputStream() {} 278 279bool IstreamInputStream::Next(const void** data, int* size) { 280 return impl_.Next(data, size); 281} 282 283void IstreamInputStream::BackUp(int count) { 284 impl_.BackUp(count); 285} 286 287bool IstreamInputStream::Skip(int count) { 288 return impl_.Skip(count); 289} 290 291int64 IstreamInputStream::ByteCount() const { 292 return impl_.ByteCount(); 293} 294 295IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream( 296 istream* input) 297 : input_(input) { 298} 299 300IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {} 301 302int IstreamInputStream::CopyingIstreamInputStream::Read( 303 void* buffer, int size) { 304 input_->read(reinterpret_cast<char*>(buffer), size); 305 int result = input_->gcount(); 306 if (result == 0 && input_->fail() && !input_->eof()) { 307 return -1; 308 } 309 return result; 310} 311 312// =================================================================== 313 314OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size) 315 : copying_output_(output), 316 impl_(©ing_output_, block_size) { 317} 318 319OstreamOutputStream::~OstreamOutputStream() { 320 impl_.Flush(); 321} 322 323bool OstreamOutputStream::Next(void** data, int* size) { 324 return impl_.Next(data, size); 325} 326 327void OstreamOutputStream::BackUp(int count) { 328 impl_.BackUp(count); 329} 330 331int64 OstreamOutputStream::ByteCount() const { 332 return impl_.ByteCount(); 333} 334 335OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream( 336 ostream* output) 337 : output_(output) { 338} 339 340OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() { 341} 342 343bool OstreamOutputStream::CopyingOstreamOutputStream::Write( 344 const void* buffer, int size) { 345 output_->write(reinterpret_cast<const char*>(buffer), size); 346 return output_->good(); 347} 348 349// =================================================================== 350 351ConcatenatingInputStream::ConcatenatingInputStream( 352 ZeroCopyInputStream* const streams[], int count) 353 : streams_(streams), stream_count_(count), bytes_retired_(0) { 354} 355 356ConcatenatingInputStream::~ConcatenatingInputStream() { 357} 358 359bool ConcatenatingInputStream::Next(const void** data, int* size) { 360 while (stream_count_ > 0) { 361 if (streams_[0]->Next(data, size)) return true; 362 363 // That stream is done. Advance to the next one. 364 bytes_retired_ += streams_[0]->ByteCount(); 365 ++streams_; 366 --stream_count_; 367 } 368 369 // No more streams. 370 return false; 371} 372 373void ConcatenatingInputStream::BackUp(int count) { 374 if (stream_count_ > 0) { 375 streams_[0]->BackUp(count); 376 } else { 377 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next()."; 378 } 379} 380 381bool ConcatenatingInputStream::Skip(int count) { 382 while (stream_count_ > 0) { 383 // Assume that ByteCount() can be used to find out how much we actually 384 // skipped when Skip() fails. 385 int64 target_byte_count = streams_[0]->ByteCount() + count; 386 if (streams_[0]->Skip(count)) return true; 387 388 // Hit the end of the stream. Figure out how many more bytes we still have 389 // to skip. 390 int64 final_byte_count = streams_[0]->ByteCount(); 391 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count); 392 count = target_byte_count - final_byte_count; 393 394 // That stream is done. Advance to the next one. 395 bytes_retired_ += final_byte_count; 396 ++streams_; 397 --stream_count_; 398 } 399 400 return false; 401} 402 403int64 ConcatenatingInputStream::ByteCount() const { 404 if (stream_count_ == 0) { 405 return bytes_retired_; 406 } else { 407 return bytes_retired_ + streams_[0]->ByteCount(); 408 } 409} 410 411 412// =================================================================== 413 414LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input, 415 int64 limit) 416 : input_(input), limit_(limit) {} 417 418LimitingInputStream::~LimitingInputStream() { 419 // If we overshot the limit, back up. 420 if (limit_ < 0) input_->BackUp(-limit_); 421} 422 423bool LimitingInputStream::Next(const void** data, int* size) { 424 if (limit_ <= 0) return false; 425 if (!input_->Next(data, size)) return false; 426 427 limit_ -= *size; 428 if (limit_ < 0) { 429 // We overshot the limit. Reduce *size to hide the rest of the buffer. 430 *size += limit_; 431 } 432 return true; 433} 434 435void LimitingInputStream::BackUp(int count) { 436 if (limit_ < 0) { 437 input_->BackUp(count - limit_); 438 limit_ = count; 439 } else { 440 input_->BackUp(count); 441 limit_ += count; 442 } 443} 444 445bool LimitingInputStream::Skip(int count) { 446 if (count > limit_) { 447 if (limit_ < 0) return false; 448 input_->Skip(limit_); 449 limit_ = 0; 450 return false; 451 } else { 452 if (!input_->Skip(count)) return false; 453 limit_ -= count; 454 return true; 455 } 456} 457 458int64 LimitingInputStream::ByteCount() const { 459 if (limit_ < 0) { 460 return input_->ByteCount() + limit_; 461 } else { 462 return input_->ByteCount(); 463 } 464} 465 466 467// =================================================================== 468 469} // namespace io 470} // namespace protobuf 471} // namespace google 472