1/* 2 * libjingle 3 * Copyright 2011, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#if defined(POSIX) 29#include <sys/file.h> 30#endif // POSIX 31#include <sys/types.h> 32#include <sys/stat.h> 33#include <errno.h> 34#include <string> 35#include "talk/base/basictypes.h" 36#include "talk/base/common.h" 37#include "talk/base/messagequeue.h" 38#include "talk/base/stream.h" 39#include "talk/base/stringencode.h" 40#include "talk/base/stringutils.h" 41#include "talk/base/thread.h" 42 43#ifdef WIN32 44#include "talk/base/win32.h" 45#define fileno _fileno 46#endif 47 48namespace talk_base { 49 50/////////////////////////////////////////////////////////////////////////////// 51// StreamInterface 52/////////////////////////////////////////////////////////////////////////////// 53 54enum { 55 MSG_POST_EVENT = 0xF1F1 56}; 57 58StreamInterface::~StreamInterface() { 59} 60 61struct PostEventData : public MessageData { 62 int events, error; 63 PostEventData(int ev, int er) : events(ev), error(er) { } 64}; 65 66StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, 67 size_t* written, int* error) { 68 StreamResult result = SR_SUCCESS; 69 size_t total_written = 0, current_written; 70 while (total_written < data_len) { 71 result = Write(static_cast<const char*>(data) + total_written, 72 data_len - total_written, ¤t_written, error); 73 if (result != SR_SUCCESS) 74 break; 75 total_written += current_written; 76 } 77 if (written) 78 *written = total_written; 79 return result; 80} 81 82StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, 83 size_t* read, int* error) { 84 StreamResult result = SR_SUCCESS; 85 size_t total_read = 0, current_read; 86 while (total_read < buffer_len) { 87 result = Read(static_cast<char*>(buffer) + total_read, 88 buffer_len - total_read, ¤t_read, error); 89 if (result != SR_SUCCESS) 90 break; 91 total_read += current_read; 92 } 93 if (read) 94 *read = total_read; 95 return result; 96} 97 98StreamResult StreamInterface::ReadLine(std::string* line) { 99 line->clear(); 100 StreamResult result = SR_SUCCESS; 101 while (true) { 102 char ch; 103 result = Read(&ch, sizeof(ch), NULL, NULL); 104 if (result != SR_SUCCESS) { 105 break; 106 } 107 if (ch == '\n') { 108 break; 109 } 110 line->push_back(ch); 111 } 112 if (!line->empty()) { // give back the line we've collected so far with 113 result = SR_SUCCESS; // a success code. Otherwise return the last code 114 } 115 return result; 116} 117 118void StreamInterface::PostEvent(Thread* t, int events, int err) { 119 t->Post(this, MSG_POST_EVENT, new PostEventData(events, err)); 120} 121 122void StreamInterface::PostEvent(int events, int err) { 123 PostEvent(Thread::Current(), events, err); 124} 125 126StreamInterface::StreamInterface() { 127} 128 129void StreamInterface::OnMessage(Message* msg) { 130 if (MSG_POST_EVENT == msg->message_id) { 131 PostEventData* pe = static_cast<PostEventData*>(msg->pdata); 132 SignalEvent(this, pe->events, pe->error); 133 delete msg->pdata; 134 } 135} 136 137/////////////////////////////////////////////////////////////////////////////// 138// StreamAdapterInterface 139/////////////////////////////////////////////////////////////////////////////// 140 141StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, 142 bool owned) 143 : stream_(stream), owned_(owned) { 144 if (NULL != stream_) 145 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 146} 147 148void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { 149 if (NULL != stream_) 150 stream_->SignalEvent.disconnect(this); 151 if (owned_) 152 delete stream_; 153 stream_ = stream; 154 owned_ = owned; 155 if (NULL != stream_) 156 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 157} 158 159StreamInterface* StreamAdapterInterface::Detach() { 160 if (NULL != stream_) 161 stream_->SignalEvent.disconnect(this); 162 StreamInterface* stream = stream_; 163 stream_ = NULL; 164 return stream; 165} 166 167StreamAdapterInterface::~StreamAdapterInterface() { 168 if (owned_) 169 delete stream_; 170} 171 172/////////////////////////////////////////////////////////////////////////////// 173// StreamTap 174/////////////////////////////////////////////////////////////////////////////// 175 176StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap) 177: StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS), 178 tap_error_(0) 179{ 180 AttachTap(tap); 181} 182 183void StreamTap::AttachTap(StreamInterface* tap) { 184 tap_.reset(tap); 185} 186 187StreamInterface* StreamTap::DetachTap() { 188 return tap_.release(); 189} 190 191StreamResult StreamTap::GetTapResult(int* error) { 192 if (error) { 193 *error = tap_error_; 194 } 195 return tap_result_; 196} 197 198StreamResult StreamTap::Read(void* buffer, size_t buffer_len, 199 size_t* read, int* error) { 200 size_t backup_read; 201 if (!read) { 202 read = &backup_read; 203 } 204 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len, 205 read, error); 206 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 207 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_); 208 } 209 return res; 210} 211 212StreamResult StreamTap::Write(const void* data, size_t data_len, 213 size_t* written, int* error) { 214 size_t backup_written; 215 if (!written) { 216 written = &backup_written; 217 } 218 StreamResult res = StreamAdapterInterface::Write(data, data_len, 219 written, error); 220 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 221 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_); 222 } 223 return res; 224} 225 226/////////////////////////////////////////////////////////////////////////////// 227// StreamSegment 228/////////////////////////////////////////////////////////////////////////////// 229 230StreamSegment::StreamSegment(StreamInterface* stream) 231: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 232 length_(SIZE_UNKNOWN) 233{ 234 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 235 stream->GetPosition(&start_); 236} 237 238StreamSegment::StreamSegment(StreamInterface* stream, size_t length) 239: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 240 length_(length) 241{ 242 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 243 stream->GetPosition(&start_); 244} 245 246StreamResult StreamSegment::Read(void* buffer, size_t buffer_len, 247 size_t* read, int* error) 248{ 249 if (SIZE_UNKNOWN != length_) { 250 if (pos_ >= length_) 251 return SR_EOS; 252 buffer_len = _min(buffer_len, length_ - pos_); 253 } 254 size_t backup_read; 255 if (!read) { 256 read = &backup_read; 257 } 258 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, 259 read, error); 260 if (SR_SUCCESS == result) { 261 pos_ += *read; 262 } 263 return result; 264} 265 266bool StreamSegment::SetPosition(size_t position) { 267 if (SIZE_UNKNOWN == start_) 268 return false; // Not seekable 269 if ((SIZE_UNKNOWN != length_) && (position > length_)) 270 return false; // Seek past end of segment 271 if (!StreamAdapterInterface::SetPosition(start_ + position)) 272 return false; 273 pos_ = position; 274 return true; 275} 276 277bool StreamSegment::GetPosition(size_t* position) const { 278 if (SIZE_UNKNOWN == start_) 279 return false; // Not seekable 280 if (!StreamAdapterInterface::GetPosition(position)) 281 return false; 282 if (position) { 283 ASSERT(*position >= start_); 284 *position -= start_; 285 } 286 return true; 287} 288 289bool StreamSegment::GetSize(size_t* size) const { 290 if (!StreamAdapterInterface::GetSize(size)) 291 return false; 292 if (size) { 293 if (SIZE_UNKNOWN != start_) { 294 ASSERT(*size >= start_); 295 *size -= start_; 296 } 297 if (SIZE_UNKNOWN != length_) { 298 *size = _min(*size, length_); 299 } 300 } 301 return true; 302} 303 304bool StreamSegment::GetAvailable(size_t* size) const { 305 if (!StreamAdapterInterface::GetAvailable(size)) 306 return false; 307 if (size && (SIZE_UNKNOWN != length_)) 308 *size = _min(*size, length_ - pos_); 309 return true; 310} 311 312/////////////////////////////////////////////////////////////////////////////// 313// NullStream 314/////////////////////////////////////////////////////////////////////////////// 315 316NullStream::NullStream() { 317} 318 319NullStream::~NullStream() { 320} 321 322StreamState NullStream::GetState() const { 323 return SS_OPEN; 324} 325 326StreamResult NullStream::Read(void* buffer, size_t buffer_len, 327 size_t* read, int* error) { 328 if (error) *error = -1; 329 return SR_ERROR; 330} 331 332StreamResult NullStream::Write(const void* data, size_t data_len, 333 size_t* written, int* error) { 334 if (written) *written = data_len; 335 return SR_SUCCESS; 336} 337 338void NullStream::Close() { 339} 340 341/////////////////////////////////////////////////////////////////////////////// 342// FileStream 343/////////////////////////////////////////////////////////////////////////////// 344 345FileStream::FileStream() : file_(NULL) { 346} 347 348FileStream::~FileStream() { 349 FileStream::Close(); 350} 351 352bool FileStream::Open(const std::string& filename, const char* mode) { 353 Close(); 354#ifdef WIN32 355 std::wstring wfilename; 356 if (Utf8ToWindowsFilename(filename, &wfilename)) { 357 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); 358 } else { 359 file_ = NULL; 360 } 361#else 362 file_ = fopen(filename.c_str(), mode); 363#endif 364 return (file_ != NULL); 365} 366 367bool FileStream::OpenShare(const std::string& filename, const char* mode, 368 int shflag) { 369 Close(); 370#ifdef WIN32 371 std::wstring wfilename; 372 if (Utf8ToWindowsFilename(filename, &wfilename)) { 373 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); 374 } else { 375 file_ = NULL; 376 } 377#else 378 return Open(filename, mode); 379#endif 380 return (file_ != NULL); 381} 382 383bool FileStream::DisableBuffering() { 384 if (!file_) 385 return false; 386 return (setvbuf(file_, NULL, _IONBF, 0) == 0); 387} 388 389StreamState FileStream::GetState() const { 390 return (file_ == NULL) ? SS_CLOSED : SS_OPEN; 391} 392 393StreamResult FileStream::Read(void* buffer, size_t buffer_len, 394 size_t* read, int* error) { 395 if (!file_) 396 return SR_EOS; 397 size_t result = fread(buffer, 1, buffer_len, file_); 398 if ((result == 0) && (buffer_len > 0)) { 399 if (feof(file_)) 400 return SR_EOS; 401 if (error) 402 *error = errno; 403 return SR_ERROR; 404 } 405 if (read) 406 *read = result; 407 return SR_SUCCESS; 408} 409 410StreamResult FileStream::Write(const void* data, size_t data_len, 411 size_t* written, int* error) { 412 if (!file_) 413 return SR_EOS; 414 size_t result = fwrite(data, 1, data_len, file_); 415 if ((result == 0) && (data_len > 0)) { 416 if (error) 417 *error = errno; 418 return SR_ERROR; 419 } 420 if (written) 421 *written = result; 422 return SR_SUCCESS; 423} 424 425void FileStream::Close() { 426 if (file_) { 427 DoClose(); 428 file_ = NULL; 429 } 430} 431 432bool FileStream::SetPosition(size_t position) { 433 if (!file_) 434 return false; 435 return (fseek(file_, position, SEEK_SET) == 0); 436} 437 438bool FileStream::GetPosition(size_t* position) const { 439 ASSERT(NULL != position); 440 if (!file_) 441 return false; 442 long result = ftell(file_); 443 if (result < 0) 444 return false; 445 if (position) 446 *position = result; 447 return true; 448} 449 450bool FileStream::GetSize(size_t* size) const { 451 ASSERT(NULL != size); 452 if (!file_) 453 return false; 454 struct stat file_stats; 455 if (fstat(fileno(file_), &file_stats) != 0) 456 return false; 457 if (size) 458 *size = file_stats.st_size; 459 return true; 460} 461 462bool FileStream::GetAvailable(size_t* size) const { 463 ASSERT(NULL != size); 464 if (!GetSize(size)) 465 return false; 466 long result = ftell(file_); 467 if (result < 0) 468 return false; 469 if (size) 470 *size -= result; 471 return true; 472} 473 474bool FileStream::ReserveSize(size_t size) { 475 // TODO: extend the file to the proper length 476 return true; 477} 478 479bool FileStream::GetSize(const std::string& filename, size_t* size) { 480 struct stat file_stats; 481 if (stat(filename.c_str(), &file_stats) != 0) 482 return false; 483 *size = file_stats.st_size; 484 return true; 485} 486 487bool FileStream::Flush() { 488 if (file_) { 489 return (0 == fflush(file_)); 490 } 491 // try to flush empty file? 492 ASSERT(false); 493 return false; 494} 495 496#if defined(POSIX) 497 498bool FileStream::TryLock() { 499 if (file_ == NULL) { 500 // Stream not open. 501 ASSERT(false); 502 return false; 503 } 504 505 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0; 506} 507 508bool FileStream::Unlock() { 509 if (file_ == NULL) { 510 // Stream not open. 511 ASSERT(false); 512 return false; 513 } 514 515 return flock(fileno(file_), LOCK_UN) == 0; 516} 517 518#endif 519 520void FileStream::DoClose() { 521 fclose(file_); 522} 523 524#ifdef POSIX 525 526// Have to identically rewrite the FileStream destructor or else it would call 527// the base class's Close() instead of the sub-class's. 528POpenStream::~POpenStream() { 529 POpenStream::Close(); 530} 531 532bool POpenStream::Open(const std::string& subcommand, const char* mode) { 533 Close(); 534 file_ = popen(subcommand.c_str(), mode); 535 return file_ != NULL; 536} 537 538bool POpenStream::OpenShare(const std::string& subcommand, const char* mode, 539 int shflag) { 540 return Open(subcommand, mode); 541} 542 543void POpenStream::DoClose() { 544 wait_status_ = pclose(file_); 545} 546 547#endif 548 549/////////////////////////////////////////////////////////////////////////////// 550// MemoryStream 551/////////////////////////////////////////////////////////////////////////////// 552 553MemoryStreamBase::MemoryStreamBase() 554 : buffer_(NULL), buffer_length_(0), data_length_(0), 555 seek_position_(0) { 556} 557 558StreamState MemoryStreamBase::GetState() const { 559 return SS_OPEN; 560} 561 562StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes, 563 size_t* bytes_read, int* error) { 564 if (seek_position_ >= data_length_) { 565 return SR_EOS; 566 } 567 size_t available = data_length_ - seek_position_; 568 if (bytes > available) { 569 // Read partial buffer 570 bytes = available; 571 } 572 memcpy(buffer, &buffer_[seek_position_], bytes); 573 seek_position_ += bytes; 574 if (bytes_read) { 575 *bytes_read = bytes; 576 } 577 return SR_SUCCESS; 578} 579 580StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes, 581 size_t* bytes_written, int* error) { 582 size_t available = buffer_length_ - seek_position_; 583 if (0 == available) { 584 // Increase buffer size to the larger of: 585 // a) new position rounded up to next 256 bytes 586 // b) double the previous length 587 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1, 588 buffer_length_ * 2); 589 StreamResult result = DoReserve(new_buffer_length, error); 590 if (SR_SUCCESS != result) { 591 return result; 592 } 593 ASSERT(buffer_length_ >= new_buffer_length); 594 available = buffer_length_ - seek_position_; 595 } 596 597 if (bytes > available) { 598 bytes = available; 599 } 600 memcpy(&buffer_[seek_position_], buffer, bytes); 601 seek_position_ += bytes; 602 if (data_length_ < seek_position_) { 603 data_length_ = seek_position_; 604 } 605 if (bytes_written) { 606 *bytes_written = bytes; 607 } 608 return SR_SUCCESS; 609} 610 611void MemoryStreamBase::Close() { 612 // nothing to do 613} 614 615bool MemoryStreamBase::SetPosition(size_t position) { 616 if (position > data_length_) 617 return false; 618 seek_position_ = position; 619 return true; 620} 621 622bool MemoryStreamBase::GetPosition(size_t *position) const { 623 if (position) 624 *position = seek_position_; 625 return true; 626} 627 628bool MemoryStreamBase::GetSize(size_t *size) const { 629 if (size) 630 *size = data_length_; 631 return true; 632} 633 634bool MemoryStreamBase::GetAvailable(size_t *size) const { 635 if (size) 636 *size = data_length_ - seek_position_; 637 return true; 638} 639 640bool MemoryStreamBase::ReserveSize(size_t size) { 641 return (SR_SUCCESS == DoReserve(size, NULL)); 642} 643 644StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) { 645 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS; 646} 647 648/////////////////////////////////////////////////////////////////////////////// 649 650MemoryStream::MemoryStream() 651 : buffer_alloc_(NULL) { 652} 653 654MemoryStream::MemoryStream(const char* data) 655 : buffer_alloc_(NULL) { 656 SetData(data, strlen(data)); 657} 658 659MemoryStream::MemoryStream(const void* data, size_t length) 660 : buffer_alloc_(NULL) { 661 SetData(data, length); 662} 663 664MemoryStream::~MemoryStream() { 665 delete [] buffer_alloc_; 666} 667 668void MemoryStream::SetData(const void* data, size_t length) { 669 data_length_ = buffer_length_ = length; 670 delete [] buffer_alloc_; 671 buffer_alloc_ = new char[buffer_length_ + kAlignment]; 672 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment)); 673 memcpy(buffer_, data, data_length_); 674 seek_position_ = 0; 675} 676 677StreamResult MemoryStream::DoReserve(size_t size, int* error) { 678 if (buffer_length_ >= size) 679 return SR_SUCCESS; 680 681 if (char* new_buffer_alloc = new char[size + kAlignment]) { 682 char* new_buffer = reinterpret_cast<char*>( 683 ALIGNP(new_buffer_alloc, kAlignment)); 684 memcpy(new_buffer, buffer_, data_length_); 685 delete [] buffer_alloc_; 686 buffer_alloc_ = new_buffer_alloc; 687 buffer_ = new_buffer; 688 buffer_length_ = size; 689 return SR_SUCCESS; 690 } 691 692 if (error) { 693 *error = ENOMEM; 694 } 695 return SR_ERROR; 696} 697 698/////////////////////////////////////////////////////////////////////////////// 699 700ExternalMemoryStream::ExternalMemoryStream() { 701} 702 703ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) { 704 SetData(data, length); 705} 706 707ExternalMemoryStream::~ExternalMemoryStream() { 708} 709 710void ExternalMemoryStream::SetData(void* data, size_t length) { 711 data_length_ = buffer_length_ = length; 712 buffer_ = static_cast<char*>(data); 713 seek_position_ = 0; 714} 715 716/////////////////////////////////////////////////////////////////////////////// 717// FifoBuffer 718/////////////////////////////////////////////////////////////////////////////// 719 720FifoBuffer::FifoBuffer(size_t size) 721 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 722 data_length_(0), read_position_(0), owner_(Thread::Current()) { 723 // all events are done on the owner_ thread 724} 725 726FifoBuffer::~FifoBuffer() { 727} 728 729bool FifoBuffer::GetBuffered(size_t* size) const { 730 CritScope cs(&crit_); 731 *size = data_length_; 732 return true; 733} 734 735bool FifoBuffer::SetCapacity(size_t size) { 736 CritScope cs(&crit_); 737 if (data_length_ > size) { 738 return false; 739 } 740 741 if (size != buffer_length_) { 742 char* buffer = new char[size]; 743 const size_t copy = data_length_; 744 const size_t tail_copy = _min(copy, buffer_length_ - read_position_); 745 memcpy(buffer, &buffer_[read_position_], tail_copy); 746 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); 747 buffer_.reset(buffer); 748 read_position_ = 0; 749 buffer_length_ = size; 750 } 751 return true; 752} 753 754StreamState FifoBuffer::GetState() const { 755 return state_; 756} 757 758StreamResult FifoBuffer::Read(void* buffer, size_t bytes, 759 size_t* bytes_read, int* error) { 760 CritScope cs(&crit_); 761 const size_t available = data_length_; 762 if (0 == available) { 763 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; 764 } 765 766 const bool was_writable = data_length_ < buffer_length_; 767 const size_t copy = _min(bytes, available); 768 const size_t tail_copy = _min(copy, buffer_length_ - read_position_); 769 char* const p = static_cast<char*>(buffer); 770 memcpy(p, &buffer_[read_position_], tail_copy); 771 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); 772 read_position_ = (read_position_ + copy) % buffer_length_; 773 data_length_ -= copy; 774 if (bytes_read) { 775 *bytes_read = copy; 776 } 777 // if we were full before, and now we're not, post an event 778 if (!was_writable && copy > 0) { 779 PostEvent(owner_, SE_WRITE, 0); 780 } 781 782 return SR_SUCCESS; 783} 784 785StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, 786 size_t* bytes_written, int* error) { 787 CritScope cs(&crit_); 788 if (state_ == SS_CLOSED) { 789 return SR_EOS; 790 } 791 792 const size_t available = buffer_length_ - data_length_; 793 if (0 == available) { 794 return SR_BLOCK; 795 } 796 797 const bool was_readable = (data_length_ > 0); 798 const size_t write_position = (read_position_ + data_length_) 799 % buffer_length_; 800 const size_t copy = _min(bytes, available); 801 const size_t tail_copy = _min(copy, buffer_length_ - write_position); 802 const char* const p = static_cast<const char*>(buffer); 803 memcpy(&buffer_[write_position], p, tail_copy); 804 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); 805 data_length_ += copy; 806 if (bytes_written) { 807 *bytes_written = copy; 808 } 809 // if we didn't have any data to read before, and now we do, post an event 810 if (!was_readable && copy > 0) { 811 PostEvent(owner_, SE_READ, 0); 812 } 813 814 return SR_SUCCESS; 815} 816 817void FifoBuffer::Close() { 818 CritScope cs(&crit_); 819 state_ = SS_CLOSED; 820} 821 822const void* FifoBuffer::GetReadData(size_t* size) { 823 CritScope cs(&crit_); 824 *size = (read_position_ + data_length_ <= buffer_length_) ? 825 data_length_ : buffer_length_ - read_position_; 826 return &buffer_[read_position_]; 827} 828 829void FifoBuffer::ConsumeReadData(size_t size) { 830 CritScope cs(&crit_); 831 ASSERT(size <= data_length_); 832 const bool was_writable = data_length_ < buffer_length_; 833 read_position_ = (read_position_ + size) % buffer_length_; 834 data_length_ -= size; 835 if (!was_writable && size > 0) { 836 PostEvent(owner_, SE_WRITE, 0); 837 } 838} 839 840void* FifoBuffer::GetWriteBuffer(size_t* size) { 841 CritScope cs(&crit_); 842 if (state_ == SS_CLOSED) { 843 return NULL; 844 } 845 846 // if empty, reset the write position to the beginning, so we can get 847 // the biggest possible block 848 if (data_length_ == 0) { 849 read_position_ = 0; 850 } 851 852 const size_t write_position = (read_position_ + data_length_) 853 % buffer_length_; 854 *size = (write_position >= read_position_) ? 855 buffer_length_ - write_position : read_position_ - write_position; 856 return &buffer_[write_position]; 857} 858 859void FifoBuffer::ConsumeWriteBuffer(size_t size) { 860 CritScope cs(&crit_); 861 ASSERT(size <= buffer_length_ - data_length_); 862 const bool was_readable = (data_length_ > 0); 863 data_length_ += size; 864 if (!was_readable && size > 0) { 865 PostEvent(owner_, SE_READ, 0); 866 } 867} 868 869/////////////////////////////////////////////////////////////////////////////// 870// LoggingAdapter 871/////////////////////////////////////////////////////////////////////////////// 872 873LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level, 874 const std::string& label, bool hex_mode) 875: StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) 876{ 877 set_label(label); 878} 879 880void LoggingAdapter::set_label(const std::string& label) { 881 label_.assign("["); 882 label_.append(label); 883 label_.append("]"); 884} 885 886StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len, 887 size_t* read, int* error) { 888 size_t local_read; if (!read) read = &local_read; 889 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, 890 error); 891 if (result == SR_SUCCESS) { 892 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_); 893 } 894 return result; 895} 896 897StreamResult LoggingAdapter::Write(const void* data, size_t data_len, 898 size_t* written, int* error) { 899 size_t local_written; if (!written) written = &local_written; 900 StreamResult result = StreamAdapterInterface::Write(data, data_len, written, 901 error); 902 if (result == SR_SUCCESS) { 903 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_, 904 &lms_); 905 } 906 return result; 907} 908 909void LoggingAdapter::Close() { 910 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 911 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 912 LOG_V(level_) << label_ << " Closed locally"; 913 StreamAdapterInterface::Close(); 914} 915 916void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) { 917 if (events & SE_OPEN) { 918 LOG_V(level_) << label_ << " Open"; 919 } else if (events & SE_CLOSE) { 920 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 921 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 922 LOG_V(level_) << label_ << " Closed with error: " << err; 923 } 924 StreamAdapterInterface::OnEvent(stream, events, err); 925} 926 927/////////////////////////////////////////////////////////////////////////////// 928// StringStream - Reads/Writes to an external std::string 929/////////////////////////////////////////////////////////////////////////////// 930 931StringStream::StringStream(std::string& str) 932: str_(str), read_pos_(0), read_only_(false) 933{ 934} 935 936StringStream::StringStream(const std::string& str) 937: str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) 938{ 939} 940 941StreamState StringStream::GetState() const { 942 return SS_OPEN; 943} 944 945StreamResult StringStream::Read(void* buffer, size_t buffer_len, 946 size_t* read, int* error) { 947 size_t available = _min(buffer_len, str_.size() - read_pos_); 948 if (!available) 949 return SR_EOS; 950 memcpy(buffer, str_.data() + read_pos_, available); 951 read_pos_ += available; 952 if (read) 953 *read = available; 954 return SR_SUCCESS; 955} 956 957StreamResult StringStream::Write(const void* data, size_t data_len, 958 size_t* written, int* error) { 959 if (read_only_) { 960 if (error) { 961 *error = -1; 962 } 963 return SR_ERROR; 964 } 965 str_.append(static_cast<const char*>(data), 966 static_cast<const char*>(data) + data_len); 967 if (written) 968 *written = data_len; 969 return SR_SUCCESS; 970} 971 972void StringStream::Close() { 973} 974 975bool StringStream::SetPosition(size_t position) { 976 if (position > str_.size()) 977 return false; 978 read_pos_ = position; 979 return true; 980} 981 982bool StringStream::GetPosition(size_t* position) const { 983 if (position) 984 *position = read_pos_; 985 return true; 986} 987 988bool StringStream::GetSize(size_t* size) const { 989 if (size) 990 *size = str_.size(); 991 return true; 992} 993 994bool StringStream::GetAvailable(size_t* size) const { 995 if (size) 996 *size = str_.size() - read_pos_; 997 return true; 998} 999 1000bool StringStream::ReserveSize(size_t size) { 1001 if (read_only_) 1002 return false; 1003 str_.reserve(size); 1004 return true; 1005} 1006 1007/////////////////////////////////////////////////////////////////////////////// 1008// StreamReference 1009/////////////////////////////////////////////////////////////////////////////// 1010 1011StreamReference::StreamReference(StreamInterface* stream) 1012 : StreamAdapterInterface(stream, false) { 1013 // owner set to false so the destructor does not free the stream. 1014 stream_ref_count_ = new StreamRefCount(stream); 1015} 1016 1017StreamInterface* StreamReference::NewReference() { 1018 stream_ref_count_->AddReference(); 1019 return new StreamReference(stream_ref_count_, stream()); 1020} 1021 1022StreamReference::~StreamReference() { 1023 stream_ref_count_->Release(); 1024} 1025 1026StreamReference::StreamReference(StreamRefCount* stream_ref_count, 1027 StreamInterface* stream) 1028 : StreamAdapterInterface(stream, false), 1029 stream_ref_count_(stream_ref_count) { 1030} 1031 1032/////////////////////////////////////////////////////////////////////////////// 1033 1034StreamResult Flow(StreamInterface* source, 1035 char* buffer, size_t buffer_len, 1036 StreamInterface* sink, 1037 size_t* data_len /* = NULL */) { 1038 ASSERT(buffer_len > 0); 1039 1040 StreamResult result; 1041 size_t count, read_pos, write_pos; 1042 if (data_len) { 1043 read_pos = *data_len; 1044 } else { 1045 read_pos = 0; 1046 } 1047 1048 bool end_of_stream = false; 1049 do { 1050 // Read until buffer is full, end of stream, or error 1051 while (!end_of_stream && (read_pos < buffer_len)) { 1052 result = source->Read(buffer + read_pos, buffer_len - read_pos, 1053 &count, NULL); 1054 if (result == SR_EOS) { 1055 end_of_stream = true; 1056 } else if (result != SR_SUCCESS) { 1057 if (data_len) { 1058 *data_len = read_pos; 1059 } 1060 return result; 1061 } else { 1062 read_pos += count; 1063 } 1064 } 1065 1066 // Write until buffer is empty, or error (including end of stream) 1067 write_pos = 0; 1068 while (write_pos < read_pos) { 1069 result = sink->Write(buffer + write_pos, read_pos - write_pos, 1070 &count, NULL); 1071 if (result != SR_SUCCESS) { 1072 if (data_len) { 1073 *data_len = read_pos - write_pos; 1074 if (write_pos > 0) { 1075 memmove(buffer, buffer + write_pos, *data_len); 1076 } 1077 } 1078 return result; 1079 } 1080 write_pos += count; 1081 } 1082 1083 read_pos = 0; 1084 } while (!end_of_stream); 1085 1086 if (data_len) { 1087 *data_len = 0; 1088 } 1089 return SR_SUCCESS; 1090} 1091 1092/////////////////////////////////////////////////////////////////////////////// 1093 1094} // namespace talk_base 1095