1/*
2 * libjingle
3 * Copyright 2004 Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(POSIX)
29#include <sys/file.h>
30#endif  // POSIX
31#include <sys/types.h>
32#include <sys/stat.h>
33#include <errno.h>
34#include <string>
35#include "talk/base/basictypes.h"
36#include "talk/base/common.h"
37#include "talk/base/logging.h"
38#include "talk/base/messagequeue.h"
39#include "talk/base/stream.h"
40#include "talk/base/stringencode.h"
41#include "talk/base/stringutils.h"
42#include "talk/base/thread.h"
43#include "talk/base/timeutils.h"
44
45#ifdef WIN32
46#include "talk/base/win32.h"
47#define fileno _fileno
48#endif
49
50namespace talk_base {
51
52///////////////////////////////////////////////////////////////////////////////
53// StreamInterface
54///////////////////////////////////////////////////////////////////////////////
55StreamInterface::~StreamInterface() {
56}
57
58StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
59                                       size_t* written, int* error) {
60  StreamResult result = SR_SUCCESS;
61  size_t total_written = 0, current_written;
62  while (total_written < data_len) {
63    result = Write(static_cast<const char*>(data) + total_written,
64                   data_len - total_written, &current_written, error);
65    if (result != SR_SUCCESS)
66      break;
67    total_written += current_written;
68  }
69  if (written)
70    *written = total_written;
71  return result;
72}
73
74StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
75                                      size_t* read, int* error) {
76  StreamResult result = SR_SUCCESS;
77  size_t total_read = 0, current_read;
78  while (total_read < buffer_len) {
79    result = Read(static_cast<char*>(buffer) + total_read,
80                  buffer_len - total_read, &current_read, error);
81    if (result != SR_SUCCESS)
82      break;
83    total_read += current_read;
84  }
85  if (read)
86    *read = total_read;
87  return result;
88}
89
90StreamResult StreamInterface::ReadLine(std::string* line) {
91  line->clear();
92  StreamResult result = SR_SUCCESS;
93  while (true) {
94    char ch;
95    result = Read(&ch, sizeof(ch), NULL, NULL);
96    if (result != SR_SUCCESS) {
97      break;
98    }
99    if (ch == '\n') {
100      break;
101    }
102    line->push_back(ch);
103  }
104  if (!line->empty()) {   // give back the line we've collected so far with
105    result = SR_SUCCESS;  // a success code.  Otherwise return the last code
106  }
107  return result;
108}
109
110void StreamInterface::PostEvent(Thread* t, int events, int err) {
111  t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
112}
113
114void StreamInterface::PostEvent(int events, int err) {
115  PostEvent(Thread::Current(), events, err);
116}
117
118StreamInterface::StreamInterface() {
119}
120
121void StreamInterface::OnMessage(Message* msg) {
122  if (MSG_POST_EVENT == msg->message_id) {
123    StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
124    SignalEvent(this, pe->events, pe->error);
125    delete msg->pdata;
126  }
127}
128
129///////////////////////////////////////////////////////////////////////////////
130// StreamAdapterInterface
131///////////////////////////////////////////////////////////////////////////////
132
133StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
134                                               bool owned)
135    : stream_(stream), owned_(owned) {
136  if (NULL != stream_)
137    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
138}
139
140void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
141  if (NULL != stream_)
142    stream_->SignalEvent.disconnect(this);
143  if (owned_)
144    delete stream_;
145  stream_ = stream;
146  owned_ = owned;
147  if (NULL != stream_)
148    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
149}
150
151StreamInterface* StreamAdapterInterface::Detach() {
152  if (NULL != stream_)
153    stream_->SignalEvent.disconnect(this);
154  StreamInterface* stream = stream_;
155  stream_ = NULL;
156  return stream;
157}
158
159StreamAdapterInterface::~StreamAdapterInterface() {
160  if (owned_)
161    delete stream_;
162}
163
164///////////////////////////////////////////////////////////////////////////////
165// StreamTap
166///////////////////////////////////////////////////////////////////////////////
167
168StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
169    : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
170        tap_error_(0) {
171  AttachTap(tap);
172}
173
174void StreamTap::AttachTap(StreamInterface* tap) {
175  tap_.reset(tap);
176}
177
178StreamInterface* StreamTap::DetachTap() {
179  return tap_.release();
180}
181
182StreamResult StreamTap::GetTapResult(int* error) {
183  if (error) {
184    *error = tap_error_;
185  }
186  return tap_result_;
187}
188
189StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
190                             size_t* read, int* error) {
191  size_t backup_read;
192  if (!read) {
193    read = &backup_read;
194  }
195  StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
196                                                  read, error);
197  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
198    tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
199  }
200  return res;
201}
202
203StreamResult StreamTap::Write(const void* data, size_t data_len,
204                              size_t* written, int* error) {
205  size_t backup_written;
206  if (!written) {
207    written = &backup_written;
208  }
209  StreamResult res = StreamAdapterInterface::Write(data, data_len,
210                                                   written, error);
211  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
212    tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
213  }
214  return res;
215}
216
217///////////////////////////////////////////////////////////////////////////////
218// StreamSegment
219///////////////////////////////////////////////////////////////////////////////
220
221StreamSegment::StreamSegment(StreamInterface* stream)
222    : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
223    length_(SIZE_UNKNOWN) {
224  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
225  stream->GetPosition(&start_);
226}
227
228StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
229    : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
230    length_(length) {
231  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
232  stream->GetPosition(&start_);
233}
234
235StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
236                                 size_t* read, int* error) {
237  if (SIZE_UNKNOWN != length_) {
238    if (pos_ >= length_)
239      return SR_EOS;
240    buffer_len = _min(buffer_len, length_ - pos_);
241  }
242  size_t backup_read;
243  if (!read) {
244    read = &backup_read;
245  }
246  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
247                                                     read, error);
248  if (SR_SUCCESS == result) {
249    pos_ += *read;
250  }
251  return result;
252}
253
254bool StreamSegment::SetPosition(size_t position) {
255  if (SIZE_UNKNOWN == start_)
256    return false;  // Not seekable
257  if ((SIZE_UNKNOWN != length_) && (position > length_))
258    return false;  // Seek past end of segment
259  if (!StreamAdapterInterface::SetPosition(start_ + position))
260    return false;
261  pos_ = position;
262  return true;
263}
264
265bool StreamSegment::GetPosition(size_t* position) const {
266  if (SIZE_UNKNOWN == start_)
267    return false;  // Not seekable
268  if (!StreamAdapterInterface::GetPosition(position))
269    return false;
270  if (position) {
271    ASSERT(*position >= start_);
272    *position -= start_;
273  }
274  return true;
275}
276
277bool StreamSegment::GetSize(size_t* size) const {
278  if (!StreamAdapterInterface::GetSize(size))
279    return false;
280  if (size) {
281    if (SIZE_UNKNOWN != start_) {
282      ASSERT(*size >= start_);
283      *size -= start_;
284    }
285    if (SIZE_UNKNOWN != length_) {
286      *size = _min(*size, length_);
287    }
288  }
289  return true;
290}
291
292bool StreamSegment::GetAvailable(size_t* size) const {
293  if (!StreamAdapterInterface::GetAvailable(size))
294    return false;
295  if (size && (SIZE_UNKNOWN != length_))
296    *size = _min(*size, length_ - pos_);
297  return true;
298}
299
300///////////////////////////////////////////////////////////////////////////////
301// NullStream
302///////////////////////////////////////////////////////////////////////////////
303
304NullStream::NullStream() {
305}
306
307NullStream::~NullStream() {
308}
309
310StreamState NullStream::GetState() const {
311  return SS_OPEN;
312}
313
314StreamResult NullStream::Read(void* buffer, size_t buffer_len,
315                              size_t* read, int* error) {
316  if (error) *error = -1;
317  return SR_ERROR;
318}
319
320StreamResult NullStream::Write(const void* data, size_t data_len,
321                               size_t* written, int* error) {
322  if (written) *written = data_len;
323  return SR_SUCCESS;
324}
325
326void NullStream::Close() {
327}
328
329///////////////////////////////////////////////////////////////////////////////
330// FileStream
331///////////////////////////////////////////////////////////////////////////////
332
333FileStream::FileStream() : file_(NULL) {
334}
335
336FileStream::~FileStream() {
337  FileStream::Close();
338}
339
340bool FileStream::Open(const std::string& filename, const char* mode,
341                      int* error) {
342  Close();
343#ifdef WIN32
344  std::wstring wfilename;
345  if (Utf8ToWindowsFilename(filename, &wfilename)) {
346    file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
347  } else {
348    if (error) {
349      *error = -1;
350      return false;
351    }
352  }
353#else
354  file_ = fopen(filename.c_str(), mode);
355#endif
356  if (!file_ && error) {
357    *error = errno;
358  }
359  return (file_ != NULL);
360}
361
362bool FileStream::OpenShare(const std::string& filename, const char* mode,
363                           int shflag, int* error) {
364  Close();
365#ifdef WIN32
366  std::wstring wfilename;
367  if (Utf8ToWindowsFilename(filename, &wfilename)) {
368    file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
369    if (!file_ && error) {
370      *error = errno;
371      return false;
372    }
373    return file_ != NULL;
374  } else {
375    if (error) {
376      *error = -1;
377    }
378    return false;
379  }
380#else
381  return Open(filename, mode, error);
382#endif
383}
384
385bool FileStream::DisableBuffering() {
386  if (!file_)
387    return false;
388  return (setvbuf(file_, NULL, _IONBF, 0) == 0);
389}
390
391StreamState FileStream::GetState() const {
392  return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
393}
394
395StreamResult FileStream::Read(void* buffer, size_t buffer_len,
396                              size_t* read, int* error) {
397  if (!file_)
398    return SR_EOS;
399  size_t result = fread(buffer, 1, buffer_len, file_);
400  if ((result == 0) && (buffer_len > 0)) {
401    if (feof(file_))
402      return SR_EOS;
403    if (error)
404      *error = errno;
405    return SR_ERROR;
406  }
407  if (read)
408    *read = result;
409  return SR_SUCCESS;
410}
411
412StreamResult FileStream::Write(const void* data, size_t data_len,
413                               size_t* written, int* error) {
414  if (!file_)
415    return SR_EOS;
416  size_t result = fwrite(data, 1, data_len, file_);
417  if ((result == 0) && (data_len > 0)) {
418    if (error)
419      *error = errno;
420    return SR_ERROR;
421  }
422  if (written)
423    *written = result;
424  return SR_SUCCESS;
425}
426
427void FileStream::Close() {
428  if (file_) {
429    DoClose();
430    file_ = NULL;
431  }
432}
433
434bool FileStream::SetPosition(size_t position) {
435  if (!file_)
436    return false;
437  return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
438}
439
440bool FileStream::GetPosition(size_t* position) const {
441  ASSERT(NULL != position);
442  if (!file_)
443    return false;
444  long result = ftell(file_);
445  if (result < 0)
446    return false;
447  if (position)
448    *position = result;
449  return true;
450}
451
452bool FileStream::GetSize(size_t* size) const {
453  ASSERT(NULL != size);
454  if (!file_)
455    return false;
456  struct stat file_stats;
457  if (fstat(fileno(file_), &file_stats) != 0)
458    return false;
459  if (size)
460    *size = file_stats.st_size;
461  return true;
462}
463
464bool FileStream::GetAvailable(size_t* size) const {
465  ASSERT(NULL != size);
466  if (!GetSize(size))
467    return false;
468  long result = ftell(file_);
469  if (result < 0)
470    return false;
471  if (size)
472    *size -= result;
473  return true;
474}
475
476bool FileStream::ReserveSize(size_t size) {
477  // TODO: extend the file to the proper length
478  return true;
479}
480
481bool FileStream::GetSize(const std::string& filename, size_t* size) {
482  struct stat file_stats;
483  if (stat(filename.c_str(), &file_stats) != 0)
484    return false;
485  *size = file_stats.st_size;
486  return true;
487}
488
489bool FileStream::Flush() {
490  if (file_) {
491    return (0 == fflush(file_));
492  }
493  // try to flush empty file?
494  ASSERT(false);
495  return false;
496}
497
498#if defined(POSIX)
499
500bool FileStream::TryLock() {
501  if (file_ == NULL) {
502    // Stream not open.
503    ASSERT(false);
504    return false;
505  }
506
507  return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
508}
509
510bool FileStream::Unlock() {
511  if (file_ == NULL) {
512    // Stream not open.
513    ASSERT(false);
514    return false;
515  }
516
517  return flock(fileno(file_), LOCK_UN) == 0;
518}
519
520#endif
521
522void FileStream::DoClose() {
523  fclose(file_);
524}
525
526CircularFileStream::CircularFileStream(size_t max_size)
527  : max_write_size_(max_size),
528    position_(0),
529    marked_position_(max_size / 2),
530    last_write_position_(0),
531    read_segment_(READ_LATEST),
532    read_segment_available_(0) {
533}
534
535bool CircularFileStream::Open(
536    const std::string& filename, const char* mode, int* error) {
537  if (!FileStream::Open(filename.c_str(), mode, error))
538    return false;
539
540  if (strchr(mode, "r") != NULL) {  // Opened in read mode.
541    // Check if the buffer has been overwritten and determine how to read the
542    // log in time sequence.
543    size_t file_size;
544    GetSize(&file_size);
545    if (file_size == position_) {
546      // The buffer has not been overwritten yet. Read 0 .. file_size
547      read_segment_ = READ_LATEST;
548      read_segment_available_ = file_size;
549    } else {
550      // The buffer has been over written. There are three segments: The first
551      // one is 0 .. marked_position_, which is the marked earliest log. The
552      // second one is position_ .. file_size, which is the middle log. The
553      // last one is marked_position_ .. position_, which is the latest log.
554      read_segment_ = READ_MARKED;
555      read_segment_available_ = marked_position_;
556      last_write_position_ = position_;
557    }
558
559    // Read from the beginning.
560    position_ = 0;
561    SetPosition(position_);
562  }
563
564  return true;
565}
566
567StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
568                                      size_t* read, int* error) {
569  if (read_segment_available_ == 0) {
570    size_t file_size;
571    switch (read_segment_) {
572      case READ_MARKED:  // Finished READ_MARKED and start READ_MIDDLE.
573        read_segment_ = READ_MIDDLE;
574        position_ = last_write_position_;
575        SetPosition(position_);
576        GetSize(&file_size);
577        read_segment_available_ = file_size - position_;
578        break;
579
580      case READ_MIDDLE:  // Finished READ_MIDDLE and start READ_LATEST.
581        read_segment_ = READ_LATEST;
582        position_ = marked_position_;
583        SetPosition(position_);
584        read_segment_available_ = last_write_position_ - position_;
585        break;
586
587      default:  // Finished READ_LATEST and return EOS.
588        return talk_base::SR_EOS;
589    }
590  }
591
592  size_t local_read;
593  if (!read) read = &local_read;
594
595  size_t to_read = talk_base::_min(buffer_len, read_segment_available_);
596  talk_base::StreamResult result
597    = talk_base::FileStream::Read(buffer, to_read, read, error);
598  if (result == talk_base::SR_SUCCESS) {
599    read_segment_available_ -= *read;
600    position_ += *read;
601  }
602  return result;
603}
604
605StreamResult CircularFileStream::Write(const void* data, size_t data_len,
606                                       size_t* written, int* error) {
607  if (position_ >= max_write_size_) {
608    ASSERT(position_ == max_write_size_);
609    position_ = marked_position_;
610    SetPosition(position_);
611  }
612
613  size_t local_written;
614  if (!written) written = &local_written;
615
616  size_t to_eof = max_write_size_ - position_;
617  size_t to_write = talk_base::_min(data_len, to_eof);
618  talk_base::StreamResult result
619    = talk_base::FileStream::Write(data, to_write, written, error);
620  if (result == talk_base::SR_SUCCESS) {
621    position_ += *written;
622  }
623  return result;
624}
625
626AsyncWriteStream::~AsyncWriteStream() {
627  write_thread_->Clear(this, 0, NULL);
628  ClearBufferAndWrite();
629
630  CritScope cs(&crit_stream_);
631  stream_.reset();
632}
633
634// This is needed by some stream writers, such as RtpDumpWriter.
635bool AsyncWriteStream::GetPosition(size_t* position) const {
636  CritScope cs(&crit_stream_);
637  return stream_->GetPosition(position);
638}
639
640// This is needed by some stream writers, such as the plugin log writers.
641StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
642                                    size_t* read, int* error) {
643  CritScope cs(&crit_stream_);
644  return stream_->Read(buffer, buffer_len, read, error);
645}
646
647void AsyncWriteStream::Close() {
648  if (state_ == SS_CLOSED) {
649    return;
650  }
651
652  write_thread_->Clear(this, 0, NULL);
653  ClearBufferAndWrite();
654
655  CritScope cs(&crit_stream_);
656  stream_->Close();
657  state_ = SS_CLOSED;
658}
659
660StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
661                                     size_t* written, int* error) {
662  if (state_ == SS_CLOSED) {
663    return SR_ERROR;
664  }
665
666  size_t previous_buffer_length = 0;
667  {
668    CritScope cs(&crit_buffer_);
669    previous_buffer_length = buffer_.length();
670    buffer_.AppendData(data, data_len);
671  }
672
673  if (previous_buffer_length == 0) {
674    // If there's stuff already in the buffer, then we already called
675    // Post and the write_thread_ hasn't pulled it out yet, so we
676    // don't need to re-Post.
677    write_thread_->Post(this, 0, NULL);
678  }
679  // Return immediately, assuming that it works.
680  if (written) {
681    *written = data_len;
682  }
683  return SR_SUCCESS;
684}
685
686void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
687  ClearBufferAndWrite();
688}
689
690bool AsyncWriteStream::Flush() {
691  if (state_ == SS_CLOSED) {
692    return false;
693  }
694
695  ClearBufferAndWrite();
696
697  CritScope cs(&crit_stream_);
698  return stream_->Flush();
699}
700
701void AsyncWriteStream::ClearBufferAndWrite() {
702  Buffer to_write;
703  {
704    CritScope cs_buffer(&crit_buffer_);
705    buffer_.TransferTo(&to_write);
706  }
707
708  if (to_write.length() > 0) {
709    CritScope cs(&crit_stream_);
710    stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
711  }
712}
713
714#ifdef POSIX
715
716// Have to identically rewrite the FileStream destructor or else it would call
717// the base class's Close() instead of the sub-class's.
718POpenStream::~POpenStream() {
719  POpenStream::Close();
720}
721
722bool POpenStream::Open(const std::string& subcommand,
723                       const char* mode,
724                       int* error) {
725  Close();
726  file_ = popen(subcommand.c_str(), mode);
727  if (file_ == NULL) {
728    if (error)
729      *error = errno;
730    return false;
731  }
732  return true;
733}
734
735bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
736                            int shflag, int* error) {
737  return Open(subcommand, mode, error);
738}
739
740void POpenStream::DoClose() {
741  wait_status_ = pclose(file_);
742}
743
744#endif
745
746///////////////////////////////////////////////////////////////////////////////
747// MemoryStream
748///////////////////////////////////////////////////////////////////////////////
749
750MemoryStreamBase::MemoryStreamBase()
751  : buffer_(NULL), buffer_length_(0), data_length_(0),
752    seek_position_(0) {
753}
754
755StreamState MemoryStreamBase::GetState() const {
756  return SS_OPEN;
757}
758
759StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
760                                    size_t* bytes_read, int* error) {
761  if (seek_position_ >= data_length_) {
762    return SR_EOS;
763  }
764  size_t available = data_length_ - seek_position_;
765  if (bytes > available) {
766    // Read partial buffer
767    bytes = available;
768  }
769  memcpy(buffer, &buffer_[seek_position_], bytes);
770  seek_position_ += bytes;
771  if (bytes_read) {
772    *bytes_read = bytes;
773  }
774  return SR_SUCCESS;
775}
776
777StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
778                                     size_t* bytes_written, int* error) {
779  size_t available = buffer_length_ - seek_position_;
780  if (0 == available) {
781    // Increase buffer size to the larger of:
782    // a) new position rounded up to next 256 bytes
783    // b) double the previous length
784    size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
785                                    buffer_length_ * 2);
786    StreamResult result = DoReserve(new_buffer_length, error);
787    if (SR_SUCCESS != result) {
788      return result;
789    }
790    ASSERT(buffer_length_ >= new_buffer_length);
791    available = buffer_length_ - seek_position_;
792  }
793
794  if (bytes > available) {
795    bytes = available;
796  }
797  memcpy(&buffer_[seek_position_], buffer, bytes);
798  seek_position_ += bytes;
799  if (data_length_ < seek_position_) {
800    data_length_ = seek_position_;
801  }
802  if (bytes_written) {
803    *bytes_written = bytes;
804  }
805  return SR_SUCCESS;
806}
807
808void MemoryStreamBase::Close() {
809  // nothing to do
810}
811
812bool MemoryStreamBase::SetPosition(size_t position) {
813  if (position > data_length_)
814    return false;
815  seek_position_ = position;
816  return true;
817}
818
819bool MemoryStreamBase::GetPosition(size_t* position) const {
820  if (position)
821    *position = seek_position_;
822  return true;
823}
824
825bool MemoryStreamBase::GetSize(size_t* size) const {
826  if (size)
827    *size = data_length_;
828  return true;
829}
830
831bool MemoryStreamBase::GetAvailable(size_t* size) const {
832  if (size)
833    *size = data_length_ - seek_position_;
834  return true;
835}
836
837bool MemoryStreamBase::ReserveSize(size_t size) {
838  return (SR_SUCCESS == DoReserve(size, NULL));
839}
840
841StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
842  return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
843}
844
845///////////////////////////////////////////////////////////////////////////////
846
847MemoryStream::MemoryStream()
848  : buffer_alloc_(NULL) {
849}
850
851MemoryStream::MemoryStream(const char* data)
852  : buffer_alloc_(NULL) {
853  SetData(data, strlen(data));
854}
855
856MemoryStream::MemoryStream(const void* data, size_t length)
857  : buffer_alloc_(NULL) {
858  SetData(data, length);
859}
860
861MemoryStream::~MemoryStream() {
862  delete [] buffer_alloc_;
863}
864
865void MemoryStream::SetData(const void* data, size_t length) {
866  data_length_ = buffer_length_ = length;
867  delete [] buffer_alloc_;
868  buffer_alloc_ = new char[buffer_length_ + kAlignment];
869  buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
870  memcpy(buffer_, data, data_length_);
871  seek_position_ = 0;
872}
873
874StreamResult MemoryStream::DoReserve(size_t size, int* error) {
875  if (buffer_length_ >= size)
876    return SR_SUCCESS;
877
878  if (char* new_buffer_alloc = new char[size + kAlignment]) {
879    char* new_buffer = reinterpret_cast<char*>(
880        ALIGNP(new_buffer_alloc, kAlignment));
881    memcpy(new_buffer, buffer_, data_length_);
882    delete [] buffer_alloc_;
883    buffer_alloc_ = new_buffer_alloc;
884    buffer_ = new_buffer;
885    buffer_length_ = size;
886    return SR_SUCCESS;
887  }
888
889  if (error) {
890    *error = ENOMEM;
891  }
892  return SR_ERROR;
893}
894
895///////////////////////////////////////////////////////////////////////////////
896
897ExternalMemoryStream::ExternalMemoryStream() {
898}
899
900ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
901  SetData(data, length);
902}
903
904ExternalMemoryStream::~ExternalMemoryStream() {
905}
906
907void ExternalMemoryStream::SetData(void* data, size_t length) {
908  data_length_ = buffer_length_ = length;
909  buffer_ = static_cast<char*>(data);
910  seek_position_ = 0;
911}
912
913///////////////////////////////////////////////////////////////////////////////
914// FifoBuffer
915///////////////////////////////////////////////////////////////////////////////
916
917FifoBuffer::FifoBuffer(size_t size)
918    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
919      data_length_(0), read_position_(0), owner_(Thread::Current()) {
920  // all events are done on the owner_ thread
921}
922
923FifoBuffer::FifoBuffer(size_t size, Thread* owner)
924    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
925      data_length_(0), read_position_(0), owner_(owner) {
926  // all events are done on the owner_ thread
927}
928
929FifoBuffer::~FifoBuffer() {
930}
931
932bool FifoBuffer::GetBuffered(size_t* size) const {
933  CritScope cs(&crit_);
934  *size = data_length_;
935  return true;
936}
937
938bool FifoBuffer::SetCapacity(size_t size) {
939  CritScope cs(&crit_);
940  if (data_length_ > size) {
941    return false;
942  }
943
944  if (size != buffer_length_) {
945    char* buffer = new char[size];
946    const size_t copy = data_length_;
947    const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
948    memcpy(buffer, &buffer_[read_position_], tail_copy);
949    memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
950    buffer_.reset(buffer);
951    read_position_ = 0;
952    buffer_length_ = size;
953  }
954  return true;
955}
956
957StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
958                                    size_t offset, size_t* bytes_read) {
959  CritScope cs(&crit_);
960  return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
961}
962
963StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
964                                     size_t offset, size_t* bytes_written) {
965  CritScope cs(&crit_);
966  return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
967}
968
969StreamState FifoBuffer::GetState() const {
970  return state_;
971}
972
973StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
974                              size_t* bytes_read, int* error) {
975  CritScope cs(&crit_);
976  const bool was_writable = data_length_ < buffer_length_;
977  size_t copy = 0;
978  StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
979
980  if (result == SR_SUCCESS) {
981    // If read was successful then adjust the read position and number of
982    // bytes buffered.
983    read_position_ = (read_position_ + copy) % buffer_length_;
984    data_length_ -= copy;
985    if (bytes_read) {
986      *bytes_read = copy;
987    }
988
989    // if we were full before, and now we're not, post an event
990    if (!was_writable && copy > 0) {
991      PostEvent(owner_, SE_WRITE, 0);
992    }
993  }
994  return result;
995}
996
997StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
998                               size_t* bytes_written, int* error) {
999  CritScope cs(&crit_);
1000
1001  const bool was_readable = (data_length_ > 0);
1002  size_t copy = 0;
1003  StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
1004
1005  if (result == SR_SUCCESS) {
1006    // If write was successful then adjust the number of readable bytes.
1007    data_length_ += copy;
1008    if (bytes_written) {
1009      *bytes_written = copy;
1010    }
1011
1012    // if we didn't have any data to read before, and now we do, post an event
1013    if (!was_readable && copy > 0) {
1014      PostEvent(owner_, SE_READ, 0);
1015    }
1016  }
1017  return result;
1018}
1019
1020void FifoBuffer::Close() {
1021  CritScope cs(&crit_);
1022  state_ = SS_CLOSED;
1023}
1024
1025const void* FifoBuffer::GetReadData(size_t* size) {
1026  CritScope cs(&crit_);
1027  *size = (read_position_ + data_length_ <= buffer_length_) ?
1028      data_length_ : buffer_length_ - read_position_;
1029  return &buffer_[read_position_];
1030}
1031
1032void FifoBuffer::ConsumeReadData(size_t size) {
1033  CritScope cs(&crit_);
1034  ASSERT(size <= data_length_);
1035  const bool was_writable = data_length_ < buffer_length_;
1036  read_position_ = (read_position_ + size) % buffer_length_;
1037  data_length_ -= size;
1038  if (!was_writable && size > 0) {
1039    PostEvent(owner_, SE_WRITE, 0);
1040  }
1041}
1042
1043void* FifoBuffer::GetWriteBuffer(size_t* size) {
1044  CritScope cs(&crit_);
1045  if (state_ == SS_CLOSED) {
1046    return NULL;
1047  }
1048
1049  // if empty, reset the write position to the beginning, so we can get
1050  // the biggest possible block
1051  if (data_length_ == 0) {
1052    read_position_ = 0;
1053  }
1054
1055  const size_t write_position = (read_position_ + data_length_)
1056      % buffer_length_;
1057  *size = (write_position > read_position_ || data_length_ == 0) ?
1058      buffer_length_ - write_position : read_position_ - write_position;
1059  return &buffer_[write_position];
1060}
1061
1062void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1063  CritScope cs(&crit_);
1064  ASSERT(size <= buffer_length_ - data_length_);
1065  const bool was_readable = (data_length_ > 0);
1066  data_length_ += size;
1067  if (!was_readable && size > 0) {
1068    PostEvent(owner_, SE_READ, 0);
1069  }
1070}
1071
1072bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1073  CritScope cs(&crit_);
1074  *size = buffer_length_ - data_length_;
1075  return true;
1076}
1077
1078StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1079                                          size_t bytes,
1080                                          size_t offset,
1081                                          size_t* bytes_read) {
1082  if (offset >= data_length_) {
1083    return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1084  }
1085
1086  const size_t available = data_length_ - offset;
1087  const size_t read_position = (read_position_ + offset) % buffer_length_;
1088  const size_t copy = _min(bytes, available);
1089  const size_t tail_copy = _min(copy, buffer_length_ - read_position);
1090  char* const p = static_cast<char*>(buffer);
1091  memcpy(p, &buffer_[read_position], tail_copy);
1092  memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1093
1094  if (bytes_read) {
1095    *bytes_read = copy;
1096  }
1097  return SR_SUCCESS;
1098}
1099
1100StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1101                                           size_t bytes,
1102                                           size_t offset,
1103                                           size_t* bytes_written) {
1104  if (state_ == SS_CLOSED) {
1105    return SR_EOS;
1106  }
1107
1108  if (data_length_ + offset >= buffer_length_) {
1109    return SR_BLOCK;
1110  }
1111
1112  const size_t available = buffer_length_ - data_length_ - offset;
1113  const size_t write_position = (read_position_ + data_length_ + offset)
1114      % buffer_length_;
1115  const size_t copy = _min(bytes, available);
1116  const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1117  const char* const p = static_cast<const char*>(buffer);
1118  memcpy(&buffer_[write_position], p, tail_copy);
1119  memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1120
1121  if (bytes_written) {
1122    *bytes_written = copy;
1123  }
1124  return SR_SUCCESS;
1125}
1126
1127
1128
1129///////////////////////////////////////////////////////////////////////////////
1130// LoggingAdapter
1131///////////////////////////////////////////////////////////////////////////////
1132
1133LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1134                               const std::string& label, bool hex_mode)
1135    : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1136  set_label(label);
1137}
1138
1139void LoggingAdapter::set_label(const std::string& label) {
1140  label_.assign("[");
1141  label_.append(label);
1142  label_.append("]");
1143}
1144
1145StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1146                                  size_t* read, int* error) {
1147  size_t local_read; if (!read) read = &local_read;
1148  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1149                                                     error);
1150  if (result == SR_SUCCESS) {
1151    LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1152  }
1153  return result;
1154}
1155
1156StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1157                                   size_t* written, int* error) {
1158  size_t local_written;
1159  if (!written) written = &local_written;
1160  StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1161                                                      error);
1162  if (result == SR_SUCCESS) {
1163    LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1164                 &lms_);
1165  }
1166  return result;
1167}
1168
1169void LoggingAdapter::Close() {
1170  LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1171  LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1172  LOG_V(level_) << label_ << " Closed locally";
1173  StreamAdapterInterface::Close();
1174}
1175
1176void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1177  if (events & SE_OPEN) {
1178    LOG_V(level_) << label_ << " Open";
1179  } else if (events & SE_CLOSE) {
1180    LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1181    LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1182    LOG_V(level_) << label_ << " Closed with error: " << err;
1183  }
1184  StreamAdapterInterface::OnEvent(stream, events, err);
1185}
1186
1187///////////////////////////////////////////////////////////////////////////////
1188// StringStream - Reads/Writes to an external std::string
1189///////////////////////////////////////////////////////////////////////////////
1190
1191StringStream::StringStream(std::string& str)
1192    : str_(str), read_pos_(0), read_only_(false) {
1193}
1194
1195StringStream::StringStream(const std::string& str)
1196    : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1197}
1198
1199StreamState StringStream::GetState() const {
1200  return SS_OPEN;
1201}
1202
1203StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1204                                      size_t* read, int* error) {
1205  size_t available = _min(buffer_len, str_.size() - read_pos_);
1206  if (!available)
1207    return SR_EOS;
1208  memcpy(buffer, str_.data() + read_pos_, available);
1209  read_pos_ += available;
1210  if (read)
1211    *read = available;
1212  return SR_SUCCESS;
1213}
1214
1215StreamResult StringStream::Write(const void* data, size_t data_len,
1216                                      size_t* written, int* error) {
1217  if (read_only_) {
1218    if (error) {
1219      *error = -1;
1220    }
1221    return SR_ERROR;
1222  }
1223  str_.append(static_cast<const char*>(data),
1224              static_cast<const char*>(data) + data_len);
1225  if (written)
1226    *written = data_len;
1227  return SR_SUCCESS;
1228}
1229
1230void StringStream::Close() {
1231}
1232
1233bool StringStream::SetPosition(size_t position) {
1234  if (position > str_.size())
1235    return false;
1236  read_pos_ = position;
1237  return true;
1238}
1239
1240bool StringStream::GetPosition(size_t* position) const {
1241  if (position)
1242    *position = read_pos_;
1243  return true;
1244}
1245
1246bool StringStream::GetSize(size_t* size) const {
1247  if (size)
1248    *size = str_.size();
1249  return true;
1250}
1251
1252bool StringStream::GetAvailable(size_t* size) const {
1253  if (size)
1254    *size = str_.size() - read_pos_;
1255  return true;
1256}
1257
1258bool StringStream::ReserveSize(size_t size) {
1259  if (read_only_)
1260    return false;
1261  str_.reserve(size);
1262  return true;
1263}
1264
1265///////////////////////////////////////////////////////////////////////////////
1266// StreamReference
1267///////////////////////////////////////////////////////////////////////////////
1268
1269StreamReference::StreamReference(StreamInterface* stream)
1270    : StreamAdapterInterface(stream, false) {
1271  // owner set to false so the destructor does not free the stream.
1272  stream_ref_count_ = new StreamRefCount(stream);
1273}
1274
1275StreamInterface* StreamReference::NewReference() {
1276  stream_ref_count_->AddReference();
1277  return new StreamReference(stream_ref_count_, stream());
1278}
1279
1280StreamReference::~StreamReference() {
1281  stream_ref_count_->Release();
1282}
1283
1284StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1285                                 StreamInterface* stream)
1286    : StreamAdapterInterface(stream, false),
1287      stream_ref_count_(stream_ref_count) {
1288}
1289
1290///////////////////////////////////////////////////////////////////////////////
1291
1292StreamResult Flow(StreamInterface* source,
1293                  char* buffer, size_t buffer_len,
1294                  StreamInterface* sink,
1295                  size_t* data_len /* = NULL */) {
1296  ASSERT(buffer_len > 0);
1297
1298  StreamResult result;
1299  size_t count, read_pos, write_pos;
1300  if (data_len) {
1301    read_pos = *data_len;
1302  } else {
1303    read_pos = 0;
1304  }
1305
1306  bool end_of_stream = false;
1307  do {
1308    // Read until buffer is full, end of stream, or error
1309    while (!end_of_stream && (read_pos < buffer_len)) {
1310      result = source->Read(buffer + read_pos, buffer_len - read_pos,
1311                            &count, NULL);
1312      if (result == SR_EOS) {
1313        end_of_stream = true;
1314      } else if (result != SR_SUCCESS) {
1315        if (data_len) {
1316          *data_len = read_pos;
1317        }
1318        return result;
1319      } else {
1320        read_pos += count;
1321      }
1322    }
1323
1324    // Write until buffer is empty, or error (including end of stream)
1325    write_pos = 0;
1326    while (write_pos < read_pos) {
1327      result = sink->Write(buffer + write_pos, read_pos - write_pos,
1328                           &count, NULL);
1329      if (result != SR_SUCCESS) {
1330        if (data_len) {
1331          *data_len = read_pos - write_pos;
1332          if (write_pos > 0) {
1333            memmove(buffer, buffer + write_pos, *data_len);
1334          }
1335        }
1336        return result;
1337      }
1338      write_pos += count;
1339    }
1340
1341    read_pos = 0;
1342  } while (!end_of_stream);
1343
1344  if (data_len) {
1345    *data_len = 0;
1346  }
1347  return SR_SUCCESS;
1348}
1349
1350///////////////////////////////////////////////////////////////////////////////
1351
1352}  // namespace talk_base
1353