1/*-------------------------------------------------------------------------
2 * drawElements Quality Program Execution Server
3 * ---------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *      http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Test Execution Server.
22 *//*--------------------------------------------------------------------*/
23
24#include "xsExecutionServer.hpp"
25#include "deClock.h"
26
27#include <cstdio>
28
29using std::vector;
30using std::string;
31
32#if 1
33#	define DBG_PRINT(X) printf X
34#else
35#	define DBG_PRINT(X)
36#endif
37
38namespace xs
39{
40
41inline bool MessageBuilder::isComplete (void) const
42{
43	if (m_buffer.size() < MESSAGE_HEADER_SIZE)
44		return false;
45	else
46		return m_buffer.size() == getMessageSize();
47}
48
49const deUint8* MessageBuilder::getMessageData (void) const
50{
51	return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
52}
53
54size_t MessageBuilder::getMessageDataSize (void) const
55{
56	DE_ASSERT(isComplete());
57	return m_buffer.size() - MESSAGE_HEADER_SIZE;
58}
59
60void MessageBuilder::read (ByteBuffer& src)
61{
62	// Try to get header.
63	if (m_buffer.size() < MESSAGE_HEADER_SIZE)
64	{
65		while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
66			   src.getNumElements() > 0)
67			m_buffer.push_back(src.popBack());
68
69		DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
70
71		if (m_buffer.size() == MESSAGE_HEADER_SIZE)
72		{
73			// Got whole header, parse it.
74			Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
75		}
76	}
77
78	if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
79	{
80		// We have header.
81		size_t msgSize			= getMessageSize();
82		size_t numBytesLeft		= msgSize - m_buffer.size();
83		size_t numToRead		= (size_t)de::min(src.getNumElements(), (int)numBytesLeft);
84
85		if (numToRead > 0)
86		{
87			int curBufPos = (int)m_buffer.size();
88			m_buffer.resize(curBufPos+numToRead);
89			src.popBack(&m_buffer[curBufPos], (int)numToRead);
90		}
91	}
92}
93
94void MessageBuilder::clear (void)
95{
96	m_buffer.clear();
97	m_messageType	= MESSAGETYPE_NONE;
98	m_messageSize	= 0;
99}
100
101ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
102	: TcpServer		(family, port)
103	, m_testDriver	(testProcess)
104	, m_runMode		(runMode)
105{
106}
107
108ExecutionServer::~ExecutionServer (void)
109{
110}
111
112TestDriver* ExecutionServer::acquireTestDriver (void)
113{
114	if (!m_testDriverLock.tryLock())
115		throw Error("Failed to acquire test driver");
116
117	return &m_testDriver;
118}
119
120void ExecutionServer::releaseTestDriver (TestDriver* driver)
121{
122	DE_ASSERT(&m_testDriver == driver);
123	DE_UNREF(driver);
124	m_testDriverLock.unlock();
125}
126
127ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
128{
129	printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
130	return new ExecutionRequestHandler(this, socket);
131}
132
133void ExecutionServer::connectionDone (ConnectionHandler* handler)
134{
135	if (m_runMode == RUNMODE_SINGLE_EXEC)
136		m_socket.close();
137
138	TcpServer::connectionDone(handler);
139}
140
141ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
142	: ConnectionHandler	(server, socket)
143	, m_execServer		(server)
144	, m_testDriver		(DE_NULL)
145	, m_bufferIn		(RECV_BUFFER_SIZE)
146	, m_bufferOut		(SEND_BUFFER_SIZE)
147	, m_run				(false)
148	, m_sendRecvTmpBuf	(SEND_RECV_TMP_BUFFER_SIZE)
149{
150	// Set flags.
151	m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);
152
153	// Init protocol keepalives.
154	initKeepAlives();
155}
156
157ExecutionRequestHandler::~ExecutionRequestHandler (void)
158{
159	if (m_testDriver)
160		m_execServer->releaseTestDriver(m_testDriver);
161}
162
163void ExecutionRequestHandler::handle (void)
164{
165	DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
166
167	try
168	{
169		// Process execution session.
170		processSession();
171	}
172	catch (const std::exception& e)
173	{
174		printf("ExecutionRequestHandler::run(): %s\n", e.what());
175	}
176
177	DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
178
179	// Release test driver.
180	if (m_testDriver)
181	{
182		try
183		{
184			m_testDriver->reset();
185		}
186		catch (...)
187		{
188		}
189		m_execServer->releaseTestDriver(m_testDriver);
190		m_testDriver = DE_NULL;
191	}
192
193	// Close connection.
194	if (m_socket->isConnected())
195		m_socket->shutdown();
196}
197
198void ExecutionRequestHandler::acquireTestDriver (void)
199{
200	DE_ASSERT(!m_testDriver);
201
202	// Try to acquire test driver - may fail.
203	m_testDriver = m_execServer->acquireTestDriver();
204	DE_ASSERT(m_testDriver);
205	m_testDriver->reset();
206
207}
208
209void ExecutionRequestHandler::processSession (void)
210{
211	m_run = true;
212
213	deUint64 lastIoTime = deGetMicroseconds();
214
215	while (m_run)
216	{
217		bool anyIO = false;
218
219		// Read from socket to buffer.
220		anyIO = receive() || anyIO;
221
222		// Send bytes in buffer.
223		anyIO = send() || anyIO;
224
225		// Process incoming data.
226		if (m_bufferIn.getNumElements() > 0)
227		{
228			DE_ASSERT(!m_msgBuilder.isComplete());
229			m_msgBuilder.read(m_bufferIn);
230		}
231
232		if (m_msgBuilder.isComplete())
233		{
234			// Process message.
235			processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());
236
237			m_msgBuilder.clear();
238		}
239
240		// Keepalives, anyone?
241		pollKeepAlives();
242
243		// Poll test driver for IO.
244		if (m_testDriver)
245			anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
246
247		// If no IO happens in a reasonable amount of time, go to sleep.
248		{
249			deUint64 curTime = deGetMicroseconds();
250			if (anyIO)
251				lastIoTime = curTime;
252			else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
253				deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
254			else
255				deYield(); // Just give other threads chance to run.
256		}
257	}
258}
259
260void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize)
261{
262	switch (type)
263	{
264		case MESSAGETYPE_HELLO:
265		{
266			HelloMessage msg(data, dataSize);
267			DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
268			if (msg.version != PROTOCOL_VERSION)
269				throw ProtocolError("Unsupported protocol version");
270			break;
271		}
272
273		case MESSAGETYPE_TEST:
274		{
275			TestMessage msg(data, dataSize);
276			DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
277			break;
278		}
279
280		case MESSAGETYPE_KEEPALIVE:
281		{
282			KeepAliveMessage msg(data, dataSize);
283			DBG_PRINT(("KeepAliveMessage\n"));
284			keepAliveReceived();
285			break;
286		}
287
288		case MESSAGETYPE_EXECUTE_BINARY:
289		{
290			ExecuteBinaryMessage msg(data, dataSize);
291			DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
292			getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
293			keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
294			break;
295		}
296
297		case MESSAGETYPE_STOP_EXECUTION:
298		{
299			StopExecutionMessage msg(data, dataSize);
300			DBG_PRINT(("StopExecutionMessage\n"));
301			getTestDriver()->stopProcess();
302			break;
303		}
304
305		default:
306			throw ProtocolError("Unsupported message");
307	}
308}
309
310void ExecutionRequestHandler::initKeepAlives (void)
311{
312	deUint64 curTime = deGetMicroseconds();
313	m_lastKeepAliveSent		= curTime;
314	m_lastKeepAliveReceived	= curTime;
315}
316
317void ExecutionRequestHandler::keepAliveReceived (void)
318{
319	m_lastKeepAliveReceived = deGetMicroseconds();
320}
321
322void ExecutionRequestHandler::pollKeepAlives (void)
323{
324	deUint64 curTime = deGetMicroseconds();
325
326	// Check that we've got keepalives in timely fashion.
327	if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
328		throw ProtocolError("Keepalive timeout occurred");
329
330	// Send some?
331	if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
332		m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
333	{
334		vector<deUint8> buf;
335		KeepAliveMessage().write(buf);
336		m_bufferOut.pushFront(&buf[0], (int)buf.size());
337
338		m_lastKeepAliveSent = deGetMicroseconds();
339	}
340}
341
342bool ExecutionRequestHandler::receive (void)
343{
344	size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());
345
346	if (maxLen > 0)
347	{
348		size_t			numRecv;
349		deSocketResult	result	= m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
350
351		if (result == DE_SOCKETRESULT_SUCCESS)
352		{
353			DE_ASSERT(numRecv > 0);
354			m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
355			return true;
356		}
357		else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
358		{
359			m_run = false;
360			return true;
361		}
362		else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
363			return false;
364		else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
365			throw ConnectionError("Connection terminated");
366		else
367			throw ConnectionError("receive() failed");
368	}
369	else
370		return false;
371}
372
373bool ExecutionRequestHandler::send (void)
374{
375	size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());
376
377	if (maxLen > 0)
378	{
379		m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);
380
381		size_t			numSent;
382		deSocketResult	result	= m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
383
384		if (result == DE_SOCKETRESULT_SUCCESS)
385		{
386			DE_ASSERT(numSent > 0);
387			m_bufferOut.popBack((int)numSent);
388			return true;
389		}
390		else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
391		{
392			m_run = false;
393			return true;
394		}
395		else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
396			return false;
397		else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
398			throw ConnectionError("Connection terminated");
399		else
400			throw ConnectionError("send() failed");
401	}
402	else
403		return false;
404}
405
406} // xs
407