package org.testng.remote.strprotocol; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; import java.net.ConnectException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import org.testng.TestNGException; import static org.testng.remote.RemoteTestNG.isVerbose; abstract public class BaseMessageSender implements IMessageSender { private boolean m_debug = false; protected Socket m_clientSocket; private String m_host; private int m_port; protected final Object m_ackLock = new Object(); private boolean m_requestStopReceiver; /** Outgoing message stream. */ protected OutputStream m_outStream; /** Used to send ACK and STOP */ private PrintWriter m_outWriter; /** Incoming message stream. */ protected volatile InputStream m_inStream; /** Used to receive ACK and STOP */ protected volatile BufferedReader m_inReader; private ReaderThread m_readerThread; private boolean m_ack; // protected InputStream m_receiverInputStream; public BaseMessageSender(String host, int port, boolean ack) { m_host = host; m_port = port; m_ack = ack; } /** * Starts the connection. * * @throws TestNGException if an exception occurred while establishing the connection */ @Override public void connect() throws IOException { p("Waiting for Eclipse client on " + m_host + ":" + m_port); while (true) { try { m_clientSocket = new Socket(m_host, m_port); p("Received a connection from Eclipse on " + m_host + ":" + m_port); // Output streams m_outStream = m_clientSocket.getOutputStream(); m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream))); // Input streams m_inStream = m_clientSocket.getInputStream(); try { m_inReader = new BufferedReader(new InputStreamReader(m_inStream, "UTF-8")); //$NON-NLS-1$ } catch(UnsupportedEncodingException ueex) { // Should never happen m_inReader = new BufferedReader(new InputStreamReader(m_inStream)); } p("Connection established, starting reader thread"); m_readerThread = new ReaderThread(); m_readerThread.start(); return; } catch(ConnectException ex) { // ignore and retry try { Thread.sleep(4000); } catch(InterruptedException handled) { Thread.currentThread().interrupt(); } } } } private void sendAdminMessage(String message) { m_outWriter.println(message); m_outWriter.flush(); } private int m_serial = 0; @Override public void sendAck() { p("Sending ACK " + m_serial); // Note: adding the serial at the end of this message causes a lock up if interacting // with TestNG 5.14 and older (reported by JetBrains). The following git commit: // 5730bdfb33ec7a8bf4104852cd4a5f2875ba8267 // changed equals() to startsWith(). // It's ok to add this serial back for debugging, but don't commit it until JetBrains // confirms they no longer need backward compatibility with 5.14. sendAdminMessage(MessageHelper.ACK_MSG); // + m_serial++); } @Override public void sendStop() { sendAdminMessage(MessageHelper.STOP_MSG); } @Override public void initReceiver() throws SocketTimeoutException { if (m_inStream != null) { p("Receiver already initialized"); } ServerSocket serverSocket = null; try { p("initReceiver on port " + m_port); serverSocket = new ServerSocket(m_port); serverSocket.setSoTimeout(5000); Socket socket = null; while (!m_requestStopReceiver) { try { if (m_debug) { p("polling the client connection"); } socket = serverSocket.accept(); // break the loop once the first client connected break; } catch (IOException ioe) { try { Thread.sleep(100L); } catch (InterruptedException ie) { // Do nothing. } } } if (socket != null) { m_inStream = socket.getInputStream(); m_inReader = new BufferedReader(new InputStreamReader(m_inStream)); m_outStream = socket.getOutputStream(); m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream)); } } catch(SocketTimeoutException ste) { throw ste; } catch (IOException ioe) { closeQuietly(serverSocket); } } public void stopReceiver() { m_requestStopReceiver = true; } @Override public void shutDown() { closeQuietly(m_outStream); m_outStream = null; if (null != m_readerThread) { m_readerThread.interrupt(); } closeQuietly(m_inReader); m_inReader = null; closeQuietly(m_clientSocket); m_clientSocket = null; } private void closeQuietly(Closeable c) { if (c != null) { try { c.close(); } catch (IOException e) { if (m_debug) { e.printStackTrace(); } } } } private String m_latestAck; protected void waitForAck() { if (m_ack) { try { p("Message sent, waiting for ACK..."); synchronized(m_ackLock) { m_ackLock.wait(); } p("... ACK received:" + m_latestAck); } catch(InterruptedException handled) { Thread.currentThread().interrupt(); } } } private static void p(String msg) { if (isVerbose()) { System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$ } } /** * Reader thread that processes messages from the client. */ private class ReaderThread extends Thread { public ReaderThread() { super("ReaderThread"); //$NON-NLS-1$ } @Override public void run() { try { p("ReaderThread waiting for an admin message"); String message = m_inReader.readLine(); p("ReaderThread received admin message:" + message); while (message != null) { if (m_debug) { p("Admin message:" + message); //$NON-NLS-1$ } boolean acknowledge = message.startsWith(MessageHelper.ACK_MSG); boolean stop = MessageHelper.STOP_MSG.equals(message); if(acknowledge || stop) { if (acknowledge) { p("Received ACK:" + message); m_latestAck = message; } synchronized(m_ackLock) { m_ackLock.notifyAll(); } if (stop) { break; } } else { p("Received unknown message: '" + message + "'"); } message = m_inReader != null ? m_inReader.readLine() : null; } // while((m_reader != null) && (message = m_reader.readLine()) != null) { // if (m_debug) { // p("Admin message:" + message); //$NON-NLS-1$ // } // boolean acknowledge = MessageHelper.ACK_MSG.equals(message); // boolean stop = MessageHelper.STOP_MSG.equals(message); // if(acknowledge || stop) { // synchronized(m_lock) { // m_lock.notifyAll(); // } // if (stop) { // break; // } // } // } } catch(IOException ioe) { if (isVerbose()) { ioe.printStackTrace(); } } } } }