stream.h revision f74420b3285b9fe04a7e00aa3b8c0ab07ea344bc
1/* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#ifndef TALK_BASE_STREAM_H__ 29#define TALK_BASE_STREAM_H__ 30 31#include "talk/base/basictypes.h" 32#include "talk/base/criticalsection.h" 33#include "talk/base/logging.h" 34#include "talk/base/messagehandler.h" 35#include "talk/base/scoped_ptr.h" 36#include "talk/base/sigslot.h" 37 38namespace talk_base { 39 40/////////////////////////////////////////////////////////////////////////////// 41// StreamInterface is a generic asynchronous stream interface, supporting read, 42// write, and close operations, and asynchronous signalling of state changes. 43// The interface is designed with file, memory, and socket implementations in 44// mind. Some implementations offer extended operations, such as seeking. 45/////////////////////////////////////////////////////////////////////////////// 46 47// The following enumerations are declared outside of the StreamInterface 48// class for brevity in use. 49 50// The SS_OPENING state indicates that the stream will signal open or closed 51// in the future. 52enum StreamState { SS_CLOSED, SS_OPENING, SS_OPEN }; 53 54// Stream read/write methods return this value to indicate various success 55// and failure conditions described below. 56enum StreamResult { SR_ERROR, SR_SUCCESS, SR_BLOCK, SR_EOS }; 57 58// StreamEvents are used to asynchronously signal state transitionss. The flags 59// may be combined. 60// SE_OPEN: The stream has transitioned to the SS_OPEN state 61// SE_CLOSE: The stream has transitioned to the SS_CLOSED state 62// SE_READ: Data is available, so Read is likely to not return SR_BLOCK 63// SE_WRITE: Data can be written, so Write is likely to not return SR_BLOCK 64enum StreamEvent { SE_OPEN = 1, SE_READ = 2, SE_WRITE = 4, SE_CLOSE = 8 }; 65 66class Thread; 67 68class StreamInterface : public MessageHandler { 69 public: 70 virtual ~StreamInterface() { } 71 72 virtual StreamState GetState() const = 0; 73 74 // Read attempts to fill buffer of size buffer_len. Write attempts to send 75 // data_len bytes stored in data. The variables read and write are set only 76 // on SR_SUCCESS (see below). Likewise, error is only set on SR_ERROR. 77 // Read and Write return a value indicating: 78 // SR_ERROR: an error occurred, which is returned in a non-null error 79 // argument. Interpretation of the error requires knowledge of the 80 // stream's concrete type, which limits its usefulness. 81 // SR_SUCCESS: some number of bytes were successfully written, which is 82 // returned in a non-null read/write argument. 83 // SR_BLOCK: the stream is in non-blocking mode, and the operation would 84 // block, or the stream is in SS_OPENING state. 85 // SR_EOS: the end-of-stream has been reached, or the stream is in the 86 // SS_CLOSED state. 87 virtual StreamResult Read(void* buffer, size_t buffer_len, 88 size_t* read, int* error) = 0; 89 virtual StreamResult Write(const void* data, size_t data_len, 90 size_t* written, int* error) = 0; 91 // Attempt to transition to the SS_CLOSED state. SE_CLOSE will not be 92 // signalled as a result of this call. 93 virtual void Close() = 0; 94 95 // Streams may signal one or more StreamEvents to indicate state changes. 96 // The first argument identifies the stream on which the state change occured. 97 // The second argument is a bit-wise combination of StreamEvents. 98 // If SE_CLOSE is signalled, then the third argument is the associated error 99 // code. Otherwise, the value is undefined. 100 // Note: Not all streams will support asynchronous event signalling. However, 101 // SS_OPENING and SR_BLOCK returned from stream member functions imply that 102 // certain events will be raised in the future. 103 sigslot::signal3<StreamInterface*, int, int> SignalEvent; 104 105 // Like calling SignalEvent, but posts a message to the specified thread, 106 // which will call SignalEvent. This helps unroll the stack and prevent 107 // re-entrancy. 108 void PostEvent(Thread* t, int events, int err); 109 // Like the aforementioned method, but posts to the current thread. 110 void PostEvent(int events, int err); 111 112 // 113 // OPTIONAL OPERATIONS 114 // 115 // Not all implementations will support the following operations. In general, 116 // a stream will only support an operation if it reasonably efficient to do 117 // so. For example, while a socket could buffer incoming data to support 118 // seeking, it will not do so. Instead, a buffering stream adapter should 119 // be used. 120 // 121 // Even though several of these operations are related, you should 122 // always use whichever operation is most relevant. For example, you may 123 // be tempted to use GetSize() and GetPosition() to deduce the result of 124 // GetAvailable(). However, a stream which is read-once may support the 125 // latter operation but not the former. 126 // 127 128 // The following four methods are used to avoid coping data multiple times. 129 130 // GetReadData returns a pointer to a buffer which is owned by the stream. 131 // The buffer contains data_len bytes. NULL is returned if no data is 132 // available, or if the method fails. If the caller processes the data, it 133 // must call ConsumeReadData with the number of processed bytes. GetReadData 134 // does not require a matching call to ConsumeReadData if the data is not 135 // processed. Read and ConsumeReadData invalidate the buffer returned by 136 // GetReadData. 137 virtual const void* GetReadData(size_t* data_len) { return NULL; } 138 virtual void ConsumeReadData(size_t used) {} 139 140 // GetWriteBuffer returns a pointer to a buffer which is owned by the stream. 141 // The buffer has a capacity of buf_len bytes. NULL is returned if there is 142 // no buffer available, or if the method fails. The call may write data to 143 // the buffer, and then call ConsumeWriteBuffer with the number of bytes 144 // written. GetWriteBuffer does not require a matching call to 145 // ConsumeWriteData if no data is written. Write, ForceWrite, and 146 // ConsumeWriteData invalidate the buffer returned by GetWriteBuffer. 147 // TODO: Allow the caller to specify a minimum buffer size. If the specified 148 // amount of buffer is not yet available, return NULL and Signal SE_WRITE 149 // when it is available. If the requested amount is too large, return an 150 // error. 151 virtual void* GetWriteBuffer(size_t* buf_len) { return NULL; } 152 virtual void ConsumeWriteBuffer(size_t used) {} 153 154 // Write data_len bytes found in data, circumventing any throttling which 155 // would could cause SR_BLOCK to be returned. Returns true if all the data 156 // was written. Otherwise, the method is unsupported, or an unrecoverable 157 // error occurred, and the error value is set. This method should be used 158 // sparingly to write critical data which should not be throttled. A stream 159 // which cannot circumvent its blocking constraints should not implement this 160 // method. 161 // NOTE: This interface is being considered experimentally at the moment. It 162 // would be used by JUDP and BandwidthStream as a way to circumvent certain 163 // soft limits in writing. 164 //virtual bool ForceWrite(const void* data, size_t data_len, int* error) { 165 // if (error) *error = -1; 166 // return false; 167 //} 168 169 // Seek to a byte offset from the beginning of the stream. Returns false if 170 // the stream does not support seeking, or cannot seek to the specified 171 // position. 172 virtual bool SetPosition(size_t position) { return false; } 173 174 // Get the byte offset of the current position from the start of the stream. 175 // Returns false if the position is not known. 176 virtual bool GetPosition(size_t* position) const { return false; } 177 178 // Get the byte length of the entire stream. Returns false if the length 179 // is not known. 180 virtual bool GetSize(size_t* size) const { return false; } 181 182 // Return the number of Read()-able bytes remaining before end-of-stream. 183 // Returns false if not known. 184 virtual bool GetAvailable(size_t* size) const { return false; } 185 186 // Return the number of Write()-able bytes remaining before end-of-stream. 187 // Returns false if not known. 188 virtual bool GetWriteRemaining(size_t* size) const { return false; } 189 190 // Communicates the amount of data which will be written to the stream. The 191 // stream may choose to preallocate memory to accomodate this data. The 192 // stream may return false to indicate that there is not enough room (ie, 193 // Write will return SR_EOS/SR_ERROR at some point). Note that calling this 194 // function should not affect the existing state of data in the stream. 195 virtual bool ReserveSize(size_t size) { return true; } 196 197 // 198 // CONVENIENCE METHODS 199 // 200 // These methods are implemented in terms of other methods, for convenience. 201 // 202 203 // Seek to the start of the stream. 204 inline bool Rewind() { return SetPosition(0); } 205 206 // WriteAll is a helper function which repeatedly calls Write until all the 207 // data is written, or something other than SR_SUCCESS is returned. Note that 208 // unlike Write, the argument 'written' is always set, and may be non-zero 209 // on results other than SR_SUCCESS. The remaining arguments have the 210 // same semantics as Write. 211 StreamResult WriteAll(const void* data, size_t data_len, 212 size_t* written, int* error); 213 214 // Similar to ReadAll. Calls Read until buffer_len bytes have been read, or 215 // until a non-SR_SUCCESS result is returned. 'read' is always set. 216 StreamResult ReadAll(void* buffer, size_t buffer_len, 217 size_t* read, int* error); 218 219 // ReadLine is a helper function which repeatedly calls Read until it hits 220 // the end-of-line character, or something other than SR_SUCCESS. 221 // TODO: this is too inefficient to keep here. Break this out into a buffered 222 // readline object or adapter 223 StreamResult ReadLine(std::string *line); 224 225 protected: 226 StreamInterface() { } 227 228 // MessageHandler Interface 229 virtual void OnMessage(Message* msg); 230 231 private: 232 DISALLOW_EVIL_CONSTRUCTORS(StreamInterface); 233}; 234 235/////////////////////////////////////////////////////////////////////////////// 236// StreamAdapterInterface is a convenient base-class for adapting a stream. 237// By default, all operations are pass-through. Override the methods that you 238// require adaptation. Streams should really be upgraded to reference-counted. 239// In the meantime, use the owned flag to indicate whether the adapter should 240// own the adapted stream. 241/////////////////////////////////////////////////////////////////////////////// 242 243class StreamAdapterInterface : public StreamInterface, 244 public sigslot::has_slots<> { 245 public: 246 explicit StreamAdapterInterface(StreamInterface* stream, bool owned = true) 247 : stream_(stream), owned_(owned) { 248 if (NULL != stream_) 249 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 250 } 251 252 // Core Stream Interface 253 virtual StreamState GetState() const { 254 return stream_->GetState(); 255 } 256 virtual StreamResult Read(void* buffer, size_t buffer_len, 257 size_t* read, int* error) { 258 return stream_->Read(buffer, buffer_len, read, error); 259 } 260 virtual StreamResult Write(const void* data, size_t data_len, 261 size_t* written, int* error) { 262 return stream_->Write(data, data_len, written, error); 263 } 264 virtual void Close() { 265 stream_->Close(); 266 } 267 268 // Optional Stream Interface 269 /* Note: Many stream adapters were implemented prior to this Read/Write 270 interface. Therefore, a simple pass through of data in those cases may 271 be broken. At a later time, we should do a once-over pass of all 272 adapters, and make them compliant with these interfaces, after which this 273 code can be uncommented. 274 virtual const void* GetReadData(size_t* data_len) { 275 return stream_->GetReadData(data_len); 276 } 277 virtual void ConsumeReadData(size_t used) { 278 stream_->ConsumeReadData(used); 279 } 280 281 virtual void* GetWriteBuffer(size_t* buf_len) { 282 return stream_->GetWriteBuffer(buf_len); 283 } 284 virtual void ConsumeWriteBuffer(size_t used) { 285 stream_->ConsumeWriteBuffer(used); 286 } 287 */ 288 289 /* Note: This interface is currently undergoing evaluation. 290 virtual bool ForceWrite(const void* data, size_t data_len, int* error) { 291 return stream_->ForceWrite(data, data_len, error); 292 } 293 */ 294 295 virtual bool SetPosition(size_t position) { 296 return stream_->SetPosition(position); 297 } 298 virtual bool GetPosition(size_t* position) const { 299 return stream_->GetPosition(position); 300 } 301 virtual bool GetSize(size_t* size) const { 302 return stream_->GetSize(size); 303 } 304 virtual bool GetAvailable(size_t* size) const { 305 return stream_->GetAvailable(size); 306 } 307 virtual bool GetWriteRemaining(size_t* size) const { 308 return stream_->GetWriteRemaining(size); 309 } 310 virtual bool ReserveSize(size_t size) { 311 return stream_->ReserveSize(size); 312 } 313 314 void Attach(StreamInterface* stream, bool owned = true) { 315 if (NULL != stream_) 316 stream_->SignalEvent.disconnect(this); 317 if (owned_) 318 delete stream_; 319 stream_ = stream; 320 owned_ = owned; 321 if (NULL != stream_) 322 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 323 } 324 StreamInterface* Detach() { 325 if (NULL != stream_) 326 stream_->SignalEvent.disconnect(this); 327 StreamInterface* stream = stream_; 328 stream_ = NULL; 329 return stream; 330 } 331 332 protected: 333 virtual ~StreamAdapterInterface() { 334 if (owned_) 335 delete stream_; 336 } 337 // Note that the adapter presents itself as the origin of the stream events, 338 // since users of the adapter may not recognize the adapted object. 339 virtual void OnEvent(StreamInterface* stream, int events, int err) { 340 SignalEvent(this, events, err); 341 } 342 StreamInterface* stream() { return stream_; } 343 344 private: 345 StreamInterface* stream_; 346 bool owned_; 347 DISALLOW_EVIL_CONSTRUCTORS(StreamAdapterInterface); 348}; 349 350/////////////////////////////////////////////////////////////////////////////// 351// StreamTap is a non-modifying, pass-through adapter, which copies all data 352// in either direction to the tap. Note that errors or blocking on writing to 353// the tap will prevent further tap writes from occurring. 354/////////////////////////////////////////////////////////////////////////////// 355 356class StreamTap : public StreamAdapterInterface { 357 public: 358 explicit StreamTap(StreamInterface* stream, StreamInterface* tap); 359 360 void AttachTap(StreamInterface* tap); 361 StreamInterface* DetachTap(); 362 StreamResult GetTapResult(int* error); 363 364 // StreamAdapterInterface Interface 365 virtual StreamResult Read(void* buffer, size_t buffer_len, 366 size_t* read, int* error); 367 virtual StreamResult Write(const void* data, size_t data_len, 368 size_t* written, int* error); 369 370 private: 371 scoped_ptr<StreamInterface> tap_; 372 StreamResult tap_result_; 373 int tap_error_; 374 DISALLOW_EVIL_CONSTRUCTORS(StreamTap); 375}; 376 377/////////////////////////////////////////////////////////////////////////////// 378// StreamSegment adapts a read stream, to expose a subset of the adapted 379// stream's data. This is useful for cases where a stream contains multiple 380// documents concatenated together. StreamSegment can expose a subset of 381// the data as an independent stream, including support for rewinding and 382// seeking. 383/////////////////////////////////////////////////////////////////////////////// 384 385class StreamSegment : public StreamAdapterInterface { 386 public: 387 // The current position of the adapted stream becomes the beginning of the 388 // segment. If a length is specified, it bounds the length of the segment. 389 explicit StreamSegment(StreamInterface* stream); 390 explicit StreamSegment(StreamInterface* stream, size_t length); 391 392 // StreamAdapterInterface Interface 393 virtual StreamResult Read(void* buffer, size_t buffer_len, 394 size_t* read, int* error); 395 virtual bool SetPosition(size_t position); 396 virtual bool GetPosition(size_t* position) const; 397 virtual bool GetSize(size_t* size) const; 398 virtual bool GetAvailable(size_t* size) const; 399 400 private: 401 size_t start_, pos_, length_; 402 DISALLOW_EVIL_CONSTRUCTORS(StreamSegment); 403}; 404 405/////////////////////////////////////////////////////////////////////////////// 406// NullStream gives errors on read, and silently discards all written data. 407/////////////////////////////////////////////////////////////////////////////// 408 409class NullStream : public StreamInterface { 410 public: 411 NullStream(); 412 virtual ~NullStream(); 413 414 // StreamInterface Interface 415 virtual StreamState GetState() const; 416 virtual StreamResult Read(void* buffer, size_t buffer_len, 417 size_t* read, int* error); 418 virtual StreamResult Write(const void* data, size_t data_len, 419 size_t* written, int* error); 420 virtual void Close(); 421}; 422 423/////////////////////////////////////////////////////////////////////////////// 424// FileStream is a simple implementation of a StreamInterface, which does not 425// support asynchronous notification. 426/////////////////////////////////////////////////////////////////////////////// 427 428class FileStream : public StreamInterface { 429 public: 430 FileStream(); 431 virtual ~FileStream(); 432 433 // The semantics of filename and mode are the same as stdio's fopen 434 virtual bool Open(const std::string& filename, const char* mode); 435 virtual bool OpenShare(const std::string& filename, const char* mode, 436 int shflag); 437 438 // By default, reads and writes are buffered for efficiency. Disabling 439 // buffering causes writes to block until the bytes on disk are updated. 440 virtual bool DisableBuffering(); 441 442 virtual StreamState GetState() const; 443 virtual StreamResult Read(void* buffer, size_t buffer_len, 444 size_t* read, int* error); 445 virtual StreamResult Write(const void* data, size_t data_len, 446 size_t* written, int* error); 447 virtual void Close(); 448 virtual bool SetPosition(size_t position); 449 virtual bool GetPosition(size_t* position) const; 450 virtual bool GetSize(size_t* size) const; 451 virtual bool GetAvailable(size_t* size) const; 452 virtual bool ReserveSize(size_t size); 453 454 bool Flush(); 455 456#if defined(POSIX) 457 // Tries to aquire an exclusive lock on the file. 458 // Use OpenShare(...) on win32 to get similar functionality. 459 bool TryLock(); 460 bool Unlock(); 461#endif 462 463 // Note: Deprecated in favor of Filesystem::GetFileSize(). 464 static bool GetSize(const std::string& filename, size_t* size); 465 466 protected: 467 virtual void DoClose(); 468 469 FILE* file_; 470 471 private: 472 DISALLOW_EVIL_CONSTRUCTORS(FileStream); 473}; 474 475#ifdef POSIX 476// A FileStream that is actually not a file, but the output or input of a 477// sub-command. See "man 3 popen" for documentation of the underlying OS popen() 478// function. 479class POpenStream : public FileStream { 480 public: 481 POpenStream() : wait_status_(-1) {} 482 virtual ~POpenStream(); 483 484 virtual bool Open(const std::string& subcommand, const char* mode); 485 // Same as Open(). shflag is ignored. 486 virtual bool OpenShare(const std::string& subcommand, const char* mode, 487 int shflag); 488 489 // Returns the wait status from the last Close() of an Open()'ed stream, or 490 // -1 if no Open()+Close() has been done on this object. Meaning of the number 491 // is documented in "man 2 wait". 492 int GetWaitStatus() const { return wait_status_; } 493 494 protected: 495 virtual void DoClose(); 496 497 private: 498 int wait_status_; 499}; 500#endif // POSIX 501 502/////////////////////////////////////////////////////////////////////////////// 503// MemoryStream is a simple implementation of a StreamInterface over in-memory 504// data. Data is read and written at the current seek position. Reads return 505// end-of-stream when they reach the end of data. Writes actually extend the 506// end of data mark. 507/////////////////////////////////////////////////////////////////////////////// 508 509class MemoryStreamBase : public StreamInterface { 510 public: 511 virtual StreamState GetState() const; 512 virtual StreamResult Read(void* buffer, size_t bytes, size_t* bytes_read, 513 int* error); 514 virtual StreamResult Write(const void* buffer, size_t bytes, 515 size_t* bytes_written, int* error); 516 virtual void Close(); 517 virtual bool SetPosition(size_t position); 518 virtual bool GetPosition(size_t* position) const; 519 virtual bool GetSize(size_t* size) const; 520 virtual bool GetAvailable(size_t* size) const; 521 virtual bool ReserveSize(size_t size); 522 523 char* GetBuffer() { return buffer_; } 524 const char* GetBuffer() const { return buffer_; } 525 526 protected: 527 MemoryStreamBase(); 528 529 virtual StreamResult DoReserve(size_t size, int* error); 530 531 // Invariant: 0 <= seek_position <= data_length_ <= buffer_length_ 532 char* buffer_; 533 size_t buffer_length_; 534 size_t data_length_; 535 size_t seek_position_; 536 537 private: 538 DISALLOW_EVIL_CONSTRUCTORS(MemoryStreamBase); 539}; 540 541// MemoryStream dynamically resizes to accomodate written data. 542 543class MemoryStream : public MemoryStreamBase { 544 public: 545 MemoryStream(); 546 MemoryStream(const char* data); // Calls SetData(data, strlen(data)) 547 MemoryStream(const void* data, size_t length); // Calls SetData(data, length) 548 virtual ~MemoryStream(); 549 550 void SetData(const void* data, size_t length); 551 552 protected: 553 virtual StreamResult DoReserve(size_t size, int* error); 554}; 555 556// ExternalMemoryStream adapts an external memory buffer, so writes which would 557// extend past the end of the buffer will return end-of-stream. 558 559class ExternalMemoryStream : public MemoryStreamBase { 560 public: 561 ExternalMemoryStream(); 562 ExternalMemoryStream(void* data, size_t length); 563 564 void SetData(void* data, size_t length); 565}; 566 567// FifoBuffer allows for efficient, thread-safe buffering of data between 568// writer and reader. As the data can wrap around the end of the buffer, 569// MemoryStreamBase can't help us here. 570 571class FifoBuffer : public StreamInterface { 572 public: 573 // Creates a FIFO buffer with the specified capacity. 574 explicit FifoBuffer(size_t length); 575 virtual ~FifoBuffer(); 576 // Gets the amount of data currently readable from the buffer. 577 bool GetBuffered(size_t* data_len) const; 578 // Resizes the buffer to the specified capacity. Fails if data_length_ > size 579 bool SetCapacity(size_t length); 580 581 // StreamInterface methods 582 virtual StreamState GetState() const; 583 virtual StreamResult Read(void* buffer, size_t bytes, 584 size_t* bytes_read, int* error); 585 virtual StreamResult Write(const void* buffer, size_t bytes, 586 size_t* bytes_written, int* error); 587 virtual void Close(); 588 virtual const void* GetReadData(size_t* data_len); 589 virtual void ConsumeReadData(size_t used); 590 virtual void* GetWriteBuffer(size_t *buf_len); 591 virtual void ConsumeWriteBuffer(size_t used); 592 593 private: 594 StreamState state_; // keeps the opened/closed state of the stream 595 scoped_array<char> buffer_; // the allocated buffer 596 size_t buffer_length_; // size of the allocated buffer 597 size_t data_length_; // amount of readable data in the buffer 598 size_t read_position_; // offset to the readable data 599 Thread* owner_; // stream callbacks are dispatched on this thread 600 mutable CriticalSection crit_; // object lock 601 DISALLOW_EVIL_CONSTRUCTORS(FifoBuffer); 602}; 603 604/////////////////////////////////////////////////////////////////////////////// 605 606class LoggingAdapter : public StreamAdapterInterface { 607public: 608 LoggingAdapter(StreamInterface* stream, LoggingSeverity level, 609 const std::string& label, bool hex_mode = false); 610 611 void set_label(const std::string& label); 612 613 virtual StreamResult Read(void* buffer, size_t buffer_len, 614 size_t* read, int* error); 615 virtual StreamResult Write(const void* data, size_t data_len, 616 size_t* written, int* error); 617 virtual void Close(); 618 619 protected: 620 virtual void OnEvent(StreamInterface* stream, int events, int err); 621 622 private: 623 LoggingSeverity level_; 624 std::string label_; 625 bool hex_mode_; 626 LogMultilineState lms_; 627 628 DISALLOW_EVIL_CONSTRUCTORS(LoggingAdapter); 629}; 630 631/////////////////////////////////////////////////////////////////////////////// 632// StringStream - Reads/Writes to an external std::string 633/////////////////////////////////////////////////////////////////////////////// 634 635class StringStream : public StreamInterface { 636public: 637 StringStream(std::string& str); 638 StringStream(const std::string& str); 639 640 virtual StreamState GetState() const; 641 virtual StreamResult Read(void* buffer, size_t buffer_len, 642 size_t* read, int* error); 643 virtual StreamResult Write(const void* data, size_t data_len, 644 size_t* written, int* error); 645 virtual void Close(); 646 virtual bool SetPosition(size_t position); 647 virtual bool GetPosition(size_t* position) const; 648 virtual bool GetSize(size_t* size) const; 649 virtual bool GetAvailable(size_t* size) const; 650 virtual bool ReserveSize(size_t size); 651 652private: 653 std::string& str_; 654 size_t read_pos_; 655 bool read_only_; 656}; 657 658/////////////////////////////////////////////////////////////////////////////// 659// StreamReference - A reference counting stream adapter 660/////////////////////////////////////////////////////////////////////////////// 661 662// Keep in mind that the streams and adapters defined in this file are 663// not thread-safe, so this has limited uses. 664 665// A StreamRefCount holds the reference count and a pointer to the 666// wrapped stream. It deletes the wrapped stream when there are no 667// more references. We can then have multiple StreamReference 668// instances pointing to one StreamRefCount, all wrapping the same 669// stream. 670 671class StreamReference : public StreamAdapterInterface { 672 class StreamRefCount; 673 public: 674 // Constructor for the first reference to a stream 675 // Note: get more references through NewReference(). Use this 676 // constructor only once on a given stream. 677 explicit StreamReference(StreamInterface* stream) 678 : StreamAdapterInterface(stream, false) { 679 // owner set to false so the destructor does not free the stream. 680 stream_ref_count_ = new StreamRefCount(stream); 681 } 682 StreamInterface* GetStream() { return stream(); } 683 StreamInterface* NewReference() { 684 stream_ref_count_->AddReference(); 685 return new StreamReference(stream_ref_count_, stream()); 686 } 687 virtual ~StreamReference() { 688 stream_ref_count_->Release(); 689 } 690 691 private: 692 class StreamRefCount { 693 public: 694 explicit StreamRefCount(StreamInterface* stream) 695 : stream_(stream), ref_count_(1) { 696 } 697 void AddReference() { 698 CritScope lock(&cs_); 699 ++ref_count_; 700 } 701 void Release() { 702 int ref_count; 703 { // Atomic ops would have been a better fit here. 704 CritScope lock(&cs_); 705 ref_count = --ref_count_; 706 } 707 if (ref_count == 0) { 708 delete stream_; 709 delete this; 710 } 711 } 712 private: 713 StreamInterface* stream_; 714 int ref_count_; 715 CriticalSection cs_; 716 DISALLOW_EVIL_CONSTRUCTORS(StreamRefCount); 717 }; 718 719 // Constructor for adding references 720 explicit StreamReference(StreamRefCount* stream_ref_count, 721 StreamInterface* stream) 722 : StreamAdapterInterface(stream, false), 723 stream_ref_count_(stream_ref_count) { 724 } 725 StreamRefCount* stream_ref_count_; 726 DISALLOW_EVIL_CONSTRUCTORS(StreamReference); 727}; 728 729/////////////////////////////////////////////////////////////////////////////// 730 731// Flow attempts to move bytes from source to sink via buffer of size 732// buffer_len. The function returns SR_SUCCESS when source reaches 733// end-of-stream (returns SR_EOS), and all the data has been written successful 734// to sink. Alternately, if source returns SR_BLOCK or SR_ERROR, or if sink 735// returns SR_BLOCK, SR_ERROR, or SR_EOS, then the function immediately returns 736// with the unexpected StreamResult value. 737// data_len is the length of the valid data in buffer. in case of error 738// this is the data that read from source but can't move to destination. 739// as a pass in parameter, it indicates data in buffer that should move to sink 740StreamResult Flow(StreamInterface* source, 741 char* buffer, size_t buffer_len, 742 StreamInterface* sink, size_t* data_len = NULL); 743 744/////////////////////////////////////////////////////////////////////////////// 745 746} // namespace talk_base 747 748#endif // TALK_BASE_STREAM_H__ 749