1/*-------------------------------------------------------------------------
2 * drawElements Stream Library
3 * ---------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *      http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Thread safe ringbuffer
22 *//*--------------------------------------------------------------------*/
23#include "deRingbuffer.h"
24
25#include "deInt32.h"
26#include "deMemory.h"
27#include "deSemaphore.h"
28
29#include <stdlib.h>
30#include <stdio.h>
31
32struct deRingbuffer_s
33{
34	deInt32			blockSize;
35	deInt32			blockCount;
36	deInt32*		blockUsage;
37	deUint8*		buffer;
38
39	deSemaphore		emptyCount;
40	deSemaphore		fullCount;
41
42	deInt32			outBlock;
43	deInt32			outPos;
44
45	deInt32			inBlock;
46	deInt32			inPos;
47
48	deBool			stopNotified;
49	deBool			consumerStopping;
50};
51
52deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount)
53{
54	deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer));
55
56	DE_ASSERT(ringbuffer);
57	DE_ASSERT(blockCount > 0);
58	DE_ASSERT(blockSize > 0);
59
60	ringbuffer->blockSize	= blockSize;
61	ringbuffer->blockCount	= blockCount;
62	ringbuffer->buffer		= (deUint8*)deMalloc(sizeof(deUint8) * blockSize * blockCount);
63	ringbuffer->blockUsage	= (deInt32*)deMalloc(sizeof(deUint32) * blockCount);
64	ringbuffer->emptyCount	= deSemaphore_create(ringbuffer->blockCount, DE_NULL);
65	ringbuffer->fullCount	= deSemaphore_create(0, DE_NULL);
66
67	if (!ringbuffer->buffer		||
68		!ringbuffer->blockUsage	||
69		!ringbuffer->emptyCount	||
70		!ringbuffer->fullCount)
71	{
72		if (ringbuffer->emptyCount)
73			deSemaphore_destroy(ringbuffer->emptyCount);
74		if (ringbuffer->fullCount)
75			deSemaphore_destroy(ringbuffer->fullCount);
76		deFree(ringbuffer->buffer);
77		deFree(ringbuffer->blockUsage);
78		deFree(ringbuffer);
79		return DE_NULL;
80	}
81
82	memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * blockCount);
83
84	ringbuffer->outBlock	= 0;
85	ringbuffer->outPos		= 0;
86
87	ringbuffer->inBlock		= 0;
88	ringbuffer->inPos		= 0;
89
90	ringbuffer->stopNotified		= DE_FALSE;
91	ringbuffer->consumerStopping	= DE_FALSE;
92
93	return ringbuffer;
94}
95
96void deRingbuffer_stop (deRingbuffer* ringbuffer)
97{
98	/* Set notify to true and increment fullCount to let consumer continue */
99	ringbuffer->stopNotified = DE_TRUE;
100	deSemaphore_increment(ringbuffer->fullCount);
101}
102
103void deRingbuffer_destroy (deRingbuffer* ringbuffer)
104{
105	deSemaphore_destroy(ringbuffer->emptyCount);
106	deSemaphore_destroy(ringbuffer->fullCount);
107
108	free(ringbuffer->buffer);
109	free(ringbuffer->blockUsage);
110	free(ringbuffer);
111}
112
113static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written)
114{
115	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
116
117	DE_ASSERT(stream);
118	/* If ringbuffer is stopping return error on write */
119	if (ringbuffer->stopNotified)
120	{
121		DE_ASSERT(DE_FALSE);
122		return DE_STREAMRESULT_ERROR;
123	}
124
125	*written = 0;
126
127	/* Write while more data available */
128	while (*written < bufSize)
129	{
130		deInt32		writeSize	= 0;
131		deUint8*	src			= DE_NULL;
132		deUint8*	dst			= DE_NULL;
133
134		/* If between blocks accuire new block */
135		if (ringbuffer->inPos == 0)
136		{
137			deSemaphore_decrement(ringbuffer->emptyCount);
138		}
139
140		writeSize	= deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
141		dst			= ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
142		src			= (deUint8*)buf + *written;
143
144		deMemcpy(dst, src, writeSize);
145
146		ringbuffer->inPos += writeSize;
147		*written += writeSize;
148		ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
149
150		/* Block is full move to next one (or "between" this and next block) */
151		if (ringbuffer->inPos == ringbuffer->blockSize)
152		{
153			ringbuffer->inPos = 0;
154			ringbuffer->inBlock++;
155
156			if (ringbuffer->inBlock == ringbuffer->blockCount)
157				ringbuffer->inBlock = 0;
158			deSemaphore_increment(ringbuffer->fullCount);
159		}
160	}
161
162	return DE_STREAMRESULT_SUCCESS;
163}
164
165static deStreamResult producerStream_flush (deStreamData* stream)
166{
167	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
168
169	DE_ASSERT(stream);
170
171	/* No blocks reserved by producer */
172	if (ringbuffer->inPos == 0)
173		return DE_STREAMRESULT_SUCCESS;
174
175	ringbuffer->inPos		= 0;
176	ringbuffer->inBlock++;
177
178	if (ringbuffer->inBlock == ringbuffer->blockCount)
179		ringbuffer->inBlock = 0;
180
181	deSemaphore_increment(ringbuffer->fullCount);
182	return DE_STREAMRESULT_SUCCESS;
183}
184
185static deStreamResult producerStream_deinit (deStreamData* stream)
186{
187	DE_ASSERT(stream);
188
189	producerStream_flush(stream);
190
191	/* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
192	return DE_STREAMRESULT_SUCCESS;
193}
194
195static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read)
196{
197	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
198
199	DE_ASSERT(stream);
200
201	*read = 0;
202	DE_ASSERT(ringbuffer);
203
204	while (*read < bufSize)
205	{
206		deInt32		writeSize	= 0;
207		deUint8*	src			= DE_NULL;
208		deUint8*	dst			= DE_NULL;
209
210		/* If between blocks accuire new block */
211		if (ringbuffer->outPos == 0)
212		{
213			/* If consumer is set to stop after everything is consumed,
214			 * do not block if there is no more input left
215			 */
216			if (ringbuffer->consumerStopping)
217			{
218				/* Try to accuire new block, if can't there is no more input */
219				if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
220				{
221					return DE_STREAMRESULT_END_OF_STREAM;
222				}
223			}
224			else
225			{
226				/* If not stopping block until there is more input */
227				deSemaphore_decrement(ringbuffer->fullCount);
228				/* Ringbuffer was set to stop */
229				if (ringbuffer->stopNotified)
230				{
231					ringbuffer->consumerStopping = DE_TRUE;
232				}
233			}
234
235		}
236
237		writeSize	= deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
238		src			= ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
239		dst			= (deUint8*)buf + *read;
240
241		deMemcpy(dst, src, writeSize);
242
243		ringbuffer->outPos += writeSize;
244		*read += writeSize;
245
246		/* Block is consumed move to next one (or "between" this and next block) */
247		if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
248		{
249			ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
250			ringbuffer->outPos = 0;
251			ringbuffer->outBlock++;
252
253			if (ringbuffer->outBlock == ringbuffer->blockCount)
254				ringbuffer->outBlock = 0;
255
256			deSemaphore_increment(ringbuffer->emptyCount);
257		}
258	}
259
260	return DE_STREAMRESULT_SUCCESS;
261}
262
263
264static deStreamResult consumerStream_deinit (deStreamData* stream)
265{
266	DE_ASSERT(stream);
267	DE_UNREF(stream);
268
269	return DE_STREAMRESULT_SUCCESS;
270}
271
272/* There are no sensible errors so status is always good */
273deStreamStatus dummy_getStatus (deStreamData* stream)
274{
275	DE_UNREF(stream);
276
277	return DE_STREAMSTATUS_GOOD;
278}
279
280/* There are no sensible errors in ringbuffer */
281static const char* dummy_getError (deStreamData* stream)
282{
283	DE_ASSERT(stream);
284	DE_UNREF(stream);
285	return DE_NULL;
286}
287
288static const deIOStreamVFTable producerStreamVFTable = {
289	DE_NULL,
290	producerStream_write,
291	dummy_getError,
292	producerStream_flush,
293	producerStream_deinit,
294	dummy_getStatus
295};
296
297static const deIOStreamVFTable consumerStreamVFTable = {
298	consumerStream_read,
299	DE_NULL,
300	dummy_getError,
301	DE_NULL,
302	consumerStream_deinit,
303	dummy_getStatus
304};
305
306void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer)
307{
308	stream->ioStream.streamData = (deStreamData*)buffer;
309	stream->ioStream.vfTable = &producerStreamVFTable;
310}
311
312void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer)
313{
314	stream->ioStream.streamData = (deStreamData*)buffer;
315	stream->ioStream.vfTable = &consumerStreamVFTable;
316}
317