1// StreamBinder.cpp 2 3#include "StdAfx.h" 4 5#include "StreamBinder.h" 6#include "../../Common/Defs.h" 7#include "../../Common/MyCom.h" 8 9using namespace NWindows; 10using namespace NSynchronization; 11 12class CSequentialInStreamForBinder: 13 public ISequentialInStream, 14 public CMyUnknownImp 15{ 16public: 17 MY_UNKNOWN_IMP 18 19 STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize); 20private: 21 CStreamBinder *m_StreamBinder; 22public: 23 ~CSequentialInStreamForBinder() { m_StreamBinder->CloseRead(); } 24 void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; } 25}; 26 27STDMETHODIMP CSequentialInStreamForBinder::Read(void *data, UInt32 size, UInt32 *processedSize) 28 { return m_StreamBinder->Read(data, size, processedSize); } 29 30class CSequentialOutStreamForBinder: 31 public ISequentialOutStream, 32 public CMyUnknownImp 33{ 34public: 35 MY_UNKNOWN_IMP 36 37 STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize); 38 39private: 40 CStreamBinder *m_StreamBinder; 41public: 42 ~CSequentialOutStreamForBinder() { m_StreamBinder->CloseWrite(); } 43 void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; } 44}; 45 46STDMETHODIMP CSequentialOutStreamForBinder::Write(const void *data, UInt32 size, UInt32 *processedSize) 47 { return m_StreamBinder->Write(data, size, processedSize); } 48 49 50////////////////////////// 51// CStreamBinder 52// (_thereAreBytesToReadEvent && _bufferSize == 0) means that stream is finished. 53 54HRes CStreamBinder::CreateEvents() 55{ 56 RINOK(_allBytesAreWritenEvent.Create(true)); 57 RINOK(_thereAreBytesToReadEvent.Create()); 58 return _readStreamIsClosedEvent.Create(); 59} 60 61void CStreamBinder::ReInit() 62{ 63 _thereAreBytesToReadEvent.Reset(); 64 _readStreamIsClosedEvent.Reset(); 65 ProcessedSize = 0; 66} 67 68 69 70void CStreamBinder::CreateStreams(ISequentialInStream **inStream, 71 ISequentialOutStream **outStream) 72{ 73 CSequentialInStreamForBinder *inStreamSpec = new 74 CSequentialInStreamForBinder; 75 CMyComPtr<ISequentialInStream> inStreamLoc(inStreamSpec); 76 inStreamSpec->SetBinder(this); 77 *inStream = inStreamLoc.Detach(); 78 79 CSequentialOutStreamForBinder *outStreamSpec = new 80 CSequentialOutStreamForBinder; 81 CMyComPtr<ISequentialOutStream> outStreamLoc(outStreamSpec); 82 outStreamSpec->SetBinder(this); 83 *outStream = outStreamLoc.Detach(); 84 85 _buffer = NULL; 86 _bufferSize= 0; 87 ProcessedSize = 0; 88} 89 90HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize) 91{ 92 UInt32 sizeToRead = size; 93 if (size > 0) 94 { 95 RINOK(_thereAreBytesToReadEvent.Lock()); 96 sizeToRead = MyMin(_bufferSize, size); 97 if (_bufferSize > 0) 98 { 99 memcpy(data, _buffer, sizeToRead); 100 _buffer = ((const Byte *)_buffer) + sizeToRead; 101 _bufferSize -= sizeToRead; 102 if (_bufferSize == 0) 103 { 104 _thereAreBytesToReadEvent.Reset(); 105 _allBytesAreWritenEvent.Set(); 106 } 107 } 108 } 109 if (processedSize != NULL) 110 *processedSize = sizeToRead; 111 ProcessedSize += sizeToRead; 112 return S_OK; 113} 114 115void CStreamBinder::CloseRead() 116{ 117 _readStreamIsClosedEvent.Set(); 118} 119 120HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize) 121{ 122 if (size > 0) 123 { 124 _buffer = data; 125 _bufferSize = size; 126 _allBytesAreWritenEvent.Reset(); 127 _thereAreBytesToReadEvent.Set(); 128 129 HANDLE events[2]; 130 events[0] = _allBytesAreWritenEvent; 131 events[1] = _readStreamIsClosedEvent; 132 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); 133 if (waitResult != WAIT_OBJECT_0 + 0) 134 { 135 // ReadingWasClosed = true; 136 return S_FALSE; 137 } 138 // if(!_allBytesAreWritenEvent.Lock()) 139 // return E_FAIL; 140 } 141 if (processedSize != NULL) 142 *processedSize = size; 143 return S_OK; 144} 145 146void CStreamBinder::CloseWrite() 147{ 148 // _bufferSize must be = 0 149 _thereAreBytesToReadEvent.Set(); 150} 151