1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/p2p/base/port.h"
29
30#include <algorithm>
31#include <vector>
32
33#include "talk/base/helpers.h"
34#include "talk/base/logging.h"
35#include "talk/base/scoped_ptr.h"
36#include "talk/base/stringutils.h"
37#include "talk/p2p/base/common.h"
38
39namespace {
40
41// The length of time we wait before timing out readability on a connection.
42const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000;   // 30 seconds
43
44// The length of time we wait before timing out writability on a connection.
45const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000;  // 15 seconds
46
47// The length of time we wait before we become unwritable.
48const uint32 CONNECTION_WRITE_CONNECT_TIMEOUT = 5 * 1000;  // 5 seconds
49
50// The number of pings that must fail to respond before we become unwritable.
51const uint32 CONNECTION_WRITE_CONNECT_FAILURES = 5;
52
53// This is the length of time that we wait for a ping response to come back.
54const int CONNECTION_RESPONSE_TIMEOUT = 5 * 1000;   // 5 seconds
55
56// Determines whether we have seen at least the given maximum number of
57// pings fail to have a response.
58inline bool TooManyFailures(
59    const std::vector<uint32>& pings_since_last_response,
60    uint32 maximum_failures,
61    uint32 rtt_estimate,
62    uint32 now) {
63
64  // If we haven't sent that many pings, then we can't have failed that many.
65  if (pings_since_last_response.size() < maximum_failures)
66    return false;
67
68  // Check if the window in which we would expect a response to the ping has
69  // already elapsed.
70  return pings_since_last_response[maximum_failures - 1] + rtt_estimate < now;
71}
72
73// Determines whether we have gone too long without seeing any response.
74inline bool TooLongWithoutResponse(
75    const std::vector<uint32>& pings_since_last_response,
76    uint32 maximum_time,
77    uint32 now) {
78
79  if (pings_since_last_response.size() == 0)
80    return false;
81
82  return pings_since_last_response[0] + maximum_time < now;
83}
84
85// We will restrict RTT estimates (when used for determining state) to be
86// within a reasonable range.
87const uint32 MINIMUM_RTT = 100;   // 0.1 seconds
88const uint32 MAXIMUM_RTT = 3000;  // 3 seconds
89
90// When we don't have any RTT data, we have to pick something reasonable.  We
91// use a large value just in case the connection is really slow.
92const uint32 DEFAULT_RTT = MAXIMUM_RTT;
93
94// Computes our estimate of the RTT given the current estimate.
95inline uint32 ConservativeRTTEstimate(uint32 rtt) {
96  return talk_base::_max(MINIMUM_RTT, talk_base::_min(MAXIMUM_RTT, 2 * rtt));
97}
98
99// Weighting of the old rtt value to new data.
100const int RTT_RATIO = 3;  // 3 : 1
101
102// The delay before we begin checking if this port is useless.
103const int kPortTimeoutDelay = 30 * 1000;  // 30 seconds
104
105const uint32 MSG_CHECKTIMEOUT = 1;
106const uint32 MSG_DELETE = 1;
107}
108
109namespace cricket {
110
111static const char* const PROTO_NAMES[] = { "udp", "tcp", "ssltcp" };
112
113const char* ProtoToString(ProtocolType proto) {
114  return PROTO_NAMES[proto];
115}
116
117bool StringToProto(const char* value, ProtocolType* proto) {
118  for (size_t i = 0; i <= PROTO_LAST; ++i) {
119    if (strcmp(PROTO_NAMES[i], value) == 0) {
120      *proto = static_cast<ProtocolType>(i);
121      return true;
122    }
123  }
124  return false;
125}
126
127Port::Port(talk_base::Thread* thread, const std::string& type,
128           talk_base::PacketSocketFactory* factory, talk_base::Network* network,
129           uint32 ip, int min_port, int max_port)
130    : thread_(thread),
131      factory_(factory),
132      type_(type),
133      network_(network),
134      ip_(ip),
135      min_port_(min_port),
136      max_port_(max_port),
137      preference_(-1),
138      lifetime_(LT_PRESTART),
139      enable_port_packets_(false) {
140  ASSERT(factory_ != NULL);
141
142  set_username_fragment(talk_base::CreateRandomString(16));
143  set_password(talk_base::CreateRandomString(16));
144  LOG_J(LS_INFO, this) << "Port created";
145}
146
147Port::~Port() {
148  // Delete all of the remaining connections.  We copy the list up front
149  // because each deletion will cause it to be modified.
150
151  std::vector<Connection*> list;
152
153  AddressMap::iterator iter = connections_.begin();
154  while (iter != connections_.end()) {
155    list.push_back(iter->second);
156    ++iter;
157  }
158
159  for (uint32 i = 0; i < list.size(); i++)
160    delete list[i];
161}
162
163Connection* Port::GetConnection(const talk_base::SocketAddress& remote_addr) {
164  AddressMap::const_iterator iter = connections_.find(remote_addr);
165  if (iter != connections_.end())
166    return iter->second;
167  else
168    return NULL;
169}
170
171void Port::AddAddress(const talk_base::SocketAddress& address,
172                      const std::string& protocol,
173                      bool final) {
174  Candidate c;
175  c.set_name(name_);
176  c.set_type(type_);
177  c.set_protocol(protocol);
178  c.set_address(address);
179  c.set_preference(preference_);
180  c.set_username(username_frag_);
181  c.set_password(password_);
182  c.set_network_name(network_->name());
183  c.set_generation(generation_);
184  candidates_.push_back(c);
185
186  if (final)
187    SignalAddressReady(this);
188}
189
190void Port::AddConnection(Connection* conn) {
191  connections_[conn->remote_candidate().address()] = conn;
192  conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed);
193  SignalConnectionCreated(this, conn);
194}
195
196void Port::OnReadPacket(
197    const char* data, size_t size, const talk_base::SocketAddress& addr) {
198  // If the user has enabled port packets, just hand this over.
199  if (enable_port_packets_) {
200    SignalReadPacket(this, data, size, addr);
201    return;
202  }
203
204  // If this is an authenticated STUN request, then signal unknown address and
205  // send back a proper binding response.
206  StunMessage* msg;
207  std::string remote_username;
208  if (!GetStunMessage(data, size, addr, &msg, &remote_username)) {
209    LOG_J(LS_ERROR, this) << "Received non-STUN packet from unknown address ("
210                          << addr.ToString() << ")";
211  } else if (!msg) {
212    // STUN message handled already
213  } else if (msg->type() == STUN_BINDING_REQUEST) {
214    SignalUnknownAddress(this, addr, msg, remote_username);
215  } else {
216    // NOTE(tschmelcher): This is benign. It occurs if we pruned a
217    // connection for this port while it had STUN requests in flight, because
218    // we then get back responses for them, which this code correctly does not
219    // handle.
220    LOG_J(LS_ERROR, this) << "Received unexpected STUN message type ("
221                          << msg->type() << ") from unknown address ("
222                          << addr.ToString() << ")";
223    delete msg;
224  }
225}
226
227bool Port::GetStunMessage(const char* data, size_t size,
228                          const talk_base::SocketAddress& addr,
229                          StunMessage** out_msg, std::string* out_username) {
230  // NOTE: This could clearly be optimized to avoid allocating any memory.
231  //       However, at the data rates we'll be looking at on the client side,
232  //       this probably isn't worth worrying about.
233  ASSERT(out_msg != NULL);
234  ASSERT(out_username != NULL);
235  *out_msg = NULL;
236  out_username->clear();
237
238  // Parse the request message.  If the packet is not a complete and correct
239  // STUN message, then ignore it.
240  talk_base::scoped_ptr<StunMessage> stun_msg(new StunMessage());
241  talk_base::ByteBuffer buf(data, size);
242  if (!stun_msg->Read(&buf) || (buf.Length() > 0)) {
243    return false;
244  }
245
246  // The packet must include a username that either begins or ends with our
247  // fragment.  It should begin with our fragment if it is a request and it
248  // should end with our fragment if it is a response.
249  const StunByteStringAttribute* username_attr =
250      stun_msg->GetByteString(STUN_ATTR_USERNAME);
251
252  int remote_frag_len = (username_attr ? username_attr->length() : 0);
253  remote_frag_len -= static_cast<int>(username_frag_.size());
254
255  if (stun_msg->type() == STUN_BINDING_REQUEST) {
256    if (remote_frag_len < 0) {
257      // Username not present or corrupted, don't reply.
258      LOG_J(LS_ERROR, this) << "Received STUN request without username from "
259                            << addr.ToString();
260      return true;
261    } else if (std::memcmp(username_attr->bytes(), username_frag_.c_str(),
262                           username_frag_.size()) != 0) {
263      LOG_J(LS_ERROR, this) << "Received STUN request with bad local username "
264                            << std::string(username_attr->bytes(),
265                                           username_attr->length()) << " from "
266                            << addr.ToString();
267      SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_BAD_REQUEST,
268                               STUN_ERROR_REASON_BAD_REQUEST);
269      return true;
270    }
271
272    out_username->assign(username_attr->bytes() + username_frag_.size(),
273                         username_attr->bytes() + username_attr->length());
274  } else if ((stun_msg->type() == STUN_BINDING_RESPONSE)
275      || (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE)) {
276    if (remote_frag_len < 0) {
277      LOG_J(LS_ERROR, this) << "Received STUN response without username from "
278                            << addr.ToString();
279      // Do not send error response to a response
280      return true;
281    } else if (std::memcmp(username_attr->bytes() + remote_frag_len,
282                           username_frag_.c_str(),
283                           username_frag_.size()) != 0) {
284      LOG_J(LS_ERROR, this) << "Received STUN response with bad local username "
285                            << std::string(username_attr->bytes(),
286                                           username_attr->length()) << " from "
287                            << addr.ToString();
288      // Do not send error response to a response
289      return true;
290    }
291
292    out_username->assign(username_attr->bytes(),
293                         username_attr->bytes() + remote_frag_len);
294
295    if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) {
296      if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) {
297        LOG_J(LS_ERROR, this) << "Received STUN binding error:"
298                              << " class="
299                              << static_cast<int>(error_code->error_class())
300                              << " number="
301                              << static_cast<int>(error_code->number())
302                              << " reason='" << error_code->reason() << "'"
303                              << " from " << addr.ToString();
304        // Return message to allow error-specific processing
305      } else {
306        LOG_J(LS_ERROR, this) << "Received STUN binding error without a error "
307                              << "code from " << addr.ToString();
308        // Drop corrupt message
309        return true;
310      }
311    }
312  } else {
313    LOG_J(LS_ERROR, this) << "Received STUN packet with invalid type ("
314                          << stun_msg->type() << ") from " << addr.ToString();
315    return true;
316  }
317
318  // Return the STUN message found.
319  *out_msg = stun_msg.release();
320  return true;
321}
322
323void Port::SendBindingResponse(StunMessage* request,
324                               const talk_base::SocketAddress& addr) {
325  ASSERT(request->type() == STUN_BINDING_REQUEST);
326
327  // Retrieve the username from the request.
328  const StunByteStringAttribute* username_attr =
329      request->GetByteString(STUN_ATTR_USERNAME);
330  ASSERT(username_attr != NULL);
331  if (username_attr == NULL) {
332    // No valid username, skip the response.
333    return;
334  }
335
336  // Fill in the response message.
337  StunMessage response;
338  response.SetType(STUN_BINDING_RESPONSE);
339  response.SetTransactionID(request->transaction_id());
340
341  StunByteStringAttribute* username2_attr =
342      StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
343  username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
344  response.AddAttribute(username2_attr);
345
346  StunAddressAttribute* addr_attr =
347      StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS);
348  addr_attr->SetFamily(1);
349  addr_attr->SetPort(addr.port());
350  addr_attr->SetIP(addr.ip());
351  response.AddAttribute(addr_attr);
352
353  // Send the response message.
354  // NOTE: If we wanted to, this is where we would add the HMAC.
355  talk_base::ByteBuffer buf;
356  response.Write(&buf);
357  if (SendTo(buf.Data(), buf.Length(), addr, false) < 0) {
358    LOG_J(LS_ERROR, this) << "Failed to send STUN ping response to "
359                          << addr.ToString();
360  }
361
362  // The fact that we received a successful request means that this connection
363  // (if one exists) should now be readable.
364  Connection* conn = GetConnection(addr);
365  ASSERT(conn != NULL);
366  if (conn)
367    conn->ReceivedPing();
368}
369
370void Port::SendBindingErrorResponse(StunMessage* request,
371                                    const talk_base::SocketAddress& addr,
372                                    int error_code, const std::string& reason) {
373  ASSERT(request->type() == STUN_BINDING_REQUEST);
374
375  // Retrieve the username from the request. If it didn't have one, we
376  // shouldn't be responding at all.
377  const StunByteStringAttribute* username_attr =
378      request->GetByteString(STUN_ATTR_USERNAME);
379  ASSERT(username_attr != NULL);
380  if (username_attr == NULL) {
381    // No valid username, skip the response.
382    return;
383  }
384
385  // Fill in the response message.
386  StunMessage response;
387  response.SetType(STUN_BINDING_ERROR_RESPONSE);
388  response.SetTransactionID(request->transaction_id());
389
390  StunByteStringAttribute* username2_attr =
391      StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
392  username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
393  response.AddAttribute(username2_attr);
394
395  StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode();
396  error_attr->SetErrorCode(error_code);
397  error_attr->SetReason(reason);
398  response.AddAttribute(error_attr);
399
400  // Send the response message.
401  // NOTE: If we wanted to, this is where we would add the HMAC.
402  talk_base::ByteBuffer buf;
403  response.Write(&buf);
404  SendTo(buf.Data(), buf.Length(), addr, false);
405  LOG_J(LS_INFO, this) << "Sending STUN binding error: reason=" << reason
406                       << " to " << addr.ToString();
407}
408
409void Port::OnMessage(talk_base::Message *pmsg) {
410  ASSERT(pmsg->message_id == MSG_CHECKTIMEOUT);
411  ASSERT(lifetime_ == LT_PRETIMEOUT);
412  lifetime_ = LT_POSTTIMEOUT;
413  CheckTimeout();
414}
415
416std::string Port::ToString() const {
417  std::stringstream ss;
418  ss << "Port[" << name_ << ":" << type_ << ":" << network_->ToString() << "]";
419  return ss.str();
420}
421
422void Port::EnablePortPackets() {
423  enable_port_packets_ = true;
424}
425
426void Port::Start() {
427  // The port sticks around for a minimum lifetime, after which
428  // we destroy it when it drops to zero connections.
429  if (lifetime_ == LT_PRESTART) {
430    lifetime_ = LT_PRETIMEOUT;
431    thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT);
432  } else {
433    LOG_J(LS_WARNING, this) << "Port restart attempted";
434  }
435}
436
437void Port::OnConnectionDestroyed(Connection* conn) {
438  AddressMap::iterator iter =
439      connections_.find(conn->remote_candidate().address());
440  ASSERT(iter != connections_.end());
441  connections_.erase(iter);
442
443  CheckTimeout();
444}
445
446void Port::Destroy() {
447  ASSERT(connections_.empty());
448  LOG_J(LS_INFO, this) << "Port deleted";
449  SignalDestroyed(this);
450  delete this;
451}
452
453void Port::CheckTimeout() {
454  // If this port has no connections, then there's no reason to keep it around.
455  // When the connections time out (both read and write), they will delete
456  // themselves, so if we have any connections, they are either readable or
457  // writable (or still connecting).
458  if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) {
459    Destroy();
460  }
461}
462
463// A ConnectionRequest is a simple STUN ping used to determine writability.
464class ConnectionRequest : public StunRequest {
465 public:
466  explicit ConnectionRequest(Connection* connection) : connection_(connection) {
467  }
468
469  virtual ~ConnectionRequest() {
470  }
471
472  virtual void Prepare(StunMessage* request) {
473    request->SetType(STUN_BINDING_REQUEST);
474    StunByteStringAttribute* username_attr =
475        StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
476    std::string username = connection_->remote_candidate().username();
477    username.append(connection_->port()->username_fragment());
478    username_attr->CopyBytes(username.c_str(), username.size());
479    request->AddAttribute(username_attr);
480  }
481
482  virtual void OnResponse(StunMessage* response) {
483    connection_->OnConnectionRequestResponse(this, response);
484  }
485
486  virtual void OnErrorResponse(StunMessage* response) {
487    connection_->OnConnectionRequestErrorResponse(this, response);
488  }
489
490  virtual void OnTimeout() {
491    connection_->OnConnectionRequestTimeout(this);
492  }
493
494  virtual int GetNextDelay() {
495    // Each request is sent only once.  After a single delay , the request will
496    // time out.
497    timeout_ = true;
498    return CONNECTION_RESPONSE_TIMEOUT;
499  }
500
501 private:
502  Connection* connection_;
503};
504
505//
506// Connection
507//
508
509Connection::Connection(Port* port, size_t index,
510                       const Candidate& remote_candidate)
511  : port_(port), local_candidate_index_(index),
512    remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT),
513    write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false),
514    requests_(port->thread()), rtt_(DEFAULT_RTT),
515    last_ping_sent_(0), last_ping_received_(0), last_data_received_(0),
516    reported_(false) {
517  // Wire up to send stun packets
518  requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket);
519  LOG_J(LS_INFO, this) << "Connection created";
520}
521
522Connection::~Connection() {
523}
524
525const Candidate& Connection::local_candidate() const {
526  if (local_candidate_index_ < port_->candidates().size())
527    return port_->candidates()[local_candidate_index_];
528  ASSERT(false);
529  static Candidate foo;
530  return foo;
531}
532
533void Connection::set_read_state(ReadState value) {
534  ReadState old_value = read_state_;
535  read_state_ = value;
536  if (value != old_value) {
537    LOG_J(LS_VERBOSE, this) << "set_read_state";
538    SignalStateChange(this);
539    CheckTimeout();
540  }
541}
542
543void Connection::set_write_state(WriteState value) {
544  WriteState old_value = write_state_;
545  write_state_ = value;
546  if (value != old_value) {
547    LOG_J(LS_VERBOSE, this) << "set_write_state";
548    SignalStateChange(this);
549    CheckTimeout();
550  }
551}
552
553void Connection::set_connected(bool value) {
554  bool old_value = connected_;
555  connected_ = value;
556  if (value != old_value) {
557    LOG_J(LS_VERBOSE, this) << "set_connected";
558  }
559}
560
561void Connection::OnSendStunPacket(const void* data, size_t size,
562                                  StunRequest* req) {
563  if (port_->SendTo(data, size, remote_candidate_.address(), false) < 0) {
564    LOG_J(LS_WARNING, this) << "Failed to send STUN ping " << req->id();
565  }
566}
567
568void Connection::OnReadPacket(const char* data, size_t size) {
569  StunMessage* msg;
570  std::string remote_username;
571  const talk_base::SocketAddress& addr(remote_candidate_.address());
572  if (!port_->GetStunMessage(data, size, addr, &msg, &remote_username)) {
573    // The packet did not parse as a valid STUN message
574
575    // If this connection is readable, then pass along the packet.
576    if (read_state_ == STATE_READABLE) {
577      // readable means data from this address is acceptable
578      // Send it on!
579
580      last_data_received_ = talk_base::Time();
581      recv_rate_tracker_.Update(size);
582      SignalReadPacket(this, data, size);
583
584      // If timed out sending writability checks, start up again
585      if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
586        set_write_state(STATE_WRITE_CONNECT);
587    } else {
588      // Not readable means the remote address hasn't sent a valid
589      // binding request yet.
590
591      LOG_J(LS_WARNING, this)
592        << "Received non-STUN packet from an unreadable connection.";
593    }
594  } else if (!msg) {
595    // The packet was STUN, but was already handled internally.
596  } else if (remote_username != remote_candidate_.username()) {
597    // The packet had the right local username, but the remote username was
598    // not the right one for the remote address.
599    if (msg->type() == STUN_BINDING_REQUEST) {
600      LOG_J(LS_ERROR, this) << "Received STUN request with bad remote username "
601                            << remote_username;
602      port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST,
603                                      STUN_ERROR_REASON_BAD_REQUEST);
604    } else if (msg->type() == STUN_BINDING_RESPONSE ||
605               msg->type() == STUN_BINDING_ERROR_RESPONSE) {
606      LOG_J(LS_ERROR, this) << "Received STUN response with bad remote username"
607                            " " << remote_username;
608    }
609    delete msg;
610  } else {
611    // The packet is STUN, with the right username.
612    // If this is a STUN request, then update the readable bit and respond.
613    // If this is a STUN response, then update the writable bit.
614
615    switch (msg->type()) {
616    case STUN_BINDING_REQUEST:
617      // Incoming, validated stun request from remote peer.
618      // This call will also set the connection readable.
619
620      port_->SendBindingResponse(msg, addr);
621
622      // If timed out sending writability checks, start up again
623      if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
624        set_write_state(STATE_WRITE_CONNECT);
625      break;
626
627    case STUN_BINDING_RESPONSE:
628    case STUN_BINDING_ERROR_RESPONSE:
629      // Response from remote peer. Does it match request sent?
630      // This doesn't just check, it makes callbacks if transaction
631      // id's match
632      requests_.CheckResponse(msg);
633      break;
634
635    default:
636      ASSERT(false);
637      break;
638    }
639
640    // Done with the message; delete
641
642    delete msg;
643  }
644}
645
646void Connection::Prune() {
647  if (!pruned_) {
648    LOG_J(LS_VERBOSE, this) << "Connection pruned";
649    pruned_ = true;
650    requests_.Clear();
651    set_write_state(STATE_WRITE_TIMEOUT);
652  }
653}
654
655void Connection::Destroy() {
656  LOG_J(LS_VERBOSE, this) << "Connection destroyed";
657  set_read_state(STATE_READ_TIMEOUT);
658  set_write_state(STATE_WRITE_TIMEOUT);
659}
660
661void Connection::UpdateState(uint32 now) {
662  uint32 rtt = ConservativeRTTEstimate(rtt_);
663
664  std::string pings;
665  for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
666    char buf[32];
667    talk_base::sprintfn(buf, sizeof(buf), "%u",
668        pings_since_last_response_[i]);
669    pings.append(buf).append(" ");
670  }
671  LOG_J(LS_VERBOSE, this) << "UpdateState(): pings_since_last_response_=" <<
672      pings << ", rtt=" << rtt << ", now=" << now;
673
674  // Check the readable state.
675  //
676  // Since we don't know how many pings the other side has attempted, the best
677  // test we can do is a simple window.
678
679  if ((read_state_ == STATE_READABLE) &&
680      (last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) {
681    LOG_J(LS_INFO, this) << "Unreadable after "
682                         << now - last_ping_received_
683                         << " ms without a ping, rtt=" << rtt;
684    set_read_state(STATE_READ_TIMEOUT);
685  }
686
687  // Check the writable state.  (The order of these checks is important.)
688  //
689  // Before becoming unwritable, we allow for a fixed number of pings to fail
690  // (i.e., receive no response).  We also have to give the response time to
691  // get back, so we include a conservative estimate of this.
692  //
693  // Before timing out writability, we give a fixed amount of time.  This is to
694  // allow for changes in network conditions.
695
696  if ((write_state_ == STATE_WRITABLE) &&
697      TooManyFailures(pings_since_last_response_,
698                      CONNECTION_WRITE_CONNECT_FAILURES,
699                      rtt,
700                      now) &&
701      TooLongWithoutResponse(pings_since_last_response_,
702                             CONNECTION_WRITE_CONNECT_TIMEOUT,
703                             now)) {
704    uint32 max_pings = CONNECTION_WRITE_CONNECT_FAILURES;
705    LOG_J(LS_INFO, this) << "Unwritable after " << max_pings
706                         << " ping failures and "
707                         << now - pings_since_last_response_[0]
708                         << " ms without a response,"
709                         << " ms since last received ping="
710                         << now - last_ping_received_
711                         << " ms since last received data="
712                         << now - last_data_received_
713                         << " rtt=" << rtt;
714    set_write_state(STATE_WRITE_CONNECT);
715  }
716
717  if ((write_state_ == STATE_WRITE_CONNECT) &&
718      TooLongWithoutResponse(pings_since_last_response_,
719                             CONNECTION_WRITE_TIMEOUT,
720                             now)) {
721    LOG_J(LS_INFO, this) << "Timed out after "
722                         << now - pings_since_last_response_[0]
723                         << " ms without a response, rtt=" << rtt;
724    set_write_state(STATE_WRITE_TIMEOUT);
725  }
726}
727
728void Connection::Ping(uint32 now) {
729  ASSERT(connected_);
730  last_ping_sent_ = now;
731  pings_since_last_response_.push_back(now);
732  ConnectionRequest *req = new ConnectionRequest(this);
733  LOG_J(LS_VERBOSE, this) << "Sending STUN ping " << req->id() << " at " << now;
734  requests_.Send(req);
735}
736
737void Connection::ReceivedPing() {
738  last_ping_received_ = talk_base::Time();
739  set_read_state(STATE_READABLE);
740}
741
742std::string Connection::ToString() const {
743  const char CONNECT_STATE_ABBREV[2] = {
744    '-',  // not connected (false)
745    'C',  // connected (true)
746  };
747  const char READ_STATE_ABBREV[2] = {
748    'R',  // STATE_READABLE
749    '-',  // STATE_READ_TIMEOUT
750  };
751  const char WRITE_STATE_ABBREV[3] = {
752    'W',  // STATE_WRITABLE
753    'w',  // STATE_WRITE_CONNECT
754    '-',  // STATE_WRITE_TIMEOUT
755  };
756  const Candidate& local = local_candidate();
757  const Candidate& remote = remote_candidate();
758  std::stringstream ss;
759  ss << "Conn[" << local.generation()
760     << ":" << local.name() << ":" << local.type() << ":"
761     << local.protocol() << ":" << local.address().ToString()
762     << "->" << remote.name() << ":" << remote.type() << ":"
763     << remote.protocol() << ":" << remote.address().ToString()
764     << "|"
765     << CONNECT_STATE_ABBREV[connected()]
766     << READ_STATE_ABBREV[read_state()]
767     << WRITE_STATE_ABBREV[write_state()]
768     << "|";
769  if (rtt_ < DEFAULT_RTT) {
770    ss << rtt_ << "]";
771  } else {
772    ss << "-]";
773  }
774  return ss.str();
775}
776
777void Connection::OnConnectionRequestResponse(ConnectionRequest* request,
778                                             StunMessage* response) {
779  // We've already validated that this is a STUN binding response with
780  // the correct local and remote username for this connection.
781  // So if we're not already, become writable. We may be bringing a pruned
782  // connection back to life, but if we don't really want it, we can always
783  // prune it again.
784  uint32 rtt = request->Elapsed();
785  set_write_state(STATE_WRITABLE);
786
787  std::string pings;
788  for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
789    char buf[32];
790    talk_base::sprintfn(buf, sizeof(buf), "%u",
791        pings_since_last_response_[i]);
792    pings.append(buf).append(" ");
793  }
794
795  LOG_J(LS_VERBOSE, this) << "Received STUN ping response " << request->id()
796                          << ", pings_since_last_response_=" << pings
797                          << ", rtt=" << rtt;
798
799  pings_since_last_response_.clear();
800  rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1);
801}
802
803void Connection::OnConnectionRequestErrorResponse(ConnectionRequest* request,
804                                                  StunMessage* response) {
805  const StunErrorCodeAttribute* error = response->GetErrorCode();
806  uint32 error_code = error ?
807      error->error_code() : static_cast<uint32>(STUN_ERROR_GLOBAL_FAILURE);
808
809  if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE)
810      || (error_code == STUN_ERROR_SERVER_ERROR)
811      || (error_code == STUN_ERROR_UNAUTHORIZED)) {
812    // Recoverable error, retry
813  } else if (error_code == STUN_ERROR_STALE_CREDENTIALS) {
814    // Race failure, retry
815  } else {
816    // This is not a valid connection.
817    LOG_J(LS_ERROR, this) << "Received STUN error response, code="
818                          << error_code << "; killing connection";
819    set_write_state(STATE_WRITE_TIMEOUT);
820  }
821}
822
823void Connection::OnConnectionRequestTimeout(ConnectionRequest* request) {
824  // Log at LS_INFO if we miss a ping on a writable connection.
825  talk_base::LoggingSeverity sev = (write_state_ == STATE_WRITABLE) ?
826      talk_base::LS_INFO : talk_base::LS_VERBOSE;
827  uint32 when = talk_base::Time() - request->Elapsed();
828  size_t failures;
829  for (failures = 0; failures < pings_since_last_response_.size(); ++failures) {
830    if (pings_since_last_response_[failures] > when) {
831      break;
832    }
833  }
834  LOG_JV(sev, this) << "Timing-out STUN ping " << request->id()
835                    << " after " << request->Elapsed()
836                    << " ms, failures=" << failures;
837}
838
839void Connection::CheckTimeout() {
840  // If both read and write have timed out, then this connection can contribute
841  // no more to p2p socket unless at some later date readability were to come
842  // back.  However, we gave readability a long time to timeout, so at this
843  // point, it seems fair to get rid of this connection.
844  if ((read_state_ == STATE_READ_TIMEOUT) &&
845      (write_state_ == STATE_WRITE_TIMEOUT)) {
846    port_->thread()->Post(this, MSG_DELETE);
847  }
848}
849
850void Connection::OnMessage(talk_base::Message *pmsg) {
851  ASSERT(pmsg->message_id == MSG_DELETE);
852
853  LOG_J(LS_INFO, this) << "Connection deleted";
854  SignalDestroyed(this);
855  delete this;
856}
857
858size_t Connection::recv_bytes_second() {
859  return recv_rate_tracker_.units_second();
860}
861
862size_t Connection::recv_total_bytes() {
863  return recv_rate_tracker_.total_units();
864}
865
866size_t Connection::sent_bytes_second() {
867  return send_rate_tracker_.units_second();
868}
869
870size_t Connection::sent_total_bytes() {
871  return send_rate_tracker_.total_units();
872}
873
874ProxyConnection::ProxyConnection(Port* port, size_t index,
875                                 const Candidate& candidate)
876  : Connection(port, index, candidate), error_(0) {
877}
878
879int ProxyConnection::Send(const void* data, size_t size) {
880  if (write_state() != STATE_WRITABLE) {
881    error_ = EWOULDBLOCK;
882    return SOCKET_ERROR;
883  }
884  int sent = port_->SendTo(data, size, remote_candidate_.address(), true);
885  if (sent <= 0) {
886    ASSERT(sent < 0);
887    error_ = port_->GetError();
888  } else {
889    send_rate_tracker_.Update(sent);
890  }
891  return sent;
892}
893
894}  // namespace cricket
895