1/*-------------------------------------------------------------------------
2 * drawElements Stream Library
3 * ---------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *      http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Buffered and threaded input and output streams
22 *//*--------------------------------------------------------------------*/
23
24#include "deThreadStream.h"
25#include "deStreamCpyThread.h"
26#include "deRingbuffer.h"
27#include "stdlib.h"
28
29typedef struct deThreadInStream_s
30{
31	deRingbuffer*		ringbuffer;
32	deInStream*			input;
33	deInStream			consumerStream;
34	deOutStream			producerStream;
35	deThread			thread;
36	int					bufferSize;
37} deThreadInStream;
38
39typedef struct deThreadOutStream_s
40{
41	deRingbuffer*		ringbuffer;
42	deInStream			consumerStream;
43	deOutStream			producerStream;
44	deStreamCpyThread*	thread;
45} deThreadOutStream;
46
47static void inStreamCopy (void* arg)
48{
49	deThreadInStream* threadStream = (deThreadInStream*)arg;
50
51	deUint8* buffer = malloc(sizeof(deUint8) * (size_t)threadStream->bufferSize);
52
53	for(;;)
54	{
55		deInt32 read	= 0;
56		deInt32 written	= 0;
57		deStreamResult readResult = DE_STREAMRESULT_ERROR;
58
59		readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
60		DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
61		while (written < read)
62		{
63			deInt32 wrote = 0;
64
65			/* \todo [mika] Handle errors */
66			deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);
67
68			written += wrote;
69		}
70
71		if (readResult == DE_STREAMRESULT_END_OF_STREAM)
72		{
73			break;
74		}
75	}
76
77	deOutStream_flush(&(threadStream->producerStream));
78	deRingbuffer_stop(threadStream->ringbuffer);
79	free(buffer);
80
81}
82
83static deStreamResult threadInStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* numRead)
84{
85	deThreadInStream* threadStream = (deThreadInStream*)stream;
86	return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
87}
88
89static const char* threadInStream_getError (deStreamData* stream)
90{
91	deThreadInStream* threadStream = (deThreadInStream*)stream;
92
93	/* \todo [mika] Add handling for errors on thread stream */
94	return deInStream_getError(&(threadStream->consumerStream));
95}
96
97static deStreamStatus threadInStream_getStatus (deStreamData* stream)
98{
99	deThreadInStream* threadStream = (deThreadInStream*)stream;
100
101	/* \todo [mika] Add handling for status on thread stream */
102	return deInStream_getStatus(&(threadStream->consumerStream));
103}
104
105/* \note [mika] Used by both in and out stream */
106static deStreamResult threadStream_deinit (deStreamData* stream)
107{
108	deThreadInStream* threadStream = (deThreadInStream*)stream;
109
110	deRingbuffer_stop(threadStream->ringbuffer);
111
112	deThread_join(threadStream->thread);
113	deThread_destroy(threadStream->thread);
114
115	deOutStream_deinit(&(threadStream->producerStream));
116	deInStream_deinit(&(threadStream->consumerStream));
117
118	deRingbuffer_destroy(threadStream->ringbuffer);
119
120	return DE_STREAMRESULT_SUCCESS;
121}
122
123static const deIOStreamVFTable threadInStreamVFTable = {
124	threadInStream_read,
125	DE_NULL,
126	threadInStream_getError,
127	DE_NULL,
128	threadStream_deinit,
129	threadInStream_getStatus
130};
131
132void deThreadInStream_init (deInStream* stream, deInStream* input, int ringbufferBlockSize, int ringbufferBlockCount)
133{
134	deThreadInStream* threadStream = DE_NULL;
135
136	threadStream = malloc(sizeof(deThreadInStream));
137	DE_ASSERT(threadStream);
138
139	threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
140	DE_ASSERT(threadStream->ringbuffer);
141
142	threadStream->bufferSize = ringbufferBlockSize;
143	threadStream->input = input;
144	deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
145	deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
146
147	threadStream->thread		= deThread_create(inStreamCopy, threadStream, DE_NULL);
148	stream->ioStream.vfTable	= &threadInStreamVFTable;
149	stream->ioStream.streamData = threadStream;
150}
151
152static deStreamResult threadOutStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* numWritten)
153{
154	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
155	return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
156}
157
158static const char* threadOutStream_getError (deStreamData* stream)
159{
160	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
161
162	/* \todo [mika] Add handling for errors on thread stream */
163	return deOutStream_getError(&(threadStream->producerStream));
164}
165
166static deStreamStatus threadOutStream_getStatus (deStreamData* stream)
167{
168	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
169
170	/* \todo [mika] Add handling for errors on thread stream */
171	return deOutStream_getStatus(&(threadStream->producerStream));
172}
173
174static deStreamResult threadOutStream_flush (deStreamData* stream)
175{
176	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
177
178	return deOutStream_flush(&(threadStream->producerStream));
179}
180
181static const deIOStreamVFTable threadOutStreamVFTable = {
182	DE_NULL,
183	threadOutStream_write,
184	threadOutStream_getError,
185	threadOutStream_flush,
186	threadStream_deinit,
187	threadOutStream_getStatus
188};
189
190void deThreadOutStream_init (deOutStream* stream, deOutStream* output, int ringbufferBlockSize, int ringbufferBlockCount)
191{
192	deThreadOutStream* threadStream = DE_NULL;
193
194	threadStream = malloc(sizeof(deThreadOutStream));
195	DE_ASSERT(threadStream);
196
197	threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
198	DE_ASSERT(threadStream->ringbuffer);
199
200	deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
201	deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
202
203	threadStream->thread		= deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
204	stream->ioStream.vfTable	= &threadOutStreamVFTable;
205	stream->ioStream.streamData = threadStream;
206}
207
208