1/*------------------------------------------------------------------------- 2 * drawElements Quality Program Execution Server 3 * --------------------------------------------- 4 * 5 * Copyright 2014 The Android Open Source Project 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 *//*! 20 * \file 21 * \brief Test Execution Server. 22 *//*--------------------------------------------------------------------*/ 23 24#include "xsExecutionServer.hpp" 25#include "deClock.h" 26 27#include <cstdio> 28 29using std::vector; 30using std::string; 31 32#if 1 33# define DBG_PRINT(X) printf X 34#else 35# define DBG_PRINT(X) 36#endif 37 38namespace xs 39{ 40 41inline bool MessageBuilder::isComplete (void) const 42{ 43 if (m_buffer.size() < MESSAGE_HEADER_SIZE) 44 return false; 45 else 46 return m_buffer.size() == getMessageSize(); 47} 48 49const deUint8* MessageBuilder::getMessageData (void) const 50{ 51 return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL; 52} 53 54size_t MessageBuilder::getMessageDataSize (void) const 55{ 56 DE_ASSERT(isComplete()); 57 return m_buffer.size() - MESSAGE_HEADER_SIZE; 58} 59 60void MessageBuilder::read (ByteBuffer& src) 61{ 62 // Try to get header. 63 if (m_buffer.size() < MESSAGE_HEADER_SIZE) 64 { 65 while (m_buffer.size() < MESSAGE_HEADER_SIZE && 66 src.getNumElements() > 0) 67 m_buffer.push_back(src.popBack()); 68 69 DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE); 70 71 if (m_buffer.size() == MESSAGE_HEADER_SIZE) 72 { 73 // Got whole header, parse it. 74 Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize); 75 } 76 } 77 78 if (m_buffer.size() >= MESSAGE_HEADER_SIZE) 79 { 80 // We have header. 81 size_t msgSize = getMessageSize(); 82 size_t numBytesLeft = msgSize - m_buffer.size(); 83 size_t numToRead = (size_t)de::min(src.getNumElements(), (int)numBytesLeft); 84 85 if (numToRead > 0) 86 { 87 int curBufPos = (int)m_buffer.size(); 88 m_buffer.resize(curBufPos+numToRead); 89 src.popBack(&m_buffer[curBufPos], (int)numToRead); 90 } 91 } 92} 93 94void MessageBuilder::clear (void) 95{ 96 m_buffer.clear(); 97 m_messageType = MESSAGETYPE_NONE; 98 m_messageSize = 0; 99} 100 101ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode) 102 : TcpServer (family, port) 103 , m_testDriver (testProcess) 104 , m_runMode (runMode) 105{ 106} 107 108ExecutionServer::~ExecutionServer (void) 109{ 110} 111 112TestDriver* ExecutionServer::acquireTestDriver (void) 113{ 114 if (!m_testDriverLock.tryLock()) 115 throw Error("Failed to acquire test driver"); 116 117 return &m_testDriver; 118} 119 120void ExecutionServer::releaseTestDriver (TestDriver* driver) 121{ 122 DE_ASSERT(&m_testDriver == driver); 123 DE_UNREF(driver); 124 m_testDriverLock.unlock(); 125} 126 127ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress) 128{ 129 printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort()); 130 return new ExecutionRequestHandler(this, socket); 131} 132 133void ExecutionServer::connectionDone (ConnectionHandler* handler) 134{ 135 if (m_runMode == RUNMODE_SINGLE_EXEC) 136 m_socket.close(); 137 138 TcpServer::connectionDone(handler); 139} 140 141ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket) 142 : ConnectionHandler (server, socket) 143 , m_execServer (server) 144 , m_testDriver (DE_NULL) 145 , m_bufferIn (RECV_BUFFER_SIZE) 146 , m_bufferOut (SEND_BUFFER_SIZE) 147 , m_run (false) 148 , m_sendRecvTmpBuf (SEND_RECV_TMP_BUFFER_SIZE) 149{ 150 // Set flags. 151 m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC); 152 153 // Init protocol keepalives. 154 initKeepAlives(); 155} 156 157ExecutionRequestHandler::~ExecutionRequestHandler (void) 158{ 159 if (m_testDriver) 160 m_execServer->releaseTestDriver(m_testDriver); 161} 162 163void ExecutionRequestHandler::handle (void) 164{ 165 DBG_PRINT(("ExecutionRequestHandler::handle()\n")); 166 167 try 168 { 169 // Process execution session. 170 processSession(); 171 } 172 catch (const std::exception& e) 173 { 174 printf("ExecutionRequestHandler::run(): %s\n", e.what()); 175 } 176 177 DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n")); 178 179 // Release test driver. 180 if (m_testDriver) 181 { 182 try 183 { 184 m_testDriver->reset(); 185 } 186 catch (...) 187 { 188 } 189 m_execServer->releaseTestDriver(m_testDriver); 190 m_testDriver = DE_NULL; 191 } 192 193 // Close connection. 194 if (m_socket->isConnected()) 195 m_socket->shutdown(); 196} 197 198void ExecutionRequestHandler::acquireTestDriver (void) 199{ 200 DE_ASSERT(!m_testDriver); 201 202 // Try to acquire test driver - may fail. 203 m_testDriver = m_execServer->acquireTestDriver(); 204 DE_ASSERT(m_testDriver); 205 m_testDriver->reset(); 206 207} 208 209void ExecutionRequestHandler::processSession (void) 210{ 211 m_run = true; 212 213 deUint64 lastIoTime = deGetMicroseconds(); 214 215 while (m_run) 216 { 217 bool anyIO = false; 218 219 // Read from socket to buffer. 220 anyIO = receive() || anyIO; 221 222 // Send bytes in buffer. 223 anyIO = send() || anyIO; 224 225 // Process incoming data. 226 if (m_bufferIn.getNumElements() > 0) 227 { 228 DE_ASSERT(!m_msgBuilder.isComplete()); 229 m_msgBuilder.read(m_bufferIn); 230 } 231 232 if (m_msgBuilder.isComplete()) 233 { 234 // Process message. 235 processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize()); 236 237 m_msgBuilder.clear(); 238 } 239 240 // Keepalives, anyone? 241 pollKeepAlives(); 242 243 // Poll test driver for IO. 244 if (m_testDriver) 245 anyIO = getTestDriver()->poll(m_bufferOut) || anyIO; 246 247 // If no IO happens in a reasonable amount of time, go to sleep. 248 { 249 deUint64 curTime = deGetMicroseconds(); 250 if (anyIO) 251 lastIoTime = curTime; 252 else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000) 253 deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while. 254 else 255 deYield(); // Just give other threads chance to run. 256 } 257 } 258} 259 260void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize) 261{ 262 switch (type) 263 { 264 case MESSAGETYPE_HELLO: 265 { 266 HelloMessage msg(data, dataSize); 267 DBG_PRINT(("HelloMessage: version = %d\n", msg.version)); 268 if (msg.version != PROTOCOL_VERSION) 269 throw ProtocolError("Unsupported protocol version"); 270 break; 271 } 272 273 case MESSAGETYPE_TEST: 274 { 275 TestMessage msg(data, dataSize); 276 DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str())); 277 break; 278 } 279 280 case MESSAGETYPE_KEEPALIVE: 281 { 282 KeepAliveMessage msg(data, dataSize); 283 DBG_PRINT(("KeepAliveMessage\n")); 284 keepAliveReceived(); 285 break; 286 } 287 288 case MESSAGETYPE_EXECUTE_BINARY: 289 { 290 ExecuteBinaryMessage msg(data, dataSize); 291 DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str())); 292 getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str()); 293 keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed. 294 break; 295 } 296 297 case MESSAGETYPE_STOP_EXECUTION: 298 { 299 StopExecutionMessage msg(data, dataSize); 300 DBG_PRINT(("StopExecutionMessage\n")); 301 getTestDriver()->stopProcess(); 302 break; 303 } 304 305 default: 306 throw ProtocolError("Unsupported message"); 307 } 308} 309 310void ExecutionRequestHandler::initKeepAlives (void) 311{ 312 deUint64 curTime = deGetMicroseconds(); 313 m_lastKeepAliveSent = curTime; 314 m_lastKeepAliveReceived = curTime; 315} 316 317void ExecutionRequestHandler::keepAliveReceived (void) 318{ 319 m_lastKeepAliveReceived = deGetMicroseconds(); 320} 321 322void ExecutionRequestHandler::pollKeepAlives (void) 323{ 324 deUint64 curTime = deGetMicroseconds(); 325 326 // Check that we've got keepalives in timely fashion. 327 if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000) 328 throw ProtocolError("Keepalive timeout occurred"); 329 330 // Send some? 331 if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 && 332 m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE) 333 { 334 vector<deUint8> buf; 335 KeepAliveMessage().write(buf); 336 m_bufferOut.pushFront(&buf[0], (int)buf.size()); 337 338 m_lastKeepAliveSent = deGetMicroseconds(); 339 } 340} 341 342bool ExecutionRequestHandler::receive (void) 343{ 344 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree()); 345 346 if (maxLen > 0) 347 { 348 size_t numRecv; 349 deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv); 350 351 if (result == DE_SOCKETRESULT_SUCCESS) 352 { 353 DE_ASSERT(numRecv > 0); 354 m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv); 355 return true; 356 } 357 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) 358 { 359 m_run = false; 360 return true; 361 } 362 else if (result == DE_SOCKETRESULT_WOULD_BLOCK) 363 return false; 364 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) 365 throw ConnectionError("Connection terminated"); 366 else 367 throw ConnectionError("receive() failed"); 368 } 369 else 370 return false; 371} 372 373bool ExecutionRequestHandler::send (void) 374{ 375 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements()); 376 377 if (maxLen > 0) 378 { 379 m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen); 380 381 size_t numSent; 382 deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent); 383 384 if (result == DE_SOCKETRESULT_SUCCESS) 385 { 386 DE_ASSERT(numSent > 0); 387 m_bufferOut.popBack((int)numSent); 388 return true; 389 } 390 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) 391 { 392 m_run = false; 393 return true; 394 } 395 else if (result == DE_SOCKETRESULT_WOULD_BLOCK) 396 return false; 397 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) 398 throw ConnectionError("Connection terminated"); 399 else 400 throw ConnectionError("send() failed"); 401 } 402 else 403 return false; 404} 405 406} // xs 407