1/*
2 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif  // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
17#include <string>
18#include "webrtc/base/basictypes.h"
19#include "webrtc/base/common.h"
20#include "webrtc/base/logging.h"
21#include "webrtc/base/messagequeue.h"
22#include "webrtc/base/stream.h"
23#include "webrtc/base/stringencode.h"
24#include "webrtc/base/stringutils.h"
25#include "webrtc/base/thread.h"
26#include "webrtc/base/timeutils.h"
27
28#if defined(WEBRTC_WIN)
29#include "webrtc/base/win32.h"
30#define fileno _fileno
31#endif
32
33namespace rtc {
34
35///////////////////////////////////////////////////////////////////////////////
36// StreamInterface
37///////////////////////////////////////////////////////////////////////////////
38StreamInterface::~StreamInterface() {
39}
40
41StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
42                                       size_t* written, int* error) {
43  StreamResult result = SR_SUCCESS;
44  size_t total_written = 0, current_written;
45  while (total_written < data_len) {
46    result = Write(static_cast<const char*>(data) + total_written,
47                   data_len - total_written, &current_written, error);
48    if (result != SR_SUCCESS)
49      break;
50    total_written += current_written;
51  }
52  if (written)
53    *written = total_written;
54  return result;
55}
56
57StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
58                                      size_t* read, int* error) {
59  StreamResult result = SR_SUCCESS;
60  size_t total_read = 0, current_read;
61  while (total_read < buffer_len) {
62    result = Read(static_cast<char*>(buffer) + total_read,
63                  buffer_len - total_read, &current_read, error);
64    if (result != SR_SUCCESS)
65      break;
66    total_read += current_read;
67  }
68  if (read)
69    *read = total_read;
70  return result;
71}
72
73StreamResult StreamInterface::ReadLine(std::string* line) {
74  line->clear();
75  StreamResult result = SR_SUCCESS;
76  while (true) {
77    char ch;
78    result = Read(&ch, sizeof(ch), NULL, NULL);
79    if (result != SR_SUCCESS) {
80      break;
81    }
82    if (ch == '\n') {
83      break;
84    }
85    line->push_back(ch);
86  }
87  if (!line->empty()) {   // give back the line we've collected so far with
88    result = SR_SUCCESS;  // a success code.  Otherwise return the last code
89  }
90  return result;
91}
92
93void StreamInterface::PostEvent(Thread* t, int events, int err) {
94  t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
95}
96
97void StreamInterface::PostEvent(int events, int err) {
98  PostEvent(Thread::Current(), events, err);
99}
100
101StreamInterface::StreamInterface() {
102}
103
104void StreamInterface::OnMessage(Message* msg) {
105  if (MSG_POST_EVENT == msg->message_id) {
106    StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
107    SignalEvent(this, pe->events, pe->error);
108    delete msg->pdata;
109  }
110}
111
112///////////////////////////////////////////////////////////////////////////////
113// StreamAdapterInterface
114///////////////////////////////////////////////////////////////////////////////
115
116StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
117                                               bool owned)
118    : stream_(stream), owned_(owned) {
119  if (NULL != stream_)
120    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
121}
122
123void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
124  if (NULL != stream_)
125    stream_->SignalEvent.disconnect(this);
126  if (owned_)
127    delete stream_;
128  stream_ = stream;
129  owned_ = owned;
130  if (NULL != stream_)
131    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
132}
133
134StreamInterface* StreamAdapterInterface::Detach() {
135  if (NULL != stream_)
136    stream_->SignalEvent.disconnect(this);
137  StreamInterface* stream = stream_;
138  stream_ = NULL;
139  return stream;
140}
141
142StreamAdapterInterface::~StreamAdapterInterface() {
143  if (owned_)
144    delete stream_;
145}
146
147///////////////////////////////////////////////////////////////////////////////
148// StreamTap
149///////////////////////////////////////////////////////////////////////////////
150
151StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
152    : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
153        tap_error_(0) {
154  AttachTap(tap);
155}
156
157void StreamTap::AttachTap(StreamInterface* tap) {
158  tap_.reset(tap);
159}
160
161StreamInterface* StreamTap::DetachTap() {
162  return tap_.release();
163}
164
165StreamResult StreamTap::GetTapResult(int* error) {
166  if (error) {
167    *error = tap_error_;
168  }
169  return tap_result_;
170}
171
172StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
173                             size_t* read, int* error) {
174  size_t backup_read;
175  if (!read) {
176    read = &backup_read;
177  }
178  StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
179                                                  read, error);
180  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
181    tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
182  }
183  return res;
184}
185
186StreamResult StreamTap::Write(const void* data, size_t data_len,
187                              size_t* written, int* error) {
188  size_t backup_written;
189  if (!written) {
190    written = &backup_written;
191  }
192  StreamResult res = StreamAdapterInterface::Write(data, data_len,
193                                                   written, error);
194  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
195    tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
196  }
197  return res;
198}
199
200///////////////////////////////////////////////////////////////////////////////
201// StreamSegment
202///////////////////////////////////////////////////////////////////////////////
203
204StreamSegment::StreamSegment(StreamInterface* stream)
205    : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
206    length_(SIZE_UNKNOWN) {
207  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
208  stream->GetPosition(&start_);
209}
210
211StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
212    : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
213    length_(length) {
214  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
215  stream->GetPosition(&start_);
216}
217
218StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
219                                 size_t* read, int* error) {
220  if (SIZE_UNKNOWN != length_) {
221    if (pos_ >= length_)
222      return SR_EOS;
223    buffer_len = _min(buffer_len, length_ - pos_);
224  }
225  size_t backup_read;
226  if (!read) {
227    read = &backup_read;
228  }
229  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
230                                                     read, error);
231  if (SR_SUCCESS == result) {
232    pos_ += *read;
233  }
234  return result;
235}
236
237bool StreamSegment::SetPosition(size_t position) {
238  if (SIZE_UNKNOWN == start_)
239    return false;  // Not seekable
240  if ((SIZE_UNKNOWN != length_) && (position > length_))
241    return false;  // Seek past end of segment
242  if (!StreamAdapterInterface::SetPosition(start_ + position))
243    return false;
244  pos_ = position;
245  return true;
246}
247
248bool StreamSegment::GetPosition(size_t* position) const {
249  if (SIZE_UNKNOWN == start_)
250    return false;  // Not seekable
251  if (!StreamAdapterInterface::GetPosition(position))
252    return false;
253  if (position) {
254    ASSERT(*position >= start_);
255    *position -= start_;
256  }
257  return true;
258}
259
260bool StreamSegment::GetSize(size_t* size) const {
261  if (!StreamAdapterInterface::GetSize(size))
262    return false;
263  if (size) {
264    if (SIZE_UNKNOWN != start_) {
265      ASSERT(*size >= start_);
266      *size -= start_;
267    }
268    if (SIZE_UNKNOWN != length_) {
269      *size = _min(*size, length_);
270    }
271  }
272  return true;
273}
274
275bool StreamSegment::GetAvailable(size_t* size) const {
276  if (!StreamAdapterInterface::GetAvailable(size))
277    return false;
278  if (size && (SIZE_UNKNOWN != length_))
279    *size = _min(*size, length_ - pos_);
280  return true;
281}
282
283///////////////////////////////////////////////////////////////////////////////
284// NullStream
285///////////////////////////////////////////////////////////////////////////////
286
287NullStream::NullStream() {
288}
289
290NullStream::~NullStream() {
291}
292
293StreamState NullStream::GetState() const {
294  return SS_OPEN;
295}
296
297StreamResult NullStream::Read(void* buffer, size_t buffer_len,
298                              size_t* read, int* error) {
299  if (error) *error = -1;
300  return SR_ERROR;
301}
302
303StreamResult NullStream::Write(const void* data, size_t data_len,
304                               size_t* written, int* error) {
305  if (written) *written = data_len;
306  return SR_SUCCESS;
307}
308
309void NullStream::Close() {
310}
311
312///////////////////////////////////////////////////////////////////////////////
313// FileStream
314///////////////////////////////////////////////////////////////////////////////
315
316FileStream::FileStream() : file_(NULL) {
317}
318
319FileStream::~FileStream() {
320  FileStream::Close();
321}
322
323bool FileStream::Open(const std::string& filename, const char* mode,
324                      int* error) {
325  Close();
326#if defined(WEBRTC_WIN)
327  std::wstring wfilename;
328  if (Utf8ToWindowsFilename(filename, &wfilename)) {
329    file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
330  } else {
331    if (error) {
332      *error = -1;
333      return false;
334    }
335  }
336#else
337  file_ = fopen(filename.c_str(), mode);
338#endif
339  if (!file_ && error) {
340    *error = errno;
341  }
342  return (file_ != NULL);
343}
344
345bool FileStream::OpenShare(const std::string& filename, const char* mode,
346                           int shflag, int* error) {
347  Close();
348#if defined(WEBRTC_WIN)
349  std::wstring wfilename;
350  if (Utf8ToWindowsFilename(filename, &wfilename)) {
351    file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
352    if (!file_ && error) {
353      *error = errno;
354      return false;
355    }
356    return file_ != NULL;
357  } else {
358    if (error) {
359      *error = -1;
360    }
361    return false;
362  }
363#else
364  return Open(filename, mode, error);
365#endif
366}
367
368bool FileStream::DisableBuffering() {
369  if (!file_)
370    return false;
371  return (setvbuf(file_, NULL, _IONBF, 0) == 0);
372}
373
374StreamState FileStream::GetState() const {
375  return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
376}
377
378StreamResult FileStream::Read(void* buffer, size_t buffer_len,
379                              size_t* read, int* error) {
380  if (!file_)
381    return SR_EOS;
382  size_t result = fread(buffer, 1, buffer_len, file_);
383  if ((result == 0) && (buffer_len > 0)) {
384    if (feof(file_))
385      return SR_EOS;
386    if (error)
387      *error = errno;
388    return SR_ERROR;
389  }
390  if (read)
391    *read = result;
392  return SR_SUCCESS;
393}
394
395StreamResult FileStream::Write(const void* data, size_t data_len,
396                               size_t* written, int* error) {
397  if (!file_)
398    return SR_EOS;
399  size_t result = fwrite(data, 1, data_len, file_);
400  if ((result == 0) && (data_len > 0)) {
401    if (error)
402      *error = errno;
403    return SR_ERROR;
404  }
405  if (written)
406    *written = result;
407  return SR_SUCCESS;
408}
409
410void FileStream::Close() {
411  if (file_) {
412    DoClose();
413    file_ = NULL;
414  }
415}
416
417bool FileStream::SetPosition(size_t position) {
418  if (!file_)
419    return false;
420  return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
421}
422
423bool FileStream::GetPosition(size_t* position) const {
424  ASSERT(NULL != position);
425  if (!file_)
426    return false;
427  long result = ftell(file_);
428  if (result < 0)
429    return false;
430  if (position)
431    *position = result;
432  return true;
433}
434
435bool FileStream::GetSize(size_t* size) const {
436  ASSERT(NULL != size);
437  if (!file_)
438    return false;
439  struct stat file_stats;
440  if (fstat(fileno(file_), &file_stats) != 0)
441    return false;
442  if (size)
443    *size = file_stats.st_size;
444  return true;
445}
446
447bool FileStream::GetAvailable(size_t* size) const {
448  ASSERT(NULL != size);
449  if (!GetSize(size))
450    return false;
451  long result = ftell(file_);
452  if (result < 0)
453    return false;
454  if (size)
455    *size -= result;
456  return true;
457}
458
459bool FileStream::ReserveSize(size_t size) {
460  // TODO: extend the file to the proper length
461  return true;
462}
463
464bool FileStream::GetSize(const std::string& filename, size_t* size) {
465  struct stat file_stats;
466  if (stat(filename.c_str(), &file_stats) != 0)
467    return false;
468  *size = file_stats.st_size;
469  return true;
470}
471
472bool FileStream::Flush() {
473  if (file_) {
474    return (0 == fflush(file_));
475  }
476  // try to flush empty file?
477  ASSERT(false);
478  return false;
479}
480
481#if defined(WEBRTC_POSIX) && !defined(__native_client__)
482
483bool FileStream::TryLock() {
484  if (file_ == NULL) {
485    // Stream not open.
486    ASSERT(false);
487    return false;
488  }
489
490  return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
491}
492
493bool FileStream::Unlock() {
494  if (file_ == NULL) {
495    // Stream not open.
496    ASSERT(false);
497    return false;
498  }
499
500  return flock(fileno(file_), LOCK_UN) == 0;
501}
502
503#endif
504
505void FileStream::DoClose() {
506  fclose(file_);
507}
508
509CircularFileStream::CircularFileStream(size_t max_size)
510  : max_write_size_(max_size),
511    position_(0),
512    marked_position_(max_size / 2),
513    last_write_position_(0),
514    read_segment_(READ_LATEST),
515    read_segment_available_(0) {
516}
517
518bool CircularFileStream::Open(
519    const std::string& filename, const char* mode, int* error) {
520  if (!FileStream::Open(filename.c_str(), mode, error))
521    return false;
522
523  if (strchr(mode, "r") != NULL) {  // Opened in read mode.
524    // Check if the buffer has been overwritten and determine how to read the
525    // log in time sequence.
526    size_t file_size;
527    GetSize(&file_size);
528    if (file_size == position_) {
529      // The buffer has not been overwritten yet. Read 0 .. file_size
530      read_segment_ = READ_LATEST;
531      read_segment_available_ = file_size;
532    } else {
533      // The buffer has been over written. There are three segments: The first
534      // one is 0 .. marked_position_, which is the marked earliest log. The
535      // second one is position_ .. file_size, which is the middle log. The
536      // last one is marked_position_ .. position_, which is the latest log.
537      read_segment_ = READ_MARKED;
538      read_segment_available_ = marked_position_;
539      last_write_position_ = position_;
540    }
541
542    // Read from the beginning.
543    position_ = 0;
544    SetPosition(position_);
545  }
546
547  return true;
548}
549
550StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
551                                      size_t* read, int* error) {
552  if (read_segment_available_ == 0) {
553    size_t file_size;
554    switch (read_segment_) {
555      case READ_MARKED:  // Finished READ_MARKED and start READ_MIDDLE.
556        read_segment_ = READ_MIDDLE;
557        position_ = last_write_position_;
558        SetPosition(position_);
559        GetSize(&file_size);
560        read_segment_available_ = file_size - position_;
561        break;
562
563      case READ_MIDDLE:  // Finished READ_MIDDLE and start READ_LATEST.
564        read_segment_ = READ_LATEST;
565        position_ = marked_position_;
566        SetPosition(position_);
567        read_segment_available_ = last_write_position_ - position_;
568        break;
569
570      default:  // Finished READ_LATEST and return EOS.
571        return rtc::SR_EOS;
572    }
573  }
574
575  size_t local_read;
576  if (!read) read = &local_read;
577
578  size_t to_read = rtc::_min(buffer_len, read_segment_available_);
579  rtc::StreamResult result
580    = rtc::FileStream::Read(buffer, to_read, read, error);
581  if (result == rtc::SR_SUCCESS) {
582    read_segment_available_ -= *read;
583    position_ += *read;
584  }
585  return result;
586}
587
588StreamResult CircularFileStream::Write(const void* data, size_t data_len,
589                                       size_t* written, int* error) {
590  if (position_ >= max_write_size_) {
591    ASSERT(position_ == max_write_size_);
592    position_ = marked_position_;
593    SetPosition(position_);
594  }
595
596  size_t local_written;
597  if (!written) written = &local_written;
598
599  size_t to_eof = max_write_size_ - position_;
600  size_t to_write = rtc::_min(data_len, to_eof);
601  rtc::StreamResult result
602    = rtc::FileStream::Write(data, to_write, written, error);
603  if (result == rtc::SR_SUCCESS) {
604    position_ += *written;
605  }
606  return result;
607}
608
609AsyncWriteStream::~AsyncWriteStream() {
610  write_thread_->Clear(this, 0, NULL);
611  ClearBufferAndWrite();
612
613  CritScope cs(&crit_stream_);
614  stream_.reset();
615}
616
617// This is needed by some stream writers, such as RtpDumpWriter.
618bool AsyncWriteStream::GetPosition(size_t* position) const {
619  CritScope cs(&crit_stream_);
620  return stream_->GetPosition(position);
621}
622
623// This is needed by some stream writers, such as the plugin log writers.
624StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
625                                    size_t* read, int* error) {
626  CritScope cs(&crit_stream_);
627  return stream_->Read(buffer, buffer_len, read, error);
628}
629
630void AsyncWriteStream::Close() {
631  if (state_ == SS_CLOSED) {
632    return;
633  }
634
635  write_thread_->Clear(this, 0, NULL);
636  ClearBufferAndWrite();
637
638  CritScope cs(&crit_stream_);
639  stream_->Close();
640  state_ = SS_CLOSED;
641}
642
643StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
644                                     size_t* written, int* error) {
645  if (state_ == SS_CLOSED) {
646    return SR_ERROR;
647  }
648
649  size_t previous_buffer_length = 0;
650  {
651    CritScope cs(&crit_buffer_);
652    previous_buffer_length = buffer_.length();
653    buffer_.AppendData(data, data_len);
654  }
655
656  if (previous_buffer_length == 0) {
657    // If there's stuff already in the buffer, then we already called
658    // Post and the write_thread_ hasn't pulled it out yet, so we
659    // don't need to re-Post.
660    write_thread_->Post(this, 0, NULL);
661  }
662  // Return immediately, assuming that it works.
663  if (written) {
664    *written = data_len;
665  }
666  return SR_SUCCESS;
667}
668
669void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
670  ClearBufferAndWrite();
671}
672
673bool AsyncWriteStream::Flush() {
674  if (state_ == SS_CLOSED) {
675    return false;
676  }
677
678  ClearBufferAndWrite();
679
680  CritScope cs(&crit_stream_);
681  return stream_->Flush();
682}
683
684void AsyncWriteStream::ClearBufferAndWrite() {
685  Buffer to_write;
686  {
687    CritScope cs_buffer(&crit_buffer_);
688    buffer_.TransferTo(&to_write);
689  }
690
691  if (to_write.length() > 0) {
692    CritScope cs(&crit_stream_);
693    stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
694  }
695}
696
697#if defined(WEBRTC_POSIX) && !defined(__native_client__)
698
699// Have to identically rewrite the FileStream destructor or else it would call
700// the base class's Close() instead of the sub-class's.
701POpenStream::~POpenStream() {
702  POpenStream::Close();
703}
704
705bool POpenStream::Open(const std::string& subcommand,
706                       const char* mode,
707                       int* error) {
708  Close();
709  file_ = popen(subcommand.c_str(), mode);
710  if (file_ == NULL) {
711    if (error)
712      *error = errno;
713    return false;
714  }
715  return true;
716}
717
718bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
719                            int shflag, int* error) {
720  return Open(subcommand, mode, error);
721}
722
723void POpenStream::DoClose() {
724  wait_status_ = pclose(file_);
725}
726
727#endif
728
729///////////////////////////////////////////////////////////////////////////////
730// MemoryStream
731///////////////////////////////////////////////////////////////////////////////
732
733MemoryStreamBase::MemoryStreamBase()
734  : buffer_(NULL), buffer_length_(0), data_length_(0),
735    seek_position_(0) {
736}
737
738StreamState MemoryStreamBase::GetState() const {
739  return SS_OPEN;
740}
741
742StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
743                                    size_t* bytes_read, int* error) {
744  if (seek_position_ >= data_length_) {
745    return SR_EOS;
746  }
747  size_t available = data_length_ - seek_position_;
748  if (bytes > available) {
749    // Read partial buffer
750    bytes = available;
751  }
752  memcpy(buffer, &buffer_[seek_position_], bytes);
753  seek_position_ += bytes;
754  if (bytes_read) {
755    *bytes_read = bytes;
756  }
757  return SR_SUCCESS;
758}
759
760StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
761                                     size_t* bytes_written, int* error) {
762  size_t available = buffer_length_ - seek_position_;
763  if (0 == available) {
764    // Increase buffer size to the larger of:
765    // a) new position rounded up to next 256 bytes
766    // b) double the previous length
767    size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
768                                    buffer_length_ * 2);
769    StreamResult result = DoReserve(new_buffer_length, error);
770    if (SR_SUCCESS != result) {
771      return result;
772    }
773    ASSERT(buffer_length_ >= new_buffer_length);
774    available = buffer_length_ - seek_position_;
775  }
776
777  if (bytes > available) {
778    bytes = available;
779  }
780  memcpy(&buffer_[seek_position_], buffer, bytes);
781  seek_position_ += bytes;
782  if (data_length_ < seek_position_) {
783    data_length_ = seek_position_;
784  }
785  if (bytes_written) {
786    *bytes_written = bytes;
787  }
788  return SR_SUCCESS;
789}
790
791void MemoryStreamBase::Close() {
792  // nothing to do
793}
794
795bool MemoryStreamBase::SetPosition(size_t position) {
796  if (position > data_length_)
797    return false;
798  seek_position_ = position;
799  return true;
800}
801
802bool MemoryStreamBase::GetPosition(size_t* position) const {
803  if (position)
804    *position = seek_position_;
805  return true;
806}
807
808bool MemoryStreamBase::GetSize(size_t* size) const {
809  if (size)
810    *size = data_length_;
811  return true;
812}
813
814bool MemoryStreamBase::GetAvailable(size_t* size) const {
815  if (size)
816    *size = data_length_ - seek_position_;
817  return true;
818}
819
820bool MemoryStreamBase::ReserveSize(size_t size) {
821  return (SR_SUCCESS == DoReserve(size, NULL));
822}
823
824StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
825  return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
826}
827
828///////////////////////////////////////////////////////////////////////////////
829
830MemoryStream::MemoryStream()
831  : buffer_alloc_(NULL) {
832}
833
834MemoryStream::MemoryStream(const char* data)
835  : buffer_alloc_(NULL) {
836  SetData(data, strlen(data));
837}
838
839MemoryStream::MemoryStream(const void* data, size_t length)
840  : buffer_alloc_(NULL) {
841  SetData(data, length);
842}
843
844MemoryStream::~MemoryStream() {
845  delete [] buffer_alloc_;
846}
847
848void MemoryStream::SetData(const void* data, size_t length) {
849  data_length_ = buffer_length_ = length;
850  delete [] buffer_alloc_;
851  buffer_alloc_ = new char[buffer_length_ + kAlignment];
852  buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
853  memcpy(buffer_, data, data_length_);
854  seek_position_ = 0;
855}
856
857StreamResult MemoryStream::DoReserve(size_t size, int* error) {
858  if (buffer_length_ >= size)
859    return SR_SUCCESS;
860
861  if (char* new_buffer_alloc = new char[size + kAlignment]) {
862    char* new_buffer = reinterpret_cast<char*>(
863        ALIGNP(new_buffer_alloc, kAlignment));
864    memcpy(new_buffer, buffer_, data_length_);
865    delete [] buffer_alloc_;
866    buffer_alloc_ = new_buffer_alloc;
867    buffer_ = new_buffer;
868    buffer_length_ = size;
869    return SR_SUCCESS;
870  }
871
872  if (error) {
873    *error = ENOMEM;
874  }
875  return SR_ERROR;
876}
877
878///////////////////////////////////////////////////////////////////////////////
879
880ExternalMemoryStream::ExternalMemoryStream() {
881}
882
883ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
884  SetData(data, length);
885}
886
887ExternalMemoryStream::~ExternalMemoryStream() {
888}
889
890void ExternalMemoryStream::SetData(void* data, size_t length) {
891  data_length_ = buffer_length_ = length;
892  buffer_ = static_cast<char*>(data);
893  seek_position_ = 0;
894}
895
896///////////////////////////////////////////////////////////////////////////////
897// FifoBuffer
898///////////////////////////////////////////////////////////////////////////////
899
900FifoBuffer::FifoBuffer(size_t size)
901    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
902      data_length_(0), read_position_(0), owner_(Thread::Current()) {
903  // all events are done on the owner_ thread
904}
905
906FifoBuffer::FifoBuffer(size_t size, Thread* owner)
907    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
908      data_length_(0), read_position_(0), owner_(owner) {
909  // all events are done on the owner_ thread
910}
911
912FifoBuffer::~FifoBuffer() {
913}
914
915bool FifoBuffer::GetBuffered(size_t* size) const {
916  CritScope cs(&crit_);
917  *size = data_length_;
918  return true;
919}
920
921bool FifoBuffer::SetCapacity(size_t size) {
922  CritScope cs(&crit_);
923  if (data_length_ > size) {
924    return false;
925  }
926
927  if (size != buffer_length_) {
928    char* buffer = new char[size];
929    const size_t copy = data_length_;
930    const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
931    memcpy(buffer, &buffer_[read_position_], tail_copy);
932    memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
933    buffer_.reset(buffer);
934    read_position_ = 0;
935    buffer_length_ = size;
936  }
937  return true;
938}
939
940StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
941                                    size_t offset, size_t* bytes_read) {
942  CritScope cs(&crit_);
943  return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
944}
945
946StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
947                                     size_t offset, size_t* bytes_written) {
948  CritScope cs(&crit_);
949  return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
950}
951
952StreamState FifoBuffer::GetState() const {
953  return state_;
954}
955
956StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
957                              size_t* bytes_read, int* error) {
958  CritScope cs(&crit_);
959  const bool was_writable = data_length_ < buffer_length_;
960  size_t copy = 0;
961  StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
962
963  if (result == SR_SUCCESS) {
964    // If read was successful then adjust the read position and number of
965    // bytes buffered.
966    read_position_ = (read_position_ + copy) % buffer_length_;
967    data_length_ -= copy;
968    if (bytes_read) {
969      *bytes_read = copy;
970    }
971
972    // if we were full before, and now we're not, post an event
973    if (!was_writable && copy > 0) {
974      PostEvent(owner_, SE_WRITE, 0);
975    }
976  }
977  return result;
978}
979
980StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
981                               size_t* bytes_written, int* error) {
982  CritScope cs(&crit_);
983
984  const bool was_readable = (data_length_ > 0);
985  size_t copy = 0;
986  StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
987
988  if (result == SR_SUCCESS) {
989    // If write was successful then adjust the number of readable bytes.
990    data_length_ += copy;
991    if (bytes_written) {
992      *bytes_written = copy;
993    }
994
995    // if we didn't have any data to read before, and now we do, post an event
996    if (!was_readable && copy > 0) {
997      PostEvent(owner_, SE_READ, 0);
998    }
999  }
1000  return result;
1001}
1002
1003void FifoBuffer::Close() {
1004  CritScope cs(&crit_);
1005  state_ = SS_CLOSED;
1006}
1007
1008const void* FifoBuffer::GetReadData(size_t* size) {
1009  CritScope cs(&crit_);
1010  *size = (read_position_ + data_length_ <= buffer_length_) ?
1011      data_length_ : buffer_length_ - read_position_;
1012  return &buffer_[read_position_];
1013}
1014
1015void FifoBuffer::ConsumeReadData(size_t size) {
1016  CritScope cs(&crit_);
1017  ASSERT(size <= data_length_);
1018  const bool was_writable = data_length_ < buffer_length_;
1019  read_position_ = (read_position_ + size) % buffer_length_;
1020  data_length_ -= size;
1021  if (!was_writable && size > 0) {
1022    PostEvent(owner_, SE_WRITE, 0);
1023  }
1024}
1025
1026void* FifoBuffer::GetWriteBuffer(size_t* size) {
1027  CritScope cs(&crit_);
1028  if (state_ == SS_CLOSED) {
1029    return NULL;
1030  }
1031
1032  // if empty, reset the write position to the beginning, so we can get
1033  // the biggest possible block
1034  if (data_length_ == 0) {
1035    read_position_ = 0;
1036  }
1037
1038  const size_t write_position = (read_position_ + data_length_)
1039      % buffer_length_;
1040  *size = (write_position > read_position_ || data_length_ == 0) ?
1041      buffer_length_ - write_position : read_position_ - write_position;
1042  return &buffer_[write_position];
1043}
1044
1045void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1046  CritScope cs(&crit_);
1047  ASSERT(size <= buffer_length_ - data_length_);
1048  const bool was_readable = (data_length_ > 0);
1049  data_length_ += size;
1050  if (!was_readable && size > 0) {
1051    PostEvent(owner_, SE_READ, 0);
1052  }
1053}
1054
1055bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1056  CritScope cs(&crit_);
1057  *size = buffer_length_ - data_length_;
1058  return true;
1059}
1060
1061StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1062                                          size_t bytes,
1063                                          size_t offset,
1064                                          size_t* bytes_read) {
1065  if (offset >= data_length_) {
1066    return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1067  }
1068
1069  const size_t available = data_length_ - offset;
1070  const size_t read_position = (read_position_ + offset) % buffer_length_;
1071  const size_t copy = _min(bytes, available);
1072  const size_t tail_copy = _min(copy, buffer_length_ - read_position);
1073  char* const p = static_cast<char*>(buffer);
1074  memcpy(p, &buffer_[read_position], tail_copy);
1075  memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1076
1077  if (bytes_read) {
1078    *bytes_read = copy;
1079  }
1080  return SR_SUCCESS;
1081}
1082
1083StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1084                                           size_t bytes,
1085                                           size_t offset,
1086                                           size_t* bytes_written) {
1087  if (state_ == SS_CLOSED) {
1088    return SR_EOS;
1089  }
1090
1091  if (data_length_ + offset >= buffer_length_) {
1092    return SR_BLOCK;
1093  }
1094
1095  const size_t available = buffer_length_ - data_length_ - offset;
1096  const size_t write_position = (read_position_ + data_length_ + offset)
1097      % buffer_length_;
1098  const size_t copy = _min(bytes, available);
1099  const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1100  const char* const p = static_cast<const char*>(buffer);
1101  memcpy(&buffer_[write_position], p, tail_copy);
1102  memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1103
1104  if (bytes_written) {
1105    *bytes_written = copy;
1106  }
1107  return SR_SUCCESS;
1108}
1109
1110
1111
1112///////////////////////////////////////////////////////////////////////////////
1113// LoggingAdapter
1114///////////////////////////////////////////////////////////////////////////////
1115
1116LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1117                               const std::string& label, bool hex_mode)
1118    : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1119  set_label(label);
1120}
1121
1122void LoggingAdapter::set_label(const std::string& label) {
1123  label_.assign("[");
1124  label_.append(label);
1125  label_.append("]");
1126}
1127
1128StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1129                                  size_t* read, int* error) {
1130  size_t local_read; if (!read) read = &local_read;
1131  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1132                                                     error);
1133  if (result == SR_SUCCESS) {
1134    LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1135  }
1136  return result;
1137}
1138
1139StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1140                                   size_t* written, int* error) {
1141  size_t local_written;
1142  if (!written) written = &local_written;
1143  StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1144                                                      error);
1145  if (result == SR_SUCCESS) {
1146    LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1147                 &lms_);
1148  }
1149  return result;
1150}
1151
1152void LoggingAdapter::Close() {
1153  LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1154  LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1155  LOG_V(level_) << label_ << " Closed locally";
1156  StreamAdapterInterface::Close();
1157}
1158
1159void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1160  if (events & SE_OPEN) {
1161    LOG_V(level_) << label_ << " Open";
1162  } else if (events & SE_CLOSE) {
1163    LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1164    LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1165    LOG_V(level_) << label_ << " Closed with error: " << err;
1166  }
1167  StreamAdapterInterface::OnEvent(stream, events, err);
1168}
1169
1170///////////////////////////////////////////////////////////////////////////////
1171// StringStream - Reads/Writes to an external std::string
1172///////////////////////////////////////////////////////////////////////////////
1173
1174StringStream::StringStream(std::string& str)
1175    : str_(str), read_pos_(0), read_only_(false) {
1176}
1177
1178StringStream::StringStream(const std::string& str)
1179    : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1180}
1181
1182StreamState StringStream::GetState() const {
1183  return SS_OPEN;
1184}
1185
1186StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1187                                      size_t* read, int* error) {
1188  size_t available = _min(buffer_len, str_.size() - read_pos_);
1189  if (!available)
1190    return SR_EOS;
1191  memcpy(buffer, str_.data() + read_pos_, available);
1192  read_pos_ += available;
1193  if (read)
1194    *read = available;
1195  return SR_SUCCESS;
1196}
1197
1198StreamResult StringStream::Write(const void* data, size_t data_len,
1199                                      size_t* written, int* error) {
1200  if (read_only_) {
1201    if (error) {
1202      *error = -1;
1203    }
1204    return SR_ERROR;
1205  }
1206  str_.append(static_cast<const char*>(data),
1207              static_cast<const char*>(data) + data_len);
1208  if (written)
1209    *written = data_len;
1210  return SR_SUCCESS;
1211}
1212
1213void StringStream::Close() {
1214}
1215
1216bool StringStream::SetPosition(size_t position) {
1217  if (position > str_.size())
1218    return false;
1219  read_pos_ = position;
1220  return true;
1221}
1222
1223bool StringStream::GetPosition(size_t* position) const {
1224  if (position)
1225    *position = read_pos_;
1226  return true;
1227}
1228
1229bool StringStream::GetSize(size_t* size) const {
1230  if (size)
1231    *size = str_.size();
1232  return true;
1233}
1234
1235bool StringStream::GetAvailable(size_t* size) const {
1236  if (size)
1237    *size = str_.size() - read_pos_;
1238  return true;
1239}
1240
1241bool StringStream::ReserveSize(size_t size) {
1242  if (read_only_)
1243    return false;
1244  str_.reserve(size);
1245  return true;
1246}
1247
1248///////////////////////////////////////////////////////////////////////////////
1249// StreamReference
1250///////////////////////////////////////////////////////////////////////////////
1251
1252StreamReference::StreamReference(StreamInterface* stream)
1253    : StreamAdapterInterface(stream, false) {
1254  // owner set to false so the destructor does not free the stream.
1255  stream_ref_count_ = new StreamRefCount(stream);
1256}
1257
1258StreamInterface* StreamReference::NewReference() {
1259  stream_ref_count_->AddReference();
1260  return new StreamReference(stream_ref_count_, stream());
1261}
1262
1263StreamReference::~StreamReference() {
1264  stream_ref_count_->Release();
1265}
1266
1267StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1268                                 StreamInterface* stream)
1269    : StreamAdapterInterface(stream, false),
1270      stream_ref_count_(stream_ref_count) {
1271}
1272
1273///////////////////////////////////////////////////////////////////////////////
1274
1275StreamResult Flow(StreamInterface* source,
1276                  char* buffer, size_t buffer_len,
1277                  StreamInterface* sink,
1278                  size_t* data_len /* = NULL */) {
1279  ASSERT(buffer_len > 0);
1280
1281  StreamResult result;
1282  size_t count, read_pos, write_pos;
1283  if (data_len) {
1284    read_pos = *data_len;
1285  } else {
1286    read_pos = 0;
1287  }
1288
1289  bool end_of_stream = false;
1290  do {
1291    // Read until buffer is full, end of stream, or error
1292    while (!end_of_stream && (read_pos < buffer_len)) {
1293      result = source->Read(buffer + read_pos, buffer_len - read_pos,
1294                            &count, NULL);
1295      if (result == SR_EOS) {
1296        end_of_stream = true;
1297      } else if (result != SR_SUCCESS) {
1298        if (data_len) {
1299          *data_len = read_pos;
1300        }
1301        return result;
1302      } else {
1303        read_pos += count;
1304      }
1305    }
1306
1307    // Write until buffer is empty, or error (including end of stream)
1308    write_pos = 0;
1309    while (write_pos < read_pos) {
1310      result = sink->Write(buffer + write_pos, read_pos - write_pos,
1311                           &count, NULL);
1312      if (result != SR_SUCCESS) {
1313        if (data_len) {
1314          *data_len = read_pos - write_pos;
1315          if (write_pos > 0) {
1316            memmove(buffer, buffer + write_pos, *data_len);
1317          }
1318        }
1319        return result;
1320      }
1321      write_pos += count;
1322    }
1323
1324    read_pos = 0;
1325  } while (!end_of_stream);
1326
1327  if (data_len) {
1328    *data_len = 0;
1329  }
1330  return SR_SUCCESS;
1331}
1332
1333///////////////////////////////////////////////////////////////////////////////
1334
1335}  // namespace rtc
1336