BaseMessageSender.java revision 3a1d3383627fb6be94345424990cdc2c870d9a19
1package org.testng.remote.strprotocol; 2 3import org.testng.TestNGException; 4import org.testng.remote.RemoteTestNG; 5 6import java.io.BufferedReader; 7import java.io.BufferedWriter; 8import java.io.IOException; 9import java.io.InputStream; 10import java.io.InputStreamReader; 11import java.io.OutputStream; 12import java.io.OutputStreamWriter; 13import java.io.PrintWriter; 14import java.io.UnsupportedEncodingException; 15import java.net.ConnectException; 16import java.net.ServerSocket; 17import java.net.Socket; 18import java.net.SocketTimeoutException; 19 20abstract public class BaseMessageSender implements IMessageSender { 21 private boolean m_debug = false; 22 protected Socket m_clientSocket; 23 private String m_host; 24 private int m_port; 25 protected Object m_ackLock = new Object(); 26 27 /** Outgoing message stream. */ 28 protected OutputStream m_outStream; 29 /** Used to send ACK and STOP */ 30 private PrintWriter m_outWriter; 31 32 /** Incoming message stream. */ 33 protected volatile InputStream m_inStream; 34 /** Used to receive ACK and STOP */ 35 protected volatile BufferedReader m_inReader; 36 37 private ReaderThread m_readerThread; 38 private boolean m_ack; 39// protected InputStream m_receiverInputStream; 40 41 public BaseMessageSender(String host, int port, boolean ack) { 42 m_host = host; 43 m_port = port; 44 m_ack = ack; 45 } 46 47 /** 48 * Starts the connection. 49 * 50 * @return <TT>true</TT> if the connection was successful, <TT>false</TT> otherwise 51 * @throws TestNGException if an exception occurred while establishing the connection 52 */ 53 @Override 54 public void connect() throws IOException { 55 p("Waiting for Eclipse client on " + m_host + ":" + m_port); 56 while (true) { 57 try { 58 m_clientSocket = new Socket(m_host, m_port); 59 p("Received a connection from Eclipse on " + m_host + ":" + m_port); 60 61 // Output streams 62 m_outStream = m_clientSocket.getOutputStream(); 63 m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream))); 64 65 // Input streams 66 m_inStream = m_clientSocket.getInputStream(); 67 try { 68 m_inReader = new BufferedReader(new InputStreamReader(m_inStream, 69 "UTF-8")); //$NON-NLS-1$ 70 } 71 catch(UnsupportedEncodingException ueex) { 72 // Should never happen 73 m_inReader = new BufferedReader(new InputStreamReader(m_inStream)); 74 } 75 76 p("Connection established, starting reader thread"); 77 m_readerThread = new ReaderThread(); 78 m_readerThread.start(); 79 return; 80 } 81 catch(ConnectException ex) { 82 // ignore and retry 83 try { 84 Thread.sleep(4000); 85 } 86 catch(InterruptedException e) { 87 ; 88 } 89 } 90 } 91 } 92 93 private void sendAdminMessage(String message) { 94 m_outWriter.println(message); 95 m_outWriter.flush(); 96 } 97 98 private int m_serial = 0; 99 100 @Override 101 public void sendAck() { 102 p("Sending ACK " + m_serial); 103 sendAdminMessage(MessageHelper.ACK_MSG + m_serial++); 104 } 105 106 @Override 107 public void sendStop() { 108 sendAdminMessage(MessageHelper.STOP_MSG); 109 } 110 111 @Override 112 public void initReceiver() throws SocketTimeoutException { 113 if (m_inStream != null) { 114 p("Receiver already initialized"); 115 } 116 ServerSocket serverSocket; 117 try { 118 p("initReceiver on port " + m_port); 119 serverSocket = new ServerSocket(m_port); 120 serverSocket.setSoTimeout(5000); 121 122 while (true) { 123 try { 124 Socket socket = serverSocket.accept(); 125 m_inStream = socket.getInputStream(); 126 m_inReader = new BufferedReader(new InputStreamReader(m_inStream)); 127 m_outStream = socket.getOutputStream(); 128 m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream)); 129 130 break; 131 } 132 catch (IOException ioe) { 133 try { 134 Thread.sleep(100L); 135 } 136 catch (InterruptedException ie) { 137 // Do nothing. 138 } 139 } 140 } 141 } 142 catch(SocketTimeoutException ste) { 143 throw ste; 144 } 145 catch (IOException ioe) { 146 // TODO Auto-generated catch block 147 ioe.printStackTrace(); 148 } 149 } 150 151 @Override 152 public void shutDown() { 153 if(null != m_outStream) { 154 try { 155 m_outStream.close(); 156 } 157 catch(IOException ex) { 158 // ignore 159 } 160 m_outStream = null; 161 } 162 163 try { 164 if(null != m_readerThread) { 165 m_readerThread.interrupt(); 166 } 167 168 if(null != m_inReader) { 169 m_inReader.close(); 170 m_inReader = null; 171 } 172 } 173 catch(IOException e) { 174 e.printStackTrace(); 175 } 176 177 try { 178 if(null != m_clientSocket) { 179 m_clientSocket.close(); 180 m_clientSocket = null; 181 } 182 } 183 catch(IOException e) { 184 if(m_debug) { 185 e.printStackTrace(); 186 } 187 } 188 } 189 190 private String m_latestAck; 191 192 protected void waitForAck() { 193 if (m_ack) { 194 try { 195 p("Message sent, waiting for ACK..."); 196 synchronized(m_ackLock) { 197 m_ackLock.wait(); 198 } 199 p("... ACK received:" + m_latestAck); 200 } 201 catch(InterruptedException e) { 202 } 203 } 204 } 205 206 private static void p(String msg) { 207 if (RemoteTestNG.isVerbose()) { 208 System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$ 209 } 210 } 211 212 /** 213 * Reader thread that processes messages from the client. 214 */ 215 private class ReaderThread extends Thread { 216 217 public ReaderThread() { 218 super("ReaderThread"); //$NON-NLS-1$ 219 } 220 221 @Override 222 public void run() { 223 try { 224 p("ReaderThread waiting for an admin message"); 225 String message = m_inReader.readLine(); 226 p("ReaderThread received admin message:" + message); 227 while (message != null) { 228 if (m_debug) { 229 p("Admin message:" + message); //$NON-NLS-1$ 230 } 231 boolean acknowledge = message.startsWith(MessageHelper.ACK_MSG); 232 boolean stop = MessageHelper.STOP_MSG.equals(message); 233 if(acknowledge || stop) { 234 if (acknowledge) { 235 p("Received ACK:" + message); 236 m_latestAck = message; 237 } 238 synchronized(m_ackLock) { 239 m_ackLock.notifyAll(); 240 } 241 if (stop) { 242 break; 243 } 244 } else { 245 p("Received unknown message: '" + message + "'"); 246 } 247 message = m_inReader != null ? m_inReader.readLine() : null; 248 } 249// while((m_reader != null) && (message = m_reader.readLine()) != null) { 250// if (m_debug) { 251// p("Admin message:" + message); //$NON-NLS-1$ 252// } 253// boolean acknowledge = MessageHelper.ACK_MSG.equals(message); 254// boolean stop = MessageHelper.STOP_MSG.equals(message); 255// if(acknowledge || stop) { 256// synchronized(m_lock) { 257// m_lock.notifyAll(); 258// } 259// if (stop) { 260// break; 261// } 262// } 263// } 264 } 265 catch(IOException ioe) { 266 if (RemoteTestNG.isVerbose()) { 267 ioe.printStackTrace(); 268 } 269 } 270 } 271 } 272} 273