1/*
2 * libjingle
3 * Copyright 2011, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(POSIX)
29#include <sys/file.h>
30#endif  // POSIX
31#include <sys/types.h>
32#include <sys/stat.h>
33#include <errno.h>
34#include <string>
35#include "talk/base/basictypes.h"
36#include "talk/base/common.h"
37#include "talk/base/messagequeue.h"
38#include "talk/base/stream.h"
39#include "talk/base/stringencode.h"
40#include "talk/base/stringutils.h"
41#include "talk/base/thread.h"
42
43#ifdef WIN32
44#include "talk/base/win32.h"
45#define fileno _fileno
46#endif
47
48namespace talk_base {
49
50///////////////////////////////////////////////////////////////////////////////
51// StreamInterface
52///////////////////////////////////////////////////////////////////////////////
53
54enum {
55  MSG_POST_EVENT = 0xF1F1
56};
57
58StreamInterface::~StreamInterface() {
59}
60
61struct PostEventData : public MessageData {
62  int events, error;
63  PostEventData(int ev, int er) : events(ev), error(er) { }
64};
65
66StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
67                                       size_t* written, int* error) {
68  StreamResult result = SR_SUCCESS;
69  size_t total_written = 0, current_written;
70  while (total_written < data_len) {
71    result = Write(static_cast<const char*>(data) + total_written,
72                   data_len - total_written, &current_written, error);
73    if (result != SR_SUCCESS)
74      break;
75    total_written += current_written;
76  }
77  if (written)
78    *written = total_written;
79  return result;
80}
81
82StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
83                                      size_t* read, int* error) {
84  StreamResult result = SR_SUCCESS;
85  size_t total_read = 0, current_read;
86  while (total_read < buffer_len) {
87    result = Read(static_cast<char*>(buffer) + total_read,
88                  buffer_len - total_read, &current_read, error);
89    if (result != SR_SUCCESS)
90      break;
91    total_read += current_read;
92  }
93  if (read)
94    *read = total_read;
95  return result;
96}
97
98StreamResult StreamInterface::ReadLine(std::string* line) {
99  line->clear();
100  StreamResult result = SR_SUCCESS;
101  while (true) {
102    char ch;
103    result = Read(&ch, sizeof(ch), NULL, NULL);
104    if (result != SR_SUCCESS) {
105      break;
106    }
107    if (ch == '\n') {
108      break;
109    }
110    line->push_back(ch);
111  }
112  if (!line->empty()) {   // give back the line we've collected so far with
113    result = SR_SUCCESS;  // a success code.  Otherwise return the last code
114  }
115  return result;
116}
117
118void StreamInterface::PostEvent(Thread* t, int events, int err) {
119  t->Post(this, MSG_POST_EVENT, new PostEventData(events, err));
120}
121
122void StreamInterface::PostEvent(int events, int err) {
123  PostEvent(Thread::Current(), events, err);
124}
125
126StreamInterface::StreamInterface() {
127}
128
129void StreamInterface::OnMessage(Message* msg) {
130  if (MSG_POST_EVENT == msg->message_id) {
131    PostEventData* pe = static_cast<PostEventData*>(msg->pdata);
132    SignalEvent(this, pe->events, pe->error);
133    delete msg->pdata;
134  }
135}
136
137///////////////////////////////////////////////////////////////////////////////
138// StreamAdapterInterface
139///////////////////////////////////////////////////////////////////////////////
140
141StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
142                                               bool owned)
143    : stream_(stream), owned_(owned) {
144  if (NULL != stream_)
145    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
146}
147
148void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
149  if (NULL != stream_)
150    stream_->SignalEvent.disconnect(this);
151  if (owned_)
152    delete stream_;
153  stream_ = stream;
154  owned_ = owned;
155  if (NULL != stream_)
156    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
157}
158
159StreamInterface* StreamAdapterInterface::Detach() {
160  if (NULL != stream_)
161    stream_->SignalEvent.disconnect(this);
162  StreamInterface* stream = stream_;
163  stream_ = NULL;
164  return stream;
165}
166
167StreamAdapterInterface::~StreamAdapterInterface() {
168  if (owned_)
169    delete stream_;
170}
171
172///////////////////////////////////////////////////////////////////////////////
173// StreamTap
174///////////////////////////////////////////////////////////////////////////////
175
176StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
177: StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
178  tap_error_(0)
179{
180  AttachTap(tap);
181}
182
183void StreamTap::AttachTap(StreamInterface* tap) {
184  tap_.reset(tap);
185}
186
187StreamInterface* StreamTap::DetachTap() {
188  return tap_.release();
189}
190
191StreamResult StreamTap::GetTapResult(int* error) {
192  if (error) {
193    *error = tap_error_;
194  }
195  return tap_result_;
196}
197
198StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
199                             size_t* read, int* error) {
200  size_t backup_read;
201  if (!read) {
202    read = &backup_read;
203  }
204  StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
205                                                  read, error);
206  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
207    tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
208  }
209  return res;
210}
211
212StreamResult StreamTap::Write(const void* data, size_t data_len,
213                              size_t* written, int* error) {
214  size_t backup_written;
215  if (!written) {
216    written = &backup_written;
217  }
218  StreamResult res = StreamAdapterInterface::Write(data, data_len,
219                                                   written, error);
220  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
221    tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
222  }
223  return res;
224}
225
226///////////////////////////////////////////////////////////////////////////////
227// StreamSegment
228///////////////////////////////////////////////////////////////////////////////
229
230StreamSegment::StreamSegment(StreamInterface* stream)
231: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
232  length_(SIZE_UNKNOWN)
233{
234  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
235  stream->GetPosition(&start_);
236}
237
238StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
239: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
240  length_(length)
241{
242  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
243  stream->GetPosition(&start_);
244}
245
246StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
247                                 size_t* read, int* error)
248{
249  if (SIZE_UNKNOWN != length_) {
250    if (pos_ >= length_)
251      return SR_EOS;
252    buffer_len = _min(buffer_len, length_ - pos_);
253  }
254  size_t backup_read;
255  if (!read) {
256    read = &backup_read;
257  }
258  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
259                                                     read, error);
260  if (SR_SUCCESS == result) {
261    pos_ += *read;
262  }
263  return result;
264}
265
266bool StreamSegment::SetPosition(size_t position) {
267  if (SIZE_UNKNOWN == start_)
268    return false;  // Not seekable
269  if ((SIZE_UNKNOWN != length_) && (position > length_))
270    return false;  // Seek past end of segment
271  if (!StreamAdapterInterface::SetPosition(start_ + position))
272    return false;
273  pos_ = position;
274  return true;
275}
276
277bool StreamSegment::GetPosition(size_t* position) const {
278  if (SIZE_UNKNOWN == start_)
279    return false;  // Not seekable
280  if (!StreamAdapterInterface::GetPosition(position))
281    return false;
282  if (position) {
283    ASSERT(*position >= start_);
284    *position -= start_;
285  }
286  return true;
287}
288
289bool StreamSegment::GetSize(size_t* size) const {
290  if (!StreamAdapterInterface::GetSize(size))
291    return false;
292  if (size) {
293    if (SIZE_UNKNOWN != start_) {
294      ASSERT(*size >= start_);
295      *size -= start_;
296    }
297    if (SIZE_UNKNOWN != length_) {
298      *size = _min(*size, length_);
299    }
300  }
301  return true;
302}
303
304bool StreamSegment::GetAvailable(size_t* size) const {
305  if (!StreamAdapterInterface::GetAvailable(size))
306    return false;
307  if (size && (SIZE_UNKNOWN != length_))
308    *size = _min(*size, length_ - pos_);
309  return true;
310}
311
312///////////////////////////////////////////////////////////////////////////////
313// NullStream
314///////////////////////////////////////////////////////////////////////////////
315
316NullStream::NullStream() {
317}
318
319NullStream::~NullStream() {
320}
321
322StreamState NullStream::GetState() const {
323  return SS_OPEN;
324}
325
326StreamResult NullStream::Read(void* buffer, size_t buffer_len,
327                              size_t* read, int* error) {
328  if (error) *error = -1;
329  return SR_ERROR;
330}
331
332StreamResult NullStream::Write(const void* data, size_t data_len,
333                               size_t* written, int* error) {
334  if (written) *written = data_len;
335  return SR_SUCCESS;
336}
337
338void NullStream::Close() {
339}
340
341///////////////////////////////////////////////////////////////////////////////
342// FileStream
343///////////////////////////////////////////////////////////////////////////////
344
345FileStream::FileStream() : file_(NULL) {
346}
347
348FileStream::~FileStream() {
349  FileStream::Close();
350}
351
352bool FileStream::Open(const std::string& filename, const char* mode) {
353  Close();
354#ifdef WIN32
355  std::wstring wfilename;
356  if (Utf8ToWindowsFilename(filename, &wfilename)) {
357    file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
358  } else {
359    file_ = NULL;
360  }
361#else
362  file_ = fopen(filename.c_str(), mode);
363#endif
364  return (file_ != NULL);
365}
366
367bool FileStream::OpenShare(const std::string& filename, const char* mode,
368                           int shflag) {
369  Close();
370#ifdef WIN32
371  std::wstring wfilename;
372  if (Utf8ToWindowsFilename(filename, &wfilename)) {
373    file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
374  } else {
375    file_ = NULL;
376  }
377#else
378  return Open(filename, mode);
379#endif
380  return (file_ != NULL);
381}
382
383bool FileStream::DisableBuffering() {
384  if (!file_)
385    return false;
386  return (setvbuf(file_, NULL, _IONBF, 0) == 0);
387}
388
389StreamState FileStream::GetState() const {
390  return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
391}
392
393StreamResult FileStream::Read(void* buffer, size_t buffer_len,
394                              size_t* read, int* error) {
395  if (!file_)
396    return SR_EOS;
397  size_t result = fread(buffer, 1, buffer_len, file_);
398  if ((result == 0) && (buffer_len > 0)) {
399    if (feof(file_))
400      return SR_EOS;
401    if (error)
402      *error = errno;
403    return SR_ERROR;
404  }
405  if (read)
406    *read = result;
407  return SR_SUCCESS;
408}
409
410StreamResult FileStream::Write(const void* data, size_t data_len,
411                               size_t* written, int* error) {
412  if (!file_)
413    return SR_EOS;
414  size_t result = fwrite(data, 1, data_len, file_);
415  if ((result == 0) && (data_len > 0)) {
416    if (error)
417      *error = errno;
418    return SR_ERROR;
419  }
420  if (written)
421    *written = result;
422  return SR_SUCCESS;
423}
424
425void FileStream::Close() {
426  if (file_) {
427    DoClose();
428    file_ = NULL;
429  }
430}
431
432bool FileStream::SetPosition(size_t position) {
433  if (!file_)
434    return false;
435  return (fseek(file_, position, SEEK_SET) == 0);
436}
437
438bool FileStream::GetPosition(size_t* position) const {
439  ASSERT(NULL != position);
440  if (!file_)
441    return false;
442  long result = ftell(file_);
443  if (result < 0)
444    return false;
445  if (position)
446    *position = result;
447  return true;
448}
449
450bool FileStream::GetSize(size_t* size) const {
451  ASSERT(NULL != size);
452  if (!file_)
453    return false;
454  struct stat file_stats;
455  if (fstat(fileno(file_), &file_stats) != 0)
456    return false;
457  if (size)
458    *size = file_stats.st_size;
459  return true;
460}
461
462bool FileStream::GetAvailable(size_t* size) const {
463  ASSERT(NULL != size);
464  if (!GetSize(size))
465    return false;
466  long result = ftell(file_);
467  if (result < 0)
468    return false;
469  if (size)
470    *size -= result;
471  return true;
472}
473
474bool FileStream::ReserveSize(size_t size) {
475  // TODO: extend the file to the proper length
476  return true;
477}
478
479bool FileStream::GetSize(const std::string& filename, size_t* size) {
480  struct stat file_stats;
481  if (stat(filename.c_str(), &file_stats) != 0)
482    return false;
483  *size = file_stats.st_size;
484  return true;
485}
486
487bool FileStream::Flush() {
488  if (file_) {
489    return (0 == fflush(file_));
490  }
491  // try to flush empty file?
492  ASSERT(false);
493  return false;
494}
495
496#if defined(POSIX)
497
498bool FileStream::TryLock() {
499  if (file_ == NULL) {
500    // Stream not open.
501    ASSERT(false);
502    return false;
503  }
504
505  return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
506}
507
508bool FileStream::Unlock() {
509  if (file_ == NULL) {
510    // Stream not open.
511    ASSERT(false);
512    return false;
513  }
514
515  return flock(fileno(file_), LOCK_UN) == 0;
516}
517
518#endif
519
520void FileStream::DoClose() {
521  fclose(file_);
522}
523
524#ifdef POSIX
525
526// Have to identically rewrite the FileStream destructor or else it would call
527// the base class's Close() instead of the sub-class's.
528POpenStream::~POpenStream() {
529  POpenStream::Close();
530}
531
532bool POpenStream::Open(const std::string& subcommand, const char* mode) {
533  Close();
534  file_ = popen(subcommand.c_str(), mode);
535  return file_ != NULL;
536}
537
538bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
539                            int shflag) {
540  return Open(subcommand, mode);
541}
542
543void POpenStream::DoClose() {
544  wait_status_ = pclose(file_);
545}
546
547#endif
548
549///////////////////////////////////////////////////////////////////////////////
550// MemoryStream
551///////////////////////////////////////////////////////////////////////////////
552
553MemoryStreamBase::MemoryStreamBase()
554  : buffer_(NULL), buffer_length_(0), data_length_(0),
555    seek_position_(0) {
556}
557
558StreamState MemoryStreamBase::GetState() const {
559  return SS_OPEN;
560}
561
562StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
563                                    size_t* bytes_read, int* error) {
564  if (seek_position_ >= data_length_) {
565    return SR_EOS;
566  }
567  size_t available = data_length_ - seek_position_;
568  if (bytes > available) {
569    // Read partial buffer
570    bytes = available;
571  }
572  memcpy(buffer, &buffer_[seek_position_], bytes);
573  seek_position_ += bytes;
574  if (bytes_read) {
575    *bytes_read = bytes;
576  }
577  return SR_SUCCESS;
578}
579
580StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
581                                     size_t* bytes_written, int* error) {
582  size_t available = buffer_length_ - seek_position_;
583  if (0 == available) {
584    // Increase buffer size to the larger of:
585    // a) new position rounded up to next 256 bytes
586    // b) double the previous length
587    size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
588                                    buffer_length_ * 2);
589    StreamResult result = DoReserve(new_buffer_length, error);
590    if (SR_SUCCESS != result) {
591      return result;
592    }
593    ASSERT(buffer_length_ >= new_buffer_length);
594    available = buffer_length_ - seek_position_;
595  }
596
597  if (bytes > available) {
598    bytes = available;
599  }
600  memcpy(&buffer_[seek_position_], buffer, bytes);
601  seek_position_ += bytes;
602  if (data_length_ < seek_position_) {
603    data_length_ = seek_position_;
604  }
605  if (bytes_written) {
606    *bytes_written = bytes;
607  }
608  return SR_SUCCESS;
609}
610
611void MemoryStreamBase::Close() {
612  // nothing to do
613}
614
615bool MemoryStreamBase::SetPosition(size_t position) {
616  if (position > data_length_)
617    return false;
618  seek_position_ = position;
619  return true;
620}
621
622bool MemoryStreamBase::GetPosition(size_t *position) const {
623  if (position)
624    *position = seek_position_;
625  return true;
626}
627
628bool MemoryStreamBase::GetSize(size_t *size) const {
629  if (size)
630    *size = data_length_;
631  return true;
632}
633
634bool MemoryStreamBase::GetAvailable(size_t *size) const {
635  if (size)
636    *size = data_length_ - seek_position_;
637  return true;
638}
639
640bool MemoryStreamBase::ReserveSize(size_t size) {
641  return (SR_SUCCESS == DoReserve(size, NULL));
642}
643
644StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
645  return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
646}
647
648///////////////////////////////////////////////////////////////////////////////
649
650MemoryStream::MemoryStream()
651  : buffer_alloc_(NULL) {
652}
653
654MemoryStream::MemoryStream(const char* data)
655  : buffer_alloc_(NULL) {
656  SetData(data, strlen(data));
657}
658
659MemoryStream::MemoryStream(const void* data, size_t length)
660  : buffer_alloc_(NULL) {
661  SetData(data, length);
662}
663
664MemoryStream::~MemoryStream() {
665  delete [] buffer_alloc_;
666}
667
668void MemoryStream::SetData(const void* data, size_t length) {
669  data_length_ = buffer_length_ = length;
670  delete [] buffer_alloc_;
671  buffer_alloc_ = new char[buffer_length_ + kAlignment];
672  buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
673  memcpy(buffer_, data, data_length_);
674  seek_position_ = 0;
675}
676
677StreamResult MemoryStream::DoReserve(size_t size, int* error) {
678  if (buffer_length_ >= size)
679    return SR_SUCCESS;
680
681  if (char* new_buffer_alloc = new char[size + kAlignment]) {
682    char* new_buffer = reinterpret_cast<char*>(
683        ALIGNP(new_buffer_alloc, kAlignment));
684    memcpy(new_buffer, buffer_, data_length_);
685    delete [] buffer_alloc_;
686    buffer_alloc_ = new_buffer_alloc;
687    buffer_ = new_buffer;
688    buffer_length_ = size;
689    return SR_SUCCESS;
690  }
691
692  if (error) {
693    *error = ENOMEM;
694  }
695  return SR_ERROR;
696}
697
698///////////////////////////////////////////////////////////////////////////////
699
700ExternalMemoryStream::ExternalMemoryStream() {
701}
702
703ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
704  SetData(data, length);
705}
706
707ExternalMemoryStream::~ExternalMemoryStream() {
708}
709
710void ExternalMemoryStream::SetData(void* data, size_t length) {
711  data_length_ = buffer_length_ = length;
712  buffer_ = static_cast<char*>(data);
713  seek_position_ = 0;
714}
715
716///////////////////////////////////////////////////////////////////////////////
717// FifoBuffer
718///////////////////////////////////////////////////////////////////////////////
719
720FifoBuffer::FifoBuffer(size_t size)
721    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
722      data_length_(0), read_position_(0), owner_(Thread::Current()) {
723  // all events are done on the owner_ thread
724}
725
726FifoBuffer::~FifoBuffer() {
727}
728
729bool FifoBuffer::GetBuffered(size_t* size) const {
730  CritScope cs(&crit_);
731  *size = data_length_;
732  return true;
733}
734
735bool FifoBuffer::SetCapacity(size_t size) {
736  CritScope cs(&crit_);
737  if (data_length_ > size) {
738    return false;
739  }
740
741  if (size != buffer_length_) {
742    char* buffer = new char[size];
743    const size_t copy = data_length_;
744    const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
745    memcpy(buffer, &buffer_[read_position_], tail_copy);
746    memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
747    buffer_.reset(buffer);
748    read_position_ = 0;
749    buffer_length_ = size;
750  }
751  return true;
752}
753
754StreamState FifoBuffer::GetState() const {
755  return state_;
756}
757
758StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
759                              size_t* bytes_read, int* error) {
760  CritScope cs(&crit_);
761  const size_t available = data_length_;
762  if (0 == available) {
763    return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
764  }
765
766  const bool was_writable = data_length_ < buffer_length_;
767  const size_t copy = _min(bytes, available);
768  const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
769  char* const p = static_cast<char*>(buffer);
770  memcpy(p, &buffer_[read_position_], tail_copy);
771  memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
772  read_position_ = (read_position_ + copy) % buffer_length_;
773  data_length_ -= copy;
774  if (bytes_read) {
775    *bytes_read = copy;
776  }
777  // if we were full before, and now we're not, post an event
778  if (!was_writable && copy > 0) {
779    PostEvent(owner_, SE_WRITE, 0);
780  }
781
782  return SR_SUCCESS;
783}
784
785StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
786                               size_t* bytes_written, int* error) {
787  CritScope cs(&crit_);
788  if (state_ == SS_CLOSED) {
789    return SR_EOS;
790  }
791
792  const size_t available = buffer_length_ - data_length_;
793  if (0 == available) {
794    return SR_BLOCK;
795  }
796
797  const bool was_readable = (data_length_ > 0);
798  const size_t write_position = (read_position_ + data_length_)
799      % buffer_length_;
800  const size_t copy = _min(bytes, available);
801  const size_t tail_copy = _min(copy, buffer_length_ - write_position);
802  const char* const p = static_cast<const char*>(buffer);
803  memcpy(&buffer_[write_position], p, tail_copy);
804  memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
805  data_length_ += copy;
806  if (bytes_written) {
807    *bytes_written = copy;
808  }
809  // if we didn't have any data to read before, and now we do, post an event
810  if (!was_readable && copy > 0) {
811    PostEvent(owner_, SE_READ, 0);
812  }
813
814  return SR_SUCCESS;
815}
816
817void FifoBuffer::Close() {
818  CritScope cs(&crit_);
819  state_ = SS_CLOSED;
820}
821
822const void* FifoBuffer::GetReadData(size_t* size) {
823  CritScope cs(&crit_);
824  *size = (read_position_ + data_length_ <= buffer_length_) ?
825      data_length_ : buffer_length_ - read_position_;
826  return &buffer_[read_position_];
827}
828
829void FifoBuffer::ConsumeReadData(size_t size) {
830  CritScope cs(&crit_);
831  ASSERT(size <= data_length_);
832  const bool was_writable = data_length_ < buffer_length_;
833  read_position_ = (read_position_ + size) % buffer_length_;
834  data_length_ -= size;
835  if (!was_writable && size > 0) {
836    PostEvent(owner_, SE_WRITE, 0);
837  }
838}
839
840void* FifoBuffer::GetWriteBuffer(size_t* size) {
841  CritScope cs(&crit_);
842  if (state_ == SS_CLOSED) {
843    return NULL;
844  }
845
846  // if empty, reset the write position to the beginning, so we can get
847  // the biggest possible block
848  if (data_length_ == 0) {
849    read_position_ = 0;
850  }
851
852  const size_t write_position = (read_position_ + data_length_)
853      % buffer_length_;
854  *size = (write_position >= read_position_) ?
855      buffer_length_ - write_position : read_position_ - write_position;
856  return &buffer_[write_position];
857}
858
859void FifoBuffer::ConsumeWriteBuffer(size_t size) {
860  CritScope cs(&crit_);
861  ASSERT(size <= buffer_length_ - data_length_);
862  const bool was_readable = (data_length_ > 0);
863  data_length_ += size;
864  if (!was_readable && size > 0) {
865    PostEvent(owner_, SE_READ, 0);
866  }
867}
868
869///////////////////////////////////////////////////////////////////////////////
870// LoggingAdapter
871///////////////////////////////////////////////////////////////////////////////
872
873LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
874                               const std::string& label, bool hex_mode)
875: StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode)
876{
877  set_label(label);
878}
879
880void LoggingAdapter::set_label(const std::string& label) {
881  label_.assign("[");
882  label_.append(label);
883  label_.append("]");
884}
885
886StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
887                                  size_t* read, int* error) {
888  size_t local_read; if (!read) read = &local_read;
889  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
890                                                     error);
891  if (result == SR_SUCCESS) {
892    LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
893  }
894  return result;
895}
896
897StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
898                                   size_t* written, int* error) {
899  size_t local_written; if (!written) written = &local_written;
900  StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
901                                                      error);
902  if (result == SR_SUCCESS) {
903    LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
904                 &lms_);
905  }
906  return result;
907}
908
909void LoggingAdapter::Close() {
910  LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
911  LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
912  LOG_V(level_) << label_ << " Closed locally";
913  StreamAdapterInterface::Close();
914}
915
916void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
917  if (events & SE_OPEN) {
918    LOG_V(level_) << label_ << " Open";
919  } else if (events & SE_CLOSE) {
920    LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
921    LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
922    LOG_V(level_) << label_ << " Closed with error: " << err;
923  }
924  StreamAdapterInterface::OnEvent(stream, events, err);
925}
926
927///////////////////////////////////////////////////////////////////////////////
928// StringStream - Reads/Writes to an external std::string
929///////////////////////////////////////////////////////////////////////////////
930
931StringStream::StringStream(std::string& str)
932: str_(str), read_pos_(0), read_only_(false)
933{
934}
935
936StringStream::StringStream(const std::string& str)
937: str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true)
938{
939}
940
941StreamState StringStream::GetState() const {
942  return SS_OPEN;
943}
944
945StreamResult StringStream::Read(void* buffer, size_t buffer_len,
946                                      size_t* read, int* error) {
947  size_t available = _min(buffer_len, str_.size() - read_pos_);
948  if (!available)
949    return SR_EOS;
950  memcpy(buffer, str_.data() + read_pos_, available);
951  read_pos_ += available;
952  if (read)
953    *read = available;
954  return SR_SUCCESS;
955}
956
957StreamResult StringStream::Write(const void* data, size_t data_len,
958                                      size_t* written, int* error) {
959  if (read_only_) {
960    if (error) {
961      *error = -1;
962    }
963    return SR_ERROR;
964  }
965  str_.append(static_cast<const char*>(data),
966              static_cast<const char*>(data) + data_len);
967  if (written)
968    *written = data_len;
969  return SR_SUCCESS;
970}
971
972void StringStream::Close() {
973}
974
975bool StringStream::SetPosition(size_t position) {
976  if (position > str_.size())
977    return false;
978  read_pos_ = position;
979  return true;
980}
981
982bool StringStream::GetPosition(size_t* position) const {
983  if (position)
984    *position = read_pos_;
985  return true;
986}
987
988bool StringStream::GetSize(size_t* size) const {
989  if (size)
990    *size = str_.size();
991  return true;
992}
993
994bool StringStream::GetAvailable(size_t* size) const {
995  if (size)
996    *size = str_.size() - read_pos_;
997  return true;
998}
999
1000bool StringStream::ReserveSize(size_t size) {
1001  if (read_only_)
1002    return false;
1003  str_.reserve(size);
1004  return true;
1005}
1006
1007///////////////////////////////////////////////////////////////////////////////
1008// StreamReference
1009///////////////////////////////////////////////////////////////////////////////
1010
1011StreamReference::StreamReference(StreamInterface* stream)
1012    : StreamAdapterInterface(stream, false) {
1013  // owner set to false so the destructor does not free the stream.
1014  stream_ref_count_ = new StreamRefCount(stream);
1015}
1016
1017StreamInterface* StreamReference::NewReference() {
1018  stream_ref_count_->AddReference();
1019  return new StreamReference(stream_ref_count_, stream());
1020}
1021
1022StreamReference::~StreamReference() {
1023  stream_ref_count_->Release();
1024}
1025
1026StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1027                                 StreamInterface* stream)
1028    : StreamAdapterInterface(stream, false),
1029      stream_ref_count_(stream_ref_count) {
1030}
1031
1032///////////////////////////////////////////////////////////////////////////////
1033
1034StreamResult Flow(StreamInterface* source,
1035                  char* buffer, size_t buffer_len,
1036                  StreamInterface* sink,
1037                  size_t* data_len /* = NULL */) {
1038  ASSERT(buffer_len > 0);
1039
1040  StreamResult result;
1041  size_t count, read_pos, write_pos;
1042  if (data_len) {
1043    read_pos = *data_len;
1044  } else {
1045    read_pos = 0;
1046  }
1047
1048  bool end_of_stream = false;
1049  do {
1050    // Read until buffer is full, end of stream, or error
1051    while (!end_of_stream && (read_pos < buffer_len)) {
1052      result = source->Read(buffer + read_pos, buffer_len - read_pos,
1053                            &count, NULL);
1054      if (result == SR_EOS) {
1055        end_of_stream = true;
1056      } else if (result != SR_SUCCESS) {
1057        if (data_len) {
1058          *data_len = read_pos;
1059        }
1060        return result;
1061      } else {
1062        read_pos += count;
1063      }
1064    }
1065
1066    // Write until buffer is empty, or error (including end of stream)
1067    write_pos = 0;
1068    while (write_pos < read_pos) {
1069      result = sink->Write(buffer + write_pos, read_pos - write_pos,
1070                           &count, NULL);
1071      if (result != SR_SUCCESS) {
1072        if (data_len) {
1073          *data_len = read_pos - write_pos;
1074          if (write_pos > 0) {
1075            memmove(buffer, buffer + write_pos, *data_len);
1076          }
1077        }
1078        return result;
1079      }
1080      write_pos += count;
1081    }
1082
1083    read_pos = 0;
1084  } while (!end_of_stream);
1085
1086  if (data_len) {
1087    *data_len = 0;
1088  }
1089  return SR_SUCCESS;
1090}
1091
1092///////////////////////////////////////////////////////////////////////////////
1093
1094} // namespace talk_base
1095