1// Protocol Buffers - Google's data interchange format 2// Copyright 2008 Google Inc. All rights reserved. 3// http://code.google.com/p/protobuf/ 4// 5// Redistribution and use in source and binary forms, with or without 6// modification, are permitted provided that the following conditions are 7// met: 8// 9// * Redistributions of source code must retain the above copyright 10// notice, this list of conditions and the following disclaimer. 11// * Redistributions in binary form must reproduce the above 12// copyright notice, this list of conditions and the following disclaimer 13// in the documentation and/or other materials provided with the 14// distribution. 15// * Neither the name of Google Inc. nor the names of its 16// contributors may be used to endorse or promote products derived from 17// this software without specific prior written permission. 18// 19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 20// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 21// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 22// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 23// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 25// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 26// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 27// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 28// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 29// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 30 31// Author: kenton@google.com (Kenton Varda) 32// Based on original Protocol Buffers design by 33// Sanjay Ghemawat, Jeff Dean, and others. 34 35#ifdef _MSC_VER 36#include <io.h> 37#else 38#include <unistd.h> 39#include <sys/types.h> 40#include <sys/stat.h> 41#include <fcntl.h> 42#endif 43#include <errno.h> 44#include <iostream> 45#include <algorithm> 46 47#include <google/protobuf/io/zero_copy_stream_impl.h> 48#include <google/protobuf/stubs/common.h> 49#include <google/protobuf/stubs/stl_util-inl.h> 50 51namespace google { 52namespace protobuf { 53namespace io { 54 55#ifdef _WIN32 56// Win32 lseek is broken: If invoked on a non-seekable file descriptor, its 57// return value is undefined. We re-define it to always produce an error. 58#define lseek(fd, offset, origin) ((off_t)-1) 59#endif 60 61namespace { 62 63// EINTR sucks. 64int close_no_eintr(int fd) { 65 int result; 66 do { 67 result = close(fd); 68 } while (result < 0 && errno == EINTR); 69 return result; 70} 71 72} // namespace 73 74 75// =================================================================== 76 77FileInputStream::FileInputStream(int file_descriptor, int block_size) 78 : copying_input_(file_descriptor), 79 impl_(©ing_input_, block_size) { 80} 81 82FileInputStream::~FileInputStream() {} 83 84bool FileInputStream::Close() { 85 return copying_input_.Close(); 86} 87 88bool FileInputStream::Next(const void** data, int* size) { 89 return impl_.Next(data, size); 90} 91 92void FileInputStream::BackUp(int count) { 93 impl_.BackUp(count); 94} 95 96bool FileInputStream::Skip(int count) { 97 return impl_.Skip(count); 98} 99 100int64 FileInputStream::ByteCount() const { 101 return impl_.ByteCount(); 102} 103 104FileInputStream::CopyingFileInputStream::CopyingFileInputStream( 105 int file_descriptor) 106 : file_(file_descriptor), 107 close_on_delete_(false), 108 is_closed_(false), 109 errno_(0), 110 previous_seek_failed_(false) { 111} 112 113FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() { 114 if (close_on_delete_) { 115 if (!Close()) { 116 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_); 117 } 118 } 119} 120 121bool FileInputStream::CopyingFileInputStream::Close() { 122 GOOGLE_CHECK(!is_closed_); 123 124 is_closed_ = true; 125 if (close_no_eintr(file_) != 0) { 126 // The docs on close() do not specify whether a file descriptor is still 127 // open after close() fails with EIO. However, the glibc source code 128 // seems to indicate that it is not. 129 errno_ = errno; 130 return false; 131 } 132 133 return true; 134} 135 136int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) { 137 GOOGLE_CHECK(!is_closed_); 138 139 int result; 140 do { 141 result = read(file_, buffer, size); 142 } while (result < 0 && errno == EINTR); 143 144 if (result < 0) { 145 // Read error (not EOF). 146 errno_ = errno; 147 } 148 149 return result; 150} 151 152int FileInputStream::CopyingFileInputStream::Skip(int count) { 153 GOOGLE_CHECK(!is_closed_); 154 155 if (!previous_seek_failed_ && 156 lseek(file_, count, SEEK_CUR) != (off_t)-1) { 157 // Seek succeeded. 158 return count; 159 } else { 160 // Failed to seek. 161 162 // Note to self: Don't seek again. This file descriptor doesn't 163 // support it. 164 previous_seek_failed_ = true; 165 166 // Use the default implementation. 167 return CopyingInputStream::Skip(count); 168 } 169} 170 171// =================================================================== 172 173FileOutputStream::FileOutputStream(int file_descriptor, int block_size) 174 : copying_output_(file_descriptor), 175 impl_(©ing_output_, block_size) { 176} 177 178FileOutputStream::~FileOutputStream() { 179 impl_.Flush(); 180} 181 182bool FileOutputStream::Close() { 183 bool flush_succeeded = impl_.Flush(); 184 return copying_output_.Close() && flush_succeeded; 185} 186 187bool FileOutputStream::Flush() { 188 return impl_.Flush(); 189} 190 191bool FileOutputStream::Next(void** data, int* size) { 192 return impl_.Next(data, size); 193} 194 195void FileOutputStream::BackUp(int count) { 196 impl_.BackUp(count); 197} 198 199int64 FileOutputStream::ByteCount() const { 200 return impl_.ByteCount(); 201} 202 203FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream( 204 int file_descriptor) 205 : file_(file_descriptor), 206 close_on_delete_(false), 207 is_closed_(false), 208 errno_(0) { 209} 210 211FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() { 212 if (close_on_delete_) { 213 if (!Close()) { 214 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_); 215 } 216 } 217} 218 219bool FileOutputStream::CopyingFileOutputStream::Close() { 220 GOOGLE_CHECK(!is_closed_); 221 222 is_closed_ = true; 223 if (close_no_eintr(file_) != 0) { 224 // The docs on close() do not specify whether a file descriptor is still 225 // open after close() fails with EIO. However, the glibc source code 226 // seems to indicate that it is not. 227 errno_ = errno; 228 return false; 229 } 230 231 return true; 232} 233 234bool FileOutputStream::CopyingFileOutputStream::Write( 235 const void* buffer, int size) { 236 GOOGLE_CHECK(!is_closed_); 237 int total_written = 0; 238 239 const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer); 240 241 while (total_written < size) { 242 int bytes; 243 do { 244 bytes = write(file_, buffer_base + total_written, size - total_written); 245 } while (bytes < 0 && errno == EINTR); 246 247 if (bytes <= 0) { 248 // Write error. 249 250 // FIXME(kenton): According to the man page, if write() returns zero, 251 // there was no error; write() simply did not write anything. It's 252 // unclear under what circumstances this might happen, but presumably 253 // errno won't be set in this case. I am confused as to how such an 254 // event should be handled. For now I'm treating it as an error, since 255 // retrying seems like it could lead to an infinite loop. I suspect 256 // this never actually happens anyway. 257 258 if (bytes < 0) { 259 errno_ = errno; 260 } 261 return false; 262 } 263 total_written += bytes; 264 } 265 266 return true; 267} 268 269// =================================================================== 270 271IstreamInputStream::IstreamInputStream(istream* input, int block_size) 272 : copying_input_(input), 273 impl_(©ing_input_, block_size) { 274} 275 276IstreamInputStream::~IstreamInputStream() {} 277 278bool IstreamInputStream::Next(const void** data, int* size) { 279 return impl_.Next(data, size); 280} 281 282void IstreamInputStream::BackUp(int count) { 283 impl_.BackUp(count); 284} 285 286bool IstreamInputStream::Skip(int count) { 287 return impl_.Skip(count); 288} 289 290int64 IstreamInputStream::ByteCount() const { 291 return impl_.ByteCount(); 292} 293 294IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream( 295 istream* input) 296 : input_(input) { 297} 298 299IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {} 300 301int IstreamInputStream::CopyingIstreamInputStream::Read( 302 void* buffer, int size) { 303 input_->read(reinterpret_cast<char*>(buffer), size); 304 int result = input_->gcount(); 305 if (result == 0 && input_->fail() && !input_->eof()) { 306 return -1; 307 } 308 return result; 309} 310 311// =================================================================== 312 313OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size) 314 : copying_output_(output), 315 impl_(©ing_output_, block_size) { 316} 317 318OstreamOutputStream::~OstreamOutputStream() { 319 impl_.Flush(); 320} 321 322bool OstreamOutputStream::Next(void** data, int* size) { 323 return impl_.Next(data, size); 324} 325 326void OstreamOutputStream::BackUp(int count) { 327 impl_.BackUp(count); 328} 329 330int64 OstreamOutputStream::ByteCount() const { 331 return impl_.ByteCount(); 332} 333 334OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream( 335 ostream* output) 336 : output_(output) { 337} 338 339OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() { 340} 341 342bool OstreamOutputStream::CopyingOstreamOutputStream::Write( 343 const void* buffer, int size) { 344 output_->write(reinterpret_cast<const char*>(buffer), size); 345 return output_->good(); 346} 347 348// =================================================================== 349 350ConcatenatingInputStream::ConcatenatingInputStream( 351 ZeroCopyInputStream* const streams[], int count) 352 : streams_(streams), stream_count_(count), bytes_retired_(0) { 353} 354 355ConcatenatingInputStream::~ConcatenatingInputStream() { 356} 357 358bool ConcatenatingInputStream::Next(const void** data, int* size) { 359 while (stream_count_ > 0) { 360 if (streams_[0]->Next(data, size)) return true; 361 362 // That stream is done. Advance to the next one. 363 bytes_retired_ += streams_[0]->ByteCount(); 364 ++streams_; 365 --stream_count_; 366 } 367 368 // No more streams. 369 return false; 370} 371 372void ConcatenatingInputStream::BackUp(int count) { 373 if (stream_count_ > 0) { 374 streams_[0]->BackUp(count); 375 } else { 376 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next()."; 377 } 378} 379 380bool ConcatenatingInputStream::Skip(int count) { 381 while (stream_count_ > 0) { 382 // Assume that ByteCount() can be used to find out how much we actually 383 // skipped when Skip() fails. 384 int64 target_byte_count = streams_[0]->ByteCount() + count; 385 if (streams_[0]->Skip(count)) return true; 386 387 // Hit the end of the stream. Figure out how many more bytes we still have 388 // to skip. 389 int64 final_byte_count = streams_[0]->ByteCount(); 390 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count); 391 count = target_byte_count - final_byte_count; 392 393 // That stream is done. Advance to the next one. 394 bytes_retired_ += final_byte_count; 395 ++streams_; 396 --stream_count_; 397 } 398 399 return false; 400} 401 402int64 ConcatenatingInputStream::ByteCount() const { 403 if (stream_count_ == 0) { 404 return bytes_retired_; 405 } else { 406 return bytes_retired_ + streams_[0]->ByteCount(); 407 } 408} 409 410 411// =================================================================== 412 413LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input, 414 int64 limit) 415 : input_(input), limit_(limit) {} 416 417LimitingInputStream::~LimitingInputStream() { 418 // If we overshot the limit, back up. 419 if (limit_ < 0) input_->BackUp(-limit_); 420} 421 422bool LimitingInputStream::Next(const void** data, int* size) { 423 if (limit_ <= 0) return false; 424 if (!input_->Next(data, size)) return false; 425 426 limit_ -= *size; 427 if (limit_ < 0) { 428 // We overshot the limit. Reduce *size to hide the rest of the buffer. 429 *size += limit_; 430 } 431 return true; 432} 433 434void LimitingInputStream::BackUp(int count) { 435 if (limit_ < 0) { 436 input_->BackUp(count - limit_); 437 limit_ = count; 438 } else { 439 input_->BackUp(count); 440 limit_ += count; 441 } 442} 443 444bool LimitingInputStream::Skip(int count) { 445 if (count > limit_) { 446 if (limit_ < 0) return false; 447 input_->Skip(limit_); 448 limit_ = 0; 449 return false; 450 } else { 451 if (!input_->Skip(count)) return false; 452 limit_ -= count; 453 return true; 454 } 455} 456 457int64 LimitingInputStream::ByteCount() const { 458 if (limit_ < 0) { 459 return input_->ByteCount() + limit_; 460 } else { 461 return input_->ByteCount(); 462 } 463} 464 465 466// =================================================================== 467 468} // namespace io 469} // namespace protobuf 470} // namespace google 471