1// Protocol Buffers - Google's data interchange format
2// Copyright 2008 Google Inc.  All rights reserved.
3// http://code.google.com/p/protobuf/
4//
5// Redistribution and use in source and binary forms, with or without
6// modification, are permitted provided that the following conditions are
7// met:
8//
9//     * Redistributions of source code must retain the above copyright
10// notice, this list of conditions and the following disclaimer.
11//     * Redistributions in binary form must reproduce the above
12// copyright notice, this list of conditions and the following disclaimer
13// in the documentation and/or other materials provided with the
14// distribution.
15//     * Neither the name of Google Inc. nor the names of its
16// contributors may be used to endorse or promote products derived from
17// this software without specific prior written permission.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31// Author: kenton@google.com (Kenton Varda)
32//  Based on original Protocol Buffers design by
33//  Sanjay Ghemawat, Jeff Dean, and others.
34
35#ifdef _MSC_VER
36#include <io.h>
37#else
38#include <unistd.h>
39#include <sys/types.h>
40#include <sys/stat.h>
41#include <fcntl.h>
42#endif
43#include <errno.h>
44#include <iostream>
45#include <algorithm>
46
47#include <google/protobuf/io/zero_copy_stream_impl.h>
48#include <google/protobuf/stubs/common.h>
49#include <google/protobuf/stubs/stl_util.h>
50
51
52namespace google {
53namespace protobuf {
54namespace io {
55
56#ifdef _WIN32
57// Win32 lseek is broken:  If invoked on a non-seekable file descriptor, its
58// return value is undefined.  We re-define it to always produce an error.
59#define lseek(fd, offset, origin) ((off_t)-1)
60#endif
61
62namespace {
63
64// EINTR sucks.
65int close_no_eintr(int fd) {
66  int result;
67  do {
68    result = close(fd);
69  } while (result < 0 && errno == EINTR);
70  return result;
71}
72
73}  // namespace
74
75
76// ===================================================================
77
78FileInputStream::FileInputStream(int file_descriptor, int block_size)
79  : copying_input_(file_descriptor),
80    impl_(&copying_input_, block_size) {
81}
82
83FileInputStream::~FileInputStream() {}
84
85bool FileInputStream::Close() {
86  return copying_input_.Close();
87}
88
89bool FileInputStream::Next(const void** data, int* size) {
90  return impl_.Next(data, size);
91}
92
93void FileInputStream::BackUp(int count) {
94  impl_.BackUp(count);
95}
96
97bool FileInputStream::Skip(int count) {
98  return impl_.Skip(count);
99}
100
101int64 FileInputStream::ByteCount() const {
102  return impl_.ByteCount();
103}
104
105FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
106    int file_descriptor)
107  : file_(file_descriptor),
108    close_on_delete_(false),
109    is_closed_(false),
110    errno_(0),
111    previous_seek_failed_(false) {
112}
113
114FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
115  if (close_on_delete_) {
116    if (!Close()) {
117      GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
118    }
119  }
120}
121
122bool FileInputStream::CopyingFileInputStream::Close() {
123  GOOGLE_CHECK(!is_closed_);
124
125  is_closed_ = true;
126  if (close_no_eintr(file_) != 0) {
127    // The docs on close() do not specify whether a file descriptor is still
128    // open after close() fails with EIO.  However, the glibc source code
129    // seems to indicate that it is not.
130    errno_ = errno;
131    return false;
132  }
133
134  return true;
135}
136
137int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
138  GOOGLE_CHECK(!is_closed_);
139
140  int result;
141  do {
142    result = read(file_, buffer, size);
143  } while (result < 0 && errno == EINTR);
144
145  if (result < 0) {
146    // Read error (not EOF).
147    errno_ = errno;
148  }
149
150  return result;
151}
152
153int FileInputStream::CopyingFileInputStream::Skip(int count) {
154  GOOGLE_CHECK(!is_closed_);
155
156  if (!previous_seek_failed_ &&
157      lseek(file_, count, SEEK_CUR) != (off_t)-1) {
158    // Seek succeeded.
159    return count;
160  } else {
161    // Failed to seek.
162
163    // Note to self:  Don't seek again.  This file descriptor doesn't
164    // support it.
165    previous_seek_failed_ = true;
166
167    // Use the default implementation.
168    return CopyingInputStream::Skip(count);
169  }
170}
171
172// ===================================================================
173
174FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
175  : copying_output_(file_descriptor),
176    impl_(&copying_output_, block_size) {
177}
178
179FileOutputStream::~FileOutputStream() {
180  impl_.Flush();
181}
182
183bool FileOutputStream::Close() {
184  bool flush_succeeded = impl_.Flush();
185  return copying_output_.Close() && flush_succeeded;
186}
187
188bool FileOutputStream::Flush() {
189  return impl_.Flush();
190}
191
192bool FileOutputStream::Next(void** data, int* size) {
193  return impl_.Next(data, size);
194}
195
196void FileOutputStream::BackUp(int count) {
197  impl_.BackUp(count);
198}
199
200int64 FileOutputStream::ByteCount() const {
201  return impl_.ByteCount();
202}
203
204FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
205    int file_descriptor)
206  : file_(file_descriptor),
207    close_on_delete_(false),
208    is_closed_(false),
209    errno_(0) {
210}
211
212FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
213  if (close_on_delete_) {
214    if (!Close()) {
215      GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
216    }
217  }
218}
219
220bool FileOutputStream::CopyingFileOutputStream::Close() {
221  GOOGLE_CHECK(!is_closed_);
222
223  is_closed_ = true;
224  if (close_no_eintr(file_) != 0) {
225    // The docs on close() do not specify whether a file descriptor is still
226    // open after close() fails with EIO.  However, the glibc source code
227    // seems to indicate that it is not.
228    errno_ = errno;
229    return false;
230  }
231
232  return true;
233}
234
235bool FileOutputStream::CopyingFileOutputStream::Write(
236    const void* buffer, int size) {
237  GOOGLE_CHECK(!is_closed_);
238  int total_written = 0;
239
240  const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
241
242  while (total_written < size) {
243    int bytes;
244    do {
245      bytes = write(file_, buffer_base + total_written, size - total_written);
246    } while (bytes < 0 && errno == EINTR);
247
248    if (bytes <= 0) {
249      // Write error.
250
251      // FIXME(kenton):  According to the man page, if write() returns zero,
252      //   there was no error; write() simply did not write anything.  It's
253      //   unclear under what circumstances this might happen, but presumably
254      //   errno won't be set in this case.  I am confused as to how such an
255      //   event should be handled.  For now I'm treating it as an error, since
256      //   retrying seems like it could lead to an infinite loop.  I suspect
257      //   this never actually happens anyway.
258
259      if (bytes < 0) {
260        errno_ = errno;
261      }
262      return false;
263    }
264    total_written += bytes;
265  }
266
267  return true;
268}
269
270// ===================================================================
271
272IstreamInputStream::IstreamInputStream(istream* input, int block_size)
273  : copying_input_(input),
274    impl_(&copying_input_, block_size) {
275}
276
277IstreamInputStream::~IstreamInputStream() {}
278
279bool IstreamInputStream::Next(const void** data, int* size) {
280  return impl_.Next(data, size);
281}
282
283void IstreamInputStream::BackUp(int count) {
284  impl_.BackUp(count);
285}
286
287bool IstreamInputStream::Skip(int count) {
288  return impl_.Skip(count);
289}
290
291int64 IstreamInputStream::ByteCount() const {
292  return impl_.ByteCount();
293}
294
295IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
296    istream* input)
297  : input_(input) {
298}
299
300IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
301
302int IstreamInputStream::CopyingIstreamInputStream::Read(
303    void* buffer, int size) {
304  input_->read(reinterpret_cast<char*>(buffer), size);
305  int result = input_->gcount();
306  if (result == 0 && input_->fail() && !input_->eof()) {
307    return -1;
308  }
309  return result;
310}
311
312// ===================================================================
313
314OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
315  : copying_output_(output),
316    impl_(&copying_output_, block_size) {
317}
318
319OstreamOutputStream::~OstreamOutputStream() {
320  impl_.Flush();
321}
322
323bool OstreamOutputStream::Next(void** data, int* size) {
324  return impl_.Next(data, size);
325}
326
327void OstreamOutputStream::BackUp(int count) {
328  impl_.BackUp(count);
329}
330
331int64 OstreamOutputStream::ByteCount() const {
332  return impl_.ByteCount();
333}
334
335OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
336    ostream* output)
337  : output_(output) {
338}
339
340OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
341}
342
343bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
344    const void* buffer, int size) {
345  output_->write(reinterpret_cast<const char*>(buffer), size);
346  return output_->good();
347}
348
349// ===================================================================
350
351ConcatenatingInputStream::ConcatenatingInputStream(
352    ZeroCopyInputStream* const streams[], int count)
353  : streams_(streams), stream_count_(count), bytes_retired_(0) {
354}
355
356ConcatenatingInputStream::~ConcatenatingInputStream() {
357}
358
359bool ConcatenatingInputStream::Next(const void** data, int* size) {
360  while (stream_count_ > 0) {
361    if (streams_[0]->Next(data, size)) return true;
362
363    // That stream is done.  Advance to the next one.
364    bytes_retired_ += streams_[0]->ByteCount();
365    ++streams_;
366    --stream_count_;
367  }
368
369  // No more streams.
370  return false;
371}
372
373void ConcatenatingInputStream::BackUp(int count) {
374  if (stream_count_ > 0) {
375    streams_[0]->BackUp(count);
376  } else {
377    GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
378  }
379}
380
381bool ConcatenatingInputStream::Skip(int count) {
382  while (stream_count_ > 0) {
383    // Assume that ByteCount() can be used to find out how much we actually
384    // skipped when Skip() fails.
385    int64 target_byte_count = streams_[0]->ByteCount() + count;
386    if (streams_[0]->Skip(count)) return true;
387
388    // Hit the end of the stream.  Figure out how many more bytes we still have
389    // to skip.
390    int64 final_byte_count = streams_[0]->ByteCount();
391    GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
392    count = target_byte_count - final_byte_count;
393
394    // That stream is done.  Advance to the next one.
395    bytes_retired_ += final_byte_count;
396    ++streams_;
397    --stream_count_;
398  }
399
400  return false;
401}
402
403int64 ConcatenatingInputStream::ByteCount() const {
404  if (stream_count_ == 0) {
405    return bytes_retired_;
406  } else {
407    return bytes_retired_ + streams_[0]->ByteCount();
408  }
409}
410
411
412// ===================================================================
413
414LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
415                                         int64 limit)
416  : input_(input), limit_(limit) {}
417
418LimitingInputStream::~LimitingInputStream() {
419  // If we overshot the limit, back up.
420  if (limit_ < 0) input_->BackUp(-limit_);
421}
422
423bool LimitingInputStream::Next(const void** data, int* size) {
424  if (limit_ <= 0) return false;
425  if (!input_->Next(data, size)) return false;
426
427  limit_ -= *size;
428  if (limit_ < 0) {
429    // We overshot the limit.  Reduce *size to hide the rest of the buffer.
430    *size += limit_;
431  }
432  return true;
433}
434
435void LimitingInputStream::BackUp(int count) {
436  if (limit_ < 0) {
437    input_->BackUp(count - limit_);
438    limit_ = count;
439  } else {
440    input_->BackUp(count);
441    limit_ += count;
442  }
443}
444
445bool LimitingInputStream::Skip(int count) {
446  if (count > limit_) {
447    if (limit_ < 0) return false;
448    input_->Skip(limit_);
449    limit_ = 0;
450    return false;
451  } else {
452    if (!input_->Skip(count)) return false;
453    limit_ -= count;
454    return true;
455  }
456}
457
458int64 LimitingInputStream::ByteCount() const {
459  if (limit_ < 0) {
460    return input_->ByteCount() + limit_;
461  } else {
462    return input_->ByteCount();
463  }
464}
465
466
467// ===================================================================
468
469}  // namespace io
470}  // namespace protobuf
471}  // namespace google
472