1/*
2 *  Copyright 2014 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11package org.appspot.apprtc;
12
13import org.appspot.apprtc.util.AsyncHttpURLConnection;
14import org.appspot.apprtc.util.AsyncHttpURLConnection.AsyncHttpEvents;
15import org.appspot.apprtc.util.LooperExecutor;
16
17import android.util.Log;
18
19import de.tavendo.autobahn.WebSocket.WebSocketConnectionObserver;
20import de.tavendo.autobahn.WebSocketConnection;
21import de.tavendo.autobahn.WebSocketException;
22
23import org.json.JSONException;
24import org.json.JSONObject;
25
26import java.net.URI;
27import java.net.URISyntaxException;
28import java.util.LinkedList;
29
30/**
31 * WebSocket client implementation.
32 *
33 * <p>All public methods should be called from a looper executor thread
34 * passed in a constructor, otherwise exception will be thrown.
35 * All events are dispatched on the same thread.
36 */
37
38public class WebSocketChannelClient {
39  private static final String TAG = "WSChannelRTCClient";
40  private static final int CLOSE_TIMEOUT = 1000;
41  private final WebSocketChannelEvents events;
42  private final LooperExecutor executor;
43  private WebSocketConnection ws;
44  private WebSocketObserver wsObserver;
45  private String wsServerUrl;
46  private String postServerUrl;
47  private String roomID;
48  private String clientID;
49  private WebSocketConnectionState state;
50  private final Object closeEventLock = new Object();
51  private boolean closeEvent;
52  // WebSocket send queue. Messages are added to the queue when WebSocket
53  // client is not registered and are consumed in register() call.
54  private final LinkedList<String> wsSendQueue;
55
56  /**
57   * Possible WebSocket connection states.
58   */
59  public enum WebSocketConnectionState {
60    NEW, CONNECTED, REGISTERED, CLOSED, ERROR
61  };
62
63  /**
64   * Callback interface for messages delivered on WebSocket.
65   * All events are dispatched from a looper executor thread.
66   */
67  public interface WebSocketChannelEvents {
68    public void onWebSocketMessage(final String message);
69    public void onWebSocketClose();
70    public void onWebSocketError(final String description);
71  }
72
73  public WebSocketChannelClient(LooperExecutor executor, WebSocketChannelEvents events) {
74    this.executor = executor;
75    this.events = events;
76    roomID = null;
77    clientID = null;
78    wsSendQueue = new LinkedList<String>();
79    state = WebSocketConnectionState.NEW;
80  }
81
82  public WebSocketConnectionState getState() {
83    return state;
84  }
85
86  public void connect(final String wsUrl, final String postUrl) {
87    checkIfCalledOnValidThread();
88    if (state != WebSocketConnectionState.NEW) {
89      Log.e(TAG, "WebSocket is already connected.");
90      return;
91    }
92    wsServerUrl = wsUrl;
93    postServerUrl = postUrl;
94    closeEvent = false;
95
96    Log.d(TAG, "Connecting WebSocket to: " + wsUrl + ". Post URL: " + postUrl);
97    ws = new WebSocketConnection();
98    wsObserver = new WebSocketObserver();
99    try {
100      ws.connect(new URI(wsServerUrl), wsObserver);
101    } catch (URISyntaxException e) {
102      reportError("URI error: " + e.getMessage());
103    } catch (WebSocketException e) {
104      reportError("WebSocket connection error: " + e.getMessage());
105    }
106  }
107
108  public void register(final String roomID, final String clientID) {
109    checkIfCalledOnValidThread();
110    this.roomID = roomID;
111    this.clientID = clientID;
112    if (state != WebSocketConnectionState.CONNECTED) {
113      Log.w(TAG, "WebSocket register() in state " + state);
114      return;
115    }
116    Log.d(TAG, "Registering WebSocket for room " + roomID + ". CLientID: " + clientID);
117    JSONObject json = new JSONObject();
118    try {
119      json.put("cmd", "register");
120      json.put("roomid", roomID);
121      json.put("clientid", clientID);
122      Log.d(TAG, "C->WSS: " + json.toString());
123      ws.sendTextMessage(json.toString());
124      state = WebSocketConnectionState.REGISTERED;
125      // Send any previously accumulated messages.
126      for (String sendMessage : wsSendQueue) {
127        send(sendMessage);
128      }
129      wsSendQueue.clear();
130    } catch (JSONException e) {
131      reportError("WebSocket register JSON error: " + e.getMessage());
132    }
133  }
134
135  public void send(String message) {
136    checkIfCalledOnValidThread();
137    switch (state) {
138      case NEW:
139      case CONNECTED:
140        // Store outgoing messages and send them after websocket client
141        // is registered.
142        Log.d(TAG, "WS ACC: " + message);
143        wsSendQueue.add(message);
144        return;
145      case ERROR:
146      case CLOSED:
147        Log.e(TAG, "WebSocket send() in error or closed state : " + message);
148        return;
149      case REGISTERED:
150        JSONObject json = new JSONObject();
151        try {
152          json.put("cmd", "send");
153          json.put("msg", message);
154          message = json.toString();
155          Log.d(TAG, "C->WSS: " + message);
156          ws.sendTextMessage(message);
157        } catch (JSONException e) {
158          reportError("WebSocket send JSON error: " + e.getMessage());
159        }
160        break;
161    }
162    return;
163  }
164
165  // This call can be used to send WebSocket messages before WebSocket
166  // connection is opened.
167  public void post(String message) {
168    checkIfCalledOnValidThread();
169    sendWSSMessage("POST", message);
170  }
171
172  public void disconnect(boolean waitForComplete) {
173    checkIfCalledOnValidThread();
174    Log.d(TAG, "Disonnect WebSocket. State: " + state);
175    if (state == WebSocketConnectionState.REGISTERED) {
176      // Send "bye" to WebSocket server.
177      send("{\"type\": \"bye\"}");
178      state = WebSocketConnectionState.CONNECTED;
179      // Send http DELETE to http WebSocket server.
180      sendWSSMessage("DELETE", "");
181    }
182    // Close WebSocket in CONNECTED or ERROR states only.
183    if (state == WebSocketConnectionState.CONNECTED
184        || state == WebSocketConnectionState.ERROR) {
185      ws.disconnect();
186      state = WebSocketConnectionState.CLOSED;
187
188      // Wait for websocket close event to prevent websocket library from
189      // sending any pending messages to deleted looper thread.
190      if (waitForComplete) {
191        synchronized (closeEventLock) {
192          while (!closeEvent) {
193            try {
194              closeEventLock.wait(CLOSE_TIMEOUT);
195              break;
196            } catch (InterruptedException e) {
197              Log.e(TAG, "Wait error: " + e.toString());
198            }
199          }
200        }
201      }
202    }
203    Log.d(TAG, "Disonnecting WebSocket done.");
204  }
205
206  private void reportError(final String errorMessage) {
207    Log.e(TAG, errorMessage);
208    executor.execute(new Runnable() {
209      @Override
210      public void run() {
211        if (state != WebSocketConnectionState.ERROR) {
212          state = WebSocketConnectionState.ERROR;
213          events.onWebSocketError(errorMessage);
214        }
215      }
216    });
217  }
218
219  // Asynchronously send POST/DELETE to WebSocket server.
220  private void sendWSSMessage(final String method, final String message) {
221    String postUrl = postServerUrl + "/" + roomID + "/" + clientID;
222    Log.d(TAG, "WS " + method + " : " + postUrl + " : " + message);
223    AsyncHttpURLConnection httpConnection = new AsyncHttpURLConnection(
224        method, postUrl, message, new AsyncHttpEvents() {
225          @Override
226          public void onHttpError(String errorMessage) {
227            reportError("WS " + method + " error: " + errorMessage);
228          }
229
230          @Override
231          public void onHttpComplete(String response) {
232          }
233        });
234    httpConnection.send();
235  }
236
237   // Helper method for debugging purposes. Ensures that WebSocket method is
238   // called on a looper thread.
239  private void checkIfCalledOnValidThread() {
240    if (!executor.checkOnLooperThread()) {
241      throw new IllegalStateException(
242          "WebSocket method is not called on valid thread");
243    }
244  }
245
246  private class WebSocketObserver implements WebSocketConnectionObserver {
247    @Override
248    public void onOpen() {
249      Log.d(TAG, "WebSocket connection opened to: " + wsServerUrl);
250      executor.execute(new Runnable() {
251        @Override
252        public void run() {
253          state = WebSocketConnectionState.CONNECTED;
254          // Check if we have pending register request.
255          if (roomID != null && clientID != null) {
256            register(roomID, clientID);
257          }
258        }
259      });
260    }
261
262    @Override
263    public void onClose(WebSocketCloseNotification code, String reason) {
264      Log.d(TAG, "WebSocket connection closed. Code: " + code
265          + ". Reason: " + reason + ". State: " + state);
266      synchronized (closeEventLock) {
267        closeEvent = true;
268        closeEventLock.notify();
269      }
270      executor.execute(new Runnable() {
271        @Override
272        public void run() {
273          if (state != WebSocketConnectionState.CLOSED) {
274            state = WebSocketConnectionState.CLOSED;
275            events.onWebSocketClose();
276          }
277        }
278      });
279    }
280
281    @Override
282    public void onTextMessage(String payload) {
283      Log.d(TAG, "WSS->C: " + payload);
284      final String message = payload;
285      executor.execute(new Runnable() {
286        @Override
287        public void run() {
288          if (state == WebSocketConnectionState.CONNECTED
289              || state == WebSocketConnectionState.REGISTERED) {
290            events.onWebSocketMessage(message);
291          }
292        }
293      });
294    }
295
296    @Override
297    public void onRawTextMessage(byte[] payload) {
298    }
299
300    @Override
301    public void onBinaryMessage(byte[] payload) {
302    }
303  }
304
305}
306