1// Protocol Buffers - Google's data interchange format
2// Copyright 2008 Google Inc.  All rights reserved.
3// http://code.google.com/p/protobuf/
4//
5// Redistribution and use in source and binary forms, with or without
6// modification, are permitted provided that the following conditions are
7// met:
8//
9//     * Redistributions of source code must retain the above copyright
10// notice, this list of conditions and the following disclaimer.
11//     * Redistributions in binary form must reproduce the above
12// copyright notice, this list of conditions and the following disclaimer
13// in the documentation and/or other materials provided with the
14// distribution.
15//     * Neither the name of Google Inc. nor the names of its
16// contributors may be used to endorse or promote products derived from
17// this software without specific prior written permission.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31// Author: kenton@google.com (Kenton Varda)
32//  Based on original Protocol Buffers design by
33//  Sanjay Ghemawat, Jeff Dean, and others.
34
35#ifdef _MSC_VER
36#include <io.h>
37#else
38#include <unistd.h>
39#include <sys/types.h>
40#include <sys/stat.h>
41#include <fcntl.h>
42#endif
43#include <errno.h>
44#include <iostream>
45#include <algorithm>
46
47#include <google/protobuf/io/zero_copy_stream_impl.h>
48#include <google/protobuf/stubs/common.h>
49#include <google/protobuf/stubs/stl_util-inl.h>
50
51namespace google {
52namespace protobuf {
53namespace io {
54
55#ifdef _WIN32
56// Win32 lseek is broken:  If invoked on a non-seekable file descriptor, its
57// return value is undefined.  We re-define it to always produce an error.
58#define lseek(fd, offset, origin) ((off_t)-1)
59#endif
60
61namespace {
62
63// EINTR sucks.
64int close_no_eintr(int fd) {
65  int result;
66  do {
67    result = close(fd);
68  } while (result < 0 && errno == EINTR);
69  return result;
70}
71
72}  // namespace
73
74
75// ===================================================================
76
77FileInputStream::FileInputStream(int file_descriptor, int block_size)
78  : copying_input_(file_descriptor),
79    impl_(&copying_input_, block_size) {
80}
81
82FileInputStream::~FileInputStream() {}
83
84bool FileInputStream::Close() {
85  return copying_input_.Close();
86}
87
88bool FileInputStream::Next(const void** data, int* size) {
89  return impl_.Next(data, size);
90}
91
92void FileInputStream::BackUp(int count) {
93  impl_.BackUp(count);
94}
95
96bool FileInputStream::Skip(int count) {
97  return impl_.Skip(count);
98}
99
100int64 FileInputStream::ByteCount() const {
101  return impl_.ByteCount();
102}
103
104FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
105    int file_descriptor)
106  : file_(file_descriptor),
107    close_on_delete_(false),
108    is_closed_(false),
109    errno_(0),
110    previous_seek_failed_(false) {
111}
112
113FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
114  if (close_on_delete_) {
115    if (!Close()) {
116      GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
117    }
118  }
119}
120
121bool FileInputStream::CopyingFileInputStream::Close() {
122  GOOGLE_CHECK(!is_closed_);
123
124  is_closed_ = true;
125  if (close_no_eintr(file_) != 0) {
126    // The docs on close() do not specify whether a file descriptor is still
127    // open after close() fails with EIO.  However, the glibc source code
128    // seems to indicate that it is not.
129    errno_ = errno;
130    return false;
131  }
132
133  return true;
134}
135
136int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
137  GOOGLE_CHECK(!is_closed_);
138
139  int result;
140  do {
141    result = read(file_, buffer, size);
142  } while (result < 0 && errno == EINTR);
143
144  if (result < 0) {
145    // Read error (not EOF).
146    errno_ = errno;
147  }
148
149  return result;
150}
151
152int FileInputStream::CopyingFileInputStream::Skip(int count) {
153  GOOGLE_CHECK(!is_closed_);
154
155  if (!previous_seek_failed_ &&
156      lseek(file_, count, SEEK_CUR) != (off_t)-1) {
157    // Seek succeeded.
158    return count;
159  } else {
160    // Failed to seek.
161
162    // Note to self:  Don't seek again.  This file descriptor doesn't
163    // support it.
164    previous_seek_failed_ = true;
165
166    // Use the default implementation.
167    return CopyingInputStream::Skip(count);
168  }
169}
170
171// ===================================================================
172
173FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
174  : copying_output_(file_descriptor),
175    impl_(&copying_output_, block_size) {
176}
177
178FileOutputStream::~FileOutputStream() {
179  impl_.Flush();
180}
181
182bool FileOutputStream::Close() {
183  bool flush_succeeded = impl_.Flush();
184  return copying_output_.Close() && flush_succeeded;
185}
186
187bool FileOutputStream::Flush() {
188  return impl_.Flush();
189}
190
191bool FileOutputStream::Next(void** data, int* size) {
192  return impl_.Next(data, size);
193}
194
195void FileOutputStream::BackUp(int count) {
196  impl_.BackUp(count);
197}
198
199int64 FileOutputStream::ByteCount() const {
200  return impl_.ByteCount();
201}
202
203FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
204    int file_descriptor)
205  : file_(file_descriptor),
206    close_on_delete_(false),
207    is_closed_(false),
208    errno_(0) {
209}
210
211FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
212  if (close_on_delete_) {
213    if (!Close()) {
214      GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
215    }
216  }
217}
218
219bool FileOutputStream::CopyingFileOutputStream::Close() {
220  GOOGLE_CHECK(!is_closed_);
221
222  is_closed_ = true;
223  if (close_no_eintr(file_) != 0) {
224    // The docs on close() do not specify whether a file descriptor is still
225    // open after close() fails with EIO.  However, the glibc source code
226    // seems to indicate that it is not.
227    errno_ = errno;
228    return false;
229  }
230
231  return true;
232}
233
234bool FileOutputStream::CopyingFileOutputStream::Write(
235    const void* buffer, int size) {
236  GOOGLE_CHECK(!is_closed_);
237  int total_written = 0;
238
239  const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
240
241  while (total_written < size) {
242    int bytes;
243    do {
244      bytes = write(file_, buffer_base + total_written, size - total_written);
245    } while (bytes < 0 && errno == EINTR);
246
247    if (bytes <= 0) {
248      // Write error.
249
250      // FIXME(kenton):  According to the man page, if write() returns zero,
251      //   there was no error; write() simply did not write anything.  It's
252      //   unclear under what circumstances this might happen, but presumably
253      //   errno won't be set in this case.  I am confused as to how such an
254      //   event should be handled.  For now I'm treating it as an error, since
255      //   retrying seems like it could lead to an infinite loop.  I suspect
256      //   this never actually happens anyway.
257
258      if (bytes < 0) {
259        errno_ = errno;
260      }
261      return false;
262    }
263    total_written += bytes;
264  }
265
266  return true;
267}
268
269// ===================================================================
270
271IstreamInputStream::IstreamInputStream(istream* input, int block_size)
272  : copying_input_(input),
273    impl_(&copying_input_, block_size) {
274}
275
276IstreamInputStream::~IstreamInputStream() {}
277
278bool IstreamInputStream::Next(const void** data, int* size) {
279  return impl_.Next(data, size);
280}
281
282void IstreamInputStream::BackUp(int count) {
283  impl_.BackUp(count);
284}
285
286bool IstreamInputStream::Skip(int count) {
287  return impl_.Skip(count);
288}
289
290int64 IstreamInputStream::ByteCount() const {
291  return impl_.ByteCount();
292}
293
294IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
295    istream* input)
296  : input_(input) {
297}
298
299IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
300
301int IstreamInputStream::CopyingIstreamInputStream::Read(
302    void* buffer, int size) {
303  input_->read(reinterpret_cast<char*>(buffer), size);
304  int result = input_->gcount();
305  if (result == 0 && input_->fail() && !input_->eof()) {
306    return -1;
307  }
308  return result;
309}
310
311// ===================================================================
312
313OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
314  : copying_output_(output),
315    impl_(&copying_output_, block_size) {
316}
317
318OstreamOutputStream::~OstreamOutputStream() {
319  impl_.Flush();
320}
321
322bool OstreamOutputStream::Next(void** data, int* size) {
323  return impl_.Next(data, size);
324}
325
326void OstreamOutputStream::BackUp(int count) {
327  impl_.BackUp(count);
328}
329
330int64 OstreamOutputStream::ByteCount() const {
331  return impl_.ByteCount();
332}
333
334OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
335    ostream* output)
336  : output_(output) {
337}
338
339OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
340}
341
342bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
343    const void* buffer, int size) {
344  output_->write(reinterpret_cast<const char*>(buffer), size);
345  return output_->good();
346}
347
348// ===================================================================
349
350ConcatenatingInputStream::ConcatenatingInputStream(
351    ZeroCopyInputStream* const streams[], int count)
352  : streams_(streams), stream_count_(count), bytes_retired_(0) {
353}
354
355ConcatenatingInputStream::~ConcatenatingInputStream() {
356}
357
358bool ConcatenatingInputStream::Next(const void** data, int* size) {
359  while (stream_count_ > 0) {
360    if (streams_[0]->Next(data, size)) return true;
361
362    // That stream is done.  Advance to the next one.
363    bytes_retired_ += streams_[0]->ByteCount();
364    ++streams_;
365    --stream_count_;
366  }
367
368  // No more streams.
369  return false;
370}
371
372void ConcatenatingInputStream::BackUp(int count) {
373  if (stream_count_ > 0) {
374    streams_[0]->BackUp(count);
375  } else {
376    GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
377  }
378}
379
380bool ConcatenatingInputStream::Skip(int count) {
381  while (stream_count_ > 0) {
382    // Assume that ByteCount() can be used to find out how much we actually
383    // skipped when Skip() fails.
384    int64 target_byte_count = streams_[0]->ByteCount() + count;
385    if (streams_[0]->Skip(count)) return true;
386
387    // Hit the end of the stream.  Figure out how many more bytes we still have
388    // to skip.
389    int64 final_byte_count = streams_[0]->ByteCount();
390    GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
391    count = target_byte_count - final_byte_count;
392
393    // That stream is done.  Advance to the next one.
394    bytes_retired_ += final_byte_count;
395    ++streams_;
396    --stream_count_;
397  }
398
399  return false;
400}
401
402int64 ConcatenatingInputStream::ByteCount() const {
403  if (stream_count_ == 0) {
404    return bytes_retired_;
405  } else {
406    return bytes_retired_ + streams_[0]->ByteCount();
407  }
408}
409
410
411// ===================================================================
412
413LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
414                                         int64 limit)
415  : input_(input), limit_(limit) {}
416
417LimitingInputStream::~LimitingInputStream() {
418  // If we overshot the limit, back up.
419  if (limit_ < 0) input_->BackUp(-limit_);
420}
421
422bool LimitingInputStream::Next(const void** data, int* size) {
423  if (limit_ <= 0) return false;
424  if (!input_->Next(data, size)) return false;
425
426  limit_ -= *size;
427  if (limit_ < 0) {
428    // We overshot the limit.  Reduce *size to hide the rest of the buffer.
429    *size += limit_;
430  }
431  return true;
432}
433
434void LimitingInputStream::BackUp(int count) {
435  if (limit_ < 0) {
436    input_->BackUp(count - limit_);
437    limit_ = count;
438  } else {
439    input_->BackUp(count);
440    limit_ += count;
441  }
442}
443
444bool LimitingInputStream::Skip(int count) {
445  if (count > limit_) {
446    if (limit_ < 0) return false;
447    input_->Skip(limit_);
448    limit_ = 0;
449    return false;
450  } else {
451    if (!input_->Skip(count)) return false;
452    limit_ -= count;
453    return true;
454  }
455}
456
457int64 LimitingInputStream::ByteCount() const {
458  if (limit_ < 0) {
459    return input_->ByteCount() + limit_;
460  } else {
461    return input_->ByteCount();
462  }
463}
464
465
466// ===================================================================
467
468}  // namespace io
469}  // namespace protobuf
470}  // namespace google
471