1/*-------------------------------------------------------------------------
2 * drawElements Quality Program Test Executor
3 * ------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *      http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Tcp/Ip communication link.
22 *//*--------------------------------------------------------------------*/
23
24#include "xeTcpIpLink.hpp"
25#include "xsProtocol.hpp"
26#include "deClock.h"
27#include "deInt32.h"
28
29namespace xe
30{
31
32enum
33{
34	SEND_BUFFER_BLOCK_SIZE		= 1024,
35	SEND_BUFFER_NUM_BLOCKS		= 64
36};
37
38// Utilities for writing messages out.
39
40static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
41{
42	deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
43	xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44	dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45}
46
47static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
48{
49	writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50	dst.flush();
51}
52
53static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
54{
55	int		nameSize			= (int)strlen(name)		+ 1;
56	int		paramsSize			= (int)strlen(params)	+ 1;
57	int		workDirSize			= (int)strlen(workDir)	+ 1;
58	int		caseListSize		= (int)strlen(caseList)	+ 1;
59	int		totalSize			= xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
60
61	writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
62	dst.write(nameSize,		(const deUint8*)name);
63	dst.write(paramsSize,	(const deUint8*)params);
64	dst.write(workDirSize,	(const deUint8*)workDir);
65	dst.write(caseListSize,	(const deUint8*)caseList);
66	dst.flush();
67}
68
69static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
70{
71	writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
72	dst.flush();
73}
74
75// TcpIpLinkState
76
77TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
78	: m_state					(initialState)
79	, m_error					(initialErr)
80	, m_lastKeepaliveReceived	(0)
81	, m_stateChangedCallback	(DE_NULL)
82	, m_testLogDataCallback		(DE_NULL)
83	, m_infoLogDataCallback		(DE_NULL)
84	, m_userPtr					(DE_NULL)
85{
86}
87
88TcpIpLinkState::~TcpIpLinkState (void)
89{
90}
91
92CommLinkState TcpIpLinkState::getState (void) const
93{
94	de::ScopedLock lock(m_lock);
95
96	return m_state;
97}
98
99CommLinkState TcpIpLinkState::getState (std::string& error) const
100{
101	de::ScopedLock lock(m_lock);
102
103	error = m_error;
104	return m_state;
105}
106
107void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
108{
109	de::ScopedLock lock(m_lock);
110
111	m_stateChangedCallback		= stateChangedCallback;
112	m_testLogDataCallback		= testLogDataCallback;
113	m_infoLogDataCallback		= infoLogDataCallback;
114	m_userPtr					= userPtr;
115}
116
117void TcpIpLinkState::setState (CommLinkState state, const char* error)
118{
119	CommLink::StateChangedFunc	callback	= DE_NULL;
120	void*						userPtr		= DE_NULL;
121
122	{
123		de::ScopedLock lock(m_lock);
124
125		m_state = state;
126		m_error	= error;
127
128		callback	= m_stateChangedCallback;
129		userPtr		= m_userPtr;
130	}
131
132	if (callback)
133		callback(userPtr, state, error);
134}
135
136void TcpIpLinkState::onTestLogData (const deUint8* bytes, int numBytes) const
137{
138	CommLink::LogDataFunc	callback	= DE_NULL;
139	void*					userPtr		= DE_NULL;
140
141	m_lock.lock();
142	callback	= m_testLogDataCallback;
143	userPtr		= m_userPtr;
144	m_lock.unlock();
145
146	if (callback)
147		callback(userPtr, bytes, numBytes);
148}
149
150void TcpIpLinkState::onInfoLogData (const deUint8* bytes, int numBytes) const
151{
152	CommLink::LogDataFunc	callback	= DE_NULL;
153	void*					userPtr		= DE_NULL;
154
155	m_lock.lock();
156	callback	= m_infoLogDataCallback;
157	userPtr		= m_userPtr;
158	m_lock.unlock();
159
160	if (callback)
161		callback(userPtr, bytes, numBytes);
162}
163
164void TcpIpLinkState::onKeepaliveReceived (void)
165{
166	de::ScopedLock lock(m_lock);
167	m_lastKeepaliveReceived = deGetMicroseconds();
168}
169
170deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
171{
172	de::ScopedLock lock(m_lock);
173	return m_lastKeepaliveReceived;
174}
175
176// TcpIpSendThread
177
178TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
179	: m_socket		(socket)
180	, m_state		(state)
181	, m_buffer		(SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
182	, m_isRunning	(false)
183{
184}
185
186TcpIpSendThread::~TcpIpSendThread (void)
187{
188}
189
190void TcpIpSendThread::start (void)
191{
192	DE_ASSERT(!m_isRunning);
193
194	// Reset state.
195	m_buffer.clear();
196	m_isRunning = true;
197
198	de::Thread::start();
199}
200
201void TcpIpSendThread::run (void)
202{
203	try
204	{
205		deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
206
207		while (!m_buffer.isCanceled())
208		{
209			int				numToSend	= 0;
210			int				numSent		= 0;
211			deSocketResult	result		= DE_SOCKETRESULT_LAST;
212
213			try
214			{
215				// Wait for single byte and then try to read more.
216				m_buffer.read(1, &buf[0]);
217				numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
218			}
219			catch (const de::BlockBuffer<deUint8>::CanceledException&)
220			{
221				// Handled in loop condition.
222			}
223
224			while (numSent < numToSend)
225			{
226				result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
227
228				if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
229					XE_FAIL("Connection closed");
230				else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
231					XE_FAIL("Connection terminated");
232				else if (result == DE_SOCKETRESULT_ERROR)
233					XE_FAIL("Socket error");
234				else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
235				{
236					// \note Socket should not be in non-blocking mode.
237					DE_ASSERT(numSent <= 0);
238					deYield();
239				}
240				else
241					DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
242			}
243		}
244	}
245	catch (const std::exception& e)
246	{
247		m_state.setState(COMMLINKSTATE_ERROR, e.what());
248	}
249}
250
251void TcpIpSendThread::stop (void)
252{
253	if (m_isRunning)
254	{
255		m_buffer.cancel();
256		join();
257		m_isRunning = false;
258	}
259}
260
261// TcpIpRecvThread
262
263TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
264	: m_socket		(socket)
265	, m_state		(state)
266	, m_curMsgPos	(0)
267	, m_isRunning	(false)
268{
269}
270
271TcpIpRecvThread::~TcpIpRecvThread (void)
272{
273}
274
275void TcpIpRecvThread::start (void)
276{
277	DE_ASSERT(!m_isRunning);
278
279	// Reset state.
280	m_curMsgPos = 0;
281	m_isRunning = true;
282
283	de::Thread::start();
284}
285
286void TcpIpRecvThread::run (void)
287{
288	try
289	{
290		for (;;)
291		{
292			bool				hasHeader		= m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
293			bool				hasPayload		= false;
294			int					messageSize		= 0;
295			xs::MessageType		messageType		= (xs::MessageType)0;
296
297			if (hasHeader)
298			{
299				xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
300				hasPayload = m_curMsgPos >= messageSize;
301			}
302
303			if (hasPayload)
304			{
305				// Process message.
306				handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
307				m_curMsgPos = 0;
308			}
309			else
310			{
311				// Try to receive missing bytes.
312				int					curSize			= hasHeader ? messageSize : xs::MESSAGE_HEADER_SIZE;
313				int					bytesToRecv		= curSize-m_curMsgPos;
314				int					numRecv			= 0;
315				deSocketResult		result			= DE_SOCKETRESULT_LAST;
316
317				if ((int)m_curMsgBuf.size() < curSize)
318					m_curMsgBuf.resize(curSize);
319
320				result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
321
322				if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
323					XE_FAIL("Connection closed");
324				else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
325					XE_FAIL("Connection terminated");
326				else if (result == DE_SOCKETRESULT_ERROR)
327					XE_FAIL("Socket error");
328				else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
329				{
330					// \note Socket should not be in non-blocking mode.
331					DE_ASSERT(numRecv <= 0);
332					deYield();
333				}
334				else
335				{
336					DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
337					DE_ASSERT(numRecv <= bytesToRecv);
338					m_curMsgPos += numRecv;
339					// Continue receiving bytes / handle message in next iter.
340				}
341			}
342		}
343	}
344	catch (const std::exception& e)
345	{
346		m_state.setState(COMMLINKSTATE_ERROR, e.what());
347	}
348}
349
350void TcpIpRecvThread::stop (void)
351{
352	if (m_isRunning)
353	{
354		// \note Socket must be closed before terminating receive thread.
355		XE_CHECK(!m_socket.isReceiveOpen());
356
357		join();
358		m_isRunning = false;
359	}
360}
361
362void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, int dataSize)
363{
364	switch (messageType)
365	{
366		case xs::MESSAGETYPE_KEEPALIVE:
367			m_state.onKeepaliveReceived();
368			break;
369
370		case xs::MESSAGETYPE_PROCESS_STARTED:
371			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
372			m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
373			break;
374
375		case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
376		{
377			xs::ProcessLaunchFailedMessage msg(data, dataSize);
378			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
379			m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
380			break;
381		}
382
383		case xs::MESSAGETYPE_PROCESS_FINISHED:
384		{
385			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
386			xs::ProcessFinishedMessage msg(data, dataSize);
387			m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
388			DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
389			break;
390		}
391
392		case xs::MESSAGETYPE_PROCESS_LOG_DATA:
393		case xs::MESSAGETYPE_INFO:
394			// Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
395			if (data[dataSize-1] == 0)
396				dataSize -= 1;
397
398			if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
399			{
400				XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
401				m_state.onTestLogData(&data[0], dataSize);
402			}
403			else
404				m_state.onInfoLogData(&data[0], dataSize);
405			break;
406
407		default:
408			XE_FAIL("Unknown message");
409	}
410}
411
412// TcpIpLink
413
414TcpIpLink::TcpIpLink (void)
415	: m_state			(COMMLINKSTATE_ERROR, "Not connected")
416	, m_sendThread		(m_socket, m_state)
417	, m_recvThread		(m_socket, m_state)
418	, m_keepaliveTimer	(DE_NULL)
419{
420	m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
421	XE_CHECK(m_keepaliveTimer);
422}
423
424TcpIpLink::~TcpIpLink (void)
425{
426	try
427	{
428		closeConnection();
429	}
430	catch (...)
431	{
432		// Can't do much except to ignore error.
433	}
434	deTimer_destroy(m_keepaliveTimer);
435}
436
437void TcpIpLink::closeConnection (void)
438{
439	{
440		deSocketState state = m_socket.getState();
441		if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
442			m_socket.shutdown();
443	}
444
445	if (deTimer_isActive(m_keepaliveTimer))
446		deTimer_disable(m_keepaliveTimer);
447
448	if (m_sendThread.isRunning())
449		m_sendThread.stop();
450
451	if (m_recvThread.isRunning())
452		m_recvThread.stop();
453
454	if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
455		m_socket.close();
456}
457
458void TcpIpLink::connect (const de::SocketAddress& address)
459{
460	XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
461	XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
462	XE_CHECK(!m_sendThread.isRunning());
463	XE_CHECK(!m_recvThread.isRunning());
464
465	m_socket.connect(address);
466
467	try
468	{
469		// Clear error and set state to ready.
470		m_state.setState(COMMLINKSTATE_READY, "");
471		m_state.onKeepaliveReceived();
472
473		// Launch threads.
474		m_sendThread.start();
475		m_recvThread.start();
476
477		XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
478	}
479	catch (const std::exception& e)
480	{
481		closeConnection();
482		m_state.setState(COMMLINKSTATE_ERROR, e.what());
483	}
484}
485
486void TcpIpLink::disconnect (void)
487{
488	try
489	{
490		closeConnection();
491		m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
492	}
493	catch (const std::exception& e)
494	{
495		m_state.setState(COMMLINKSTATE_ERROR, e.what());
496	}
497}
498
499void TcpIpLink::reset (void)
500{
501	// \note Just clears error state if we are connected.
502	if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
503	{
504		m_state.setState(COMMLINKSTATE_READY, "");
505
506		// \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
507	}
508	else
509		disconnect(); // Abnormal state/usage. Disconnect socket.
510}
511
512void TcpIpLink::keepaliveTimerCallback (void* ptr)
513{
514	TcpIpLink*	link			= static_cast<TcpIpLink*>(ptr);
515	deUint64	lastKeepalive	= link->m_state.getLastKeepaliveRecevied();
516	deUint64	curTime			= deGetMicroseconds();
517
518	// Check for timeout.
519	if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
520		link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
521
522	// Enqueue new keepalive.
523	try
524	{
525		writeKeepalive(link->m_sendThread.getBuffer());
526	}
527	catch (const de::BlockBuffer<deUint8>::CanceledException&)
528	{
529		// Ignore. Can happen in connection teardown.
530	}
531}
532
533CommLinkState TcpIpLink::getState (void) const
534{
535	return m_state.getState();
536}
537
538CommLinkState TcpIpLink::getState (std::string& message) const
539{
540	return m_state.getState(message);
541}
542
543void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
544{
545	m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
546}
547
548void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
549{
550	XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
551
552	m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
553	writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
554}
555
556void TcpIpLink::stopTestProcess (void)
557{
558	XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
559	writeStopExecution(m_sendThread.getBuffer());
560}
561
562} // xe
563