port.cc revision f74420b3285b9fe04a7e00aa3b8c0ab07ea344bc
1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(_MSC_VER) && _MSC_VER < 1300
29#pragma warning(disable:4786)
30#endif
31
32#include <algorithm>
33#include <vector>
34
35#include "talk/base/asyncudpsocket.h"
36#include "talk/base/asynctcpsocket.h"
37#include "talk/base/helpers.h"
38#include "talk/base/logging.h"
39#include "talk/base/scoped_ptr.h"
40#include "talk/base/socketadapters.h"
41#include "talk/base/stringutils.h"
42#include "talk/p2p/base/common.h"
43#include "talk/p2p/base/port.h"
44
45#if defined(_MSC_VER) && _MSC_VER < 1300
46namespace std {
47  using ::memcmp;
48}
49#endif
50
51namespace {
52
53// The length of time we wait before timing out readability on a connection.
54const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000;   // 30 seconds
55
56// The length of time we wait before timing out writability on a connection.
57const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000;  // 15 seconds
58
59// The length of time we wait before we become unwritable.
60const uint32 CONNECTION_WRITE_CONNECT_TIMEOUT = 5 * 1000;  // 5 seconds
61
62// The number of pings that must fail to respond before we become unwritable.
63const uint32 CONNECTION_WRITE_CONNECT_FAILURES = 5;
64
65// This is the length of time that we wait for a ping response to come back.
66const int CONNECTION_RESPONSE_TIMEOUT = 5 * 1000;   // 5 seconds
67
68// Determines whether we have seen at least the given maximum number of
69// pings fail to have a response.
70inline bool TooManyFailures(
71    const std::vector<uint32>& pings_since_last_response,
72    uint32 maximum_failures,
73    uint32 rtt_estimate,
74    uint32 now) {
75
76  // If we haven't sent that many pings, then we can't have failed that many.
77  if (pings_since_last_response.size() < maximum_failures)
78    return false;
79
80  // Check if the window in which we would expect a response to the ping has
81  // already elapsed.
82  return pings_since_last_response[maximum_failures - 1] + rtt_estimate < now;
83}
84
85// Determines whether we have gone too long without seeing any response.
86inline bool TooLongWithoutResponse(
87    const std::vector<uint32>& pings_since_last_response,
88    uint32 maximum_time,
89    uint32 now) {
90
91  if (pings_since_last_response.size() == 0)
92    return false;
93
94  return pings_since_last_response[0] + maximum_time < now;
95}
96
97// We will restrict RTT estimates (when used for determining state) to be
98// within a reasonable range.
99const uint32 MINIMUM_RTT = 100;   // 0.1 seconds
100const uint32 MAXIMUM_RTT = 3000;  // 3 seconds
101
102// When we don't have any RTT data, we have to pick something reasonable.  We
103// use a large value just in case the connection is really slow.
104const uint32 DEFAULT_RTT = MAXIMUM_RTT;
105
106// Computes our estimate of the RTT given the current estimate.
107inline uint32 ConservativeRTTEstimate(uint32 rtt) {
108  return talk_base::_max(MINIMUM_RTT, talk_base::_min(MAXIMUM_RTT, 2 * rtt));
109}
110
111// Weighting of the old rtt value to new data.
112const int RTT_RATIO = 3;  // 3 : 1
113
114// The delay before we begin checking if this port is useless.
115const int kPortTimeoutDelay = 30 * 1000;  // 30 seconds
116
117const uint32 MSG_CHECKTIMEOUT = 1;
118const uint32 MSG_DELETE = 1;
119}
120
121namespace cricket {
122
123static const char* const PROTO_NAMES[] = { "udp", "tcp", "ssltcp" };
124
125const char* ProtoToString(ProtocolType proto) {
126  return PROTO_NAMES[proto];
127}
128
129bool StringToProto(const char* value, ProtocolType* proto) {
130  for (size_t i = 0; i <= PROTO_LAST; ++i) {
131    if (strcmp(PROTO_NAMES[i], value) == 0) {
132      *proto = static_cast<ProtocolType>(i);
133      return true;
134    }
135  }
136  return false;
137}
138
139Port::Port(talk_base::Thread* thread, const std::string& type,
140           talk_base::SocketFactory* factory, talk_base::Network* network)
141  : thread_(thread), factory_(factory), type_(type), network_(network),
142    preference_(-1), lifetime_(LT_PRESTART), enable_port_packets_(false) {
143  if (factory_ == NULL)
144    factory_ = thread_->socketserver();
145
146  set_username_fragment(talk_base::CreateRandomString(16));
147  set_password(talk_base::CreateRandomString(16));
148  LOG_J(LS_INFO, this) << "Port created";
149}
150
151Port::~Port() {
152  // Delete all of the remaining connections.  We copy the list up front
153  // because each deletion will cause it to be modified.
154
155  std::vector<Connection*> list;
156
157  AddressMap::iterator iter = connections_.begin();
158  while (iter != connections_.end()) {
159    list.push_back(iter->second);
160    ++iter;
161  }
162
163  for (uint32 i = 0; i < list.size(); i++)
164    delete list[i];
165}
166
167Connection* Port::GetConnection(const talk_base::SocketAddress& remote_addr) {
168  AddressMap::const_iterator iter = connections_.find(remote_addr);
169  if (iter != connections_.end())
170    return iter->second;
171  else
172    return NULL;
173}
174
175void Port::AddAddress(const talk_base::SocketAddress& address,
176                      const std::string& protocol,
177                      bool final) {
178  Candidate c;
179  c.set_name(name_);
180  c.set_type(type_);
181  c.set_protocol(protocol);
182  c.set_address(address);
183  c.set_preference(preference_);
184  c.set_username(username_frag_);
185  c.set_password(password_);
186  c.set_network_name(network_->name());
187  c.set_generation(generation_);
188  candidates_.push_back(c);
189
190  if (final)
191    SignalAddressReady(this);
192}
193
194void Port::AddConnection(Connection* conn) {
195  connections_[conn->remote_candidate().address()] = conn;
196  conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed);
197  SignalConnectionCreated(this, conn);
198}
199
200void Port::OnReadPacket(
201    const char* data, size_t size, const talk_base::SocketAddress& addr) {
202  // If the user has enabled port packets, just hand this over.
203  if (enable_port_packets_) {
204    SignalReadPacket(this, data, size, addr);
205    return;
206  }
207
208  // If this is an authenticated STUN request, then signal unknown address and
209  // send back a proper binding response.
210  StunMessage* msg;
211  std::string remote_username;
212  if (!GetStunMessage(data, size, addr, msg, remote_username)) {
213    LOG_J(LS_ERROR, this) << "Received non-STUN packet from unknown address ("
214                          << addr.ToString() << ")";
215  } else if (!msg) {
216    // STUN message handled already
217  } else if (msg->type() == STUN_BINDING_REQUEST) {
218    SignalUnknownAddress(this, addr, msg, remote_username);
219  } else {
220    // NOTE(tschmelcher): This is benign. It occurs if we pruned a
221    // connection for this port while it had STUN requests in flight, because
222    // we then get back responses for them, which this code correctly does not
223    // handle.
224    LOG_J(LS_ERROR, this) << "Received unexpected STUN message type ("
225                          << msg->type() << ") from unknown address ("
226                          << addr.ToString() << ")";
227    delete msg;
228  }
229}
230
231void Port::SendBindingRequest(Connection* conn) {
232  // Construct the request message.
233  StunMessage request;
234  request.SetType(STUN_BINDING_REQUEST);
235  request.SetTransactionID(talk_base::CreateRandomString(16));
236
237  StunByteStringAttribute* username_attr =
238      StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
239  std::string username = conn->remote_candidate().username();
240  username.append(username_frag_);
241  username_attr->CopyBytes(username.c_str(), username.size());
242  request.AddAttribute(username_attr);
243
244  // Send the request message.
245  // NOTE: If we wanted to, this is where we would add the HMAC.
246  talk_base::ByteBuffer buf;
247  request.Write(&buf);
248  SendTo(buf.Data(), buf.Length(), conn->remote_candidate().address(), false);
249}
250
251bool Port::GetStunMessage(const char* data, size_t size,
252                          const talk_base::SocketAddress& addr,
253                          StunMessage *& msg, std::string& remote_username) {
254  // NOTE: This could clearly be optimized to avoid allocating any memory.
255  //       However, at the data rates we'll be looking at on the client side,
256  //       this probably isn't worth worrying about.
257
258  msg = 0;
259
260  // Parse the request message.  If the packet is not a complete and correct
261  // STUN message, then ignore it.
262  talk_base::scoped_ptr<StunMessage> stun_msg(new StunMessage());
263  talk_base::ByteBuffer buf(data, size);
264  if (!stun_msg->Read(&buf) || (buf.Length() > 0)) {
265    return false;
266  }
267
268  // The packet must include a username that either begins or ends with our
269  // fragment.  It should begin with our fragment if it is a request and it
270  // should end with our fragment if it is a response.
271  const StunByteStringAttribute* username_attr =
272      stun_msg->GetByteString(STUN_ATTR_USERNAME);
273
274  int remote_frag_len = (username_attr ? username_attr->length() : 0);
275  remote_frag_len -= static_cast<int>(username_frag_.size());
276
277  if (stun_msg->type() == STUN_BINDING_REQUEST) {
278    if (remote_frag_len < 0) {
279      // Username not present or corrupted, don't reply.
280      LOG_J(LS_ERROR, this) << "Received STUN request without username";
281      return true;
282    } else if (std::memcmp(username_attr->bytes(), username_frag_.c_str(),
283                           username_frag_.size()) != 0) {
284      LOG_J(LS_ERROR, this) << "Received STUN request with bad username";
285      SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_BAD_REQUEST,
286                               STUN_ERROR_REASON_BAD_REQUEST);
287      return true;
288    }
289
290    remote_username.assign(username_attr->bytes() + username_frag_.size(),
291      username_attr->bytes() + username_attr->length());
292  } else if ((stun_msg->type() == STUN_BINDING_RESPONSE)
293      || (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE)) {
294    if (remote_frag_len < 0) {
295      // NOTE(tschmelcher): This is benign. It occurs when the response to a
296      // StunBindingRequest to the real STUN server (which involves no
297      // usernames) took too long to reach us and so the base StunRequest
298      // re-sent itself, resulting in us getting an extraneous second response
299      // that gets forwarded on to this code and correctly discarded.
300      LOG_J(LS_ERROR, this) << "Received STUN response without username";
301      // Do not send error response to a response
302      return true;
303    } else if (std::memcmp(username_attr->bytes() + remote_frag_len,
304                           username_frag_.c_str(),
305                           username_frag_.size()) != 0) {
306      LOG_J(LS_ERROR, this) << "Received STUN response with bad username";
307      // Do not send error response to a response
308      return true;
309    }
310
311    remote_username.assign(username_attr->bytes(),
312      username_attr->bytes() + remote_frag_len);
313
314    if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) {
315      if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) {
316        LOG_J(LS_ERROR, this) << "Received STUN binding error:"
317                              << " class=" << error_code->error_class()
318                              << " number=" << error_code->number()
319                              << " reason='" << error_code->reason() << "'";
320        // Return message to allow error-specific processing
321      } else {
322        LOG_J(LS_ERROR, this)
323          << "Received STUN error response with no error code";
324        // Drop corrupt message
325        return true;
326      }
327    }
328  } else {
329    LOG_J(LS_ERROR, this) << "Received STUN packet with invalid type ("
330                          << stun_msg->type() << ")";
331    return true;
332  }
333
334  // Return the STUN message found.
335  msg = stun_msg.release();
336  return true;
337}
338
339void Port::SendBindingResponse(
340    StunMessage* request, const talk_base::SocketAddress& addr) {
341
342  ASSERT(request->type() == STUN_BINDING_REQUEST);
343
344  // Retrieve the username from the request.
345  const StunByteStringAttribute* username_attr =
346      request->GetByteString(STUN_ATTR_USERNAME);
347  ASSERT(username_attr != NULL);
348  if (username_attr == NULL) {
349    // No valid username, skip the response.
350    return;
351  }
352
353  // Fill in the response message.
354  StunMessage response;
355  response.SetType(STUN_BINDING_RESPONSE);
356  response.SetTransactionID(request->transaction_id());
357
358  StunByteStringAttribute* username2_attr =
359      StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
360  username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
361  response.AddAttribute(username2_attr);
362
363  StunAddressAttribute* addr_attr =
364      StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS);
365  addr_attr->SetFamily(1);
366  addr_attr->SetPort(addr.port());
367  addr_attr->SetIP(addr.ip());
368  response.AddAttribute(addr_attr);
369
370  // Send the response message.
371  // NOTE: If we wanted to, this is where we would add the HMAC.
372  talk_base::ByteBuffer buf;
373  response.Write(&buf);
374  SendTo(buf.Data(), buf.Length(), addr, false);
375
376  // The fact that we received a successful request means that this connection
377  // (if one exists) should now be readable.
378  Connection* conn = GetConnection(addr);
379  ASSERT(conn != NULL);
380  if (conn)
381    conn->ReceivedPing();
382}
383
384void Port::SendBindingErrorResponse(
385    StunMessage* request, const talk_base::SocketAddress& addr, int error_code,
386    const std::string& reason) {
387
388  ASSERT(request->type() == STUN_BINDING_REQUEST);
389
390  // Retrieve the username from the request.  If it didn't have one, we
391  // shouldn't be responding at all.
392  const StunByteStringAttribute* username_attr =
393      request->GetByteString(STUN_ATTR_USERNAME);
394  ASSERT(username_attr != NULL);
395  if (username_attr == NULL) {
396    // No valid username, skip the response.
397    return;
398  }
399
400  // Fill in the response message.
401  StunMessage response;
402  response.SetType(STUN_BINDING_ERROR_RESPONSE);
403  response.SetTransactionID(request->transaction_id());
404
405  StunByteStringAttribute* username2_attr =
406      StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
407  username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
408  response.AddAttribute(username2_attr);
409
410  StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode();
411  error_attr->SetErrorCode(error_code);
412  error_attr->SetReason(reason);
413  response.AddAttribute(error_attr);
414
415  // Send the response message.
416  // NOTE: If we wanted to, this is where we would add the HMAC.
417  talk_base::ByteBuffer buf;
418  response.Write(&buf);
419  SendTo(buf.Data(), buf.Length(), addr, false);
420}
421
422talk_base::AsyncPacketSocket* Port::CreatePacketSocket(ProtocolType proto) {
423  if (proto == PROTO_UDP) {
424    // UDP sockets are simple.
425    return talk_base::AsyncUDPSocket::Create(factory_);
426  } else if (proto == PROTO_TCP || proto == PROTO_SSLTCP) {
427    // Create the base TCP socket. Bail out if this fails.
428    talk_base::AsyncSocket* socket = factory_->CreateAsyncSocket(SOCK_STREAM);
429    if (!socket) {
430      return NULL;
431    }
432
433    // If using a proxy, wrap the socket in a proxy socket.
434    if (proxy().type == talk_base::PROXY_SOCKS5) {
435      socket = new talk_base::AsyncSocksProxySocket(
436          socket, proxy().address, proxy().username, proxy().password);
437    } else if (proxy().type == talk_base::PROXY_HTTPS) {
438      socket = new talk_base::AsyncHttpsProxySocket(
439          socket, user_agent(), proxy().address,
440          proxy().username, proxy().password);
441    }
442
443    // If using SSLTCP, wrap the TCP socket in a pseudo-SSL socket.
444    if (proto == PROTO_SSLTCP) {
445      socket = new talk_base::AsyncSSLSocket(socket);
446    }
447
448    // Finally, wrap that socket in a TCP packet socket.
449    // [Insert obligatory Taco Town reference here]
450    return new talk_base::AsyncTCPSocket(socket);
451  } else {
452    LOG_J(LS_ERROR, this) << "Unknown protocol (" << proto << ")";
453    return NULL;
454  }
455}
456
457void Port::OnMessage(talk_base::Message *pmsg) {
458  ASSERT(pmsg->message_id == MSG_CHECKTIMEOUT);
459  ASSERT(lifetime_ == LT_PRETIMEOUT);
460  lifetime_ = LT_POSTTIMEOUT;
461  CheckTimeout();
462}
463
464std::string Port::ToString() const {
465  std::stringstream ss;
466  ss << "Port[" << name_ << ":" << type_ << ":" << network_->ToString() << "]";
467  return ss.str();
468}
469
470void Port::EnablePortPackets() {
471  enable_port_packets_ = true;
472}
473
474void Port::Start() {
475  // The port sticks around for a minimum lifetime, after which
476  // we destroy it when it drops to zero connections.
477  if (lifetime_ == LT_PRESTART) {
478    lifetime_ = LT_PRETIMEOUT;
479    thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT);
480  } else {
481    LOG_J(LS_WARNING, this) << "Port restart attempted";
482  }
483}
484
485void Port::OnConnectionDestroyed(Connection* conn) {
486  AddressMap::iterator iter =
487      connections_.find(conn->remote_candidate().address());
488  ASSERT(iter != connections_.end());
489  connections_.erase(iter);
490
491  CheckTimeout();
492}
493
494void Port::Destroy() {
495  ASSERT(connections_.empty());
496  LOG_J(LS_INFO, this) << "Port deleted";
497  SignalDestroyed(this);
498  delete this;
499}
500
501void Port::CheckTimeout() {
502  // If this port has no connections, then there's no reason to keep it around.
503  // When the connections time out (both read and write), they will delete
504  // themselves, so if we have any connections, they are either readable or
505  // writable (or still connecting).
506  if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) {
507    Destroy();
508  }
509}
510
511// A ConnectionRequest is a simple STUN ping used to determine writability.
512class ConnectionRequest : public StunRequest {
513 public:
514  explicit ConnectionRequest(Connection* connection) : connection_(connection) {
515  }
516
517  virtual ~ConnectionRequest() {
518  }
519
520  virtual void Prepare(StunMessage* request) {
521    request->SetType(STUN_BINDING_REQUEST);
522    StunByteStringAttribute* username_attr =
523        StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
524    std::string username = connection_->remote_candidate().username();
525    username.append(connection_->port()->username_fragment());
526    username_attr->CopyBytes(username.c_str(), username.size());
527    request->AddAttribute(username_attr);
528  }
529
530  virtual void OnResponse(StunMessage* response) {
531    connection_->OnConnectionRequestResponse(response, Elapsed());
532  }
533
534  virtual void OnErrorResponse(StunMessage* response) {
535    connection_->OnConnectionRequestErrorResponse(response, Elapsed());
536  }
537
538  virtual void OnTimeout() {
539    LOG_J(LS_VERBOSE, connection_) << "Timing-out STUN ping " << id();
540  }
541
542  virtual int GetNextDelay() {
543    // Each request is sent only once.  After a single delay , the request will
544    // time out.
545    timeout_ = true;
546    return CONNECTION_RESPONSE_TIMEOUT;
547  }
548
549 private:
550  Connection* connection_;
551};
552
553//
554// Connection
555//
556
557Connection::Connection(Port* port, size_t index,
558                       const Candidate& remote_candidate)
559  : port_(port), local_candidate_index_(index),
560    remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT),
561    write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false),
562    requests_(port->thread()), rtt_(DEFAULT_RTT),
563    last_ping_sent_(0), last_ping_received_(0), reported_(false) {
564  // Wire up to send stun packets
565  requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket);
566  LOG_J(LS_INFO, this) << "Connection created";
567}
568
569Connection::~Connection() {
570}
571
572const Candidate& Connection::local_candidate() const {
573  if (local_candidate_index_ < port_->candidates().size())
574    return port_->candidates()[local_candidate_index_];
575  ASSERT(false);
576  static Candidate foo;
577  return foo;
578}
579
580void Connection::set_read_state(ReadState value) {
581  ReadState old_value = read_state_;
582  read_state_ = value;
583  if (value != old_value) {
584    LOG_J(LS_VERBOSE, this) << "set_read_state";
585    SignalStateChange(this);
586    CheckTimeout();
587  }
588}
589
590void Connection::set_write_state(WriteState value) {
591  WriteState old_value = write_state_;
592  write_state_ = value;
593  if (value != old_value) {
594    LOG_J(LS_VERBOSE, this) << "set_write_state";
595    SignalStateChange(this);
596    CheckTimeout();
597  }
598}
599
600void Connection::set_connected(bool value) {
601  bool old_value = connected_;
602  connected_ = value;
603  if (value != old_value) {
604    LOG_J(LS_VERBOSE, this) << "set_connected";
605  }
606}
607
608void Connection::OnSendStunPacket(
609    const void* data, size_t size, StunRequest* req) {
610  port_->SendTo(data, size, remote_candidate_.address(), false);
611}
612
613void Connection::OnReadPacket(const char* data, size_t size) {
614  StunMessage* msg;
615  std::string remote_username;
616  const talk_base::SocketAddress& addr(remote_candidate_.address());
617  if (!port_->GetStunMessage(data, size, addr, msg, remote_username)) {
618    // The packet did not parse as a valid STUN message
619
620    // If this connection is readable, then pass along the packet.
621    if (read_state_ == STATE_READABLE) {
622      // readable means data from this address is acceptable
623      // Send it on!
624
625      recv_rate_tracker_.Update(size);
626      SignalReadPacket(this, data, size);
627
628      // If timed out sending writability checks, start up again
629      if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
630        set_write_state(STATE_WRITE_CONNECT);
631    } else {
632      // Not readable means the remote address hasn't send a valid
633      // binding request yet.
634
635      LOG_J(LS_WARNING, this)
636        << "Received non-STUN packet from an unreadable connection.";
637    }
638  } else if (!msg) {
639    // The packet was STUN, but was already handled
640  } else if (remote_username != remote_candidate_.username()) {
641    // Not destined this connection
642    LOG_J(LS_ERROR, this) << "Received STUN packet on wrong address.";
643    if (msg->type() == STUN_BINDING_REQUEST) {
644      port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST,
645                                      STUN_ERROR_REASON_BAD_REQUEST);
646    }
647    delete msg;
648  } else {
649    // The packet is STUN, with the current username
650    // If this is a STUN request, then update the readable bit and respond.
651    // If this is a STUN response, then update the writable bit.
652
653    switch (msg->type()) {
654    case STUN_BINDING_REQUEST:
655      // Incoming, validated stun request from remote peer.
656      // This call will also set the connection readable.
657
658      port_->SendBindingResponse(msg, addr);
659
660      // If timed out sending writability checks, start up again
661      if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
662        set_write_state(STATE_WRITE_CONNECT);
663      break;
664
665    case STUN_BINDING_RESPONSE:
666    case STUN_BINDING_ERROR_RESPONSE:
667      // Response from remote peer. Does it match request sent?
668      // This doesn't just check, it makes callbacks if transaction
669      // id's match
670      requests_.CheckResponse(msg);
671      break;
672
673    default:
674      ASSERT(false);
675      break;
676    }
677
678    // Done with the message; delete
679
680    delete msg;
681  }
682}
683
684void Connection::Prune() {
685  if (!pruned_) {
686    LOG_J(LS_VERBOSE, this) << "Connection pruned";
687    pruned_ = true;
688    requests_.Clear();
689    set_write_state(STATE_WRITE_TIMEOUT);
690  }
691}
692
693void Connection::Destroy() {
694  LOG_J(LS_VERBOSE, this) << "Connection destroyed";
695  set_read_state(STATE_READ_TIMEOUT);
696  set_write_state(STATE_WRITE_TIMEOUT);
697}
698
699void Connection::UpdateState(uint32 now) {
700  // Check the readable state.
701  //
702  // Since we don't know how many pings the other side has attempted, the best
703  // test we can do is a simple window.
704
705  if ((read_state_ == STATE_READABLE) &&
706      (last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) {
707    set_read_state(STATE_READ_TIMEOUT);
708  }
709
710  // Check the writable state.  (The order of these checks is important.)
711  //
712  // Before becoming unwritable, we allow for a fixed number of pings to fail
713  // (i.e., receive no response).  We also have to give the response time to
714  // get back, so we include a conservative estimate of this.
715  //
716  // Before timing out writability, we give a fixed amount of time.  This is to
717  // allow for changes in network conditions.
718
719  uint32 rtt = ConservativeRTTEstimate(rtt_);
720
721  std::string pings;
722  for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
723    char buf[32];
724    talk_base::sprintfn(buf, sizeof(buf), "%u",
725        pings_since_last_response_[i]);
726    pings.append(buf).append(" ");
727  }
728  LOG_J(LS_VERBOSE, this) << "UpdateState(): pings_since_last_response_ = " <<
729      pings << ", rtt = " << rtt << ", now = " << now;
730
731  if ((write_state_ == STATE_WRITABLE) &&
732      TooManyFailures(pings_since_last_response_,
733                      CONNECTION_WRITE_CONNECT_FAILURES,
734                      rtt,
735                      now) &&
736      TooLongWithoutResponse(pings_since_last_response_,
737                             CONNECTION_WRITE_CONNECT_TIMEOUT,
738                             now)) {
739    set_write_state(STATE_WRITE_CONNECT);
740  }
741
742  if ((write_state_ == STATE_WRITE_CONNECT) &&
743      TooLongWithoutResponse(pings_since_last_response_,
744                             CONNECTION_WRITE_TIMEOUT,
745                             now)) {
746    set_write_state(STATE_WRITE_TIMEOUT);
747  }
748}
749
750void Connection::Ping(uint32 now) {
751  ASSERT(connected_);
752  last_ping_sent_ = now;
753  pings_since_last_response_.push_back(now);
754  ConnectionRequest *req = new ConnectionRequest(this);
755  LOG_J(LS_VERBOSE, this) << "Sending STUN ping " << req->id() << " at " << now;
756  requests_.Send(req);
757}
758
759void Connection::ReceivedPing() {
760  last_ping_received_ = talk_base::Time();
761  set_read_state(STATE_READABLE);
762}
763
764std::string Connection::ToString() const {
765  const char CONNECT_STATE_ABBREV[2] = {
766    '-',  // not connected (false)
767    'C',  // connected (true)
768  };
769  const char READ_STATE_ABBREV[2] = {
770    'R',  // STATE_READABLE
771    '-',  // STATE_READ_TIMEOUT
772  };
773  const char WRITE_STATE_ABBREV[3] = {
774    'W',  // STATE_WRITABLE
775    'w',  // STATE_WRITE_CONNECT
776    '-',  // STATE_WRITE_TIMEOUT
777  };
778  const Candidate& local = local_candidate();
779  const Candidate& remote = remote_candidate();
780  std::stringstream ss;
781  ss << "Conn[" << local.generation()
782     << ":" << local.name() << ":" << local.type() << ":"
783     << local.protocol() << ":" << local.address().ToString()
784     << "->" << remote.name() << ":" << remote.type() << ":"
785     << remote.protocol() << ":" << remote.address().ToString()
786     << "|"
787     << CONNECT_STATE_ABBREV[connected()]
788     << READ_STATE_ABBREV[read_state()]
789     << WRITE_STATE_ABBREV[write_state()]
790     << "|" << rtt_ << "]";
791  return ss.str();
792}
793
794void Connection::OnConnectionRequestResponse(StunMessage* response,
795                                             uint32 rtt) {
796  // We have a potentially valid reply from the remote address.
797  // The packet must include a username that ends with our fragment,
798  // since it is a response.
799
800  // Check exact message type
801  bool valid = true;
802  if (response->type() != STUN_BINDING_RESPONSE)
803    valid = false;
804
805  // Must have username attribute
806  const StunByteStringAttribute* username_attr =
807      response->GetByteString(STUN_ATTR_USERNAME);
808  if (valid) {
809    if (!username_attr) {
810      LOG_J(LS_ERROR, this) << "Received likely STUN packet with no username";
811      valid = false;
812    }
813  }
814
815  // Length must be at least the size of our fragment (actually, should
816  // be bigger since our fragment is at the end!)
817  if (valid) {
818    if (username_attr->length() <= port_->username_fragment().size()) {
819      LOG_J(LS_ERROR, this) << "Received likely STUN packet with short username";
820      valid = false;
821    }
822  }
823
824  // Compare our fragment with the end of the username - must be exact match
825  if (valid) {
826    std::string username_fragment = port_->username_fragment();
827    int offset = (int)(username_attr->length() - username_fragment.size());
828    if (std::memcmp(username_attr->bytes() + offset,
829        username_fragment.c_str(), username_fragment.size()) != 0) {
830      LOG_J(LS_ERROR, this) << "Received STUN response with bad username";
831      valid = false;
832    }
833  }
834
835  if (valid) {
836    // Valid response. If we're not already, become writable.  We may be
837    // bringing a pruned connection back to life, but if we don't really want
838    // it, we can always prune it again.
839    set_write_state(STATE_WRITABLE);
840
841    std::string pings;
842    for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
843      char buf[32];
844      talk_base::sprintfn(buf, sizeof(buf), "%u",
845          pings_since_last_response_[i]);
846      pings.append(buf).append(" ");
847    }
848    LOG_J(LS_VERBOSE, this) << "OnConnectionRequestResponse(): "
849        "pings_since_last_response_ = " << pings << ", rtt = " << rtt;
850
851    pings_since_last_response_.clear();
852    rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1);
853
854    LOG_J(LS_VERBOSE, this) << "Received STUN ping response " <<
855        response->transaction_id() << " after rtt = " << rtt;
856  }
857}
858
859void Connection::OnConnectionRequestErrorResponse(StunMessage *response,
860                                                  uint32 rtt) {
861  const StunErrorCodeAttribute* error = response->GetErrorCode();
862  uint32 error_code = error ? error->error_code() : STUN_ERROR_GLOBAL_FAILURE;
863
864  if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE)
865      || (error_code == STUN_ERROR_SERVER_ERROR)
866      || (error_code == STUN_ERROR_UNAUTHORIZED)) {
867    // Recoverable error, retry
868  } else if (error_code == STUN_ERROR_STALE_CREDENTIALS) {
869    // Race failure, retry
870  } else {
871    // This is not a valid connection.
872    LOG_J(LS_ERROR, this) << "Received STUN error response; killing connection";
873    set_write_state(STATE_WRITE_TIMEOUT);
874  }
875}
876
877void Connection::CheckTimeout() {
878  // If both read and write have timed out, then this connection can contribute
879  // no more to p2p socket unless at some later date readability were to come
880  // back.  However, we gave readability a long time to timeout, so at this
881  // point, it seems fair to get rid of this connectoin.
882  if ((read_state_ == STATE_READ_TIMEOUT) &&
883      (write_state_ == STATE_WRITE_TIMEOUT)) {
884    port_->thread()->Post(this, MSG_DELETE);
885  }
886}
887
888void Connection::OnMessage(talk_base::Message *pmsg) {
889  ASSERT(pmsg->message_id == MSG_DELETE);
890
891  LOG_J(LS_INFO, this) << "Connection deleted";
892  SignalDestroyed(this);
893  delete this;
894}
895
896size_t Connection::recv_bytes_second() {
897  return recv_rate_tracker_.bytes_second();
898}
899
900size_t Connection::recv_total_bytes() {
901  return recv_rate_tracker_.total_bytes();
902}
903
904size_t Connection::sent_bytes_second() {
905  return send_rate_tracker_.bytes_second();
906}
907
908size_t Connection::sent_total_bytes() {
909  return send_rate_tracker_.total_bytes();
910}
911
912ProxyConnection::ProxyConnection(Port* port, size_t index,
913                                 const Candidate& candidate)
914  : Connection(port, index, candidate), error_(0) {
915}
916
917int ProxyConnection::Send(const void* data, size_t size) {
918  if (write_state() != STATE_WRITABLE) {
919    error_ = EWOULDBLOCK;
920    return SOCKET_ERROR;
921  }
922  int sent = port_->SendTo(data, size, remote_candidate_.address(), true);
923  if (sent <= 0) {
924    ASSERT(sent < 0);
925    error_ = port_->GetError();
926  } else {
927    send_rate_tracker_.Update(sent);
928  }
929  return sent;
930}
931
932RateTracker::RateTracker()
933    : total_bytes_(0), bytes_second_(0),
934      last_bytes_second_time_(static_cast<uint32>(-1)),
935      last_bytes_second_calc_(0) {
936}
937
938size_t RateTracker::total_bytes() const {
939  return total_bytes_;
940}
941
942size_t RateTracker::bytes_second() {
943  // Snapshot bytes / second calculator. Determine how many seconds have
944  // elapsed since our last reference point. If over 1 second, establish
945  // a new reference point that is an integer number of seconds since the
946  // last one, and compute the bytes over that interval.
947
948  uint32 current_time = talk_base::Time();
949  if (last_bytes_second_time_ != static_cast<uint32>(-1)) {
950    int delta = talk_base::TimeDiff(current_time, last_bytes_second_time_);
951    if (delta >= 1000) {
952      int fraction_time = delta % 1000;
953      int seconds = delta / 1000;
954      int fraction_bytes =
955          static_cast<int>(total_bytes_ - last_bytes_second_calc_) *
956              fraction_time / delta;
957      // Compute "bytes received during the interval" / "seconds in interval"
958      bytes_second_ =
959          (total_bytes_ - last_bytes_second_calc_ - fraction_bytes) / seconds;
960      last_bytes_second_time_ = current_time - fraction_time;
961      last_bytes_second_calc_ = total_bytes_ - fraction_bytes;
962    }
963  }
964  if (last_bytes_second_time_ == static_cast<uint32>(-1)) {
965    last_bytes_second_time_ = current_time;
966    last_bytes_second_calc_ = total_bytes_;
967  }
968
969  return bytes_second_;
970}
971
972void RateTracker::Update(size_t bytes) {
973  total_bytes_ += bytes;
974}
975
976}  // namespace cricket
977