1#ifndef _DEBLOCKBUFFER_HPP
2#define _DEBLOCKBUFFER_HPP
3/*-------------------------------------------------------------------------
4 * drawElements C++ Base Library
5 * -----------------------------
6 *
7 * Copyright 2014 The Android Open Source Project
8 *
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 *
13 *      http://www.apache.org/licenses/LICENSE-2.0
14 *
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 *
21 *//*!
22 * \file
23 * \brief Block-based thread-safe queue.
24 *//*--------------------------------------------------------------------*/
25
26#include "deBlockBuffer.hpp"
27#include "deMutex.hpp"
28#include "deSemaphore.h"
29
30#include <exception>
31
32namespace de
33{
34
35void BlockBuffer_selfTest (void);
36
37class BufferCanceledException : public std::exception
38{
39public:
40	inline BufferCanceledException	(void) {}
41	inline ~BufferCanceledException	(void) throw() {}
42
43	const char* what (void) const throw() { return "BufferCanceledException"; }
44};
45
46template <typename T>
47class BlockBuffer
48{
49public:
50	typedef BufferCanceledException CanceledException;
51
52					BlockBuffer			(int blockSize, int numBlocks);
53					~BlockBuffer		(void);
54
55	void			clear				(void); //!< Resets buffer. Will block until pending writes and reads have completed.
56
57	void			write				(int numElements, const T* elements);
58	int				tryWrite			(int numElements, const T* elements);
59	void			flush				(void);
60	bool			tryFlush			(void);
61
62	void			read				(int numElements, T* elements);
63	int				tryRead				(int numElements, T* elements);
64
65	void			cancel				(void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException.
66	bool			isCanceled			(void) const { return !!m_canceled; }
67
68private:
69					BlockBuffer			(const BlockBuffer& other);
70	BlockBuffer&	operator=			(const BlockBuffer& other);
71
72	int				writeToCurrentBlock	(int numElements, const T* elements, bool blocking);
73	int				readFromCurrentBlock(int numElements, T* elements, bool blocking);
74
75	void			flushWriteBlock		(void);
76
77	deSemaphore		m_fill;				//!< Block fill count.
78	deSemaphore		m_empty;			//!< Block empty count.
79
80	int				m_writeBlock;		//!< Current write block ndx.
81	int				m_writePos;			//!< Position in block. 0 if block is not yet acquired.
82
83	int				m_readBlock;		//!< Current read block ndx.
84	int				m_readPos;			//!< Position in block. 0 if block is not yet acquired.
85
86	int				m_blockSize;
87	int				m_numBlocks;
88
89	T*				m_elements;
90	int*			m_numUsedInBlock;
91
92	Mutex			m_writeLock;
93	Mutex			m_readLock;
94
95	volatile deUint32	m_canceled;
96};
97
98template <typename T>
99BlockBuffer<T>::BlockBuffer (int blockSize, int numBlocks)
100	: m_fill			(0)
101	, m_empty			(0)
102	, m_writeBlock		(0)
103	, m_writePos		(0)
104	, m_readBlock		(0)
105	, m_readPos			(0)
106	, m_blockSize		(blockSize)
107	, m_numBlocks		(numBlocks)
108	, m_elements		(DE_NULL)
109	, m_numUsedInBlock	(DE_NULL)
110	, m_writeLock		()
111	, m_readLock		()
112	, m_canceled		(DE_FALSE)
113{
114	DE_ASSERT(blockSize > 0);
115	DE_ASSERT(numBlocks > 0);
116
117	try
118	{
119		m_elements			= new T[m_numBlocks*m_blockSize];
120		m_numUsedInBlock	= new int[m_numBlocks];
121	}
122	catch (...)
123	{
124		delete[] m_elements;
125		delete[] m_numUsedInBlock;
126		throw;
127	}
128
129	m_fill	= deSemaphore_create(0, DE_NULL);
130	m_empty	= deSemaphore_create(numBlocks, DE_NULL);
131	DE_ASSERT(m_fill && m_empty);
132}
133
134template <typename T>
135BlockBuffer<T>::~BlockBuffer (void)
136{
137	delete[] m_elements;
138	delete[] m_numUsedInBlock;
139
140	deSemaphore_destroy(m_fill);
141	deSemaphore_destroy(m_empty);
142}
143
144template <typename T>
145void BlockBuffer<T>::clear (void)
146{
147	ScopedLock readLock		(m_readLock);
148	ScopedLock writeLock	(m_writeLock);
149
150	deSemaphore_destroy(m_fill);
151	deSemaphore_destroy(m_empty);
152
153	m_fill			= deSemaphore_create(0, DE_NULL);
154	m_empty			= deSemaphore_create(m_numBlocks, DE_NULL);
155	m_writeBlock	= 0;
156	m_writePos		= 0;
157	m_readBlock		= 0;
158	m_readPos		= 0;
159	m_canceled		= DE_FALSE;
160
161	DE_ASSERT(m_fill && m_empty);
162}
163
164template <typename T>
165void BlockBuffer<T>::cancel (void)
166{
167	DE_ASSERT(!m_canceled);
168	m_canceled = DE_TRUE;
169
170	deSemaphore_increment(m_empty);
171	deSemaphore_increment(m_fill);
172}
173
174template <typename T>
175int BlockBuffer<T>::writeToCurrentBlock (int numElements, const T* elements, bool blocking)
176{
177	DE_ASSERT(numElements > 0 && elements != DE_NULL);
178
179	if (m_writePos == 0)
180	{
181		/* Write thread doesn't own current block - need to acquire. */
182		if (blocking)
183			deSemaphore_decrement(m_empty);
184		else
185		{
186			if (!deSemaphore_tryDecrement(m_empty))
187				return 0;
188		}
189
190		/* Check for canceled bit. */
191		if (m_canceled)
192		{
193			// \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here.
194			deSemaphore_increment(m_empty);
195			m_writeLock.unlock();
196			throw CanceledException();
197		}
198	}
199
200	/* Write thread owns current block. */
201	T*		block			= m_elements + m_writeBlock*m_blockSize;
202	int		numToWrite		= de::min(numElements, m_blockSize-m_writePos);
203
204	DE_ASSERT(numToWrite > 0);
205
206	for (int ndx = 0; ndx < numToWrite; ndx++)
207		block[m_writePos+ndx] = elements[ndx];
208
209	m_writePos += numToWrite;
210
211	if (m_writePos == m_blockSize)
212		flushWriteBlock(); /* Flush current write block. */
213
214	return numToWrite;
215}
216
217template <typename T>
218int BlockBuffer<T>::readFromCurrentBlock (int numElements, T* elements, bool blocking)
219{
220	DE_ASSERT(numElements > 0 && elements != DE_NULL);
221
222	if (m_readPos == 0)
223	{
224		/* Read thread doesn't own current block - need to acquire. */
225		if (blocking)
226			deSemaphore_decrement(m_fill);
227		else
228		{
229			if (!deSemaphore_tryDecrement(m_fill))
230				return 0;
231		}
232
233		/* Check for canceled bit. */
234		if (m_canceled)
235		{
236			// \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here.
237			deSemaphore_increment(m_fill);
238			m_readLock.unlock();
239			throw CanceledException();
240		}
241	}
242
243	/* Read thread now owns current block. */
244	const T*	block			= m_elements + m_readBlock*m_blockSize;
245	int			numUsedInBlock	= m_numUsedInBlock[m_readBlock];
246	int			numToRead		= de::min(numElements, numUsedInBlock-m_readPos);
247
248	DE_ASSERT(numToRead > 0);
249
250	for (int ndx = 0; ndx < numToRead; ndx++)
251		elements[ndx] = block[m_readPos+ndx];
252
253	m_readPos += numToRead;
254
255	if (m_readPos == numUsedInBlock)
256	{
257		/* Free current read block and advance. */
258		m_readBlock		= (m_readBlock+1) % m_numBlocks;
259		m_readPos		= 0;
260		deSemaphore_increment(m_empty);
261	}
262
263	return numToRead;
264}
265
266template <typename T>
267int BlockBuffer<T>::tryWrite (int numElements, const T* elements)
268{
269	int numWritten = 0;
270
271	DE_ASSERT(numElements > 0 && elements != DE_NULL);
272
273	if (m_canceled)
274		throw CanceledException();
275
276	if (!m_writeLock.tryLock())
277		return numWritten;
278
279	while (numWritten < numElements)
280	{
281		int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */);
282
283		if (ret == 0)
284			break; /* Write failed. */
285
286		numWritten += ret;
287	}
288
289	m_writeLock.unlock();
290
291	return numWritten;
292}
293
294template <typename T>
295void BlockBuffer<T>::write (int numElements, const T* elements)
296{
297	DE_ASSERT(numElements > 0 && elements != DE_NULL);
298
299	if (m_canceled)
300		throw CanceledException();
301
302	m_writeLock.lock();
303
304	int numWritten = 0;
305	while (numWritten < numElements)
306		numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */);
307
308	m_writeLock.unlock();
309}
310
311template <typename T>
312void BlockBuffer<T>::flush (void)
313{
314	m_writeLock.lock();
315
316	if (m_writePos > 0)
317		flushWriteBlock();
318
319	m_writeLock.unlock();
320}
321
322template <typename T>
323bool BlockBuffer<T>::tryFlush (void)
324{
325	if (!m_writeLock.tryLock())
326		return false;
327
328	if (m_writePos > 0)
329		flushWriteBlock();
330
331	m_writeLock.unlock();
332
333	return true;
334}
335
336template <typename T>
337void BlockBuffer<T>::flushWriteBlock (void)
338{
339	DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize));
340
341	m_numUsedInBlock[m_writeBlock]	= m_writePos;
342	m_writeBlock					= (m_writeBlock+1) % m_numBlocks;
343	m_writePos						= 0;
344	deSemaphore_increment(m_fill);
345}
346
347template <typename T>
348int BlockBuffer<T>::tryRead (int numElements, T* elements)
349{
350	int numRead = 0;
351
352	if (m_canceled)
353		throw CanceledException();
354
355	if (!m_readLock.tryLock())
356		return numRead;
357
358	while (numRead < numElements)
359	{
360		int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */);
361
362		if (ret == 0)
363			break; /* Failed. */
364
365		numRead += ret;
366	}
367
368	m_readLock.unlock();
369
370	return numRead;
371}
372
373template <typename T>
374void BlockBuffer<T>::read (int numElements, T* elements)
375{
376	DE_ASSERT(numElements > 0 && elements != DE_NULL);
377
378	if (m_canceled)
379		throw CanceledException();
380
381	m_readLock.lock();
382
383	int numRead = 0;
384	while (numRead < numElements)
385		numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */);
386
387	m_readLock.unlock();
388}
389
390} // de
391
392#endif // _DEBLOCKBUFFER_HPP
393