1/*-------------------------------------------------------------------------
2 * drawElements C++ Base Library
3 * -----------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *      http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Block-based thread-safe queue.
22 *//*--------------------------------------------------------------------*/
23
24#include "deBlockBuffer.hpp"
25#include "deRandom.hpp"
26#include "deThread.hpp"
27#include "deInt32.h"
28
29#include <vector>
30
31namespace de
32{
33
34using std::vector;
35
36namespace BlockBufferBasicTest
37{
38
39struct Message
40{
41	deUint32 data;
42
43	Message (deUint16 threadId, deUint16 payload)
44		: data((threadId << 16) | payload)
45	{
46	}
47
48	Message (void)
49		: data(0)
50	{
51	}
52
53	deUint16 getThreadId	(void) const { return data >> 16;		}
54	deUint16 getPayload		(void) const { return data & 0xffff;	}
55};
56
57typedef BlockBuffer<Message> MessageBuffer;
58
59class Consumer : public Thread
60{
61public:
62	Consumer (MessageBuffer& buffer, int numProducers)
63		: m_buffer		(buffer)
64	{
65		m_lastPayload.resize(numProducers, 0);
66		m_payloadSum.resize(numProducers, 0);
67	}
68
69	void run (void)
70	{
71		Random	rnd		((deUint32)m_lastPayload.size());
72		Message	tmpBuf	[64];
73		bool	consume	= true;
74
75		while (consume)
76		{
77			int numToRead	= rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
78			int numRead		= m_buffer.tryRead(numToRead, &tmpBuf[0]);
79
80			for (int ndx = 0; ndx < numRead; ndx++)
81			{
82				const Message& msg = tmpBuf[ndx];
83
84				deUint16 threadId = msg.getThreadId();
85
86				if (threadId == 0xffff)
87				{
88					/* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
89					if (ndx+1 < numRead)
90					{
91						m_buffer.write(numRead-ndx-1, &tmpBuf[ndx+1]);
92						m_buffer.flush();
93					}
94
95					consume = false;
96					break;
97				}
98				else
99				{
100					/* Verify message. */
101					DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
102					DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
103
104					m_lastPayload[threadId]	 = msg.getPayload();
105					m_payloadSum[threadId]	+= (deUint32)msg.getPayload();
106				}
107			}
108		}
109	}
110
111	deUint32 getPayloadSum (deUint16 threadId) const
112	{
113		return m_payloadSum[threadId];
114	}
115
116private:
117	MessageBuffer&			m_buffer;
118	vector<deUint16>		m_lastPayload;
119	vector<deUint32>		m_payloadSum;
120};
121
122class Producer : public Thread
123{
124public:
125	Producer (MessageBuffer& buffer, deUint16 threadId, int numMessages)
126		: m_buffer		(buffer)
127		, m_threadId	(threadId)
128		, m_numMessages	(numMessages)
129	{
130	}
131
132	void run (void)
133	{
134		// Yield to give main thread chance to start other producers.
135		deSleep(1);
136
137		Random	rnd		(m_threadId);
138		int		msgNdx	= 0;
139		Message	tmpBuf[64];
140
141		while (msgNdx < m_numMessages)
142		{
143			int writeSize = rnd.getInt(1, de::min(m_numMessages-msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
144			for (int ndx = 0; ndx < writeSize; ndx++)
145				tmpBuf[ndx] = Message(m_threadId, (deUint16)msgNdx++);
146
147			m_buffer.write(writeSize, &tmpBuf[0]);
148			if (rnd.getBool())
149				m_buffer.flush();
150		}
151	}
152
153private:
154	MessageBuffer&	m_buffer;
155	deUint16		m_threadId;
156	int				m_numMessages;
157};
158
159void runTest (void)
160{
161	const int numIterations = 8;
162	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
163	{
164		Random							rnd				(iterNdx);
165		int								numBlocks		= rnd.getInt(2, 128);
166		int								blockSize		= rnd.getInt(1, 16);
167		int								numProducers	= rnd.getInt(1, 16);
168		int								numConsumers	= rnd.getInt(1, 16);
169		int								dataSize		= rnd.getInt(50, 200);
170		MessageBuffer					buffer			(blockSize, numBlocks);
171		vector<Producer*>				producers;
172		vector<Consumer*>				consumers;
173
174		for (int i = 0; i < numProducers; i++)
175			producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
176
177		for (int i = 0; i < numConsumers; i++)
178			consumers.push_back(new Consumer(buffer, numProducers));
179
180		// Start consumers.
181		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
182			(*i)->start();
183
184		// Start producers.
185		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
186			(*i)->start();
187
188		// Wait for producers.
189		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
190			(*i)->join();
191
192		// Write end messages for consumers.
193		const Message endMsg(0xffff, 0);
194		for (int i = 0; i < numConsumers; i++)
195			buffer.write(1, &endMsg);
196		buffer.flush();
197
198		// Wait for consumers.
199		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
200			(*i)->join();
201
202		// Verify payload sums.
203		deUint32 refSum = 0;
204		for (int i = 0; i < dataSize; i++)
205			refSum += (deUint32)(deUint16)i;
206
207		for (int i = 0; i < numProducers; i++)
208		{
209			deUint32 cmpSum = 0;
210			for (int j = 0; j < numConsumers; j++)
211				cmpSum += consumers[j]->getPayloadSum(i);
212			DE_TEST_ASSERT(refSum == cmpSum);
213		}
214
215		// Free resources.
216		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
217			delete *i;
218		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
219			delete *i;
220	}
221}
222
223} // BlockBufferBasicTest
224
225namespace BlockBufferCancelTest
226{
227
228class Producer : public Thread
229{
230public:
231	Producer (BlockBuffer<deUint8>* buffer, deUint32 seed)
232		: m_buffer	(buffer)
233		, m_seed	(seed)
234	{
235	}
236
237	void run (void)
238	{
239		deUint8	tmp[1024];
240		Random	rnd(m_seed);
241
242		for (;;)
243		{
244			int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
245
246			try
247			{
248				m_buffer->write(blockSize, &tmp[0]);
249
250				if (rnd.getBool())
251					m_buffer->flush();
252			}
253			catch (const BlockBuffer<deUint8>::CanceledException&)
254			{
255				break;
256			}
257		}
258	}
259
260private:
261	BlockBuffer<deUint8>*	m_buffer;
262	deUint32				m_seed;
263};
264
265class Consumer : public Thread
266{
267public:
268	Consumer (BlockBuffer<deUint8>* buffer, deUint32 seed)
269		: m_buffer	(buffer)
270		, m_seed	(seed)
271	{
272	}
273
274	void run (void)
275	{
276		deUint8	tmp[1024];
277		Random	rnd(m_seed);
278
279		for (;;)
280		{
281			int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
282
283			try
284			{
285				m_buffer->read(blockSize, &tmp[0]);
286			}
287			catch (const BlockBuffer<deUint8>::CanceledException&)
288			{
289				break;
290			}
291		}
292	}
293
294private:
295	BlockBuffer<deUint8>*	m_buffer;
296	deUint32				m_seed;
297};
298
299void runTest (void)
300{
301	BlockBuffer<deUint8>	buffer			(64, 16);
302	const int				numIterations	= 8;
303
304	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
305	{
306		Random				rnd				(deInt32Hash(iterNdx));
307		int					numThreads		= rnd.getInt(1, 16);
308		int					sleepMs			= rnd.getInt(1, 200);
309		vector<Thread*>		threads;
310
311		for (int i = 0; i < numThreads; i++)
312		{
313			if (rnd.getBool())
314				threads.push_back(new Consumer(&buffer, rnd.getUint32()));
315			else
316				threads.push_back(new Producer(&buffer, rnd.getUint32()));
317		}
318
319		// Start threads.
320		for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
321			(*i)->start();
322
323		// Sleep for a while.
324		deSleep(sleepMs);
325
326		// Cancel buffer.
327		buffer.cancel();
328
329		// Wait for threads to finish.
330		for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
331			(*i)->join();
332
333		// Reset buffer.
334		buffer.clear();
335
336		// Delete threads
337		for (vector<Thread*>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
338			delete *thread;
339	}
340}
341
342} // BlockBufferCancelTest
343
344void BlockBuffer_selfTest (void)
345{
346	BlockBufferBasicTest::runTest();
347	BlockBufferCancelTest::runTest();
348}
349
350} // de
351