1/* 2 * libjingle 3 * Copyright 2004 Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#if defined(POSIX) 29#include <sys/file.h> 30#endif // POSIX 31#include <sys/types.h> 32#include <sys/stat.h> 33#include <errno.h> 34#include <string> 35#include "talk/base/basictypes.h" 36#include "talk/base/common.h" 37#include "talk/base/logging.h" 38#include "talk/base/messagequeue.h" 39#include "talk/base/stream.h" 40#include "talk/base/stringencode.h" 41#include "talk/base/stringutils.h" 42#include "talk/base/thread.h" 43#include "talk/base/timeutils.h" 44 45#ifdef WIN32 46#include "talk/base/win32.h" 47#define fileno _fileno 48#endif 49 50namespace talk_base { 51 52/////////////////////////////////////////////////////////////////////////////// 53// StreamInterface 54/////////////////////////////////////////////////////////////////////////////// 55StreamInterface::~StreamInterface() { 56} 57 58StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, 59 size_t* written, int* error) { 60 StreamResult result = SR_SUCCESS; 61 size_t total_written = 0, current_written; 62 while (total_written < data_len) { 63 result = Write(static_cast<const char*>(data) + total_written, 64 data_len - total_written, ¤t_written, error); 65 if (result != SR_SUCCESS) 66 break; 67 total_written += current_written; 68 } 69 if (written) 70 *written = total_written; 71 return result; 72} 73 74StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, 75 size_t* read, int* error) { 76 StreamResult result = SR_SUCCESS; 77 size_t total_read = 0, current_read; 78 while (total_read < buffer_len) { 79 result = Read(static_cast<char*>(buffer) + total_read, 80 buffer_len - total_read, ¤t_read, error); 81 if (result != SR_SUCCESS) 82 break; 83 total_read += current_read; 84 } 85 if (read) 86 *read = total_read; 87 return result; 88} 89 90StreamResult StreamInterface::ReadLine(std::string* line) { 91 line->clear(); 92 StreamResult result = SR_SUCCESS; 93 while (true) { 94 char ch; 95 result = Read(&ch, sizeof(ch), NULL, NULL); 96 if (result != SR_SUCCESS) { 97 break; 98 } 99 if (ch == '\n') { 100 break; 101 } 102 line->push_back(ch); 103 } 104 if (!line->empty()) { // give back the line we've collected so far with 105 result = SR_SUCCESS; // a success code. Otherwise return the last code 106 } 107 return result; 108} 109 110void StreamInterface::PostEvent(Thread* t, int events, int err) { 111 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err)); 112} 113 114void StreamInterface::PostEvent(int events, int err) { 115 PostEvent(Thread::Current(), events, err); 116} 117 118StreamInterface::StreamInterface() { 119} 120 121void StreamInterface::OnMessage(Message* msg) { 122 if (MSG_POST_EVENT == msg->message_id) { 123 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata); 124 SignalEvent(this, pe->events, pe->error); 125 delete msg->pdata; 126 } 127} 128 129/////////////////////////////////////////////////////////////////////////////// 130// StreamAdapterInterface 131/////////////////////////////////////////////////////////////////////////////// 132 133StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, 134 bool owned) 135 : stream_(stream), owned_(owned) { 136 if (NULL != stream_) 137 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 138} 139 140void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { 141 if (NULL != stream_) 142 stream_->SignalEvent.disconnect(this); 143 if (owned_) 144 delete stream_; 145 stream_ = stream; 146 owned_ = owned; 147 if (NULL != stream_) 148 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 149} 150 151StreamInterface* StreamAdapterInterface::Detach() { 152 if (NULL != stream_) 153 stream_->SignalEvent.disconnect(this); 154 StreamInterface* stream = stream_; 155 stream_ = NULL; 156 return stream; 157} 158 159StreamAdapterInterface::~StreamAdapterInterface() { 160 if (owned_) 161 delete stream_; 162} 163 164/////////////////////////////////////////////////////////////////////////////// 165// StreamTap 166/////////////////////////////////////////////////////////////////////////////// 167 168StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap) 169 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS), 170 tap_error_(0) { 171 AttachTap(tap); 172} 173 174void StreamTap::AttachTap(StreamInterface* tap) { 175 tap_.reset(tap); 176} 177 178StreamInterface* StreamTap::DetachTap() { 179 return tap_.release(); 180} 181 182StreamResult StreamTap::GetTapResult(int* error) { 183 if (error) { 184 *error = tap_error_; 185 } 186 return tap_result_; 187} 188 189StreamResult StreamTap::Read(void* buffer, size_t buffer_len, 190 size_t* read, int* error) { 191 size_t backup_read; 192 if (!read) { 193 read = &backup_read; 194 } 195 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len, 196 read, error); 197 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 198 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_); 199 } 200 return res; 201} 202 203StreamResult StreamTap::Write(const void* data, size_t data_len, 204 size_t* written, int* error) { 205 size_t backup_written; 206 if (!written) { 207 written = &backup_written; 208 } 209 StreamResult res = StreamAdapterInterface::Write(data, data_len, 210 written, error); 211 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 212 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_); 213 } 214 return res; 215} 216 217/////////////////////////////////////////////////////////////////////////////// 218// StreamSegment 219/////////////////////////////////////////////////////////////////////////////// 220 221StreamSegment::StreamSegment(StreamInterface* stream) 222 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 223 length_(SIZE_UNKNOWN) { 224 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 225 stream->GetPosition(&start_); 226} 227 228StreamSegment::StreamSegment(StreamInterface* stream, size_t length) 229 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 230 length_(length) { 231 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 232 stream->GetPosition(&start_); 233} 234 235StreamResult StreamSegment::Read(void* buffer, size_t buffer_len, 236 size_t* read, int* error) { 237 if (SIZE_UNKNOWN != length_) { 238 if (pos_ >= length_) 239 return SR_EOS; 240 buffer_len = _min(buffer_len, length_ - pos_); 241 } 242 size_t backup_read; 243 if (!read) { 244 read = &backup_read; 245 } 246 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, 247 read, error); 248 if (SR_SUCCESS == result) { 249 pos_ += *read; 250 } 251 return result; 252} 253 254bool StreamSegment::SetPosition(size_t position) { 255 if (SIZE_UNKNOWN == start_) 256 return false; // Not seekable 257 if ((SIZE_UNKNOWN != length_) && (position > length_)) 258 return false; // Seek past end of segment 259 if (!StreamAdapterInterface::SetPosition(start_ + position)) 260 return false; 261 pos_ = position; 262 return true; 263} 264 265bool StreamSegment::GetPosition(size_t* position) const { 266 if (SIZE_UNKNOWN == start_) 267 return false; // Not seekable 268 if (!StreamAdapterInterface::GetPosition(position)) 269 return false; 270 if (position) { 271 ASSERT(*position >= start_); 272 *position -= start_; 273 } 274 return true; 275} 276 277bool StreamSegment::GetSize(size_t* size) const { 278 if (!StreamAdapterInterface::GetSize(size)) 279 return false; 280 if (size) { 281 if (SIZE_UNKNOWN != start_) { 282 ASSERT(*size >= start_); 283 *size -= start_; 284 } 285 if (SIZE_UNKNOWN != length_) { 286 *size = _min(*size, length_); 287 } 288 } 289 return true; 290} 291 292bool StreamSegment::GetAvailable(size_t* size) const { 293 if (!StreamAdapterInterface::GetAvailable(size)) 294 return false; 295 if (size && (SIZE_UNKNOWN != length_)) 296 *size = _min(*size, length_ - pos_); 297 return true; 298} 299 300/////////////////////////////////////////////////////////////////////////////// 301// NullStream 302/////////////////////////////////////////////////////////////////////////////// 303 304NullStream::NullStream() { 305} 306 307NullStream::~NullStream() { 308} 309 310StreamState NullStream::GetState() const { 311 return SS_OPEN; 312} 313 314StreamResult NullStream::Read(void* buffer, size_t buffer_len, 315 size_t* read, int* error) { 316 if (error) *error = -1; 317 return SR_ERROR; 318} 319 320StreamResult NullStream::Write(const void* data, size_t data_len, 321 size_t* written, int* error) { 322 if (written) *written = data_len; 323 return SR_SUCCESS; 324} 325 326void NullStream::Close() { 327} 328 329/////////////////////////////////////////////////////////////////////////////// 330// FileStream 331/////////////////////////////////////////////////////////////////////////////// 332 333FileStream::FileStream() : file_(NULL) { 334} 335 336FileStream::~FileStream() { 337 FileStream::Close(); 338} 339 340bool FileStream::Open(const std::string& filename, const char* mode, 341 int* error) { 342 Close(); 343#ifdef WIN32 344 std::wstring wfilename; 345 if (Utf8ToWindowsFilename(filename, &wfilename)) { 346 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); 347 } else { 348 if (error) { 349 *error = -1; 350 return false; 351 } 352 } 353#else 354 file_ = fopen(filename.c_str(), mode); 355#endif 356 if (!file_ && error) { 357 *error = errno; 358 } 359 return (file_ != NULL); 360} 361 362bool FileStream::OpenShare(const std::string& filename, const char* mode, 363 int shflag, int* error) { 364 Close(); 365#ifdef WIN32 366 std::wstring wfilename; 367 if (Utf8ToWindowsFilename(filename, &wfilename)) { 368 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); 369 if (!file_ && error) { 370 *error = errno; 371 return false; 372 } 373 return file_ != NULL; 374 } else { 375 if (error) { 376 *error = -1; 377 } 378 return false; 379 } 380#else 381 return Open(filename, mode, error); 382#endif 383} 384 385bool FileStream::DisableBuffering() { 386 if (!file_) 387 return false; 388 return (setvbuf(file_, NULL, _IONBF, 0) == 0); 389} 390 391StreamState FileStream::GetState() const { 392 return (file_ == NULL) ? SS_CLOSED : SS_OPEN; 393} 394 395StreamResult FileStream::Read(void* buffer, size_t buffer_len, 396 size_t* read, int* error) { 397 if (!file_) 398 return SR_EOS; 399 size_t result = fread(buffer, 1, buffer_len, file_); 400 if ((result == 0) && (buffer_len > 0)) { 401 if (feof(file_)) 402 return SR_EOS; 403 if (error) 404 *error = errno; 405 return SR_ERROR; 406 } 407 if (read) 408 *read = result; 409 return SR_SUCCESS; 410} 411 412StreamResult FileStream::Write(const void* data, size_t data_len, 413 size_t* written, int* error) { 414 if (!file_) 415 return SR_EOS; 416 size_t result = fwrite(data, 1, data_len, file_); 417 if ((result == 0) && (data_len > 0)) { 418 if (error) 419 *error = errno; 420 return SR_ERROR; 421 } 422 if (written) 423 *written = result; 424 return SR_SUCCESS; 425} 426 427void FileStream::Close() { 428 if (file_) { 429 DoClose(); 430 file_ = NULL; 431 } 432} 433 434bool FileStream::SetPosition(size_t position) { 435 if (!file_) 436 return false; 437 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0); 438} 439 440bool FileStream::GetPosition(size_t* position) const { 441 ASSERT(NULL != position); 442 if (!file_) 443 return false; 444 long result = ftell(file_); 445 if (result < 0) 446 return false; 447 if (position) 448 *position = result; 449 return true; 450} 451 452bool FileStream::GetSize(size_t* size) const { 453 ASSERT(NULL != size); 454 if (!file_) 455 return false; 456 struct stat file_stats; 457 if (fstat(fileno(file_), &file_stats) != 0) 458 return false; 459 if (size) 460 *size = file_stats.st_size; 461 return true; 462} 463 464bool FileStream::GetAvailable(size_t* size) const { 465 ASSERT(NULL != size); 466 if (!GetSize(size)) 467 return false; 468 long result = ftell(file_); 469 if (result < 0) 470 return false; 471 if (size) 472 *size -= result; 473 return true; 474} 475 476bool FileStream::ReserveSize(size_t size) { 477 // TODO: extend the file to the proper length 478 return true; 479} 480 481bool FileStream::GetSize(const std::string& filename, size_t* size) { 482 struct stat file_stats; 483 if (stat(filename.c_str(), &file_stats) != 0) 484 return false; 485 *size = file_stats.st_size; 486 return true; 487} 488 489bool FileStream::Flush() { 490 if (file_) { 491 return (0 == fflush(file_)); 492 } 493 // try to flush empty file? 494 ASSERT(false); 495 return false; 496} 497 498#if defined(POSIX) 499 500bool FileStream::TryLock() { 501 if (file_ == NULL) { 502 // Stream not open. 503 ASSERT(false); 504 return false; 505 } 506 507 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0; 508} 509 510bool FileStream::Unlock() { 511 if (file_ == NULL) { 512 // Stream not open. 513 ASSERT(false); 514 return false; 515 } 516 517 return flock(fileno(file_), LOCK_UN) == 0; 518} 519 520#endif 521 522void FileStream::DoClose() { 523 fclose(file_); 524} 525 526CircularFileStream::CircularFileStream(size_t max_size) 527 : max_write_size_(max_size), 528 position_(0), 529 marked_position_(max_size / 2), 530 last_write_position_(0), 531 read_segment_(READ_LATEST), 532 read_segment_available_(0) { 533} 534 535bool CircularFileStream::Open( 536 const std::string& filename, const char* mode, int* error) { 537 if (!FileStream::Open(filename.c_str(), mode, error)) 538 return false; 539 540 if (strchr(mode, "r") != NULL) { // Opened in read mode. 541 // Check if the buffer has been overwritten and determine how to read the 542 // log in time sequence. 543 size_t file_size; 544 GetSize(&file_size); 545 if (file_size == position_) { 546 // The buffer has not been overwritten yet. Read 0 .. file_size 547 read_segment_ = READ_LATEST; 548 read_segment_available_ = file_size; 549 } else { 550 // The buffer has been over written. There are three segments: The first 551 // one is 0 .. marked_position_, which is the marked earliest log. The 552 // second one is position_ .. file_size, which is the middle log. The 553 // last one is marked_position_ .. position_, which is the latest log. 554 read_segment_ = READ_MARKED; 555 read_segment_available_ = marked_position_; 556 last_write_position_ = position_; 557 } 558 559 // Read from the beginning. 560 position_ = 0; 561 SetPosition(position_); 562 } 563 564 return true; 565} 566 567StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len, 568 size_t* read, int* error) { 569 if (read_segment_available_ == 0) { 570 size_t file_size; 571 switch (read_segment_) { 572 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE. 573 read_segment_ = READ_MIDDLE; 574 position_ = last_write_position_; 575 SetPosition(position_); 576 GetSize(&file_size); 577 read_segment_available_ = file_size - position_; 578 break; 579 580 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST. 581 read_segment_ = READ_LATEST; 582 position_ = marked_position_; 583 SetPosition(position_); 584 read_segment_available_ = last_write_position_ - position_; 585 break; 586 587 default: // Finished READ_LATEST and return EOS. 588 return talk_base::SR_EOS; 589 } 590 } 591 592 size_t local_read; 593 if (!read) read = &local_read; 594 595 size_t to_read = talk_base::_min(buffer_len, read_segment_available_); 596 talk_base::StreamResult result 597 = talk_base::FileStream::Read(buffer, to_read, read, error); 598 if (result == talk_base::SR_SUCCESS) { 599 read_segment_available_ -= *read; 600 position_ += *read; 601 } 602 return result; 603} 604 605StreamResult CircularFileStream::Write(const void* data, size_t data_len, 606 size_t* written, int* error) { 607 if (position_ >= max_write_size_) { 608 ASSERT(position_ == max_write_size_); 609 position_ = marked_position_; 610 SetPosition(position_); 611 } 612 613 size_t local_written; 614 if (!written) written = &local_written; 615 616 size_t to_eof = max_write_size_ - position_; 617 size_t to_write = talk_base::_min(data_len, to_eof); 618 talk_base::StreamResult result 619 = talk_base::FileStream::Write(data, to_write, written, error); 620 if (result == talk_base::SR_SUCCESS) { 621 position_ += *written; 622 } 623 return result; 624} 625 626AsyncWriteStream::~AsyncWriteStream() { 627 write_thread_->Clear(this, 0, NULL); 628 ClearBufferAndWrite(); 629 630 CritScope cs(&crit_stream_); 631 stream_.reset(); 632} 633 634// This is needed by some stream writers, such as RtpDumpWriter. 635bool AsyncWriteStream::GetPosition(size_t* position) const { 636 CritScope cs(&crit_stream_); 637 return stream_->GetPosition(position); 638} 639 640// This is needed by some stream writers, such as the plugin log writers. 641StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len, 642 size_t* read, int* error) { 643 CritScope cs(&crit_stream_); 644 return stream_->Read(buffer, buffer_len, read, error); 645} 646 647void AsyncWriteStream::Close() { 648 if (state_ == SS_CLOSED) { 649 return; 650 } 651 652 write_thread_->Clear(this, 0, NULL); 653 ClearBufferAndWrite(); 654 655 CritScope cs(&crit_stream_); 656 stream_->Close(); 657 state_ = SS_CLOSED; 658} 659 660StreamResult AsyncWriteStream::Write(const void* data, size_t data_len, 661 size_t* written, int* error) { 662 if (state_ == SS_CLOSED) { 663 return SR_ERROR; 664 } 665 666 size_t previous_buffer_length = 0; 667 { 668 CritScope cs(&crit_buffer_); 669 previous_buffer_length = buffer_.length(); 670 buffer_.AppendData(data, data_len); 671 } 672 673 if (previous_buffer_length == 0) { 674 // If there's stuff already in the buffer, then we already called 675 // Post and the write_thread_ hasn't pulled it out yet, so we 676 // don't need to re-Post. 677 write_thread_->Post(this, 0, NULL); 678 } 679 // Return immediately, assuming that it works. 680 if (written) { 681 *written = data_len; 682 } 683 return SR_SUCCESS; 684} 685 686void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) { 687 ClearBufferAndWrite(); 688} 689 690bool AsyncWriteStream::Flush() { 691 if (state_ == SS_CLOSED) { 692 return false; 693 } 694 695 ClearBufferAndWrite(); 696 697 CritScope cs(&crit_stream_); 698 return stream_->Flush(); 699} 700 701void AsyncWriteStream::ClearBufferAndWrite() { 702 Buffer to_write; 703 { 704 CritScope cs_buffer(&crit_buffer_); 705 buffer_.TransferTo(&to_write); 706 } 707 708 if (to_write.length() > 0) { 709 CritScope cs(&crit_stream_); 710 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL); 711 } 712} 713 714#ifdef POSIX 715 716// Have to identically rewrite the FileStream destructor or else it would call 717// the base class's Close() instead of the sub-class's. 718POpenStream::~POpenStream() { 719 POpenStream::Close(); 720} 721 722bool POpenStream::Open(const std::string& subcommand, 723 const char* mode, 724 int* error) { 725 Close(); 726 file_ = popen(subcommand.c_str(), mode); 727 if (file_ == NULL) { 728 if (error) 729 *error = errno; 730 return false; 731 } 732 return true; 733} 734 735bool POpenStream::OpenShare(const std::string& subcommand, const char* mode, 736 int shflag, int* error) { 737 return Open(subcommand, mode, error); 738} 739 740void POpenStream::DoClose() { 741 wait_status_ = pclose(file_); 742} 743 744#endif 745 746/////////////////////////////////////////////////////////////////////////////// 747// MemoryStream 748/////////////////////////////////////////////////////////////////////////////// 749 750MemoryStreamBase::MemoryStreamBase() 751 : buffer_(NULL), buffer_length_(0), data_length_(0), 752 seek_position_(0) { 753} 754 755StreamState MemoryStreamBase::GetState() const { 756 return SS_OPEN; 757} 758 759StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes, 760 size_t* bytes_read, int* error) { 761 if (seek_position_ >= data_length_) { 762 return SR_EOS; 763 } 764 size_t available = data_length_ - seek_position_; 765 if (bytes > available) { 766 // Read partial buffer 767 bytes = available; 768 } 769 memcpy(buffer, &buffer_[seek_position_], bytes); 770 seek_position_ += bytes; 771 if (bytes_read) { 772 *bytes_read = bytes; 773 } 774 return SR_SUCCESS; 775} 776 777StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes, 778 size_t* bytes_written, int* error) { 779 size_t available = buffer_length_ - seek_position_; 780 if (0 == available) { 781 // Increase buffer size to the larger of: 782 // a) new position rounded up to next 256 bytes 783 // b) double the previous length 784 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1, 785 buffer_length_ * 2); 786 StreamResult result = DoReserve(new_buffer_length, error); 787 if (SR_SUCCESS != result) { 788 return result; 789 } 790 ASSERT(buffer_length_ >= new_buffer_length); 791 available = buffer_length_ - seek_position_; 792 } 793 794 if (bytes > available) { 795 bytes = available; 796 } 797 memcpy(&buffer_[seek_position_], buffer, bytes); 798 seek_position_ += bytes; 799 if (data_length_ < seek_position_) { 800 data_length_ = seek_position_; 801 } 802 if (bytes_written) { 803 *bytes_written = bytes; 804 } 805 return SR_SUCCESS; 806} 807 808void MemoryStreamBase::Close() { 809 // nothing to do 810} 811 812bool MemoryStreamBase::SetPosition(size_t position) { 813 if (position > data_length_) 814 return false; 815 seek_position_ = position; 816 return true; 817} 818 819bool MemoryStreamBase::GetPosition(size_t* position) const { 820 if (position) 821 *position = seek_position_; 822 return true; 823} 824 825bool MemoryStreamBase::GetSize(size_t* size) const { 826 if (size) 827 *size = data_length_; 828 return true; 829} 830 831bool MemoryStreamBase::GetAvailable(size_t* size) const { 832 if (size) 833 *size = data_length_ - seek_position_; 834 return true; 835} 836 837bool MemoryStreamBase::ReserveSize(size_t size) { 838 return (SR_SUCCESS == DoReserve(size, NULL)); 839} 840 841StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) { 842 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS; 843} 844 845/////////////////////////////////////////////////////////////////////////////// 846 847MemoryStream::MemoryStream() 848 : buffer_alloc_(NULL) { 849} 850 851MemoryStream::MemoryStream(const char* data) 852 : buffer_alloc_(NULL) { 853 SetData(data, strlen(data)); 854} 855 856MemoryStream::MemoryStream(const void* data, size_t length) 857 : buffer_alloc_(NULL) { 858 SetData(data, length); 859} 860 861MemoryStream::~MemoryStream() { 862 delete [] buffer_alloc_; 863} 864 865void MemoryStream::SetData(const void* data, size_t length) { 866 data_length_ = buffer_length_ = length; 867 delete [] buffer_alloc_; 868 buffer_alloc_ = new char[buffer_length_ + kAlignment]; 869 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment)); 870 memcpy(buffer_, data, data_length_); 871 seek_position_ = 0; 872} 873 874StreamResult MemoryStream::DoReserve(size_t size, int* error) { 875 if (buffer_length_ >= size) 876 return SR_SUCCESS; 877 878 if (char* new_buffer_alloc = new char[size + kAlignment]) { 879 char* new_buffer = reinterpret_cast<char*>( 880 ALIGNP(new_buffer_alloc, kAlignment)); 881 memcpy(new_buffer, buffer_, data_length_); 882 delete [] buffer_alloc_; 883 buffer_alloc_ = new_buffer_alloc; 884 buffer_ = new_buffer; 885 buffer_length_ = size; 886 return SR_SUCCESS; 887 } 888 889 if (error) { 890 *error = ENOMEM; 891 } 892 return SR_ERROR; 893} 894 895/////////////////////////////////////////////////////////////////////////////// 896 897ExternalMemoryStream::ExternalMemoryStream() { 898} 899 900ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) { 901 SetData(data, length); 902} 903 904ExternalMemoryStream::~ExternalMemoryStream() { 905} 906 907void ExternalMemoryStream::SetData(void* data, size_t length) { 908 data_length_ = buffer_length_ = length; 909 buffer_ = static_cast<char*>(data); 910 seek_position_ = 0; 911} 912 913/////////////////////////////////////////////////////////////////////////////// 914// FifoBuffer 915/////////////////////////////////////////////////////////////////////////////// 916 917FifoBuffer::FifoBuffer(size_t size) 918 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 919 data_length_(0), read_position_(0), owner_(Thread::Current()) { 920 // all events are done on the owner_ thread 921} 922 923FifoBuffer::FifoBuffer(size_t size, Thread* owner) 924 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 925 data_length_(0), read_position_(0), owner_(owner) { 926 // all events are done on the owner_ thread 927} 928 929FifoBuffer::~FifoBuffer() { 930} 931 932bool FifoBuffer::GetBuffered(size_t* size) const { 933 CritScope cs(&crit_); 934 *size = data_length_; 935 return true; 936} 937 938bool FifoBuffer::SetCapacity(size_t size) { 939 CritScope cs(&crit_); 940 if (data_length_ > size) { 941 return false; 942 } 943 944 if (size != buffer_length_) { 945 char* buffer = new char[size]; 946 const size_t copy = data_length_; 947 const size_t tail_copy = _min(copy, buffer_length_ - read_position_); 948 memcpy(buffer, &buffer_[read_position_], tail_copy); 949 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); 950 buffer_.reset(buffer); 951 read_position_ = 0; 952 buffer_length_ = size; 953 } 954 return true; 955} 956 957StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes, 958 size_t offset, size_t* bytes_read) { 959 CritScope cs(&crit_); 960 return ReadOffsetLocked(buffer, bytes, offset, bytes_read); 961} 962 963StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes, 964 size_t offset, size_t* bytes_written) { 965 CritScope cs(&crit_); 966 return WriteOffsetLocked(buffer, bytes, offset, bytes_written); 967} 968 969StreamState FifoBuffer::GetState() const { 970 return state_; 971} 972 973StreamResult FifoBuffer::Read(void* buffer, size_t bytes, 974 size_t* bytes_read, int* error) { 975 CritScope cs(&crit_); 976 const bool was_writable = data_length_ < buffer_length_; 977 size_t copy = 0; 978 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); 979 980 if (result == SR_SUCCESS) { 981 // If read was successful then adjust the read position and number of 982 // bytes buffered. 983 read_position_ = (read_position_ + copy) % buffer_length_; 984 data_length_ -= copy; 985 if (bytes_read) { 986 *bytes_read = copy; 987 } 988 989 // if we were full before, and now we're not, post an event 990 if (!was_writable && copy > 0) { 991 PostEvent(owner_, SE_WRITE, 0); 992 } 993 } 994 return result; 995} 996 997StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, 998 size_t* bytes_written, int* error) { 999 CritScope cs(&crit_); 1000 1001 const bool was_readable = (data_length_ > 0); 1002 size_t copy = 0; 1003 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); 1004 1005 if (result == SR_SUCCESS) { 1006 // If write was successful then adjust the number of readable bytes. 1007 data_length_ += copy; 1008 if (bytes_written) { 1009 *bytes_written = copy; 1010 } 1011 1012 // if we didn't have any data to read before, and now we do, post an event 1013 if (!was_readable && copy > 0) { 1014 PostEvent(owner_, SE_READ, 0); 1015 } 1016 } 1017 return result; 1018} 1019 1020void FifoBuffer::Close() { 1021 CritScope cs(&crit_); 1022 state_ = SS_CLOSED; 1023} 1024 1025const void* FifoBuffer::GetReadData(size_t* size) { 1026 CritScope cs(&crit_); 1027 *size = (read_position_ + data_length_ <= buffer_length_) ? 1028 data_length_ : buffer_length_ - read_position_; 1029 return &buffer_[read_position_]; 1030} 1031 1032void FifoBuffer::ConsumeReadData(size_t size) { 1033 CritScope cs(&crit_); 1034 ASSERT(size <= data_length_); 1035 const bool was_writable = data_length_ < buffer_length_; 1036 read_position_ = (read_position_ + size) % buffer_length_; 1037 data_length_ -= size; 1038 if (!was_writable && size > 0) { 1039 PostEvent(owner_, SE_WRITE, 0); 1040 } 1041} 1042 1043void* FifoBuffer::GetWriteBuffer(size_t* size) { 1044 CritScope cs(&crit_); 1045 if (state_ == SS_CLOSED) { 1046 return NULL; 1047 } 1048 1049 // if empty, reset the write position to the beginning, so we can get 1050 // the biggest possible block 1051 if (data_length_ == 0) { 1052 read_position_ = 0; 1053 } 1054 1055 const size_t write_position = (read_position_ + data_length_) 1056 % buffer_length_; 1057 *size = (write_position > read_position_ || data_length_ == 0) ? 1058 buffer_length_ - write_position : read_position_ - write_position; 1059 return &buffer_[write_position]; 1060} 1061 1062void FifoBuffer::ConsumeWriteBuffer(size_t size) { 1063 CritScope cs(&crit_); 1064 ASSERT(size <= buffer_length_ - data_length_); 1065 const bool was_readable = (data_length_ > 0); 1066 data_length_ += size; 1067 if (!was_readable && size > 0) { 1068 PostEvent(owner_, SE_READ, 0); 1069 } 1070} 1071 1072bool FifoBuffer::GetWriteRemaining(size_t* size) const { 1073 CritScope cs(&crit_); 1074 *size = buffer_length_ - data_length_; 1075 return true; 1076} 1077 1078StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, 1079 size_t bytes, 1080 size_t offset, 1081 size_t* bytes_read) { 1082 if (offset >= data_length_) { 1083 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; 1084 } 1085 1086 const size_t available = data_length_ - offset; 1087 const size_t read_position = (read_position_ + offset) % buffer_length_; 1088 const size_t copy = _min(bytes, available); 1089 const size_t tail_copy = _min(copy, buffer_length_ - read_position); 1090 char* const p = static_cast<char*>(buffer); 1091 memcpy(p, &buffer_[read_position], tail_copy); 1092 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); 1093 1094 if (bytes_read) { 1095 *bytes_read = copy; 1096 } 1097 return SR_SUCCESS; 1098} 1099 1100StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, 1101 size_t bytes, 1102 size_t offset, 1103 size_t* bytes_written) { 1104 if (state_ == SS_CLOSED) { 1105 return SR_EOS; 1106 } 1107 1108 if (data_length_ + offset >= buffer_length_) { 1109 return SR_BLOCK; 1110 } 1111 1112 const size_t available = buffer_length_ - data_length_ - offset; 1113 const size_t write_position = (read_position_ + data_length_ + offset) 1114 % buffer_length_; 1115 const size_t copy = _min(bytes, available); 1116 const size_t tail_copy = _min(copy, buffer_length_ - write_position); 1117 const char* const p = static_cast<const char*>(buffer); 1118 memcpy(&buffer_[write_position], p, tail_copy); 1119 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); 1120 1121 if (bytes_written) { 1122 *bytes_written = copy; 1123 } 1124 return SR_SUCCESS; 1125} 1126 1127 1128 1129/////////////////////////////////////////////////////////////////////////////// 1130// LoggingAdapter 1131/////////////////////////////////////////////////////////////////////////////// 1132 1133LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level, 1134 const std::string& label, bool hex_mode) 1135 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) { 1136 set_label(label); 1137} 1138 1139void LoggingAdapter::set_label(const std::string& label) { 1140 label_.assign("["); 1141 label_.append(label); 1142 label_.append("]"); 1143} 1144 1145StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len, 1146 size_t* read, int* error) { 1147 size_t local_read; if (!read) read = &local_read; 1148 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, 1149 error); 1150 if (result == SR_SUCCESS) { 1151 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_); 1152 } 1153 return result; 1154} 1155 1156StreamResult LoggingAdapter::Write(const void* data, size_t data_len, 1157 size_t* written, int* error) { 1158 size_t local_written; 1159 if (!written) written = &local_written; 1160 StreamResult result = StreamAdapterInterface::Write(data, data_len, written, 1161 error); 1162 if (result == SR_SUCCESS) { 1163 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_, 1164 &lms_); 1165 } 1166 return result; 1167} 1168 1169void LoggingAdapter::Close() { 1170 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 1171 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 1172 LOG_V(level_) << label_ << " Closed locally"; 1173 StreamAdapterInterface::Close(); 1174} 1175 1176void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) { 1177 if (events & SE_OPEN) { 1178 LOG_V(level_) << label_ << " Open"; 1179 } else if (events & SE_CLOSE) { 1180 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 1181 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 1182 LOG_V(level_) << label_ << " Closed with error: " << err; 1183 } 1184 StreamAdapterInterface::OnEvent(stream, events, err); 1185} 1186 1187/////////////////////////////////////////////////////////////////////////////// 1188// StringStream - Reads/Writes to an external std::string 1189/////////////////////////////////////////////////////////////////////////////// 1190 1191StringStream::StringStream(std::string& str) 1192 : str_(str), read_pos_(0), read_only_(false) { 1193} 1194 1195StringStream::StringStream(const std::string& str) 1196 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) { 1197} 1198 1199StreamState StringStream::GetState() const { 1200 return SS_OPEN; 1201} 1202 1203StreamResult StringStream::Read(void* buffer, size_t buffer_len, 1204 size_t* read, int* error) { 1205 size_t available = _min(buffer_len, str_.size() - read_pos_); 1206 if (!available) 1207 return SR_EOS; 1208 memcpy(buffer, str_.data() + read_pos_, available); 1209 read_pos_ += available; 1210 if (read) 1211 *read = available; 1212 return SR_SUCCESS; 1213} 1214 1215StreamResult StringStream::Write(const void* data, size_t data_len, 1216 size_t* written, int* error) { 1217 if (read_only_) { 1218 if (error) { 1219 *error = -1; 1220 } 1221 return SR_ERROR; 1222 } 1223 str_.append(static_cast<const char*>(data), 1224 static_cast<const char*>(data) + data_len); 1225 if (written) 1226 *written = data_len; 1227 return SR_SUCCESS; 1228} 1229 1230void StringStream::Close() { 1231} 1232 1233bool StringStream::SetPosition(size_t position) { 1234 if (position > str_.size()) 1235 return false; 1236 read_pos_ = position; 1237 return true; 1238} 1239 1240bool StringStream::GetPosition(size_t* position) const { 1241 if (position) 1242 *position = read_pos_; 1243 return true; 1244} 1245 1246bool StringStream::GetSize(size_t* size) const { 1247 if (size) 1248 *size = str_.size(); 1249 return true; 1250} 1251 1252bool StringStream::GetAvailable(size_t* size) const { 1253 if (size) 1254 *size = str_.size() - read_pos_; 1255 return true; 1256} 1257 1258bool StringStream::ReserveSize(size_t size) { 1259 if (read_only_) 1260 return false; 1261 str_.reserve(size); 1262 return true; 1263} 1264 1265/////////////////////////////////////////////////////////////////////////////// 1266// StreamReference 1267/////////////////////////////////////////////////////////////////////////////// 1268 1269StreamReference::StreamReference(StreamInterface* stream) 1270 : StreamAdapterInterface(stream, false) { 1271 // owner set to false so the destructor does not free the stream. 1272 stream_ref_count_ = new StreamRefCount(stream); 1273} 1274 1275StreamInterface* StreamReference::NewReference() { 1276 stream_ref_count_->AddReference(); 1277 return new StreamReference(stream_ref_count_, stream()); 1278} 1279 1280StreamReference::~StreamReference() { 1281 stream_ref_count_->Release(); 1282} 1283 1284StreamReference::StreamReference(StreamRefCount* stream_ref_count, 1285 StreamInterface* stream) 1286 : StreamAdapterInterface(stream, false), 1287 stream_ref_count_(stream_ref_count) { 1288} 1289 1290/////////////////////////////////////////////////////////////////////////////// 1291 1292StreamResult Flow(StreamInterface* source, 1293 char* buffer, size_t buffer_len, 1294 StreamInterface* sink, 1295 size_t* data_len /* = NULL */) { 1296 ASSERT(buffer_len > 0); 1297 1298 StreamResult result; 1299 size_t count, read_pos, write_pos; 1300 if (data_len) { 1301 read_pos = *data_len; 1302 } else { 1303 read_pos = 0; 1304 } 1305 1306 bool end_of_stream = false; 1307 do { 1308 // Read until buffer is full, end of stream, or error 1309 while (!end_of_stream && (read_pos < buffer_len)) { 1310 result = source->Read(buffer + read_pos, buffer_len - read_pos, 1311 &count, NULL); 1312 if (result == SR_EOS) { 1313 end_of_stream = true; 1314 } else if (result != SR_SUCCESS) { 1315 if (data_len) { 1316 *data_len = read_pos; 1317 } 1318 return result; 1319 } else { 1320 read_pos += count; 1321 } 1322 } 1323 1324 // Write until buffer is empty, or error (including end of stream) 1325 write_pos = 0; 1326 while (write_pos < read_pos) { 1327 result = sink->Write(buffer + write_pos, read_pos - write_pos, 1328 &count, NULL); 1329 if (result != SR_SUCCESS) { 1330 if (data_len) { 1331 *data_len = read_pos - write_pos; 1332 if (write_pos > 0) { 1333 memmove(buffer, buffer + write_pos, *data_len); 1334 } 1335 } 1336 return result; 1337 } 1338 write_pos += count; 1339 } 1340 1341 read_pos = 0; 1342 } while (!end_of_stream); 1343 1344 if (data_len) { 1345 *data_len = 0; 1346 } 1347 return SR_SUCCESS; 1348} 1349 1350/////////////////////////////////////////////////////////////////////////////// 1351 1352} // namespace talk_base 1353