BaseMessageSender.java revision bf3a3b055a632c534ac856aeb97615167e90e72c
1package org.testng.remote.strprotocol;
2
3import com.sun.xml.internal.ws.wsdl.parser.InaccessibleWSDLException;
4import org.testng.TestNGException;
5import org.testng.remote.RemoteTestNG;
6
7import java.io.BufferedReader;
8import java.io.BufferedWriter;
9import java.io.IOException;
10import java.io.InputStream;
11import java.io.InputStreamReader;
12import java.io.OutputStream;
13import java.io.OutputStreamWriter;
14import java.io.PrintWriter;
15import java.io.UnsupportedEncodingException;
16import java.net.ConnectException;
17import java.net.ServerSocket;
18import java.net.Socket;
19import java.net.SocketTimeoutException;
20
21abstract public class BaseMessageSender implements IMessageSender {
22  private boolean m_debug = false;
23  protected Socket m_clientSocket;
24  private String m_host;
25  private int m_port;
26  protected Object m_ackLock = new Object();
27
28  /** Outgoing message stream. */
29  protected OutputStream m_outStream;
30  /** Used to send ACK and STOP */
31  private PrintWriter m_outWriter;
32
33  /** Incoming message stream. */
34  protected volatile InputStream m_inStream;
35  /** Used to receive ACK and STOP */
36  protected volatile BufferedReader m_inReader;
37
38  private ReaderThread m_readerThread;
39  private boolean m_ack;
40//  protected InputStream m_receiverInputStream;
41
42  public BaseMessageSender(String host, int port, boolean ack) {
43    m_host = host;
44    m_port = port;
45    m_ack = ack;
46  }
47
48  /**
49   * Starts the connection.
50   *
51   * @return <TT>true</TT> if the connection was successful, <TT>false</TT> otherwise
52   * @throws TestNGException if an exception occurred while establishing the connection
53   */
54  @Override
55  public void connect() throws IOException {
56    p("Waiting for Eclipse client on " + m_host + ":" + m_port);
57    while (true) {
58      try {
59        m_clientSocket = new Socket(m_host, m_port);
60        p("Received a connection from Eclipse on " + m_host + ":" + m_port);
61
62        // Output streams
63        m_outStream = m_clientSocket.getOutputStream();
64        m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream)));
65
66        // Input streams
67        m_inStream = m_clientSocket.getInputStream();
68        try {
69          m_inReader = new BufferedReader(new InputStreamReader(m_inStream,
70              "UTF-8")); //$NON-NLS-1$
71        }
72        catch(UnsupportedEncodingException ueex) {
73          // Should never happen
74          m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
75        }
76
77        p("Connection established, starting reader thread");
78        m_readerThread = new ReaderThread();
79        m_readerThread.start();
80        return;
81      }
82      catch(ConnectException ex) {
83        // ignore and retry
84        try {
85          Thread.sleep(4000);
86        }
87        catch(InterruptedException e) {
88          ;
89        }
90      }
91    }
92  }
93
94  private void sendAdminMessage(String message) {
95    m_outWriter.println(message);
96    m_outWriter.flush();
97  }
98
99  private int m_serial = 0;
100
101  @Override
102  public void sendAck() {
103    p("Sending ACK " + m_serial);
104    sendAdminMessage(MessageHelper.ACK_MSG + m_serial++);
105  }
106
107  @Override
108  public void sendStop() {
109    sendAdminMessage(MessageHelper.STOP_MSG);
110  }
111
112  @Override
113  public void initReceiver() throws SocketTimeoutException {
114    if (m_inStream != null) {
115      p("Receiver already initialized");
116    }
117    ServerSocket serverSocket;
118    try {
119      p("initReceiver on port " + m_port);
120      serverSocket = new ServerSocket(m_port);
121      serverSocket.setSoTimeout(5000);
122
123      while (true) {
124        try {
125          Socket socket = serverSocket.accept();
126          m_inStream = socket.getInputStream();
127          m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
128          m_outStream = socket.getOutputStream();
129          m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream));
130
131          break;
132        }
133        catch (IOException ioe) {
134          try {
135            Thread.sleep(100L);
136          }
137          catch (InterruptedException ie) {
138            // Do nothing.
139          }
140        }
141      }
142    }
143    catch(SocketTimeoutException ste) {
144      throw ste;
145    }
146    catch (IOException ioe) {
147      // TODO Auto-generated catch block
148      ioe.printStackTrace();
149    }
150  }
151
152  @Override
153  public void shutDown() {
154    if(null != m_outStream) {
155      try {
156        m_outStream.close();
157      }
158      catch(IOException ex) {
159        // ignore
160      }
161      m_outStream = null;
162    }
163
164    try {
165      if(null != m_readerThread) {
166        m_readerThread.interrupt();
167      }
168
169      if(null != m_inReader) {
170        m_inReader.close();
171        m_inReader = null;
172      }
173    }
174    catch(IOException e) {
175      e.printStackTrace();
176    }
177
178    try {
179      if(null != m_clientSocket) {
180        m_clientSocket.close();
181        m_clientSocket = null;
182      }
183    }
184    catch(IOException e) {
185      if(m_debug) {
186        e.printStackTrace();
187      }
188    }
189  }
190
191  private String m_latestAck;
192
193  protected void waitForAck() {
194    if (m_ack) {
195      try {
196        p("Message sent, waiting for ACK...");
197        synchronized(m_ackLock) {
198          m_ackLock.wait();
199        }
200        p("... ACK received:" + m_latestAck);
201      }
202      catch(InterruptedException e) {
203      }
204    }
205  }
206
207  private static void p(String msg) {
208    if (RemoteTestNG.isVerbose()) {
209      System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$
210    }
211  }
212
213  /**
214   * Reader thread that processes messages from the client.
215   */
216  private class ReaderThread extends Thread {
217
218    public ReaderThread() {
219      super("ReaderThread"); //$NON-NLS-1$
220    }
221
222    @Override
223    public void run() {
224      try {
225        p("ReaderThread waiting for an admin message");
226        String message = m_inReader.readLine();
227        p("ReaderThread received admin message:" + message);
228        while (message != null) {
229          if (m_debug) {
230            p("Admin message:" + message); //$NON-NLS-1$
231          }
232          boolean acknowledge = message.startsWith(MessageHelper.ACK_MSG);
233          boolean stop = MessageHelper.STOP_MSG.equals(message);
234          if(acknowledge || stop) {
235            if (acknowledge) {
236              p("Received ACK:" + message);
237              m_latestAck = message;
238            }
239            synchronized(m_ackLock) {
240              m_ackLock.notifyAll();
241            }
242            if (stop) {
243              break;
244            }
245          } else {
246            p("Received unknown message: '" + message + "'");
247          }
248          message = m_inReader != null ? m_inReader.readLine() : null;
249        }
250//        while((m_reader != null) && (message = m_reader.readLine()) != null) {
251//          if (m_debug) {
252//            p("Admin message:" + message); //$NON-NLS-1$
253//          }
254//          boolean acknowledge = MessageHelper.ACK_MSG.equals(message);
255//          boolean stop = MessageHelper.STOP_MSG.equals(message);
256//          if(acknowledge || stop) {
257//            synchronized(m_lock) {
258//              m_lock.notifyAll();
259//            }
260//            if (stop) {
261//              break;
262//            }
263//          }
264//        }
265      }
266      catch(IOException ioe) {
267        if (RemoteTestNG.isVerbose()) {
268          ioe.printStackTrace();
269        }
270      }
271    }
272  }
273}
274