BaseMessageSender.java revision 3a1d3383627fb6be94345424990cdc2c870d9a19
1package org.testng.remote.strprotocol;
2
3import org.testng.TestNGException;
4import org.testng.remote.RemoteTestNG;
5
6import java.io.BufferedReader;
7import java.io.BufferedWriter;
8import java.io.IOException;
9import java.io.InputStream;
10import java.io.InputStreamReader;
11import java.io.OutputStream;
12import java.io.OutputStreamWriter;
13import java.io.PrintWriter;
14import java.io.UnsupportedEncodingException;
15import java.net.ConnectException;
16import java.net.ServerSocket;
17import java.net.Socket;
18import java.net.SocketTimeoutException;
19
20abstract public class BaseMessageSender implements IMessageSender {
21  private boolean m_debug = false;
22  protected Socket m_clientSocket;
23  private String m_host;
24  private int m_port;
25  protected Object m_ackLock = new Object();
26
27  /** Outgoing message stream. */
28  protected OutputStream m_outStream;
29  /** Used to send ACK and STOP */
30  private PrintWriter m_outWriter;
31
32  /** Incoming message stream. */
33  protected volatile InputStream m_inStream;
34  /** Used to receive ACK and STOP */
35  protected volatile BufferedReader m_inReader;
36
37  private ReaderThread m_readerThread;
38  private boolean m_ack;
39//  protected InputStream m_receiverInputStream;
40
41  public BaseMessageSender(String host, int port, boolean ack) {
42    m_host = host;
43    m_port = port;
44    m_ack = ack;
45  }
46
47  /**
48   * Starts the connection.
49   *
50   * @return <TT>true</TT> if the connection was successful, <TT>false</TT> otherwise
51   * @throws TestNGException if an exception occurred while establishing the connection
52   */
53  @Override
54  public void connect() throws IOException {
55    p("Waiting for Eclipse client on " + m_host + ":" + m_port);
56    while (true) {
57      try {
58        m_clientSocket = new Socket(m_host, m_port);
59        p("Received a connection from Eclipse on " + m_host + ":" + m_port);
60
61        // Output streams
62        m_outStream = m_clientSocket.getOutputStream();
63        m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream)));
64
65        // Input streams
66        m_inStream = m_clientSocket.getInputStream();
67        try {
68          m_inReader = new BufferedReader(new InputStreamReader(m_inStream,
69              "UTF-8")); //$NON-NLS-1$
70        }
71        catch(UnsupportedEncodingException ueex) {
72          // Should never happen
73          m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
74        }
75
76        p("Connection established, starting reader thread");
77        m_readerThread = new ReaderThread();
78        m_readerThread.start();
79        return;
80      }
81      catch(ConnectException ex) {
82        // ignore and retry
83        try {
84          Thread.sleep(4000);
85        }
86        catch(InterruptedException e) {
87          ;
88        }
89      }
90    }
91  }
92
93  private void sendAdminMessage(String message) {
94    m_outWriter.println(message);
95    m_outWriter.flush();
96  }
97
98  private int m_serial = 0;
99
100  @Override
101  public void sendAck() {
102    p("Sending ACK " + m_serial);
103    sendAdminMessage(MessageHelper.ACK_MSG + m_serial++);
104  }
105
106  @Override
107  public void sendStop() {
108    sendAdminMessage(MessageHelper.STOP_MSG);
109  }
110
111  @Override
112  public void initReceiver() throws SocketTimeoutException {
113    if (m_inStream != null) {
114      p("Receiver already initialized");
115    }
116    ServerSocket serverSocket;
117    try {
118      p("initReceiver on port " + m_port);
119      serverSocket = new ServerSocket(m_port);
120      serverSocket.setSoTimeout(5000);
121
122      while (true) {
123        try {
124          Socket socket = serverSocket.accept();
125          m_inStream = socket.getInputStream();
126          m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
127          m_outStream = socket.getOutputStream();
128          m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream));
129
130          break;
131        }
132        catch (IOException ioe) {
133          try {
134            Thread.sleep(100L);
135          }
136          catch (InterruptedException ie) {
137            // Do nothing.
138          }
139        }
140      }
141    }
142    catch(SocketTimeoutException ste) {
143      throw ste;
144    }
145    catch (IOException ioe) {
146      // TODO Auto-generated catch block
147      ioe.printStackTrace();
148    }
149  }
150
151  @Override
152  public void shutDown() {
153    if(null != m_outStream) {
154      try {
155        m_outStream.close();
156      }
157      catch(IOException ex) {
158        // ignore
159      }
160      m_outStream = null;
161    }
162
163    try {
164      if(null != m_readerThread) {
165        m_readerThread.interrupt();
166      }
167
168      if(null != m_inReader) {
169        m_inReader.close();
170        m_inReader = null;
171      }
172    }
173    catch(IOException e) {
174      e.printStackTrace();
175    }
176
177    try {
178      if(null != m_clientSocket) {
179        m_clientSocket.close();
180        m_clientSocket = null;
181      }
182    }
183    catch(IOException e) {
184      if(m_debug) {
185        e.printStackTrace();
186      }
187    }
188  }
189
190  private String m_latestAck;
191
192  protected void waitForAck() {
193    if (m_ack) {
194      try {
195        p("Message sent, waiting for ACK...");
196        synchronized(m_ackLock) {
197          m_ackLock.wait();
198        }
199        p("... ACK received:" + m_latestAck);
200      }
201      catch(InterruptedException e) {
202      }
203    }
204  }
205
206  private static void p(String msg) {
207    if (RemoteTestNG.isVerbose()) {
208      System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$
209    }
210  }
211
212  /**
213   * Reader thread that processes messages from the client.
214   */
215  private class ReaderThread extends Thread {
216
217    public ReaderThread() {
218      super("ReaderThread"); //$NON-NLS-1$
219    }
220
221    @Override
222    public void run() {
223      try {
224        p("ReaderThread waiting for an admin message");
225        String message = m_inReader.readLine();
226        p("ReaderThread received admin message:" + message);
227        while (message != null) {
228          if (m_debug) {
229            p("Admin message:" + message); //$NON-NLS-1$
230          }
231          boolean acknowledge = message.startsWith(MessageHelper.ACK_MSG);
232          boolean stop = MessageHelper.STOP_MSG.equals(message);
233          if(acknowledge || stop) {
234            if (acknowledge) {
235              p("Received ACK:" + message);
236              m_latestAck = message;
237            }
238            synchronized(m_ackLock) {
239              m_ackLock.notifyAll();
240            }
241            if (stop) {
242              break;
243            }
244          } else {
245            p("Received unknown message: '" + message + "'");
246          }
247          message = m_inReader != null ? m_inReader.readLine() : null;
248        }
249//        while((m_reader != null) && (message = m_reader.readLine()) != null) {
250//          if (m_debug) {
251//            p("Admin message:" + message); //$NON-NLS-1$
252//          }
253//          boolean acknowledge = MessageHelper.ACK_MSG.equals(message);
254//          boolean stop = MessageHelper.STOP_MSG.equals(message);
255//          if(acknowledge || stop) {
256//            synchronized(m_lock) {
257//              m_lock.notifyAll();
258//            }
259//            if (stop) {
260//              break;
261//            }
262//          }
263//        }
264      }
265      catch(IOException ioe) {
266        if (RemoteTestNG.isVerbose()) {
267          ioe.printStackTrace();
268        }
269      }
270    }
271  }
272}
273