BaseMessageSender.java revision bf3a3b055a632c534ac856aeb97615167e90e72c
1package org.testng.remote.strprotocol; 2 3import com.sun.xml.internal.ws.wsdl.parser.InaccessibleWSDLException; 4import org.testng.TestNGException; 5import org.testng.remote.RemoteTestNG; 6 7import java.io.BufferedReader; 8import java.io.BufferedWriter; 9import java.io.IOException; 10import java.io.InputStream; 11import java.io.InputStreamReader; 12import java.io.OutputStream; 13import java.io.OutputStreamWriter; 14import java.io.PrintWriter; 15import java.io.UnsupportedEncodingException; 16import java.net.ConnectException; 17import java.net.ServerSocket; 18import java.net.Socket; 19import java.net.SocketTimeoutException; 20 21abstract public class BaseMessageSender implements IMessageSender { 22 private boolean m_debug = false; 23 protected Socket m_clientSocket; 24 private String m_host; 25 private int m_port; 26 protected Object m_ackLock = new Object(); 27 28 /** Outgoing message stream. */ 29 protected OutputStream m_outStream; 30 /** Used to send ACK and STOP */ 31 private PrintWriter m_outWriter; 32 33 /** Incoming message stream. */ 34 protected volatile InputStream m_inStream; 35 /** Used to receive ACK and STOP */ 36 protected volatile BufferedReader m_inReader; 37 38 private ReaderThread m_readerThread; 39 private boolean m_ack; 40// protected InputStream m_receiverInputStream; 41 42 public BaseMessageSender(String host, int port, boolean ack) { 43 m_host = host; 44 m_port = port; 45 m_ack = ack; 46 } 47 48 /** 49 * Starts the connection. 50 * 51 * @return <TT>true</TT> if the connection was successful, <TT>false</TT> otherwise 52 * @throws TestNGException if an exception occurred while establishing the connection 53 */ 54 @Override 55 public void connect() throws IOException { 56 p("Waiting for Eclipse client on " + m_host + ":" + m_port); 57 while (true) { 58 try { 59 m_clientSocket = new Socket(m_host, m_port); 60 p("Received a connection from Eclipse on " + m_host + ":" + m_port); 61 62 // Output streams 63 m_outStream = m_clientSocket.getOutputStream(); 64 m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream))); 65 66 // Input streams 67 m_inStream = m_clientSocket.getInputStream(); 68 try { 69 m_inReader = new BufferedReader(new InputStreamReader(m_inStream, 70 "UTF-8")); //$NON-NLS-1$ 71 } 72 catch(UnsupportedEncodingException ueex) { 73 // Should never happen 74 m_inReader = new BufferedReader(new InputStreamReader(m_inStream)); 75 } 76 77 p("Connection established, starting reader thread"); 78 m_readerThread = new ReaderThread(); 79 m_readerThread.start(); 80 return; 81 } 82 catch(ConnectException ex) { 83 // ignore and retry 84 try { 85 Thread.sleep(4000); 86 } 87 catch(InterruptedException e) { 88 ; 89 } 90 } 91 } 92 } 93 94 private void sendAdminMessage(String message) { 95 m_outWriter.println(message); 96 m_outWriter.flush(); 97 } 98 99 private int m_serial = 0; 100 101 @Override 102 public void sendAck() { 103 p("Sending ACK " + m_serial); 104 sendAdminMessage(MessageHelper.ACK_MSG + m_serial++); 105 } 106 107 @Override 108 public void sendStop() { 109 sendAdminMessage(MessageHelper.STOP_MSG); 110 } 111 112 @Override 113 public void initReceiver() throws SocketTimeoutException { 114 if (m_inStream != null) { 115 p("Receiver already initialized"); 116 } 117 ServerSocket serverSocket; 118 try { 119 p("initReceiver on port " + m_port); 120 serverSocket = new ServerSocket(m_port); 121 serverSocket.setSoTimeout(5000); 122 123 while (true) { 124 try { 125 Socket socket = serverSocket.accept(); 126 m_inStream = socket.getInputStream(); 127 m_inReader = new BufferedReader(new InputStreamReader(m_inStream)); 128 m_outStream = socket.getOutputStream(); 129 m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream)); 130 131 break; 132 } 133 catch (IOException ioe) { 134 try { 135 Thread.sleep(100L); 136 } 137 catch (InterruptedException ie) { 138 // Do nothing. 139 } 140 } 141 } 142 } 143 catch(SocketTimeoutException ste) { 144 throw ste; 145 } 146 catch (IOException ioe) { 147 // TODO Auto-generated catch block 148 ioe.printStackTrace(); 149 } 150 } 151 152 @Override 153 public void shutDown() { 154 if(null != m_outStream) { 155 try { 156 m_outStream.close(); 157 } 158 catch(IOException ex) { 159 // ignore 160 } 161 m_outStream = null; 162 } 163 164 try { 165 if(null != m_readerThread) { 166 m_readerThread.interrupt(); 167 } 168 169 if(null != m_inReader) { 170 m_inReader.close(); 171 m_inReader = null; 172 } 173 } 174 catch(IOException e) { 175 e.printStackTrace(); 176 } 177 178 try { 179 if(null != m_clientSocket) { 180 m_clientSocket.close(); 181 m_clientSocket = null; 182 } 183 } 184 catch(IOException e) { 185 if(m_debug) { 186 e.printStackTrace(); 187 } 188 } 189 } 190 191 private String m_latestAck; 192 193 protected void waitForAck() { 194 if (m_ack) { 195 try { 196 p("Message sent, waiting for ACK..."); 197 synchronized(m_ackLock) { 198 m_ackLock.wait(); 199 } 200 p("... ACK received:" + m_latestAck); 201 } 202 catch(InterruptedException e) { 203 } 204 } 205 } 206 207 private static void p(String msg) { 208 if (RemoteTestNG.isVerbose()) { 209 System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$ 210 } 211 } 212 213 /** 214 * Reader thread that processes messages from the client. 215 */ 216 private class ReaderThread extends Thread { 217 218 public ReaderThread() { 219 super("ReaderThread"); //$NON-NLS-1$ 220 } 221 222 @Override 223 public void run() { 224 try { 225 p("ReaderThread waiting for an admin message"); 226 String message = m_inReader.readLine(); 227 p("ReaderThread received admin message:" + message); 228 while (message != null) { 229 if (m_debug) { 230 p("Admin message:" + message); //$NON-NLS-1$ 231 } 232 boolean acknowledge = message.startsWith(MessageHelper.ACK_MSG); 233 boolean stop = MessageHelper.STOP_MSG.equals(message); 234 if(acknowledge || stop) { 235 if (acknowledge) { 236 p("Received ACK:" + message); 237 m_latestAck = message; 238 } 239 synchronized(m_ackLock) { 240 m_ackLock.notifyAll(); 241 } 242 if (stop) { 243 break; 244 } 245 } else { 246 p("Received unknown message: '" + message + "'"); 247 } 248 message = m_inReader != null ? m_inReader.readLine() : null; 249 } 250// while((m_reader != null) && (message = m_reader.readLine()) != null) { 251// if (m_debug) { 252// p("Admin message:" + message); //$NON-NLS-1$ 253// } 254// boolean acknowledge = MessageHelper.ACK_MSG.equals(message); 255// boolean stop = MessageHelper.STOP_MSG.equals(message); 256// if(acknowledge || stop) { 257// synchronized(m_lock) { 258// m_lock.notifyAll(); 259// } 260// if (stop) { 261// break; 262// } 263// } 264// } 265 } 266 catch(IOException ioe) { 267 if (RemoteTestNG.isVerbose()) { 268 ioe.printStackTrace(); 269 } 270 } 271 } 272 } 273} 274