1/*
2 *  Copyright 2012 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/examples/peerconnection/client/peer_connection_client.h"
12
13#include "webrtc/examples/peerconnection/client/defaults.h"
14#include "webrtc/base/common.h"
15#include "webrtc/base/logging.h"
16#include "webrtc/base/nethelpers.h"
17#include "webrtc/base/stringutils.h"
18
19#ifdef WIN32
20#include "webrtc/base/win32socketserver.h"
21#endif
22
23using rtc::sprintfn;
24
25namespace {
26
27// This is our magical hangup signal.
28const char kByeMessage[] = "BYE";
29// Delay between server connection retries, in milliseconds
30const int kReconnectDelay = 2000;
31
32rtc::AsyncSocket* CreateClientSocket(int family) {
33#ifdef WIN32
34  rtc::Win32Socket* sock = new rtc::Win32Socket();
35  sock->CreateT(family, SOCK_STREAM);
36  return sock;
37#elif defined(WEBRTC_POSIX)
38  rtc::Thread* thread = rtc::Thread::Current();
39  ASSERT(thread != NULL);
40  return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
41#else
42#error Platform not supported.
43#endif
44}
45
46}  // namespace
47
48PeerConnectionClient::PeerConnectionClient()
49  : callback_(NULL),
50    resolver_(NULL),
51    state_(NOT_CONNECTED),
52    my_id_(-1) {
53}
54
55PeerConnectionClient::~PeerConnectionClient() {
56}
57
58void PeerConnectionClient::InitSocketSignals() {
59  ASSERT(control_socket_.get() != NULL);
60  ASSERT(hanging_get_.get() != NULL);
61  control_socket_->SignalCloseEvent.connect(this,
62      &PeerConnectionClient::OnClose);
63  hanging_get_->SignalCloseEvent.connect(this,
64      &PeerConnectionClient::OnClose);
65  control_socket_->SignalConnectEvent.connect(this,
66      &PeerConnectionClient::OnConnect);
67  hanging_get_->SignalConnectEvent.connect(this,
68      &PeerConnectionClient::OnHangingGetConnect);
69  control_socket_->SignalReadEvent.connect(this,
70      &PeerConnectionClient::OnRead);
71  hanging_get_->SignalReadEvent.connect(this,
72      &PeerConnectionClient::OnHangingGetRead);
73}
74
75int PeerConnectionClient::id() const {
76  return my_id_;
77}
78
79bool PeerConnectionClient::is_connected() const {
80  return my_id_ != -1;
81}
82
83const Peers& PeerConnectionClient::peers() const {
84  return peers_;
85}
86
87void PeerConnectionClient::RegisterObserver(
88    PeerConnectionClientObserver* callback) {
89  ASSERT(!callback_);
90  callback_ = callback;
91}
92
93void PeerConnectionClient::Connect(const std::string& server, int port,
94                                   const std::string& client_name) {
95  ASSERT(!server.empty());
96  ASSERT(!client_name.empty());
97
98  if (state_ != NOT_CONNECTED) {
99    LOG(WARNING)
100        << "The client must not be connected before you can call Connect()";
101    callback_->OnServerConnectionFailure();
102    return;
103  }
104
105  if (server.empty() || client_name.empty()) {
106    callback_->OnServerConnectionFailure();
107    return;
108  }
109
110  if (port <= 0)
111    port = kDefaultServerPort;
112
113  server_address_.SetIP(server);
114  server_address_.SetPort(port);
115  client_name_ = client_name;
116
117  if (server_address_.IsUnresolvedIP()) {
118    state_ = RESOLVING;
119    resolver_ = new rtc::AsyncResolver();
120    resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
121    resolver_->Start(server_address_);
122  } else {
123    DoConnect();
124  }
125}
126
127void PeerConnectionClient::OnResolveResult(
128    rtc::AsyncResolverInterface* resolver) {
129  if (resolver_->GetError() != 0) {
130    callback_->OnServerConnectionFailure();
131    resolver_->Destroy(false);
132    resolver_ = NULL;
133    state_ = NOT_CONNECTED;
134  } else {
135    server_address_ = resolver_->address();
136    DoConnect();
137  }
138}
139
140void PeerConnectionClient::DoConnect() {
141  control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
142  hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
143  InitSocketSignals();
144  char buffer[1024];
145  sprintfn(buffer, sizeof(buffer),
146           "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
147  onconnect_data_ = buffer;
148
149  bool ret = ConnectControlSocket();
150  if (ret)
151    state_ = SIGNING_IN;
152  if (!ret) {
153    callback_->OnServerConnectionFailure();
154  }
155}
156
157bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
158  if (state_ != CONNECTED)
159    return false;
160
161  ASSERT(is_connected());
162  ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
163  if (!is_connected() || peer_id == -1)
164    return false;
165
166  char headers[1024];
167  sprintfn(headers, sizeof(headers),
168      "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
169      "Content-Length: %i\r\n"
170      "Content-Type: text/plain\r\n"
171      "\r\n",
172      my_id_, peer_id, message.length());
173  onconnect_data_ = headers;
174  onconnect_data_ += message;
175  return ConnectControlSocket();
176}
177
178bool PeerConnectionClient::SendHangUp(int peer_id) {
179  return SendToPeer(peer_id, kByeMessage);
180}
181
182bool PeerConnectionClient::IsSendingMessage() {
183  return state_ == CONNECTED &&
184         control_socket_->GetState() != rtc::Socket::CS_CLOSED;
185}
186
187bool PeerConnectionClient::SignOut() {
188  if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
189    return true;
190
191  if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
192    hanging_get_->Close();
193
194  if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
195    state_ = SIGNING_OUT;
196
197    if (my_id_ != -1) {
198      char buffer[1024];
199      sprintfn(buffer, sizeof(buffer),
200          "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
201      onconnect_data_ = buffer;
202      return ConnectControlSocket();
203    } else {
204      // Can occur if the app is closed before we finish connecting.
205      return true;
206    }
207  } else {
208    state_ = SIGNING_OUT_WAITING;
209  }
210
211  return true;
212}
213
214void PeerConnectionClient::Close() {
215  control_socket_->Close();
216  hanging_get_->Close();
217  onconnect_data_.clear();
218  peers_.clear();
219  if (resolver_ != NULL) {
220    resolver_->Destroy(false);
221    resolver_ = NULL;
222  }
223  my_id_ = -1;
224  state_ = NOT_CONNECTED;
225}
226
227bool PeerConnectionClient::ConnectControlSocket() {
228  ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
229  int err = control_socket_->Connect(server_address_);
230  if (err == SOCKET_ERROR) {
231    Close();
232    return false;
233  }
234  return true;
235}
236
237void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) {
238  ASSERT(!onconnect_data_.empty());
239  size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
240  ASSERT(sent == onconnect_data_.length());
241  RTC_UNUSED(sent);
242  onconnect_data_.clear();
243}
244
245void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) {
246  char buffer[1024];
247  sprintfn(buffer, sizeof(buffer),
248           "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
249  int len = static_cast<int>(strlen(buffer));
250  int sent = socket->Send(buffer, len);
251  ASSERT(sent == len);
252  RTC_UNUSED2(sent, len);
253}
254
255void PeerConnectionClient::OnMessageFromPeer(int peer_id,
256                                             const std::string& message) {
257  if (message.length() == (sizeof(kByeMessage) - 1) &&
258      message.compare(kByeMessage) == 0) {
259    callback_->OnPeerDisconnected(peer_id);
260  } else {
261    callback_->OnMessageFromPeer(peer_id, message);
262  }
263}
264
265bool PeerConnectionClient::GetHeaderValue(const std::string& data,
266                                          size_t eoh,
267                                          const char* header_pattern,
268                                          size_t* value) {
269  ASSERT(value != NULL);
270  size_t found = data.find(header_pattern);
271  if (found != std::string::npos && found < eoh) {
272    *value = atoi(&data[found + strlen(header_pattern)]);
273    return true;
274  }
275  return false;
276}
277
278bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
279                                          const char* header_pattern,
280                                          std::string* value) {
281  ASSERT(value != NULL);
282  size_t found = data.find(header_pattern);
283  if (found != std::string::npos && found < eoh) {
284    size_t begin = found + strlen(header_pattern);
285    size_t end = data.find("\r\n", begin);
286    if (end == std::string::npos)
287      end = eoh;
288    value->assign(data.substr(begin, end - begin));
289    return true;
290  }
291  return false;
292}
293
294bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
295                                          std::string* data,
296                                          size_t* content_length) {
297  char buffer[0xffff];
298  do {
299    int bytes = socket->Recv(buffer, sizeof(buffer));
300    if (bytes <= 0)
301      break;
302    data->append(buffer, bytes);
303  } while (true);
304
305  bool ret = false;
306  size_t i = data->find("\r\n\r\n");
307  if (i != std::string::npos) {
308    LOG(INFO) << "Headers received";
309    if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
310      size_t total_response_size = (i + 4) + *content_length;
311      if (data->length() >= total_response_size) {
312        ret = true;
313        std::string should_close;
314        const char kConnection[] = "\r\nConnection: ";
315        if (GetHeaderValue(*data, i, kConnection, &should_close) &&
316            should_close.compare("close") == 0) {
317          socket->Close();
318          // Since we closed the socket, there was no notification delivered
319          // to us.  Compensate by letting ourselves know.
320          OnClose(socket, 0);
321        }
322      } else {
323        // We haven't received everything.  Just continue to accept data.
324      }
325    } else {
326      LOG(LS_ERROR) << "No content length field specified by the server.";
327    }
328  }
329  return ret;
330}
331
332void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) {
333  size_t content_length = 0;
334  if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
335    size_t peer_id = 0, eoh = 0;
336    bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
337                                  &eoh);
338    if (ok) {
339      if (my_id_ == -1) {
340        // First response.  Let's store our server assigned ID.
341        ASSERT(state_ == SIGNING_IN);
342        my_id_ = static_cast<int>(peer_id);
343        ASSERT(my_id_ != -1);
344
345        // The body of the response will be a list of already connected peers.
346        if (content_length) {
347          size_t pos = eoh + 4;
348          while (pos < control_data_.size()) {
349            size_t eol = control_data_.find('\n', pos);
350            if (eol == std::string::npos)
351              break;
352            int id = 0;
353            std::string name;
354            bool connected;
355            if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
356                           &connected) && id != my_id_) {
357              peers_[id] = name;
358              callback_->OnPeerConnected(id, name);
359            }
360            pos = eol + 1;
361          }
362        }
363        ASSERT(is_connected());
364        callback_->OnSignedIn();
365      } else if (state_ == SIGNING_OUT) {
366        Close();
367        callback_->OnDisconnected();
368      } else if (state_ == SIGNING_OUT_WAITING) {
369        SignOut();
370      }
371    }
372
373    control_data_.clear();
374
375    if (state_ == SIGNING_IN) {
376      ASSERT(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
377      state_ = CONNECTED;
378      hanging_get_->Connect(server_address_);
379    }
380  }
381}
382
383void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) {
384  LOG(INFO) << __FUNCTION__;
385  size_t content_length = 0;
386  if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
387    size_t peer_id = 0, eoh = 0;
388    bool ok = ParseServerResponse(notification_data_, content_length,
389                                  &peer_id, &eoh);
390
391    if (ok) {
392      // Store the position where the body begins.
393      size_t pos = eoh + 4;
394
395      if (my_id_ == static_cast<int>(peer_id)) {
396        // A notification about a new member or a member that just
397        // disconnected.
398        int id = 0;
399        std::string name;
400        bool connected = false;
401        if (ParseEntry(notification_data_.substr(pos), &name, &id,
402                       &connected)) {
403          if (connected) {
404            peers_[id] = name;
405            callback_->OnPeerConnected(id, name);
406          } else {
407            peers_.erase(id);
408            callback_->OnPeerDisconnected(id);
409          }
410        }
411      } else {
412        OnMessageFromPeer(static_cast<int>(peer_id),
413                          notification_data_.substr(pos));
414      }
415    }
416
417    notification_data_.clear();
418  }
419
420  if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
421      state_ == CONNECTED) {
422    hanging_get_->Connect(server_address_);
423  }
424}
425
426bool PeerConnectionClient::ParseEntry(const std::string& entry,
427                                      std::string* name,
428                                      int* id,
429                                      bool* connected) {
430  ASSERT(name != NULL);
431  ASSERT(id != NULL);
432  ASSERT(connected != NULL);
433  ASSERT(!entry.empty());
434
435  *connected = false;
436  size_t separator = entry.find(',');
437  if (separator != std::string::npos) {
438    *id = atoi(&entry[separator + 1]);
439    name->assign(entry.substr(0, separator));
440    separator = entry.find(',', separator + 1);
441    if (separator != std::string::npos) {
442      *connected = atoi(&entry[separator + 1]) ? true : false;
443    }
444  }
445  return !name->empty();
446}
447
448int PeerConnectionClient::GetResponseStatus(const std::string& response) {
449  int status = -1;
450  size_t pos = response.find(' ');
451  if (pos != std::string::npos)
452    status = atoi(&response[pos + 1]);
453  return status;
454}
455
456bool PeerConnectionClient::ParseServerResponse(const std::string& response,
457                                               size_t content_length,
458                                               size_t* peer_id,
459                                               size_t* eoh) {
460  int status = GetResponseStatus(response.c_str());
461  if (status != 200) {
462    LOG(LS_ERROR) << "Received error from server";
463    Close();
464    callback_->OnDisconnected();
465    return false;
466  }
467
468  *eoh = response.find("\r\n\r\n");
469  ASSERT(*eoh != std::string::npos);
470  if (*eoh == std::string::npos)
471    return false;
472
473  *peer_id = -1;
474
475  // See comment in peer_channel.cc for why we use the Pragma header and
476  // not e.g. "X-Peer-Id".
477  GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
478
479  return true;
480}
481
482void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) {
483  LOG(INFO) << __FUNCTION__;
484
485  socket->Close();
486
487#ifdef WIN32
488  if (err != WSAECONNREFUSED) {
489#else
490  if (err != ECONNREFUSED) {
491#endif
492    if (socket == hanging_get_.get()) {
493      if (state_ == CONNECTED) {
494        hanging_get_->Close();
495        hanging_get_->Connect(server_address_);
496      }
497    } else {
498      callback_->OnMessageSent(err);
499    }
500  } else {
501    if (socket == control_socket_.get()) {
502      LOG(WARNING) << "Connection refused; retrying in 2 seconds";
503      rtc::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
504    } else {
505      Close();
506      callback_->OnDisconnected();
507    }
508  }
509}
510
511void PeerConnectionClient::OnMessage(rtc::Message* msg) {
512  // ignore msg; there is currently only one supported message ("retry")
513  DoConnect();
514}
515