1/* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11#if defined(WEBRTC_POSIX) 12#include <sys/file.h> 13#endif // WEBRTC_POSIX 14#include <sys/types.h> 15#include <sys/stat.h> 16#include <errno.h> 17#include <string> 18#include "webrtc/base/basictypes.h" 19#include "webrtc/base/common.h" 20#include "webrtc/base/logging.h" 21#include "webrtc/base/messagequeue.h" 22#include "webrtc/base/stream.h" 23#include "webrtc/base/stringencode.h" 24#include "webrtc/base/stringutils.h" 25#include "webrtc/base/thread.h" 26#include "webrtc/base/timeutils.h" 27 28#if defined(WEBRTC_WIN) 29#include "webrtc/base/win32.h" 30#define fileno _fileno 31#endif 32 33namespace rtc { 34 35/////////////////////////////////////////////////////////////////////////////// 36// StreamInterface 37/////////////////////////////////////////////////////////////////////////////// 38StreamInterface::~StreamInterface() { 39} 40 41StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, 42 size_t* written, int* error) { 43 StreamResult result = SR_SUCCESS; 44 size_t total_written = 0, current_written; 45 while (total_written < data_len) { 46 result = Write(static_cast<const char*>(data) + total_written, 47 data_len - total_written, ¤t_written, error); 48 if (result != SR_SUCCESS) 49 break; 50 total_written += current_written; 51 } 52 if (written) 53 *written = total_written; 54 return result; 55} 56 57StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, 58 size_t* read, int* error) { 59 StreamResult result = SR_SUCCESS; 60 size_t total_read = 0, current_read; 61 while (total_read < buffer_len) { 62 result = Read(static_cast<char*>(buffer) + total_read, 63 buffer_len - total_read, ¤t_read, error); 64 if (result != SR_SUCCESS) 65 break; 66 total_read += current_read; 67 } 68 if (read) 69 *read = total_read; 70 return result; 71} 72 73StreamResult StreamInterface::ReadLine(std::string* line) { 74 line->clear(); 75 StreamResult result = SR_SUCCESS; 76 while (true) { 77 char ch; 78 result = Read(&ch, sizeof(ch), NULL, NULL); 79 if (result != SR_SUCCESS) { 80 break; 81 } 82 if (ch == '\n') { 83 break; 84 } 85 line->push_back(ch); 86 } 87 if (!line->empty()) { // give back the line we've collected so far with 88 result = SR_SUCCESS; // a success code. Otherwise return the last code 89 } 90 return result; 91} 92 93void StreamInterface::PostEvent(Thread* t, int events, int err) { 94 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err)); 95} 96 97void StreamInterface::PostEvent(int events, int err) { 98 PostEvent(Thread::Current(), events, err); 99} 100 101StreamInterface::StreamInterface() { 102} 103 104void StreamInterface::OnMessage(Message* msg) { 105 if (MSG_POST_EVENT == msg->message_id) { 106 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata); 107 SignalEvent(this, pe->events, pe->error); 108 delete msg->pdata; 109 } 110} 111 112/////////////////////////////////////////////////////////////////////////////// 113// StreamAdapterInterface 114/////////////////////////////////////////////////////////////////////////////// 115 116StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, 117 bool owned) 118 : stream_(stream), owned_(owned) { 119 if (NULL != stream_) 120 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 121} 122 123void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { 124 if (NULL != stream_) 125 stream_->SignalEvent.disconnect(this); 126 if (owned_) 127 delete stream_; 128 stream_ = stream; 129 owned_ = owned; 130 if (NULL != stream_) 131 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 132} 133 134StreamInterface* StreamAdapterInterface::Detach() { 135 if (NULL != stream_) 136 stream_->SignalEvent.disconnect(this); 137 StreamInterface* stream = stream_; 138 stream_ = NULL; 139 return stream; 140} 141 142StreamAdapterInterface::~StreamAdapterInterface() { 143 if (owned_) 144 delete stream_; 145} 146 147/////////////////////////////////////////////////////////////////////////////// 148// StreamTap 149/////////////////////////////////////////////////////////////////////////////// 150 151StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap) 152 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS), 153 tap_error_(0) { 154 AttachTap(tap); 155} 156 157void StreamTap::AttachTap(StreamInterface* tap) { 158 tap_.reset(tap); 159} 160 161StreamInterface* StreamTap::DetachTap() { 162 return tap_.release(); 163} 164 165StreamResult StreamTap::GetTapResult(int* error) { 166 if (error) { 167 *error = tap_error_; 168 } 169 return tap_result_; 170} 171 172StreamResult StreamTap::Read(void* buffer, size_t buffer_len, 173 size_t* read, int* error) { 174 size_t backup_read; 175 if (!read) { 176 read = &backup_read; 177 } 178 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len, 179 read, error); 180 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 181 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_); 182 } 183 return res; 184} 185 186StreamResult StreamTap::Write(const void* data, size_t data_len, 187 size_t* written, int* error) { 188 size_t backup_written; 189 if (!written) { 190 written = &backup_written; 191 } 192 StreamResult res = StreamAdapterInterface::Write(data, data_len, 193 written, error); 194 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 195 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_); 196 } 197 return res; 198} 199 200/////////////////////////////////////////////////////////////////////////////// 201// StreamSegment 202/////////////////////////////////////////////////////////////////////////////// 203 204StreamSegment::StreamSegment(StreamInterface* stream) 205 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 206 length_(SIZE_UNKNOWN) { 207 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 208 stream->GetPosition(&start_); 209} 210 211StreamSegment::StreamSegment(StreamInterface* stream, size_t length) 212 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 213 length_(length) { 214 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 215 stream->GetPosition(&start_); 216} 217 218StreamResult StreamSegment::Read(void* buffer, size_t buffer_len, 219 size_t* read, int* error) { 220 if (SIZE_UNKNOWN != length_) { 221 if (pos_ >= length_) 222 return SR_EOS; 223 buffer_len = _min(buffer_len, length_ - pos_); 224 } 225 size_t backup_read; 226 if (!read) { 227 read = &backup_read; 228 } 229 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, 230 read, error); 231 if (SR_SUCCESS == result) { 232 pos_ += *read; 233 } 234 return result; 235} 236 237bool StreamSegment::SetPosition(size_t position) { 238 if (SIZE_UNKNOWN == start_) 239 return false; // Not seekable 240 if ((SIZE_UNKNOWN != length_) && (position > length_)) 241 return false; // Seek past end of segment 242 if (!StreamAdapterInterface::SetPosition(start_ + position)) 243 return false; 244 pos_ = position; 245 return true; 246} 247 248bool StreamSegment::GetPosition(size_t* position) const { 249 if (SIZE_UNKNOWN == start_) 250 return false; // Not seekable 251 if (!StreamAdapterInterface::GetPosition(position)) 252 return false; 253 if (position) { 254 ASSERT(*position >= start_); 255 *position -= start_; 256 } 257 return true; 258} 259 260bool StreamSegment::GetSize(size_t* size) const { 261 if (!StreamAdapterInterface::GetSize(size)) 262 return false; 263 if (size) { 264 if (SIZE_UNKNOWN != start_) { 265 ASSERT(*size >= start_); 266 *size -= start_; 267 } 268 if (SIZE_UNKNOWN != length_) { 269 *size = _min(*size, length_); 270 } 271 } 272 return true; 273} 274 275bool StreamSegment::GetAvailable(size_t* size) const { 276 if (!StreamAdapterInterface::GetAvailable(size)) 277 return false; 278 if (size && (SIZE_UNKNOWN != length_)) 279 *size = _min(*size, length_ - pos_); 280 return true; 281} 282 283/////////////////////////////////////////////////////////////////////////////// 284// NullStream 285/////////////////////////////////////////////////////////////////////////////// 286 287NullStream::NullStream() { 288} 289 290NullStream::~NullStream() { 291} 292 293StreamState NullStream::GetState() const { 294 return SS_OPEN; 295} 296 297StreamResult NullStream::Read(void* buffer, size_t buffer_len, 298 size_t* read, int* error) { 299 if (error) *error = -1; 300 return SR_ERROR; 301} 302 303StreamResult NullStream::Write(const void* data, size_t data_len, 304 size_t* written, int* error) { 305 if (written) *written = data_len; 306 return SR_SUCCESS; 307} 308 309void NullStream::Close() { 310} 311 312/////////////////////////////////////////////////////////////////////////////// 313// FileStream 314/////////////////////////////////////////////////////////////////////////////// 315 316FileStream::FileStream() : file_(NULL) { 317} 318 319FileStream::~FileStream() { 320 FileStream::Close(); 321} 322 323bool FileStream::Open(const std::string& filename, const char* mode, 324 int* error) { 325 Close(); 326#if defined(WEBRTC_WIN) 327 std::wstring wfilename; 328 if (Utf8ToWindowsFilename(filename, &wfilename)) { 329 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); 330 } else { 331 if (error) { 332 *error = -1; 333 return false; 334 } 335 } 336#else 337 file_ = fopen(filename.c_str(), mode); 338#endif 339 if (!file_ && error) { 340 *error = errno; 341 } 342 return (file_ != NULL); 343} 344 345bool FileStream::OpenShare(const std::string& filename, const char* mode, 346 int shflag, int* error) { 347 Close(); 348#if defined(WEBRTC_WIN) 349 std::wstring wfilename; 350 if (Utf8ToWindowsFilename(filename, &wfilename)) { 351 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); 352 if (!file_ && error) { 353 *error = errno; 354 return false; 355 } 356 return file_ != NULL; 357 } else { 358 if (error) { 359 *error = -1; 360 } 361 return false; 362 } 363#else 364 return Open(filename, mode, error); 365#endif 366} 367 368bool FileStream::DisableBuffering() { 369 if (!file_) 370 return false; 371 return (setvbuf(file_, NULL, _IONBF, 0) == 0); 372} 373 374StreamState FileStream::GetState() const { 375 return (file_ == NULL) ? SS_CLOSED : SS_OPEN; 376} 377 378StreamResult FileStream::Read(void* buffer, size_t buffer_len, 379 size_t* read, int* error) { 380 if (!file_) 381 return SR_EOS; 382 size_t result = fread(buffer, 1, buffer_len, file_); 383 if ((result == 0) && (buffer_len > 0)) { 384 if (feof(file_)) 385 return SR_EOS; 386 if (error) 387 *error = errno; 388 return SR_ERROR; 389 } 390 if (read) 391 *read = result; 392 return SR_SUCCESS; 393} 394 395StreamResult FileStream::Write(const void* data, size_t data_len, 396 size_t* written, int* error) { 397 if (!file_) 398 return SR_EOS; 399 size_t result = fwrite(data, 1, data_len, file_); 400 if ((result == 0) && (data_len > 0)) { 401 if (error) 402 *error = errno; 403 return SR_ERROR; 404 } 405 if (written) 406 *written = result; 407 return SR_SUCCESS; 408} 409 410void FileStream::Close() { 411 if (file_) { 412 DoClose(); 413 file_ = NULL; 414 } 415} 416 417bool FileStream::SetPosition(size_t position) { 418 if (!file_) 419 return false; 420 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0); 421} 422 423bool FileStream::GetPosition(size_t* position) const { 424 ASSERT(NULL != position); 425 if (!file_) 426 return false; 427 long result = ftell(file_); 428 if (result < 0) 429 return false; 430 if (position) 431 *position = result; 432 return true; 433} 434 435bool FileStream::GetSize(size_t* size) const { 436 ASSERT(NULL != size); 437 if (!file_) 438 return false; 439 struct stat file_stats; 440 if (fstat(fileno(file_), &file_stats) != 0) 441 return false; 442 if (size) 443 *size = file_stats.st_size; 444 return true; 445} 446 447bool FileStream::GetAvailable(size_t* size) const { 448 ASSERT(NULL != size); 449 if (!GetSize(size)) 450 return false; 451 long result = ftell(file_); 452 if (result < 0) 453 return false; 454 if (size) 455 *size -= result; 456 return true; 457} 458 459bool FileStream::ReserveSize(size_t size) { 460 // TODO: extend the file to the proper length 461 return true; 462} 463 464bool FileStream::GetSize(const std::string& filename, size_t* size) { 465 struct stat file_stats; 466 if (stat(filename.c_str(), &file_stats) != 0) 467 return false; 468 *size = file_stats.st_size; 469 return true; 470} 471 472bool FileStream::Flush() { 473 if (file_) { 474 return (0 == fflush(file_)); 475 } 476 // try to flush empty file? 477 ASSERT(false); 478 return false; 479} 480 481#if defined(WEBRTC_POSIX) && !defined(__native_client__) 482 483bool FileStream::TryLock() { 484 if (file_ == NULL) { 485 // Stream not open. 486 ASSERT(false); 487 return false; 488 } 489 490 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0; 491} 492 493bool FileStream::Unlock() { 494 if (file_ == NULL) { 495 // Stream not open. 496 ASSERT(false); 497 return false; 498 } 499 500 return flock(fileno(file_), LOCK_UN) == 0; 501} 502 503#endif 504 505void FileStream::DoClose() { 506 fclose(file_); 507} 508 509CircularFileStream::CircularFileStream(size_t max_size) 510 : max_write_size_(max_size), 511 position_(0), 512 marked_position_(max_size / 2), 513 last_write_position_(0), 514 read_segment_(READ_LATEST), 515 read_segment_available_(0) { 516} 517 518bool CircularFileStream::Open( 519 const std::string& filename, const char* mode, int* error) { 520 if (!FileStream::Open(filename.c_str(), mode, error)) 521 return false; 522 523 if (strchr(mode, "r") != NULL) { // Opened in read mode. 524 // Check if the buffer has been overwritten and determine how to read the 525 // log in time sequence. 526 size_t file_size; 527 GetSize(&file_size); 528 if (file_size == position_) { 529 // The buffer has not been overwritten yet. Read 0 .. file_size 530 read_segment_ = READ_LATEST; 531 read_segment_available_ = file_size; 532 } else { 533 // The buffer has been over written. There are three segments: The first 534 // one is 0 .. marked_position_, which is the marked earliest log. The 535 // second one is position_ .. file_size, which is the middle log. The 536 // last one is marked_position_ .. position_, which is the latest log. 537 read_segment_ = READ_MARKED; 538 read_segment_available_ = marked_position_; 539 last_write_position_ = position_; 540 } 541 542 // Read from the beginning. 543 position_ = 0; 544 SetPosition(position_); 545 } 546 547 return true; 548} 549 550StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len, 551 size_t* read, int* error) { 552 if (read_segment_available_ == 0) { 553 size_t file_size; 554 switch (read_segment_) { 555 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE. 556 read_segment_ = READ_MIDDLE; 557 position_ = last_write_position_; 558 SetPosition(position_); 559 GetSize(&file_size); 560 read_segment_available_ = file_size - position_; 561 break; 562 563 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST. 564 read_segment_ = READ_LATEST; 565 position_ = marked_position_; 566 SetPosition(position_); 567 read_segment_available_ = last_write_position_ - position_; 568 break; 569 570 default: // Finished READ_LATEST and return EOS. 571 return rtc::SR_EOS; 572 } 573 } 574 575 size_t local_read; 576 if (!read) read = &local_read; 577 578 size_t to_read = rtc::_min(buffer_len, read_segment_available_); 579 rtc::StreamResult result 580 = rtc::FileStream::Read(buffer, to_read, read, error); 581 if (result == rtc::SR_SUCCESS) { 582 read_segment_available_ -= *read; 583 position_ += *read; 584 } 585 return result; 586} 587 588StreamResult CircularFileStream::Write(const void* data, size_t data_len, 589 size_t* written, int* error) { 590 if (position_ >= max_write_size_) { 591 ASSERT(position_ == max_write_size_); 592 position_ = marked_position_; 593 SetPosition(position_); 594 } 595 596 size_t local_written; 597 if (!written) written = &local_written; 598 599 size_t to_eof = max_write_size_ - position_; 600 size_t to_write = rtc::_min(data_len, to_eof); 601 rtc::StreamResult result 602 = rtc::FileStream::Write(data, to_write, written, error); 603 if (result == rtc::SR_SUCCESS) { 604 position_ += *written; 605 } 606 return result; 607} 608 609AsyncWriteStream::~AsyncWriteStream() { 610 write_thread_->Clear(this, 0, NULL); 611 ClearBufferAndWrite(); 612 613 CritScope cs(&crit_stream_); 614 stream_.reset(); 615} 616 617// This is needed by some stream writers, such as RtpDumpWriter. 618bool AsyncWriteStream::GetPosition(size_t* position) const { 619 CritScope cs(&crit_stream_); 620 return stream_->GetPosition(position); 621} 622 623// This is needed by some stream writers, such as the plugin log writers. 624StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len, 625 size_t* read, int* error) { 626 CritScope cs(&crit_stream_); 627 return stream_->Read(buffer, buffer_len, read, error); 628} 629 630void AsyncWriteStream::Close() { 631 if (state_ == SS_CLOSED) { 632 return; 633 } 634 635 write_thread_->Clear(this, 0, NULL); 636 ClearBufferAndWrite(); 637 638 CritScope cs(&crit_stream_); 639 stream_->Close(); 640 state_ = SS_CLOSED; 641} 642 643StreamResult AsyncWriteStream::Write(const void* data, size_t data_len, 644 size_t* written, int* error) { 645 if (state_ == SS_CLOSED) { 646 return SR_ERROR; 647 } 648 649 size_t previous_buffer_length = 0; 650 { 651 CritScope cs(&crit_buffer_); 652 previous_buffer_length = buffer_.length(); 653 buffer_.AppendData(data, data_len); 654 } 655 656 if (previous_buffer_length == 0) { 657 // If there's stuff already in the buffer, then we already called 658 // Post and the write_thread_ hasn't pulled it out yet, so we 659 // don't need to re-Post. 660 write_thread_->Post(this, 0, NULL); 661 } 662 // Return immediately, assuming that it works. 663 if (written) { 664 *written = data_len; 665 } 666 return SR_SUCCESS; 667} 668 669void AsyncWriteStream::OnMessage(rtc::Message* pmsg) { 670 ClearBufferAndWrite(); 671} 672 673bool AsyncWriteStream::Flush() { 674 if (state_ == SS_CLOSED) { 675 return false; 676 } 677 678 ClearBufferAndWrite(); 679 680 CritScope cs(&crit_stream_); 681 return stream_->Flush(); 682} 683 684void AsyncWriteStream::ClearBufferAndWrite() { 685 Buffer to_write; 686 { 687 CritScope cs_buffer(&crit_buffer_); 688 buffer_.TransferTo(&to_write); 689 } 690 691 if (to_write.length() > 0) { 692 CritScope cs(&crit_stream_); 693 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL); 694 } 695} 696 697#if defined(WEBRTC_POSIX) && !defined(__native_client__) 698 699// Have to identically rewrite the FileStream destructor or else it would call 700// the base class's Close() instead of the sub-class's. 701POpenStream::~POpenStream() { 702 POpenStream::Close(); 703} 704 705bool POpenStream::Open(const std::string& subcommand, 706 const char* mode, 707 int* error) { 708 Close(); 709 file_ = popen(subcommand.c_str(), mode); 710 if (file_ == NULL) { 711 if (error) 712 *error = errno; 713 return false; 714 } 715 return true; 716} 717 718bool POpenStream::OpenShare(const std::string& subcommand, const char* mode, 719 int shflag, int* error) { 720 return Open(subcommand, mode, error); 721} 722 723void POpenStream::DoClose() { 724 wait_status_ = pclose(file_); 725} 726 727#endif 728 729/////////////////////////////////////////////////////////////////////////////// 730// MemoryStream 731/////////////////////////////////////////////////////////////////////////////// 732 733MemoryStreamBase::MemoryStreamBase() 734 : buffer_(NULL), buffer_length_(0), data_length_(0), 735 seek_position_(0) { 736} 737 738StreamState MemoryStreamBase::GetState() const { 739 return SS_OPEN; 740} 741 742StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes, 743 size_t* bytes_read, int* error) { 744 if (seek_position_ >= data_length_) { 745 return SR_EOS; 746 } 747 size_t available = data_length_ - seek_position_; 748 if (bytes > available) { 749 // Read partial buffer 750 bytes = available; 751 } 752 memcpy(buffer, &buffer_[seek_position_], bytes); 753 seek_position_ += bytes; 754 if (bytes_read) { 755 *bytes_read = bytes; 756 } 757 return SR_SUCCESS; 758} 759 760StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes, 761 size_t* bytes_written, int* error) { 762 size_t available = buffer_length_ - seek_position_; 763 if (0 == available) { 764 // Increase buffer size to the larger of: 765 // a) new position rounded up to next 256 bytes 766 // b) double the previous length 767 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1, 768 buffer_length_ * 2); 769 StreamResult result = DoReserve(new_buffer_length, error); 770 if (SR_SUCCESS != result) { 771 return result; 772 } 773 ASSERT(buffer_length_ >= new_buffer_length); 774 available = buffer_length_ - seek_position_; 775 } 776 777 if (bytes > available) { 778 bytes = available; 779 } 780 memcpy(&buffer_[seek_position_], buffer, bytes); 781 seek_position_ += bytes; 782 if (data_length_ < seek_position_) { 783 data_length_ = seek_position_; 784 } 785 if (bytes_written) { 786 *bytes_written = bytes; 787 } 788 return SR_SUCCESS; 789} 790 791void MemoryStreamBase::Close() { 792 // nothing to do 793} 794 795bool MemoryStreamBase::SetPosition(size_t position) { 796 if (position > data_length_) 797 return false; 798 seek_position_ = position; 799 return true; 800} 801 802bool MemoryStreamBase::GetPosition(size_t* position) const { 803 if (position) 804 *position = seek_position_; 805 return true; 806} 807 808bool MemoryStreamBase::GetSize(size_t* size) const { 809 if (size) 810 *size = data_length_; 811 return true; 812} 813 814bool MemoryStreamBase::GetAvailable(size_t* size) const { 815 if (size) 816 *size = data_length_ - seek_position_; 817 return true; 818} 819 820bool MemoryStreamBase::ReserveSize(size_t size) { 821 return (SR_SUCCESS == DoReserve(size, NULL)); 822} 823 824StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) { 825 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS; 826} 827 828/////////////////////////////////////////////////////////////////////////////// 829 830MemoryStream::MemoryStream() 831 : buffer_alloc_(NULL) { 832} 833 834MemoryStream::MemoryStream(const char* data) 835 : buffer_alloc_(NULL) { 836 SetData(data, strlen(data)); 837} 838 839MemoryStream::MemoryStream(const void* data, size_t length) 840 : buffer_alloc_(NULL) { 841 SetData(data, length); 842} 843 844MemoryStream::~MemoryStream() { 845 delete [] buffer_alloc_; 846} 847 848void MemoryStream::SetData(const void* data, size_t length) { 849 data_length_ = buffer_length_ = length; 850 delete [] buffer_alloc_; 851 buffer_alloc_ = new char[buffer_length_ + kAlignment]; 852 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment)); 853 memcpy(buffer_, data, data_length_); 854 seek_position_ = 0; 855} 856 857StreamResult MemoryStream::DoReserve(size_t size, int* error) { 858 if (buffer_length_ >= size) 859 return SR_SUCCESS; 860 861 if (char* new_buffer_alloc = new char[size + kAlignment]) { 862 char* new_buffer = reinterpret_cast<char*>( 863 ALIGNP(new_buffer_alloc, kAlignment)); 864 memcpy(new_buffer, buffer_, data_length_); 865 delete [] buffer_alloc_; 866 buffer_alloc_ = new_buffer_alloc; 867 buffer_ = new_buffer; 868 buffer_length_ = size; 869 return SR_SUCCESS; 870 } 871 872 if (error) { 873 *error = ENOMEM; 874 } 875 return SR_ERROR; 876} 877 878/////////////////////////////////////////////////////////////////////////////// 879 880ExternalMemoryStream::ExternalMemoryStream() { 881} 882 883ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) { 884 SetData(data, length); 885} 886 887ExternalMemoryStream::~ExternalMemoryStream() { 888} 889 890void ExternalMemoryStream::SetData(void* data, size_t length) { 891 data_length_ = buffer_length_ = length; 892 buffer_ = static_cast<char*>(data); 893 seek_position_ = 0; 894} 895 896/////////////////////////////////////////////////////////////////////////////// 897// FifoBuffer 898/////////////////////////////////////////////////////////////////////////////// 899 900FifoBuffer::FifoBuffer(size_t size) 901 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 902 data_length_(0), read_position_(0), owner_(Thread::Current()) { 903 // all events are done on the owner_ thread 904} 905 906FifoBuffer::FifoBuffer(size_t size, Thread* owner) 907 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 908 data_length_(0), read_position_(0), owner_(owner) { 909 // all events are done on the owner_ thread 910} 911 912FifoBuffer::~FifoBuffer() { 913} 914 915bool FifoBuffer::GetBuffered(size_t* size) const { 916 CritScope cs(&crit_); 917 *size = data_length_; 918 return true; 919} 920 921bool FifoBuffer::SetCapacity(size_t size) { 922 CritScope cs(&crit_); 923 if (data_length_ > size) { 924 return false; 925 } 926 927 if (size != buffer_length_) { 928 char* buffer = new char[size]; 929 const size_t copy = data_length_; 930 const size_t tail_copy = _min(copy, buffer_length_ - read_position_); 931 memcpy(buffer, &buffer_[read_position_], tail_copy); 932 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); 933 buffer_.reset(buffer); 934 read_position_ = 0; 935 buffer_length_ = size; 936 } 937 return true; 938} 939 940StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes, 941 size_t offset, size_t* bytes_read) { 942 CritScope cs(&crit_); 943 return ReadOffsetLocked(buffer, bytes, offset, bytes_read); 944} 945 946StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes, 947 size_t offset, size_t* bytes_written) { 948 CritScope cs(&crit_); 949 return WriteOffsetLocked(buffer, bytes, offset, bytes_written); 950} 951 952StreamState FifoBuffer::GetState() const { 953 return state_; 954} 955 956StreamResult FifoBuffer::Read(void* buffer, size_t bytes, 957 size_t* bytes_read, int* error) { 958 CritScope cs(&crit_); 959 const bool was_writable = data_length_ < buffer_length_; 960 size_t copy = 0; 961 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); 962 963 if (result == SR_SUCCESS) { 964 // If read was successful then adjust the read position and number of 965 // bytes buffered. 966 read_position_ = (read_position_ + copy) % buffer_length_; 967 data_length_ -= copy; 968 if (bytes_read) { 969 *bytes_read = copy; 970 } 971 972 // if we were full before, and now we're not, post an event 973 if (!was_writable && copy > 0) { 974 PostEvent(owner_, SE_WRITE, 0); 975 } 976 } 977 return result; 978} 979 980StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, 981 size_t* bytes_written, int* error) { 982 CritScope cs(&crit_); 983 984 const bool was_readable = (data_length_ > 0); 985 size_t copy = 0; 986 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); 987 988 if (result == SR_SUCCESS) { 989 // If write was successful then adjust the number of readable bytes. 990 data_length_ += copy; 991 if (bytes_written) { 992 *bytes_written = copy; 993 } 994 995 // if we didn't have any data to read before, and now we do, post an event 996 if (!was_readable && copy > 0) { 997 PostEvent(owner_, SE_READ, 0); 998 } 999 } 1000 return result; 1001} 1002 1003void FifoBuffer::Close() { 1004 CritScope cs(&crit_); 1005 state_ = SS_CLOSED; 1006} 1007 1008const void* FifoBuffer::GetReadData(size_t* size) { 1009 CritScope cs(&crit_); 1010 *size = (read_position_ + data_length_ <= buffer_length_) ? 1011 data_length_ : buffer_length_ - read_position_; 1012 return &buffer_[read_position_]; 1013} 1014 1015void FifoBuffer::ConsumeReadData(size_t size) { 1016 CritScope cs(&crit_); 1017 ASSERT(size <= data_length_); 1018 const bool was_writable = data_length_ < buffer_length_; 1019 read_position_ = (read_position_ + size) % buffer_length_; 1020 data_length_ -= size; 1021 if (!was_writable && size > 0) { 1022 PostEvent(owner_, SE_WRITE, 0); 1023 } 1024} 1025 1026void* FifoBuffer::GetWriteBuffer(size_t* size) { 1027 CritScope cs(&crit_); 1028 if (state_ == SS_CLOSED) { 1029 return NULL; 1030 } 1031 1032 // if empty, reset the write position to the beginning, so we can get 1033 // the biggest possible block 1034 if (data_length_ == 0) { 1035 read_position_ = 0; 1036 } 1037 1038 const size_t write_position = (read_position_ + data_length_) 1039 % buffer_length_; 1040 *size = (write_position > read_position_ || data_length_ == 0) ? 1041 buffer_length_ - write_position : read_position_ - write_position; 1042 return &buffer_[write_position]; 1043} 1044 1045void FifoBuffer::ConsumeWriteBuffer(size_t size) { 1046 CritScope cs(&crit_); 1047 ASSERT(size <= buffer_length_ - data_length_); 1048 const bool was_readable = (data_length_ > 0); 1049 data_length_ += size; 1050 if (!was_readable && size > 0) { 1051 PostEvent(owner_, SE_READ, 0); 1052 } 1053} 1054 1055bool FifoBuffer::GetWriteRemaining(size_t* size) const { 1056 CritScope cs(&crit_); 1057 *size = buffer_length_ - data_length_; 1058 return true; 1059} 1060 1061StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, 1062 size_t bytes, 1063 size_t offset, 1064 size_t* bytes_read) { 1065 if (offset >= data_length_) { 1066 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; 1067 } 1068 1069 const size_t available = data_length_ - offset; 1070 const size_t read_position = (read_position_ + offset) % buffer_length_; 1071 const size_t copy = _min(bytes, available); 1072 const size_t tail_copy = _min(copy, buffer_length_ - read_position); 1073 char* const p = static_cast<char*>(buffer); 1074 memcpy(p, &buffer_[read_position], tail_copy); 1075 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); 1076 1077 if (bytes_read) { 1078 *bytes_read = copy; 1079 } 1080 return SR_SUCCESS; 1081} 1082 1083StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, 1084 size_t bytes, 1085 size_t offset, 1086 size_t* bytes_written) { 1087 if (state_ == SS_CLOSED) { 1088 return SR_EOS; 1089 } 1090 1091 if (data_length_ + offset >= buffer_length_) { 1092 return SR_BLOCK; 1093 } 1094 1095 const size_t available = buffer_length_ - data_length_ - offset; 1096 const size_t write_position = (read_position_ + data_length_ + offset) 1097 % buffer_length_; 1098 const size_t copy = _min(bytes, available); 1099 const size_t tail_copy = _min(copy, buffer_length_ - write_position); 1100 const char* const p = static_cast<const char*>(buffer); 1101 memcpy(&buffer_[write_position], p, tail_copy); 1102 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); 1103 1104 if (bytes_written) { 1105 *bytes_written = copy; 1106 } 1107 return SR_SUCCESS; 1108} 1109 1110 1111 1112/////////////////////////////////////////////////////////////////////////////// 1113// LoggingAdapter 1114/////////////////////////////////////////////////////////////////////////////// 1115 1116LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level, 1117 const std::string& label, bool hex_mode) 1118 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) { 1119 set_label(label); 1120} 1121 1122void LoggingAdapter::set_label(const std::string& label) { 1123 label_.assign("["); 1124 label_.append(label); 1125 label_.append("]"); 1126} 1127 1128StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len, 1129 size_t* read, int* error) { 1130 size_t local_read; if (!read) read = &local_read; 1131 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, 1132 error); 1133 if (result == SR_SUCCESS) { 1134 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_); 1135 } 1136 return result; 1137} 1138 1139StreamResult LoggingAdapter::Write(const void* data, size_t data_len, 1140 size_t* written, int* error) { 1141 size_t local_written; 1142 if (!written) written = &local_written; 1143 StreamResult result = StreamAdapterInterface::Write(data, data_len, written, 1144 error); 1145 if (result == SR_SUCCESS) { 1146 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_, 1147 &lms_); 1148 } 1149 return result; 1150} 1151 1152void LoggingAdapter::Close() { 1153 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 1154 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 1155 LOG_V(level_) << label_ << " Closed locally"; 1156 StreamAdapterInterface::Close(); 1157} 1158 1159void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) { 1160 if (events & SE_OPEN) { 1161 LOG_V(level_) << label_ << " Open"; 1162 } else if (events & SE_CLOSE) { 1163 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 1164 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 1165 LOG_V(level_) << label_ << " Closed with error: " << err; 1166 } 1167 StreamAdapterInterface::OnEvent(stream, events, err); 1168} 1169 1170/////////////////////////////////////////////////////////////////////////////// 1171// StringStream - Reads/Writes to an external std::string 1172/////////////////////////////////////////////////////////////////////////////// 1173 1174StringStream::StringStream(std::string& str) 1175 : str_(str), read_pos_(0), read_only_(false) { 1176} 1177 1178StringStream::StringStream(const std::string& str) 1179 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) { 1180} 1181 1182StreamState StringStream::GetState() const { 1183 return SS_OPEN; 1184} 1185 1186StreamResult StringStream::Read(void* buffer, size_t buffer_len, 1187 size_t* read, int* error) { 1188 size_t available = _min(buffer_len, str_.size() - read_pos_); 1189 if (!available) 1190 return SR_EOS; 1191 memcpy(buffer, str_.data() + read_pos_, available); 1192 read_pos_ += available; 1193 if (read) 1194 *read = available; 1195 return SR_SUCCESS; 1196} 1197 1198StreamResult StringStream::Write(const void* data, size_t data_len, 1199 size_t* written, int* error) { 1200 if (read_only_) { 1201 if (error) { 1202 *error = -1; 1203 } 1204 return SR_ERROR; 1205 } 1206 str_.append(static_cast<const char*>(data), 1207 static_cast<const char*>(data) + data_len); 1208 if (written) 1209 *written = data_len; 1210 return SR_SUCCESS; 1211} 1212 1213void StringStream::Close() { 1214} 1215 1216bool StringStream::SetPosition(size_t position) { 1217 if (position > str_.size()) 1218 return false; 1219 read_pos_ = position; 1220 return true; 1221} 1222 1223bool StringStream::GetPosition(size_t* position) const { 1224 if (position) 1225 *position = read_pos_; 1226 return true; 1227} 1228 1229bool StringStream::GetSize(size_t* size) const { 1230 if (size) 1231 *size = str_.size(); 1232 return true; 1233} 1234 1235bool StringStream::GetAvailable(size_t* size) const { 1236 if (size) 1237 *size = str_.size() - read_pos_; 1238 return true; 1239} 1240 1241bool StringStream::ReserveSize(size_t size) { 1242 if (read_only_) 1243 return false; 1244 str_.reserve(size); 1245 return true; 1246} 1247 1248/////////////////////////////////////////////////////////////////////////////// 1249// StreamReference 1250/////////////////////////////////////////////////////////////////////////////// 1251 1252StreamReference::StreamReference(StreamInterface* stream) 1253 : StreamAdapterInterface(stream, false) { 1254 // owner set to false so the destructor does not free the stream. 1255 stream_ref_count_ = new StreamRefCount(stream); 1256} 1257 1258StreamInterface* StreamReference::NewReference() { 1259 stream_ref_count_->AddReference(); 1260 return new StreamReference(stream_ref_count_, stream()); 1261} 1262 1263StreamReference::~StreamReference() { 1264 stream_ref_count_->Release(); 1265} 1266 1267StreamReference::StreamReference(StreamRefCount* stream_ref_count, 1268 StreamInterface* stream) 1269 : StreamAdapterInterface(stream, false), 1270 stream_ref_count_(stream_ref_count) { 1271} 1272 1273/////////////////////////////////////////////////////////////////////////////// 1274 1275StreamResult Flow(StreamInterface* source, 1276 char* buffer, size_t buffer_len, 1277 StreamInterface* sink, 1278 size_t* data_len /* = NULL */) { 1279 ASSERT(buffer_len > 0); 1280 1281 StreamResult result; 1282 size_t count, read_pos, write_pos; 1283 if (data_len) { 1284 read_pos = *data_len; 1285 } else { 1286 read_pos = 0; 1287 } 1288 1289 bool end_of_stream = false; 1290 do { 1291 // Read until buffer is full, end of stream, or error 1292 while (!end_of_stream && (read_pos < buffer_len)) { 1293 result = source->Read(buffer + read_pos, buffer_len - read_pos, 1294 &count, NULL); 1295 if (result == SR_EOS) { 1296 end_of_stream = true; 1297 } else if (result != SR_SUCCESS) { 1298 if (data_len) { 1299 *data_len = read_pos; 1300 } 1301 return result; 1302 } else { 1303 read_pos += count; 1304 } 1305 } 1306 1307 // Write until buffer is empty, or error (including end of stream) 1308 write_pos = 0; 1309 while (write_pos < read_pos) { 1310 result = sink->Write(buffer + write_pos, read_pos - write_pos, 1311 &count, NULL); 1312 if (result != SR_SUCCESS) { 1313 if (data_len) { 1314 *data_len = read_pos - write_pos; 1315 if (write_pos > 0) { 1316 memmove(buffer, buffer + write_pos, *data_len); 1317 } 1318 } 1319 return result; 1320 } 1321 write_pos += count; 1322 } 1323 1324 read_pos = 0; 1325 } while (!end_of_stream); 1326 1327 if (data_len) { 1328 *data_len = 0; 1329 } 1330 return SR_SUCCESS; 1331} 1332 1333/////////////////////////////////////////////////////////////////////////////// 1334 1335} // namespace rtc 1336