1/*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/examples/peerconnection/client/peer_connection_client.h"
29
30#include "talk/examples/peerconnection/client/defaults.h"
31#include "webrtc/base/common.h"
32#include "webrtc/base/logging.h"
33#include "webrtc/base/nethelpers.h"
34#include "webrtc/base/stringutils.h"
35
36#ifdef WIN32
37#include "webrtc/base/win32socketserver.h"
38#endif
39
40using rtc::sprintfn;
41
42namespace {
43
44// This is our magical hangup signal.
45const char kByeMessage[] = "BYE";
46// Delay between server connection retries, in milliseconds
47const int kReconnectDelay = 2000;
48
49rtc::AsyncSocket* CreateClientSocket(int family) {
50#ifdef WIN32
51  rtc::Win32Socket* sock = new rtc::Win32Socket();
52  sock->CreateT(family, SOCK_STREAM);
53  return sock;
54#elif defined(POSIX)
55  rtc::Thread* thread = rtc::Thread::Current();
56  ASSERT(thread != NULL);
57  return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
58#else
59#error Platform not supported.
60#endif
61}
62
63}
64
65PeerConnectionClient::PeerConnectionClient()
66  : callback_(NULL),
67    resolver_(NULL),
68    state_(NOT_CONNECTED),
69    my_id_(-1) {
70}
71
72PeerConnectionClient::~PeerConnectionClient() {
73}
74
75void PeerConnectionClient::InitSocketSignals() {
76  ASSERT(control_socket_.get() != NULL);
77  ASSERT(hanging_get_.get() != NULL);
78  control_socket_->SignalCloseEvent.connect(this,
79      &PeerConnectionClient::OnClose);
80  hanging_get_->SignalCloseEvent.connect(this,
81      &PeerConnectionClient::OnClose);
82  control_socket_->SignalConnectEvent.connect(this,
83      &PeerConnectionClient::OnConnect);
84  hanging_get_->SignalConnectEvent.connect(this,
85      &PeerConnectionClient::OnHangingGetConnect);
86  control_socket_->SignalReadEvent.connect(this,
87      &PeerConnectionClient::OnRead);
88  hanging_get_->SignalReadEvent.connect(this,
89      &PeerConnectionClient::OnHangingGetRead);
90}
91
92int PeerConnectionClient::id() const {
93  return my_id_;
94}
95
96bool PeerConnectionClient::is_connected() const {
97  return my_id_ != -1;
98}
99
100const Peers& PeerConnectionClient::peers() const {
101  return peers_;
102}
103
104void PeerConnectionClient::RegisterObserver(
105    PeerConnectionClientObserver* callback) {
106  ASSERT(!callback_);
107  callback_ = callback;
108}
109
110void PeerConnectionClient::Connect(const std::string& server, int port,
111                                   const std::string& client_name) {
112  ASSERT(!server.empty());
113  ASSERT(!client_name.empty());
114
115  if (state_ != NOT_CONNECTED) {
116    LOG(WARNING)
117        << "The client must not be connected before you can call Connect()";
118    callback_->OnServerConnectionFailure();
119    return;
120  }
121
122  if (server.empty() || client_name.empty()) {
123    callback_->OnServerConnectionFailure();
124    return;
125  }
126
127  if (port <= 0)
128    port = kDefaultServerPort;
129
130  server_address_.SetIP(server);
131  server_address_.SetPort(port);
132  client_name_ = client_name;
133
134  if (server_address_.IsUnresolved()) {
135    state_ = RESOLVING;
136    resolver_ = new rtc::AsyncResolver();
137    resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
138    resolver_->Start(server_address_);
139  } else {
140    DoConnect();
141  }
142}
143
144void PeerConnectionClient::OnResolveResult(
145    rtc::AsyncResolverInterface* resolver) {
146  if (resolver_->GetError() != 0) {
147    callback_->OnServerConnectionFailure();
148    resolver_->Destroy(false);
149    resolver_ = NULL;
150    state_ = NOT_CONNECTED;
151  } else {
152    server_address_ = resolver_->address();
153    DoConnect();
154  }
155}
156
157void PeerConnectionClient::DoConnect() {
158  control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
159  hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
160  InitSocketSignals();
161  char buffer[1024];
162  sprintfn(buffer, sizeof(buffer),
163           "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
164  onconnect_data_ = buffer;
165
166  bool ret = ConnectControlSocket();
167  if (ret)
168    state_ = SIGNING_IN;
169  if (!ret) {
170    callback_->OnServerConnectionFailure();
171  }
172}
173
174bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
175  if (state_ != CONNECTED)
176    return false;
177
178  ASSERT(is_connected());
179  ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
180  if (!is_connected() || peer_id == -1)
181    return false;
182
183  char headers[1024];
184  sprintfn(headers, sizeof(headers),
185      "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
186      "Content-Length: %i\r\n"
187      "Content-Type: text/plain\r\n"
188      "\r\n",
189      my_id_, peer_id, message.length());
190  onconnect_data_ = headers;
191  onconnect_data_ += message;
192  return ConnectControlSocket();
193}
194
195bool PeerConnectionClient::SendHangUp(int peer_id) {
196  return SendToPeer(peer_id, kByeMessage);
197}
198
199bool PeerConnectionClient::IsSendingMessage() {
200  return state_ == CONNECTED &&
201         control_socket_->GetState() != rtc::Socket::CS_CLOSED;
202}
203
204bool PeerConnectionClient::SignOut() {
205  if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
206    return true;
207
208  if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
209    hanging_get_->Close();
210
211  if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
212    state_ = SIGNING_OUT;
213
214    if (my_id_ != -1) {
215      char buffer[1024];
216      sprintfn(buffer, sizeof(buffer),
217          "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
218      onconnect_data_ = buffer;
219      return ConnectControlSocket();
220    } else {
221      // Can occur if the app is closed before we finish connecting.
222      return true;
223    }
224  } else {
225    state_ = SIGNING_OUT_WAITING;
226  }
227
228  return true;
229}
230
231void PeerConnectionClient::Close() {
232  control_socket_->Close();
233  hanging_get_->Close();
234  onconnect_data_.clear();
235  peers_.clear();
236  if (resolver_ != NULL) {
237    resolver_->Destroy(false);
238    resolver_ = NULL;
239  }
240  my_id_ = -1;
241  state_ = NOT_CONNECTED;
242}
243
244bool PeerConnectionClient::ConnectControlSocket() {
245  ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
246  int err = control_socket_->Connect(server_address_);
247  if (err == SOCKET_ERROR) {
248    Close();
249    return false;
250  }
251  return true;
252}
253
254void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) {
255  ASSERT(!onconnect_data_.empty());
256  size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
257  ASSERT(sent == onconnect_data_.length());
258  RTC_UNUSED(sent);
259  onconnect_data_.clear();
260}
261
262void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) {
263  char buffer[1024];
264  sprintfn(buffer, sizeof(buffer),
265           "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
266  int len = static_cast<int>(strlen(buffer));
267  int sent = socket->Send(buffer, len);
268  ASSERT(sent == len);
269  RTC_UNUSED2(sent, len);
270}
271
272void PeerConnectionClient::OnMessageFromPeer(int peer_id,
273                                             const std::string& message) {
274  if (message.length() == (sizeof(kByeMessage) - 1) &&
275      message.compare(kByeMessage) == 0) {
276    callback_->OnPeerDisconnected(peer_id);
277  } else {
278    callback_->OnMessageFromPeer(peer_id, message);
279  }
280}
281
282bool PeerConnectionClient::GetHeaderValue(const std::string& data,
283                                          size_t eoh,
284                                          const char* header_pattern,
285                                          size_t* value) {
286  ASSERT(value != NULL);
287  size_t found = data.find(header_pattern);
288  if (found != std::string::npos && found < eoh) {
289    *value = atoi(&data[found + strlen(header_pattern)]);
290    return true;
291  }
292  return false;
293}
294
295bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
296                                          const char* header_pattern,
297                                          std::string* value) {
298  ASSERT(value != NULL);
299  size_t found = data.find(header_pattern);
300  if (found != std::string::npos && found < eoh) {
301    size_t begin = found + strlen(header_pattern);
302    size_t end = data.find("\r\n", begin);
303    if (end == std::string::npos)
304      end = eoh;
305    value->assign(data.substr(begin, end - begin));
306    return true;
307  }
308  return false;
309}
310
311bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
312                                          std::string* data,
313                                          size_t* content_length) {
314  char buffer[0xffff];
315  do {
316    int bytes = socket->Recv(buffer, sizeof(buffer));
317    if (bytes <= 0)
318      break;
319    data->append(buffer, bytes);
320  } while (true);
321
322  bool ret = false;
323  size_t i = data->find("\r\n\r\n");
324  if (i != std::string::npos) {
325    LOG(INFO) << "Headers received";
326    if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
327      size_t total_response_size = (i + 4) + *content_length;
328      if (data->length() >= total_response_size) {
329        ret = true;
330        std::string should_close;
331        const char kConnection[] = "\r\nConnection: ";
332        if (GetHeaderValue(*data, i, kConnection, &should_close) &&
333            should_close.compare("close") == 0) {
334          socket->Close();
335          // Since we closed the socket, there was no notification delivered
336          // to us.  Compensate by letting ourselves know.
337          OnClose(socket, 0);
338        }
339      } else {
340        // We haven't received everything.  Just continue to accept data.
341      }
342    } else {
343      LOG(LS_ERROR) << "No content length field specified by the server.";
344    }
345  }
346  return ret;
347}
348
349void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) {
350  size_t content_length = 0;
351  if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
352    size_t peer_id = 0, eoh = 0;
353    bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
354                                  &eoh);
355    if (ok) {
356      if (my_id_ == -1) {
357        // First response.  Let's store our server assigned ID.
358        ASSERT(state_ == SIGNING_IN);
359        my_id_ = static_cast<int>(peer_id);
360        ASSERT(my_id_ != -1);
361
362        // The body of the response will be a list of already connected peers.
363        if (content_length) {
364          size_t pos = eoh + 4;
365          while (pos < control_data_.size()) {
366            size_t eol = control_data_.find('\n', pos);
367            if (eol == std::string::npos)
368              break;
369            int id = 0;
370            std::string name;
371            bool connected;
372            if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
373                           &connected) && id != my_id_) {
374              peers_[id] = name;
375              callback_->OnPeerConnected(id, name);
376            }
377            pos = eol + 1;
378          }
379        }
380        ASSERT(is_connected());
381        callback_->OnSignedIn();
382      } else if (state_ == SIGNING_OUT) {
383        Close();
384        callback_->OnDisconnected();
385      } else if (state_ == SIGNING_OUT_WAITING) {
386        SignOut();
387      }
388    }
389
390    control_data_.clear();
391
392    if (state_ == SIGNING_IN) {
393      ASSERT(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
394      state_ = CONNECTED;
395      hanging_get_->Connect(server_address_);
396    }
397  }
398}
399
400void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) {
401  LOG(INFO) << __FUNCTION__;
402  size_t content_length = 0;
403  if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
404    size_t peer_id = 0, eoh = 0;
405    bool ok = ParseServerResponse(notification_data_, content_length,
406                                  &peer_id, &eoh);
407
408    if (ok) {
409      // Store the position where the body begins.
410      size_t pos = eoh + 4;
411
412      if (my_id_ == static_cast<int>(peer_id)) {
413        // A notification about a new member or a member that just
414        // disconnected.
415        int id = 0;
416        std::string name;
417        bool connected = false;
418        if (ParseEntry(notification_data_.substr(pos), &name, &id,
419                       &connected)) {
420          if (connected) {
421            peers_[id] = name;
422            callback_->OnPeerConnected(id, name);
423          } else {
424            peers_.erase(id);
425            callback_->OnPeerDisconnected(id);
426          }
427        }
428      } else {
429        OnMessageFromPeer(static_cast<int>(peer_id),
430                          notification_data_.substr(pos));
431      }
432    }
433
434    notification_data_.clear();
435  }
436
437  if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
438      state_ == CONNECTED) {
439    hanging_get_->Connect(server_address_);
440  }
441}
442
443bool PeerConnectionClient::ParseEntry(const std::string& entry,
444                                      std::string* name,
445                                      int* id,
446                                      bool* connected) {
447  ASSERT(name != NULL);
448  ASSERT(id != NULL);
449  ASSERT(connected != NULL);
450  ASSERT(!entry.empty());
451
452  *connected = false;
453  size_t separator = entry.find(',');
454  if (separator != std::string::npos) {
455    *id = atoi(&entry[separator + 1]);
456    name->assign(entry.substr(0, separator));
457    separator = entry.find(',', separator + 1);
458    if (separator != std::string::npos) {
459      *connected = atoi(&entry[separator + 1]) ? true : false;
460    }
461  }
462  return !name->empty();
463}
464
465int PeerConnectionClient::GetResponseStatus(const std::string& response) {
466  int status = -1;
467  size_t pos = response.find(' ');
468  if (pos != std::string::npos)
469    status = atoi(&response[pos + 1]);
470  return status;
471}
472
473bool PeerConnectionClient::ParseServerResponse(const std::string& response,
474                                               size_t content_length,
475                                               size_t* peer_id,
476                                               size_t* eoh) {
477  int status = GetResponseStatus(response.c_str());
478  if (status != 200) {
479    LOG(LS_ERROR) << "Received error from server";
480    Close();
481    callback_->OnDisconnected();
482    return false;
483  }
484
485  *eoh = response.find("\r\n\r\n");
486  ASSERT(*eoh != std::string::npos);
487  if (*eoh == std::string::npos)
488    return false;
489
490  *peer_id = -1;
491
492  // See comment in peer_channel.cc for why we use the Pragma header and
493  // not e.g. "X-Peer-Id".
494  GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
495
496  return true;
497}
498
499void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) {
500  LOG(INFO) << __FUNCTION__;
501
502  socket->Close();
503
504#ifdef WIN32
505  if (err != WSAECONNREFUSED) {
506#else
507  if (err != ECONNREFUSED) {
508#endif
509    if (socket == hanging_get_.get()) {
510      if (state_ == CONNECTED) {
511        hanging_get_->Close();
512        hanging_get_->Connect(server_address_);
513      }
514    } else {
515      callback_->OnMessageSent(err);
516    }
517  } else {
518    if (socket == control_socket_.get()) {
519      LOG(WARNING) << "Connection refused; retrying in 2 seconds";
520      rtc::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
521    } else {
522      Close();
523      callback_->OnDisconnected();
524    }
525  }
526}
527
528void PeerConnectionClient::OnMessage(rtc::Message* msg) {
529  // ignore msg; there is currently only one supported message ("retry")
530  DoConnect();
531}
532