port.cc revision f74420b3285b9fe04a7e00aa3b8c0ab07ea344bc
1/* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#if defined(_MSC_VER) && _MSC_VER < 1300 29#pragma warning(disable:4786) 30#endif 31 32#include <algorithm> 33#include <vector> 34 35#include "talk/base/asyncudpsocket.h" 36#include "talk/base/asynctcpsocket.h" 37#include "talk/base/helpers.h" 38#include "talk/base/logging.h" 39#include "talk/base/scoped_ptr.h" 40#include "talk/base/socketadapters.h" 41#include "talk/base/stringutils.h" 42#include "talk/p2p/base/common.h" 43#include "talk/p2p/base/port.h" 44 45#if defined(_MSC_VER) && _MSC_VER < 1300 46namespace std { 47 using ::memcmp; 48} 49#endif 50 51namespace { 52 53// The length of time we wait before timing out readability on a connection. 54const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000; // 30 seconds 55 56// The length of time we wait before timing out writability on a connection. 57const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000; // 15 seconds 58 59// The length of time we wait before we become unwritable. 60const uint32 CONNECTION_WRITE_CONNECT_TIMEOUT = 5 * 1000; // 5 seconds 61 62// The number of pings that must fail to respond before we become unwritable. 63const uint32 CONNECTION_WRITE_CONNECT_FAILURES = 5; 64 65// This is the length of time that we wait for a ping response to come back. 66const int CONNECTION_RESPONSE_TIMEOUT = 5 * 1000; // 5 seconds 67 68// Determines whether we have seen at least the given maximum number of 69// pings fail to have a response. 70inline bool TooManyFailures( 71 const std::vector<uint32>& pings_since_last_response, 72 uint32 maximum_failures, 73 uint32 rtt_estimate, 74 uint32 now) { 75 76 // If we haven't sent that many pings, then we can't have failed that many. 77 if (pings_since_last_response.size() < maximum_failures) 78 return false; 79 80 // Check if the window in which we would expect a response to the ping has 81 // already elapsed. 82 return pings_since_last_response[maximum_failures - 1] + rtt_estimate < now; 83} 84 85// Determines whether we have gone too long without seeing any response. 86inline bool TooLongWithoutResponse( 87 const std::vector<uint32>& pings_since_last_response, 88 uint32 maximum_time, 89 uint32 now) { 90 91 if (pings_since_last_response.size() == 0) 92 return false; 93 94 return pings_since_last_response[0] + maximum_time < now; 95} 96 97// We will restrict RTT estimates (when used for determining state) to be 98// within a reasonable range. 99const uint32 MINIMUM_RTT = 100; // 0.1 seconds 100const uint32 MAXIMUM_RTT = 3000; // 3 seconds 101 102// When we don't have any RTT data, we have to pick something reasonable. We 103// use a large value just in case the connection is really slow. 104const uint32 DEFAULT_RTT = MAXIMUM_RTT; 105 106// Computes our estimate of the RTT given the current estimate. 107inline uint32 ConservativeRTTEstimate(uint32 rtt) { 108 return talk_base::_max(MINIMUM_RTT, talk_base::_min(MAXIMUM_RTT, 2 * rtt)); 109} 110 111// Weighting of the old rtt value to new data. 112const int RTT_RATIO = 3; // 3 : 1 113 114// The delay before we begin checking if this port is useless. 115const int kPortTimeoutDelay = 30 * 1000; // 30 seconds 116 117const uint32 MSG_CHECKTIMEOUT = 1; 118const uint32 MSG_DELETE = 1; 119} 120 121namespace cricket { 122 123static const char* const PROTO_NAMES[] = { "udp", "tcp", "ssltcp" }; 124 125const char* ProtoToString(ProtocolType proto) { 126 return PROTO_NAMES[proto]; 127} 128 129bool StringToProto(const char* value, ProtocolType* proto) { 130 for (size_t i = 0; i <= PROTO_LAST; ++i) { 131 if (strcmp(PROTO_NAMES[i], value) == 0) { 132 *proto = static_cast<ProtocolType>(i); 133 return true; 134 } 135 } 136 return false; 137} 138 139Port::Port(talk_base::Thread* thread, const std::string& type, 140 talk_base::SocketFactory* factory, talk_base::Network* network) 141 : thread_(thread), factory_(factory), type_(type), network_(network), 142 preference_(-1), lifetime_(LT_PRESTART), enable_port_packets_(false) { 143 if (factory_ == NULL) 144 factory_ = thread_->socketserver(); 145 146 set_username_fragment(talk_base::CreateRandomString(16)); 147 set_password(talk_base::CreateRandomString(16)); 148 LOG_J(LS_INFO, this) << "Port created"; 149} 150 151Port::~Port() { 152 // Delete all of the remaining connections. We copy the list up front 153 // because each deletion will cause it to be modified. 154 155 std::vector<Connection*> list; 156 157 AddressMap::iterator iter = connections_.begin(); 158 while (iter != connections_.end()) { 159 list.push_back(iter->second); 160 ++iter; 161 } 162 163 for (uint32 i = 0; i < list.size(); i++) 164 delete list[i]; 165} 166 167Connection* Port::GetConnection(const talk_base::SocketAddress& remote_addr) { 168 AddressMap::const_iterator iter = connections_.find(remote_addr); 169 if (iter != connections_.end()) 170 return iter->second; 171 else 172 return NULL; 173} 174 175void Port::AddAddress(const talk_base::SocketAddress& address, 176 const std::string& protocol, 177 bool final) { 178 Candidate c; 179 c.set_name(name_); 180 c.set_type(type_); 181 c.set_protocol(protocol); 182 c.set_address(address); 183 c.set_preference(preference_); 184 c.set_username(username_frag_); 185 c.set_password(password_); 186 c.set_network_name(network_->name()); 187 c.set_generation(generation_); 188 candidates_.push_back(c); 189 190 if (final) 191 SignalAddressReady(this); 192} 193 194void Port::AddConnection(Connection* conn) { 195 connections_[conn->remote_candidate().address()] = conn; 196 conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed); 197 SignalConnectionCreated(this, conn); 198} 199 200void Port::OnReadPacket( 201 const char* data, size_t size, const talk_base::SocketAddress& addr) { 202 // If the user has enabled port packets, just hand this over. 203 if (enable_port_packets_) { 204 SignalReadPacket(this, data, size, addr); 205 return; 206 } 207 208 // If this is an authenticated STUN request, then signal unknown address and 209 // send back a proper binding response. 210 StunMessage* msg; 211 std::string remote_username; 212 if (!GetStunMessage(data, size, addr, msg, remote_username)) { 213 LOG_J(LS_ERROR, this) << "Received non-STUN packet from unknown address (" 214 << addr.ToString() << ")"; 215 } else if (!msg) { 216 // STUN message handled already 217 } else if (msg->type() == STUN_BINDING_REQUEST) { 218 SignalUnknownAddress(this, addr, msg, remote_username); 219 } else { 220 // NOTE(tschmelcher): This is benign. It occurs if we pruned a 221 // connection for this port while it had STUN requests in flight, because 222 // we then get back responses for them, which this code correctly does not 223 // handle. 224 LOG_J(LS_ERROR, this) << "Received unexpected STUN message type (" 225 << msg->type() << ") from unknown address (" 226 << addr.ToString() << ")"; 227 delete msg; 228 } 229} 230 231void Port::SendBindingRequest(Connection* conn) { 232 // Construct the request message. 233 StunMessage request; 234 request.SetType(STUN_BINDING_REQUEST); 235 request.SetTransactionID(talk_base::CreateRandomString(16)); 236 237 StunByteStringAttribute* username_attr = 238 StunAttribute::CreateByteString(STUN_ATTR_USERNAME); 239 std::string username = conn->remote_candidate().username(); 240 username.append(username_frag_); 241 username_attr->CopyBytes(username.c_str(), username.size()); 242 request.AddAttribute(username_attr); 243 244 // Send the request message. 245 // NOTE: If we wanted to, this is where we would add the HMAC. 246 talk_base::ByteBuffer buf; 247 request.Write(&buf); 248 SendTo(buf.Data(), buf.Length(), conn->remote_candidate().address(), false); 249} 250 251bool Port::GetStunMessage(const char* data, size_t size, 252 const talk_base::SocketAddress& addr, 253 StunMessage *& msg, std::string& remote_username) { 254 // NOTE: This could clearly be optimized to avoid allocating any memory. 255 // However, at the data rates we'll be looking at on the client side, 256 // this probably isn't worth worrying about. 257 258 msg = 0; 259 260 // Parse the request message. If the packet is not a complete and correct 261 // STUN message, then ignore it. 262 talk_base::scoped_ptr<StunMessage> stun_msg(new StunMessage()); 263 talk_base::ByteBuffer buf(data, size); 264 if (!stun_msg->Read(&buf) || (buf.Length() > 0)) { 265 return false; 266 } 267 268 // The packet must include a username that either begins or ends with our 269 // fragment. It should begin with our fragment if it is a request and it 270 // should end with our fragment if it is a response. 271 const StunByteStringAttribute* username_attr = 272 stun_msg->GetByteString(STUN_ATTR_USERNAME); 273 274 int remote_frag_len = (username_attr ? username_attr->length() : 0); 275 remote_frag_len -= static_cast<int>(username_frag_.size()); 276 277 if (stun_msg->type() == STUN_BINDING_REQUEST) { 278 if (remote_frag_len < 0) { 279 // Username not present or corrupted, don't reply. 280 LOG_J(LS_ERROR, this) << "Received STUN request without username"; 281 return true; 282 } else if (std::memcmp(username_attr->bytes(), username_frag_.c_str(), 283 username_frag_.size()) != 0) { 284 LOG_J(LS_ERROR, this) << "Received STUN request with bad username"; 285 SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_BAD_REQUEST, 286 STUN_ERROR_REASON_BAD_REQUEST); 287 return true; 288 } 289 290 remote_username.assign(username_attr->bytes() + username_frag_.size(), 291 username_attr->bytes() + username_attr->length()); 292 } else if ((stun_msg->type() == STUN_BINDING_RESPONSE) 293 || (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE)) { 294 if (remote_frag_len < 0) { 295 // NOTE(tschmelcher): This is benign. It occurs when the response to a 296 // StunBindingRequest to the real STUN server (which involves no 297 // usernames) took too long to reach us and so the base StunRequest 298 // re-sent itself, resulting in us getting an extraneous second response 299 // that gets forwarded on to this code and correctly discarded. 300 LOG_J(LS_ERROR, this) << "Received STUN response without username"; 301 // Do not send error response to a response 302 return true; 303 } else if (std::memcmp(username_attr->bytes() + remote_frag_len, 304 username_frag_.c_str(), 305 username_frag_.size()) != 0) { 306 LOG_J(LS_ERROR, this) << "Received STUN response with bad username"; 307 // Do not send error response to a response 308 return true; 309 } 310 311 remote_username.assign(username_attr->bytes(), 312 username_attr->bytes() + remote_frag_len); 313 314 if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) { 315 if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) { 316 LOG_J(LS_ERROR, this) << "Received STUN binding error:" 317 << " class=" << error_code->error_class() 318 << " number=" << error_code->number() 319 << " reason='" << error_code->reason() << "'"; 320 // Return message to allow error-specific processing 321 } else { 322 LOG_J(LS_ERROR, this) 323 << "Received STUN error response with no error code"; 324 // Drop corrupt message 325 return true; 326 } 327 } 328 } else { 329 LOG_J(LS_ERROR, this) << "Received STUN packet with invalid type (" 330 << stun_msg->type() << ")"; 331 return true; 332 } 333 334 // Return the STUN message found. 335 msg = stun_msg.release(); 336 return true; 337} 338 339void Port::SendBindingResponse( 340 StunMessage* request, const talk_base::SocketAddress& addr) { 341 342 ASSERT(request->type() == STUN_BINDING_REQUEST); 343 344 // Retrieve the username from the request. 345 const StunByteStringAttribute* username_attr = 346 request->GetByteString(STUN_ATTR_USERNAME); 347 ASSERT(username_attr != NULL); 348 if (username_attr == NULL) { 349 // No valid username, skip the response. 350 return; 351 } 352 353 // Fill in the response message. 354 StunMessage response; 355 response.SetType(STUN_BINDING_RESPONSE); 356 response.SetTransactionID(request->transaction_id()); 357 358 StunByteStringAttribute* username2_attr = 359 StunAttribute::CreateByteString(STUN_ATTR_USERNAME); 360 username2_attr->CopyBytes(username_attr->bytes(), username_attr->length()); 361 response.AddAttribute(username2_attr); 362 363 StunAddressAttribute* addr_attr = 364 StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS); 365 addr_attr->SetFamily(1); 366 addr_attr->SetPort(addr.port()); 367 addr_attr->SetIP(addr.ip()); 368 response.AddAttribute(addr_attr); 369 370 // Send the response message. 371 // NOTE: If we wanted to, this is where we would add the HMAC. 372 talk_base::ByteBuffer buf; 373 response.Write(&buf); 374 SendTo(buf.Data(), buf.Length(), addr, false); 375 376 // The fact that we received a successful request means that this connection 377 // (if one exists) should now be readable. 378 Connection* conn = GetConnection(addr); 379 ASSERT(conn != NULL); 380 if (conn) 381 conn->ReceivedPing(); 382} 383 384void Port::SendBindingErrorResponse( 385 StunMessage* request, const talk_base::SocketAddress& addr, int error_code, 386 const std::string& reason) { 387 388 ASSERT(request->type() == STUN_BINDING_REQUEST); 389 390 // Retrieve the username from the request. If it didn't have one, we 391 // shouldn't be responding at all. 392 const StunByteStringAttribute* username_attr = 393 request->GetByteString(STUN_ATTR_USERNAME); 394 ASSERT(username_attr != NULL); 395 if (username_attr == NULL) { 396 // No valid username, skip the response. 397 return; 398 } 399 400 // Fill in the response message. 401 StunMessage response; 402 response.SetType(STUN_BINDING_ERROR_RESPONSE); 403 response.SetTransactionID(request->transaction_id()); 404 405 StunByteStringAttribute* username2_attr = 406 StunAttribute::CreateByteString(STUN_ATTR_USERNAME); 407 username2_attr->CopyBytes(username_attr->bytes(), username_attr->length()); 408 response.AddAttribute(username2_attr); 409 410 StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode(); 411 error_attr->SetErrorCode(error_code); 412 error_attr->SetReason(reason); 413 response.AddAttribute(error_attr); 414 415 // Send the response message. 416 // NOTE: If we wanted to, this is where we would add the HMAC. 417 talk_base::ByteBuffer buf; 418 response.Write(&buf); 419 SendTo(buf.Data(), buf.Length(), addr, false); 420} 421 422talk_base::AsyncPacketSocket* Port::CreatePacketSocket(ProtocolType proto) { 423 if (proto == PROTO_UDP) { 424 // UDP sockets are simple. 425 return talk_base::AsyncUDPSocket::Create(factory_); 426 } else if (proto == PROTO_TCP || proto == PROTO_SSLTCP) { 427 // Create the base TCP socket. Bail out if this fails. 428 talk_base::AsyncSocket* socket = factory_->CreateAsyncSocket(SOCK_STREAM); 429 if (!socket) { 430 return NULL; 431 } 432 433 // If using a proxy, wrap the socket in a proxy socket. 434 if (proxy().type == talk_base::PROXY_SOCKS5) { 435 socket = new talk_base::AsyncSocksProxySocket( 436 socket, proxy().address, proxy().username, proxy().password); 437 } else if (proxy().type == talk_base::PROXY_HTTPS) { 438 socket = new talk_base::AsyncHttpsProxySocket( 439 socket, user_agent(), proxy().address, 440 proxy().username, proxy().password); 441 } 442 443 // If using SSLTCP, wrap the TCP socket in a pseudo-SSL socket. 444 if (proto == PROTO_SSLTCP) { 445 socket = new talk_base::AsyncSSLSocket(socket); 446 } 447 448 // Finally, wrap that socket in a TCP packet socket. 449 // [Insert obligatory Taco Town reference here] 450 return new talk_base::AsyncTCPSocket(socket); 451 } else { 452 LOG_J(LS_ERROR, this) << "Unknown protocol (" << proto << ")"; 453 return NULL; 454 } 455} 456 457void Port::OnMessage(talk_base::Message *pmsg) { 458 ASSERT(pmsg->message_id == MSG_CHECKTIMEOUT); 459 ASSERT(lifetime_ == LT_PRETIMEOUT); 460 lifetime_ = LT_POSTTIMEOUT; 461 CheckTimeout(); 462} 463 464std::string Port::ToString() const { 465 std::stringstream ss; 466 ss << "Port[" << name_ << ":" << type_ << ":" << network_->ToString() << "]"; 467 return ss.str(); 468} 469 470void Port::EnablePortPackets() { 471 enable_port_packets_ = true; 472} 473 474void Port::Start() { 475 // The port sticks around for a minimum lifetime, after which 476 // we destroy it when it drops to zero connections. 477 if (lifetime_ == LT_PRESTART) { 478 lifetime_ = LT_PRETIMEOUT; 479 thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT); 480 } else { 481 LOG_J(LS_WARNING, this) << "Port restart attempted"; 482 } 483} 484 485void Port::OnConnectionDestroyed(Connection* conn) { 486 AddressMap::iterator iter = 487 connections_.find(conn->remote_candidate().address()); 488 ASSERT(iter != connections_.end()); 489 connections_.erase(iter); 490 491 CheckTimeout(); 492} 493 494void Port::Destroy() { 495 ASSERT(connections_.empty()); 496 LOG_J(LS_INFO, this) << "Port deleted"; 497 SignalDestroyed(this); 498 delete this; 499} 500 501void Port::CheckTimeout() { 502 // If this port has no connections, then there's no reason to keep it around. 503 // When the connections time out (both read and write), they will delete 504 // themselves, so if we have any connections, they are either readable or 505 // writable (or still connecting). 506 if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) { 507 Destroy(); 508 } 509} 510 511// A ConnectionRequest is a simple STUN ping used to determine writability. 512class ConnectionRequest : public StunRequest { 513 public: 514 explicit ConnectionRequest(Connection* connection) : connection_(connection) { 515 } 516 517 virtual ~ConnectionRequest() { 518 } 519 520 virtual void Prepare(StunMessage* request) { 521 request->SetType(STUN_BINDING_REQUEST); 522 StunByteStringAttribute* username_attr = 523 StunAttribute::CreateByteString(STUN_ATTR_USERNAME); 524 std::string username = connection_->remote_candidate().username(); 525 username.append(connection_->port()->username_fragment()); 526 username_attr->CopyBytes(username.c_str(), username.size()); 527 request->AddAttribute(username_attr); 528 } 529 530 virtual void OnResponse(StunMessage* response) { 531 connection_->OnConnectionRequestResponse(response, Elapsed()); 532 } 533 534 virtual void OnErrorResponse(StunMessage* response) { 535 connection_->OnConnectionRequestErrorResponse(response, Elapsed()); 536 } 537 538 virtual void OnTimeout() { 539 LOG_J(LS_VERBOSE, connection_) << "Timing-out STUN ping " << id(); 540 } 541 542 virtual int GetNextDelay() { 543 // Each request is sent only once. After a single delay , the request will 544 // time out. 545 timeout_ = true; 546 return CONNECTION_RESPONSE_TIMEOUT; 547 } 548 549 private: 550 Connection* connection_; 551}; 552 553// 554// Connection 555// 556 557Connection::Connection(Port* port, size_t index, 558 const Candidate& remote_candidate) 559 : port_(port), local_candidate_index_(index), 560 remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT), 561 write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false), 562 requests_(port->thread()), rtt_(DEFAULT_RTT), 563 last_ping_sent_(0), last_ping_received_(0), reported_(false) { 564 // Wire up to send stun packets 565 requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket); 566 LOG_J(LS_INFO, this) << "Connection created"; 567} 568 569Connection::~Connection() { 570} 571 572const Candidate& Connection::local_candidate() const { 573 if (local_candidate_index_ < port_->candidates().size()) 574 return port_->candidates()[local_candidate_index_]; 575 ASSERT(false); 576 static Candidate foo; 577 return foo; 578} 579 580void Connection::set_read_state(ReadState value) { 581 ReadState old_value = read_state_; 582 read_state_ = value; 583 if (value != old_value) { 584 LOG_J(LS_VERBOSE, this) << "set_read_state"; 585 SignalStateChange(this); 586 CheckTimeout(); 587 } 588} 589 590void Connection::set_write_state(WriteState value) { 591 WriteState old_value = write_state_; 592 write_state_ = value; 593 if (value != old_value) { 594 LOG_J(LS_VERBOSE, this) << "set_write_state"; 595 SignalStateChange(this); 596 CheckTimeout(); 597 } 598} 599 600void Connection::set_connected(bool value) { 601 bool old_value = connected_; 602 connected_ = value; 603 if (value != old_value) { 604 LOG_J(LS_VERBOSE, this) << "set_connected"; 605 } 606} 607 608void Connection::OnSendStunPacket( 609 const void* data, size_t size, StunRequest* req) { 610 port_->SendTo(data, size, remote_candidate_.address(), false); 611} 612 613void Connection::OnReadPacket(const char* data, size_t size) { 614 StunMessage* msg; 615 std::string remote_username; 616 const talk_base::SocketAddress& addr(remote_candidate_.address()); 617 if (!port_->GetStunMessage(data, size, addr, msg, remote_username)) { 618 // The packet did not parse as a valid STUN message 619 620 // If this connection is readable, then pass along the packet. 621 if (read_state_ == STATE_READABLE) { 622 // readable means data from this address is acceptable 623 // Send it on! 624 625 recv_rate_tracker_.Update(size); 626 SignalReadPacket(this, data, size); 627 628 // If timed out sending writability checks, start up again 629 if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) 630 set_write_state(STATE_WRITE_CONNECT); 631 } else { 632 // Not readable means the remote address hasn't send a valid 633 // binding request yet. 634 635 LOG_J(LS_WARNING, this) 636 << "Received non-STUN packet from an unreadable connection."; 637 } 638 } else if (!msg) { 639 // The packet was STUN, but was already handled 640 } else if (remote_username != remote_candidate_.username()) { 641 // Not destined this connection 642 LOG_J(LS_ERROR, this) << "Received STUN packet on wrong address."; 643 if (msg->type() == STUN_BINDING_REQUEST) { 644 port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST, 645 STUN_ERROR_REASON_BAD_REQUEST); 646 } 647 delete msg; 648 } else { 649 // The packet is STUN, with the current username 650 // If this is a STUN request, then update the readable bit and respond. 651 // If this is a STUN response, then update the writable bit. 652 653 switch (msg->type()) { 654 case STUN_BINDING_REQUEST: 655 // Incoming, validated stun request from remote peer. 656 // This call will also set the connection readable. 657 658 port_->SendBindingResponse(msg, addr); 659 660 // If timed out sending writability checks, start up again 661 if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) 662 set_write_state(STATE_WRITE_CONNECT); 663 break; 664 665 case STUN_BINDING_RESPONSE: 666 case STUN_BINDING_ERROR_RESPONSE: 667 // Response from remote peer. Does it match request sent? 668 // This doesn't just check, it makes callbacks if transaction 669 // id's match 670 requests_.CheckResponse(msg); 671 break; 672 673 default: 674 ASSERT(false); 675 break; 676 } 677 678 // Done with the message; delete 679 680 delete msg; 681 } 682} 683 684void Connection::Prune() { 685 if (!pruned_) { 686 LOG_J(LS_VERBOSE, this) << "Connection pruned"; 687 pruned_ = true; 688 requests_.Clear(); 689 set_write_state(STATE_WRITE_TIMEOUT); 690 } 691} 692 693void Connection::Destroy() { 694 LOG_J(LS_VERBOSE, this) << "Connection destroyed"; 695 set_read_state(STATE_READ_TIMEOUT); 696 set_write_state(STATE_WRITE_TIMEOUT); 697} 698 699void Connection::UpdateState(uint32 now) { 700 // Check the readable state. 701 // 702 // Since we don't know how many pings the other side has attempted, the best 703 // test we can do is a simple window. 704 705 if ((read_state_ == STATE_READABLE) && 706 (last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) { 707 set_read_state(STATE_READ_TIMEOUT); 708 } 709 710 // Check the writable state. (The order of these checks is important.) 711 // 712 // Before becoming unwritable, we allow for a fixed number of pings to fail 713 // (i.e., receive no response). We also have to give the response time to 714 // get back, so we include a conservative estimate of this. 715 // 716 // Before timing out writability, we give a fixed amount of time. This is to 717 // allow for changes in network conditions. 718 719 uint32 rtt = ConservativeRTTEstimate(rtt_); 720 721 std::string pings; 722 for (size_t i = 0; i < pings_since_last_response_.size(); ++i) { 723 char buf[32]; 724 talk_base::sprintfn(buf, sizeof(buf), "%u", 725 pings_since_last_response_[i]); 726 pings.append(buf).append(" "); 727 } 728 LOG_J(LS_VERBOSE, this) << "UpdateState(): pings_since_last_response_ = " << 729 pings << ", rtt = " << rtt << ", now = " << now; 730 731 if ((write_state_ == STATE_WRITABLE) && 732 TooManyFailures(pings_since_last_response_, 733 CONNECTION_WRITE_CONNECT_FAILURES, 734 rtt, 735 now) && 736 TooLongWithoutResponse(pings_since_last_response_, 737 CONNECTION_WRITE_CONNECT_TIMEOUT, 738 now)) { 739 set_write_state(STATE_WRITE_CONNECT); 740 } 741 742 if ((write_state_ == STATE_WRITE_CONNECT) && 743 TooLongWithoutResponse(pings_since_last_response_, 744 CONNECTION_WRITE_TIMEOUT, 745 now)) { 746 set_write_state(STATE_WRITE_TIMEOUT); 747 } 748} 749 750void Connection::Ping(uint32 now) { 751 ASSERT(connected_); 752 last_ping_sent_ = now; 753 pings_since_last_response_.push_back(now); 754 ConnectionRequest *req = new ConnectionRequest(this); 755 LOG_J(LS_VERBOSE, this) << "Sending STUN ping " << req->id() << " at " << now; 756 requests_.Send(req); 757} 758 759void Connection::ReceivedPing() { 760 last_ping_received_ = talk_base::Time(); 761 set_read_state(STATE_READABLE); 762} 763 764std::string Connection::ToString() const { 765 const char CONNECT_STATE_ABBREV[2] = { 766 '-', // not connected (false) 767 'C', // connected (true) 768 }; 769 const char READ_STATE_ABBREV[2] = { 770 'R', // STATE_READABLE 771 '-', // STATE_READ_TIMEOUT 772 }; 773 const char WRITE_STATE_ABBREV[3] = { 774 'W', // STATE_WRITABLE 775 'w', // STATE_WRITE_CONNECT 776 '-', // STATE_WRITE_TIMEOUT 777 }; 778 const Candidate& local = local_candidate(); 779 const Candidate& remote = remote_candidate(); 780 std::stringstream ss; 781 ss << "Conn[" << local.generation() 782 << ":" << local.name() << ":" << local.type() << ":" 783 << local.protocol() << ":" << local.address().ToString() 784 << "->" << remote.name() << ":" << remote.type() << ":" 785 << remote.protocol() << ":" << remote.address().ToString() 786 << "|" 787 << CONNECT_STATE_ABBREV[connected()] 788 << READ_STATE_ABBREV[read_state()] 789 << WRITE_STATE_ABBREV[write_state()] 790 << "|" << rtt_ << "]"; 791 return ss.str(); 792} 793 794void Connection::OnConnectionRequestResponse(StunMessage* response, 795 uint32 rtt) { 796 // We have a potentially valid reply from the remote address. 797 // The packet must include a username that ends with our fragment, 798 // since it is a response. 799 800 // Check exact message type 801 bool valid = true; 802 if (response->type() != STUN_BINDING_RESPONSE) 803 valid = false; 804 805 // Must have username attribute 806 const StunByteStringAttribute* username_attr = 807 response->GetByteString(STUN_ATTR_USERNAME); 808 if (valid) { 809 if (!username_attr) { 810 LOG_J(LS_ERROR, this) << "Received likely STUN packet with no username"; 811 valid = false; 812 } 813 } 814 815 // Length must be at least the size of our fragment (actually, should 816 // be bigger since our fragment is at the end!) 817 if (valid) { 818 if (username_attr->length() <= port_->username_fragment().size()) { 819 LOG_J(LS_ERROR, this) << "Received likely STUN packet with short username"; 820 valid = false; 821 } 822 } 823 824 // Compare our fragment with the end of the username - must be exact match 825 if (valid) { 826 std::string username_fragment = port_->username_fragment(); 827 int offset = (int)(username_attr->length() - username_fragment.size()); 828 if (std::memcmp(username_attr->bytes() + offset, 829 username_fragment.c_str(), username_fragment.size()) != 0) { 830 LOG_J(LS_ERROR, this) << "Received STUN response with bad username"; 831 valid = false; 832 } 833 } 834 835 if (valid) { 836 // Valid response. If we're not already, become writable. We may be 837 // bringing a pruned connection back to life, but if we don't really want 838 // it, we can always prune it again. 839 set_write_state(STATE_WRITABLE); 840 841 std::string pings; 842 for (size_t i = 0; i < pings_since_last_response_.size(); ++i) { 843 char buf[32]; 844 talk_base::sprintfn(buf, sizeof(buf), "%u", 845 pings_since_last_response_[i]); 846 pings.append(buf).append(" "); 847 } 848 LOG_J(LS_VERBOSE, this) << "OnConnectionRequestResponse(): " 849 "pings_since_last_response_ = " << pings << ", rtt = " << rtt; 850 851 pings_since_last_response_.clear(); 852 rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1); 853 854 LOG_J(LS_VERBOSE, this) << "Received STUN ping response " << 855 response->transaction_id() << " after rtt = " << rtt; 856 } 857} 858 859void Connection::OnConnectionRequestErrorResponse(StunMessage *response, 860 uint32 rtt) { 861 const StunErrorCodeAttribute* error = response->GetErrorCode(); 862 uint32 error_code = error ? error->error_code() : STUN_ERROR_GLOBAL_FAILURE; 863 864 if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE) 865 || (error_code == STUN_ERROR_SERVER_ERROR) 866 || (error_code == STUN_ERROR_UNAUTHORIZED)) { 867 // Recoverable error, retry 868 } else if (error_code == STUN_ERROR_STALE_CREDENTIALS) { 869 // Race failure, retry 870 } else { 871 // This is not a valid connection. 872 LOG_J(LS_ERROR, this) << "Received STUN error response; killing connection"; 873 set_write_state(STATE_WRITE_TIMEOUT); 874 } 875} 876 877void Connection::CheckTimeout() { 878 // If both read and write have timed out, then this connection can contribute 879 // no more to p2p socket unless at some later date readability were to come 880 // back. However, we gave readability a long time to timeout, so at this 881 // point, it seems fair to get rid of this connectoin. 882 if ((read_state_ == STATE_READ_TIMEOUT) && 883 (write_state_ == STATE_WRITE_TIMEOUT)) { 884 port_->thread()->Post(this, MSG_DELETE); 885 } 886} 887 888void Connection::OnMessage(talk_base::Message *pmsg) { 889 ASSERT(pmsg->message_id == MSG_DELETE); 890 891 LOG_J(LS_INFO, this) << "Connection deleted"; 892 SignalDestroyed(this); 893 delete this; 894} 895 896size_t Connection::recv_bytes_second() { 897 return recv_rate_tracker_.bytes_second(); 898} 899 900size_t Connection::recv_total_bytes() { 901 return recv_rate_tracker_.total_bytes(); 902} 903 904size_t Connection::sent_bytes_second() { 905 return send_rate_tracker_.bytes_second(); 906} 907 908size_t Connection::sent_total_bytes() { 909 return send_rate_tracker_.total_bytes(); 910} 911 912ProxyConnection::ProxyConnection(Port* port, size_t index, 913 const Candidate& candidate) 914 : Connection(port, index, candidate), error_(0) { 915} 916 917int ProxyConnection::Send(const void* data, size_t size) { 918 if (write_state() != STATE_WRITABLE) { 919 error_ = EWOULDBLOCK; 920 return SOCKET_ERROR; 921 } 922 int sent = port_->SendTo(data, size, remote_candidate_.address(), true); 923 if (sent <= 0) { 924 ASSERT(sent < 0); 925 error_ = port_->GetError(); 926 } else { 927 send_rate_tracker_.Update(sent); 928 } 929 return sent; 930} 931 932RateTracker::RateTracker() 933 : total_bytes_(0), bytes_second_(0), 934 last_bytes_second_time_(static_cast<uint32>(-1)), 935 last_bytes_second_calc_(0) { 936} 937 938size_t RateTracker::total_bytes() const { 939 return total_bytes_; 940} 941 942size_t RateTracker::bytes_second() { 943 // Snapshot bytes / second calculator. Determine how many seconds have 944 // elapsed since our last reference point. If over 1 second, establish 945 // a new reference point that is an integer number of seconds since the 946 // last one, and compute the bytes over that interval. 947 948 uint32 current_time = talk_base::Time(); 949 if (last_bytes_second_time_ != static_cast<uint32>(-1)) { 950 int delta = talk_base::TimeDiff(current_time, last_bytes_second_time_); 951 if (delta >= 1000) { 952 int fraction_time = delta % 1000; 953 int seconds = delta / 1000; 954 int fraction_bytes = 955 static_cast<int>(total_bytes_ - last_bytes_second_calc_) * 956 fraction_time / delta; 957 // Compute "bytes received during the interval" / "seconds in interval" 958 bytes_second_ = 959 (total_bytes_ - last_bytes_second_calc_ - fraction_bytes) / seconds; 960 last_bytes_second_time_ = current_time - fraction_time; 961 last_bytes_second_calc_ = total_bytes_ - fraction_bytes; 962 } 963 } 964 if (last_bytes_second_time_ == static_cast<uint32>(-1)) { 965 last_bytes_second_time_ = current_time; 966 last_bytes_second_calc_ = total_bytes_; 967 } 968 969 return bytes_second_; 970} 971 972void RateTracker::Update(size_t bytes) { 973 total_bytes_ += bytes; 974} 975 976} // namespace cricket 977