1/* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#include "talk/p2p/client/basicportallocator.h" 29 30#include <string> 31#include <vector> 32 33#include "talk/base/common.h" 34#include "talk/base/helpers.h" 35#include "talk/base/logging.h" 36#include "talk/p2p/base/basicpacketsocketfactory.h" 37#include "talk/p2p/base/common.h" 38#include "talk/p2p/base/port.h" 39#include "talk/p2p/base/relayport.h" 40#include "talk/p2p/base/stunport.h" 41#include "talk/p2p/base/tcpport.h" 42#include "talk/p2p/base/turnport.h" 43#include "talk/p2p/base/udpport.h" 44 45using talk_base::CreateRandomId; 46using talk_base::CreateRandomString; 47 48namespace { 49 50const uint32 MSG_CONFIG_START = 1; 51const uint32 MSG_CONFIG_READY = 2; 52const uint32 MSG_ALLOCATE = 3; 53const uint32 MSG_ALLOCATION_PHASE = 4; 54const uint32 MSG_SHAKE = 5; 55const uint32 MSG_SEQUENCEOBJECTS_CREATED = 6; 56const uint32 MSG_CONFIG_STOP = 7; 57 58const uint32 ALLOCATE_DELAY = 250; 59const uint32 ALLOCATION_STEP_DELAY = 1 * 1000; 60 61const int PHASE_UDP = 0; 62const int PHASE_RELAY = 1; 63const int PHASE_TCP = 2; 64const int PHASE_SSLTCP = 3; 65 66const int kNumPhases = 4; 67 68// Both these values are in bytes. 69const int kLargeSocketSendBufferSize = 128 * 1024; 70const int kNormalSocketSendBufferSize = 64 * 1024; 71 72const int SHAKE_MIN_DELAY = 45 * 1000; // 45 seconds 73const int SHAKE_MAX_DELAY = 90 * 1000; // 90 seconds 74 75int ShakeDelay() { 76 int range = SHAKE_MAX_DELAY - SHAKE_MIN_DELAY + 1; 77 return SHAKE_MIN_DELAY + CreateRandomId() % range; 78} 79 80} // namespace 81 82namespace cricket { 83 84const uint32 DISABLE_ALL_PHASES = 85 PORTALLOCATOR_DISABLE_UDP 86 | PORTALLOCATOR_DISABLE_TCP 87 | PORTALLOCATOR_DISABLE_STUN 88 | PORTALLOCATOR_DISABLE_RELAY; 89 90// Performs the allocation of ports, in a sequenced (timed) manner, for a given 91// network and IP address. 92class AllocationSequence : public talk_base::MessageHandler, 93 public sigslot::has_slots<> { 94 public: 95 enum State { 96 kInit, // Initial state. 97 kRunning, // Started allocating ports. 98 kStopped, // Stopped from running. 99 kCompleted, // All ports are allocated. 100 101 // kInit --> kRunning --> {kCompleted|kStopped} 102 }; 103 104 AllocationSequence(BasicPortAllocatorSession* session, 105 talk_base::Network* network, 106 PortConfiguration* config, 107 uint32 flags); 108 ~AllocationSequence(); 109 bool Init(); 110 111 State state() const { return state_; } 112 113 // Disables the phases for a new sequence that this one already covers for an 114 // equivalent network setup. 115 void DisableEquivalentPhases(talk_base::Network* network, 116 PortConfiguration* config, uint32* flags); 117 118 // Starts and stops the sequence. When started, it will continue allocating 119 // new ports on its own timed schedule. 120 void Start(); 121 void Stop(); 122 123 // MessageHandler 124 void OnMessage(talk_base::Message* msg); 125 126 void EnableProtocol(ProtocolType proto); 127 bool ProtocolEnabled(ProtocolType proto) const; 128 129 // Signal from AllocationSequence, when it's done with allocating ports. 130 // This signal is useful, when port allocation fails which doesn't result 131 // in any candidates. Using this signal BasicPortAllocatorSession can send 132 // its candidate discovery conclusion signal. Without this signal, 133 // BasicPortAllocatorSession doesn't have any event to trigger signal. This 134 // can also be achieved by starting timer in BPAS. 135 sigslot::signal1<AllocationSequence*> SignalPortAllocationComplete; 136 137 private: 138 typedef std::vector<ProtocolType> ProtocolList; 139 140 bool IsFlagSet(uint32 flag) { 141 return ((flags_ & flag) != 0); 142 } 143 void CreateUDPPorts(); 144 void CreateTCPPorts(); 145 void CreateStunPorts(); 146 void CreateRelayPorts(); 147 void CreateGturnPort(const RelayServerConfig& config); 148 void CreateTurnPort(const RelayServerConfig& config); 149 150 void OnReadPacket(talk_base::AsyncPacketSocket* socket, 151 const char* data, size_t size, 152 const talk_base::SocketAddress& remote_addr); 153 void OnPortDestroyed(PortInterface* port); 154 155 BasicPortAllocatorSession* session_; 156 talk_base::Network* network_; 157 talk_base::IPAddress ip_; 158 PortConfiguration* config_; 159 State state_; 160 uint32 flags_; 161 ProtocolList protocols_; 162 talk_base::scoped_ptr<talk_base::AsyncPacketSocket> udp_socket_; 163 // Keeping a list of all UDP based ports. 164 std::deque<Port*> ports; 165 int phase_; 166}; 167 168// BasicPortAllocator 169BasicPortAllocator::BasicPortAllocator( 170 talk_base::NetworkManager* network_manager, 171 talk_base::PacketSocketFactory* socket_factory) 172 : network_manager_(network_manager), 173 socket_factory_(socket_factory) { 174 ASSERT(socket_factory_ != NULL); 175 Construct(); 176} 177 178BasicPortAllocator::BasicPortAllocator( 179 talk_base::NetworkManager* network_manager) 180 : network_manager_(network_manager), 181 socket_factory_(NULL) { 182 Construct(); 183} 184 185BasicPortAllocator::BasicPortAllocator( 186 talk_base::NetworkManager* network_manager, 187 talk_base::PacketSocketFactory* socket_factory, 188 const talk_base::SocketAddress& stun_address) 189 : network_manager_(network_manager), 190 socket_factory_(socket_factory), 191 stun_address_(stun_address) { 192 ASSERT(socket_factory_ != NULL); 193 Construct(); 194} 195 196BasicPortAllocator::BasicPortAllocator( 197 talk_base::NetworkManager* network_manager, 198 const talk_base::SocketAddress& stun_address, 199 const talk_base::SocketAddress& relay_address_udp, 200 const talk_base::SocketAddress& relay_address_tcp, 201 const talk_base::SocketAddress& relay_address_ssl) 202 : network_manager_(network_manager), 203 socket_factory_(NULL), 204 stun_address_(stun_address) { 205 206 RelayServerConfig config(RELAY_GTURN); 207 if (!relay_address_udp.IsAny()) 208 config.ports.push_back(ProtocolAddress(relay_address_udp, PROTO_UDP)); 209 if (!relay_address_tcp.IsAny()) 210 config.ports.push_back(ProtocolAddress(relay_address_tcp, PROTO_TCP)); 211 if (!relay_address_ssl.IsAny()) 212 config.ports.push_back(ProtocolAddress(relay_address_ssl, PROTO_SSLTCP)); 213 AddRelay(config); 214 215 Construct(); 216} 217 218void BasicPortAllocator::Construct() { 219 allow_tcp_listen_ = true; 220} 221 222BasicPortAllocator::~BasicPortAllocator() { 223} 224 225PortAllocatorSession *BasicPortAllocator::CreateSessionInternal( 226 const std::string& content_name, int component, 227 const std::string& ice_ufrag, const std::string& ice_pwd) { 228 return new BasicPortAllocatorSession(this, content_name, component, 229 ice_ufrag, ice_pwd); 230} 231 232// BasicPortAllocatorSession 233BasicPortAllocatorSession::BasicPortAllocatorSession( 234 BasicPortAllocator *allocator, 235 const std::string& content_name, 236 int component, 237 const std::string& ice_ufrag, 238 const std::string& ice_pwd) 239 : PortAllocatorSession(content_name, component, 240 ice_ufrag, ice_pwd, allocator->flags()), 241 allocator_(allocator), network_thread_(NULL), 242 socket_factory_(allocator->socket_factory()), 243 configuration_done_(false), 244 allocation_started_(false), 245 network_manager_started_(false), 246 running_(false), 247 allocation_sequences_created_(false) { 248 allocator_->network_manager()->SignalNetworksChanged.connect( 249 this, &BasicPortAllocatorSession::OnNetworksChanged); 250 allocator_->network_manager()->StartUpdating(); 251} 252 253BasicPortAllocatorSession::~BasicPortAllocatorSession() { 254 allocator_->network_manager()->StopUpdating(); 255 if (network_thread_ != NULL) 256 network_thread_->Clear(this); 257 258 std::vector<PortData>::iterator it; 259 for (it = ports_.begin(); it != ports_.end(); it++) 260 delete it->port(); 261 262 for (uint32 i = 0; i < configs_.size(); ++i) 263 delete configs_[i]; 264 265 for (uint32 i = 0; i < sequences_.size(); ++i) 266 delete sequences_[i]; 267} 268 269void BasicPortAllocatorSession::StartGettingPorts() { 270 network_thread_ = talk_base::Thread::Current(); 271 if (!socket_factory_) { 272 owned_socket_factory_.reset( 273 new talk_base::BasicPacketSocketFactory(network_thread_)); 274 socket_factory_ = owned_socket_factory_.get(); 275 } 276 277 running_ = true; 278 network_thread_->Post(this, MSG_CONFIG_START); 279 280 if (flags() & PORTALLOCATOR_ENABLE_SHAKER) 281 network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE); 282} 283 284void BasicPortAllocatorSession::StopGettingPorts() { 285 ASSERT(talk_base::Thread::Current() == network_thread_); 286 running_ = false; 287 network_thread_->Clear(this, MSG_ALLOCATE); 288 for (uint32 i = 0; i < sequences_.size(); ++i) 289 sequences_[i]->Stop(); 290 network_thread_->Post(this, MSG_CONFIG_STOP); 291} 292 293void BasicPortAllocatorSession::OnMessage(talk_base::Message *message) { 294 switch (message->message_id) { 295 case MSG_CONFIG_START: 296 ASSERT(talk_base::Thread::Current() == network_thread_); 297 GetPortConfigurations(); 298 break; 299 300 case MSG_CONFIG_READY: 301 ASSERT(talk_base::Thread::Current() == network_thread_); 302 OnConfigReady(static_cast<PortConfiguration*>(message->pdata)); 303 break; 304 305 case MSG_ALLOCATE: 306 ASSERT(talk_base::Thread::Current() == network_thread_); 307 OnAllocate(); 308 break; 309 310 case MSG_SHAKE: 311 ASSERT(talk_base::Thread::Current() == network_thread_); 312 OnShake(); 313 break; 314 case MSG_SEQUENCEOBJECTS_CREATED: 315 ASSERT(talk_base::Thread::Current() == network_thread_); 316 OnAllocationSequenceObjectsCreated(); 317 break; 318 case MSG_CONFIG_STOP: 319 ASSERT(talk_base::Thread::Current() == network_thread_); 320 OnConfigStop(); 321 break; 322 default: 323 ASSERT(false); 324 } 325} 326 327void BasicPortAllocatorSession::GetPortConfigurations() { 328 PortConfiguration* config = new PortConfiguration(allocator_->stun_address(), 329 username(), 330 password()); 331 332 for (size_t i = 0; i < allocator_->relays().size(); ++i) { 333 config->AddRelay(allocator_->relays()[i]); 334 } 335 ConfigReady(config); 336} 337 338void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) { 339 network_thread_->Post(this, MSG_CONFIG_READY, config); 340} 341 342// Adds a configuration to the list. 343void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) { 344 if (config) 345 configs_.push_back(config); 346 347 AllocatePorts(); 348} 349 350void BasicPortAllocatorSession::OnConfigStop() { 351 ASSERT(talk_base::Thread::Current() == network_thread_); 352 353 // If any of the allocated ports have not completed the candidates allocation, 354 // mark those as error. Since session doesn't need any new candidates 355 // at this stage of the allocation, it's safe to discard any new candidates. 356 bool send_signal = false; 357 for (std::vector<PortData>::iterator it = ports_.begin(); 358 it != ports_.end(); ++it) { 359 if (!it->complete()) { 360 // Updating port state to error, which didn't finish allocating candidates 361 // yet. 362 it->set_error(); 363 send_signal = true; 364 } 365 } 366 367 // Did we stop any running sequences? 368 for (std::vector<AllocationSequence*>::iterator it = sequences_.begin(); 369 it != sequences_.end() && !send_signal; ++it) { 370 if ((*it)->state() == AllocationSequence::kStopped) { 371 send_signal = true; 372 } 373 } 374 375 // If we stopped anything that was running, send a done signal now. 376 if (send_signal) { 377 MaybeSignalCandidatesAllocationDone(); 378 } 379} 380 381void BasicPortAllocatorSession::AllocatePorts() { 382 ASSERT(talk_base::Thread::Current() == network_thread_); 383 network_thread_->Post(this, MSG_ALLOCATE); 384} 385 386void BasicPortAllocatorSession::OnAllocate() { 387 if (network_manager_started_) 388 DoAllocate(); 389 390 allocation_started_ = true; 391 if (running_) 392 network_thread_->PostDelayed(ALLOCATE_DELAY, this, MSG_ALLOCATE); 393} 394 395// For each network, see if we have a sequence that covers it already. If not, 396// create a new sequence to create the appropriate ports. 397void BasicPortAllocatorSession::DoAllocate() { 398 bool done_signal_needed = false; 399 std::vector<talk_base::Network*> networks; 400 allocator_->network_manager()->GetNetworks(&networks); 401 if (networks.empty()) { 402 LOG(LS_WARNING) << "Machine has no networks; no ports will be allocated"; 403 done_signal_needed = true; 404 } else { 405 for (uint32 i = 0; i < networks.size(); ++i) { 406 PortConfiguration* config = NULL; 407 if (configs_.size() > 0) 408 config = configs_.back(); 409 410 uint32 sequence_flags = flags(); 411 if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) { 412 // If all the ports are disabled we should just fire the allocation 413 // done event and return. 414 done_signal_needed = true; 415 break; 416 } 417 418 // Disables phases that are not specified in this config. 419 if (!config || config->stun_address.IsNil()) { 420 // No STUN ports specified in this config. 421 sequence_flags |= PORTALLOCATOR_DISABLE_STUN; 422 } 423 if (!config || config->relays.empty()) { 424 // No relay ports specified in this config. 425 sequence_flags |= PORTALLOCATOR_DISABLE_RELAY; 426 } 427 428 if (!(sequence_flags & PORTALLOCATOR_ENABLE_IPV6) && 429 networks[i]->ip().family() == AF_INET6) { 430 // Skip IPv6 networks unless the flag's been set. 431 continue; 432 } 433 434 // Disable phases that would only create ports equivalent to 435 // ones that we have already made. 436 DisableEquivalentPhases(networks[i], config, &sequence_flags); 437 438 if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) { 439 // New AllocationSequence would have nothing to do, so don't make it. 440 continue; 441 } 442 443 AllocationSequence* sequence = 444 new AllocationSequence(this, networks[i], config, sequence_flags); 445 if (!sequence->Init()) { 446 delete sequence; 447 continue; 448 } 449 done_signal_needed = true; 450 sequence->SignalPortAllocationComplete.connect( 451 this, &BasicPortAllocatorSession::OnPortAllocationComplete); 452 if (running_) 453 sequence->Start(); 454 sequences_.push_back(sequence); 455 } 456 } 457 if (done_signal_needed) { 458 network_thread_->Post(this, MSG_SEQUENCEOBJECTS_CREATED); 459 } 460} 461 462void BasicPortAllocatorSession::OnNetworksChanged() { 463 network_manager_started_ = true; 464 if (allocation_started_) 465 DoAllocate(); 466} 467 468void BasicPortAllocatorSession::DisableEquivalentPhases( 469 talk_base::Network* network, PortConfiguration* config, uint32* flags) { 470 for (uint32 i = 0; i < sequences_.size() && 471 (*flags & DISABLE_ALL_PHASES) != DISABLE_ALL_PHASES; ++i) { 472 sequences_[i]->DisableEquivalentPhases(network, config, flags); 473 } 474} 475 476void BasicPortAllocatorSession::AddAllocatedPort(Port* port, 477 AllocationSequence * seq, 478 bool prepare_address) { 479 if (!port) 480 return; 481 482 LOG(LS_INFO) << "Adding allocated port for " << content_name(); 483 port->set_content_name(content_name()); 484 port->set_component(component_); 485 port->set_generation(generation()); 486 if (allocator_->proxy().type != talk_base::PROXY_NONE) 487 port->set_proxy(allocator_->user_agent(), allocator_->proxy()); 488 port->set_send_retransmit_count_attribute((allocator_->flags() & 489 PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0); 490 491 if (content_name().compare(CN_VIDEO) == 0 && 492 component_ == cricket::ICE_CANDIDATE_COMPONENT_RTP) { 493 // For video RTP alone, we set send-buffer sizes. This used to be set in the 494 // engines/channels. 495 int sendBufSize = (flags() & PORTALLOCATOR_USE_LARGE_SOCKET_SEND_BUFFERS) 496 ? kLargeSocketSendBufferSize 497 : kNormalSocketSendBufferSize; 498 port->SetOption(talk_base::Socket::OPT_SNDBUF, sendBufSize); 499 } 500 501 PortData data(port, seq); 502 ports_.push_back(data); 503 504 port->SignalCandidateReady.connect( 505 this, &BasicPortAllocatorSession::OnCandidateReady); 506 port->SignalPortComplete.connect(this, 507 &BasicPortAllocatorSession::OnPortComplete); 508 port->SignalDestroyed.connect(this, 509 &BasicPortAllocatorSession::OnPortDestroyed); 510 port->SignalPortError.connect( 511 this, &BasicPortAllocatorSession::OnPortError); 512 LOG_J(LS_INFO, port) << "Added port to allocator"; 513 514 if (prepare_address) 515 port->PrepareAddress(); 516 if (running_) 517 port->Start(); 518} 519 520void BasicPortAllocatorSession::OnAllocationSequenceObjectsCreated() { 521 allocation_sequences_created_ = true; 522 // Send candidate allocation complete signal if we have no sequences. 523 MaybeSignalCandidatesAllocationDone(); 524} 525 526void BasicPortAllocatorSession::OnCandidateReady( 527 Port* port, const Candidate& c) { 528 ASSERT(talk_base::Thread::Current() == network_thread_); 529 PortData* data = FindPort(port); 530 ASSERT(data != NULL); 531 // Discarding any candidate signal if port allocation status is 532 // already in completed state. 533 if (data->complete()) 534 return; 535 536 // Send candidates whose protocol is enabled. 537 std::vector<Candidate> candidates; 538 ProtocolType pvalue; 539 if (StringToProto(c.protocol().c_str(), &pvalue) && 540 data->sequence()->ProtocolEnabled(pvalue)) { 541 candidates.push_back(c); 542 } 543 544 if (!candidates.empty()) { 545 SignalCandidatesReady(this, candidates); 546 } 547 548 // Moving to READY state as we have atleast one candidate from the port. 549 // Since this port has atleast one candidate we should forward this port 550 // to listners, to allow connections from this port. 551 if (!data->ready()) { 552 data->set_ready(); 553 SignalPortReady(this, port); 554 } 555} 556 557void BasicPortAllocatorSession::OnPortComplete(Port* port) { 558 ASSERT(talk_base::Thread::Current() == network_thread_); 559 PortData* data = FindPort(port); 560 ASSERT(data != NULL); 561 562 // Ignore any late signals. 563 if (data->complete()) 564 return; 565 566 // Moving to COMPLETE state. 567 data->set_complete(); 568 // Send candidate allocation complete signal if this was the last port. 569 MaybeSignalCandidatesAllocationDone(); 570} 571 572void BasicPortAllocatorSession::OnPortError(Port* port) { 573 ASSERT(talk_base::Thread::Current() == network_thread_); 574 PortData* data = FindPort(port); 575 ASSERT(data != NULL); 576 // We might have already given up on this port and stopped it. 577 if (data->complete()) 578 return; 579 580 // SignalAddressError is currently sent from StunPort/TurnPort. 581 // But this signal itself is generic. 582 data->set_error(); 583 // Send candidate allocation complete signal if this was the last port. 584 MaybeSignalCandidatesAllocationDone(); 585} 586 587void BasicPortAllocatorSession::OnProtocolEnabled(AllocationSequence* seq, 588 ProtocolType proto) { 589 std::vector<Candidate> candidates; 590 for (std::vector<PortData>::iterator it = ports_.begin(); 591 it != ports_.end(); ++it) { 592 if (it->sequence() != seq) 593 continue; 594 595 const std::vector<Candidate>& potentials = it->port()->Candidates(); 596 for (size_t i = 0; i < potentials.size(); ++i) { 597 ProtocolType pvalue; 598 if (!StringToProto(potentials[i].protocol().c_str(), &pvalue)) 599 continue; 600 if (pvalue == proto) { 601 candidates.push_back(potentials[i]); 602 } 603 } 604 } 605 606 if (!candidates.empty()) { 607 SignalCandidatesReady(this, candidates); 608 } 609} 610 611void BasicPortAllocatorSession::OnPortAllocationComplete( 612 AllocationSequence* seq) { 613 // Send candidate allocation complete signal if all ports are done. 614 MaybeSignalCandidatesAllocationDone(); 615} 616 617void BasicPortAllocatorSession::MaybeSignalCandidatesAllocationDone() { 618 // Send signal only if all required AllocationSequence objects 619 // are created. 620 if (!allocation_sequences_created_) 621 return; 622 623 // Check that all port allocation sequences are complete. 624 for (std::vector<AllocationSequence*>::iterator it = sequences_.begin(); 625 it != sequences_.end(); ++it) { 626 if ((*it)->state() == AllocationSequence::kRunning) 627 return; 628 } 629 630 // If all allocated ports are in complete state, session must have got all 631 // expected candidates. Session will trigger candidates allocation complete 632 // signal. 633 for (std::vector<PortData>::iterator it = ports_.begin(); 634 it != ports_.end(); ++it) { 635 if (!it->complete()) 636 return; 637 } 638 LOG(LS_INFO) << "All candidates gathered for " << content_name_ << ":" 639 << component_ << ":" << generation(); 640 SignalCandidatesAllocationDone(this); 641} 642 643void BasicPortAllocatorSession::OnPortDestroyed( 644 PortInterface* port) { 645 ASSERT(talk_base::Thread::Current() == network_thread_); 646 for (std::vector<PortData>::iterator iter = ports_.begin(); 647 iter != ports_.end(); ++iter) { 648 if (port == iter->port()) { 649 ports_.erase(iter); 650 LOG_J(LS_INFO, port) << "Removed port from allocator (" 651 << static_cast<int>(ports_.size()) << " remaining)"; 652 return; 653 } 654 } 655 ASSERT(false); 656} 657 658void BasicPortAllocatorSession::OnShake() { 659 LOG(INFO) << ">>>>> SHAKE <<<<< >>>>> SHAKE <<<<< >>>>> SHAKE <<<<<"; 660 661 std::vector<Port*> ports; 662 std::vector<Connection*> connections; 663 664 for (size_t i = 0; i < ports_.size(); ++i) { 665 if (ports_[i].ready()) 666 ports.push_back(ports_[i].port()); 667 } 668 669 for (size_t i = 0; i < ports.size(); ++i) { 670 Port::AddressMap::const_iterator iter; 671 for (iter = ports[i]->connections().begin(); 672 iter != ports[i]->connections().end(); 673 ++iter) { 674 connections.push_back(iter->second); 675 } 676 } 677 678 LOG(INFO) << ">>>>> Destroying " << ports.size() << " ports and " 679 << connections.size() << " connections"; 680 681 for (size_t i = 0; i < connections.size(); ++i) 682 connections[i]->Destroy(); 683 684 if (running_ || (ports.size() > 0) || (connections.size() > 0)) 685 network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE); 686} 687 688BasicPortAllocatorSession::PortData* BasicPortAllocatorSession::FindPort( 689 Port* port) { 690 for (std::vector<PortData>::iterator it = ports_.begin(); 691 it != ports_.end(); ++it) { 692 if (it->port() == port) { 693 return &*it; 694 } 695 } 696 return NULL; 697} 698 699// AllocationSequence 700 701AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session, 702 talk_base::Network* network, 703 PortConfiguration* config, 704 uint32 flags) 705 : session_(session), 706 network_(network), 707 ip_(network->ip()), 708 config_(config), 709 state_(kInit), 710 flags_(flags), 711 udp_socket_(NULL), 712 phase_(0) { 713} 714 715bool AllocationSequence::Init() { 716 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && 717 !IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_UFRAG)) { 718 LOG(LS_ERROR) << "Shared socket option can't be set without " 719 << "shared ufrag."; 720 ASSERT(false); 721 return false; 722 } 723 724 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) { 725 udp_socket_.reset(session_->socket_factory()->CreateUdpSocket( 726 talk_base::SocketAddress(ip_, 0), session_->allocator()->min_port(), 727 session_->allocator()->max_port())); 728 if (udp_socket_) { 729 udp_socket_->SignalReadPacket.connect( 730 this, &AllocationSequence::OnReadPacket); 731 } 732 // Continuing if |udp_socket_| is NULL, as local TCP and RelayPort using TCP 733 // are next available options to setup a communication channel. 734 } 735 return true; 736} 737 738AllocationSequence::~AllocationSequence() { 739 session_->network_thread()->Clear(this); 740} 741 742void AllocationSequence::DisableEquivalentPhases(talk_base::Network* network, 743 PortConfiguration* config, uint32* flags) { 744 if (!((network == network_) && (ip_ == network->ip()))) { 745 // Different network setup; nothing is equivalent. 746 return; 747 } 748 749 // Else turn off the stuff that we've already got covered. 750 751 // Every config implicitly specifies local, so turn that off right away. 752 *flags |= PORTALLOCATOR_DISABLE_UDP; 753 *flags |= PORTALLOCATOR_DISABLE_TCP; 754 755 if (config_ && config) { 756 if (config_->stun_address == config->stun_address) { 757 // Already got this STUN server covered. 758 *flags |= PORTALLOCATOR_DISABLE_STUN; 759 } 760 if (!config_->relays.empty()) { 761 // Already got relays covered. 762 // NOTE: This will even skip a _different_ set of relay servers if we 763 // were to be given one, but that never happens in our codebase. Should 764 // probably get rid of the list in PortConfiguration and just keep a 765 // single relay server in each one. 766 *flags |= PORTALLOCATOR_DISABLE_RELAY; 767 } 768 } 769} 770 771void AllocationSequence::Start() { 772 state_ = kRunning; 773 session_->network_thread()->Post(this, MSG_ALLOCATION_PHASE); 774} 775 776void AllocationSequence::Stop() { 777 // If the port is completed, don't set it to stopped. 778 if (state_ == kRunning) { 779 state_ = kStopped; 780 session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); 781 } 782} 783 784void AllocationSequence::OnMessage(talk_base::Message* msg) { 785 ASSERT(talk_base::Thread::Current() == session_->network_thread()); 786 ASSERT(msg->message_id == MSG_ALLOCATION_PHASE); 787 788 const char* const PHASE_NAMES[kNumPhases] = { 789 "Udp", "Relay", "Tcp", "SslTcp" 790 }; 791 792 // Perform all of the phases in the current step. 793 LOG_J(LS_INFO, network_) << "Allocation Phase=" 794 << PHASE_NAMES[phase_]; 795 796 switch (phase_) { 797 case PHASE_UDP: 798 CreateUDPPorts(); 799 CreateStunPorts(); 800 EnableProtocol(PROTO_UDP); 801 break; 802 803 case PHASE_RELAY: 804 CreateRelayPorts(); 805 break; 806 807 case PHASE_TCP: 808 CreateTCPPorts(); 809 EnableProtocol(PROTO_TCP); 810 break; 811 812 case PHASE_SSLTCP: 813 state_ = kCompleted; 814 EnableProtocol(PROTO_SSLTCP); 815 break; 816 817 default: 818 ASSERT(false); 819 } 820 821 if (state() == kRunning) { 822 ++phase_; 823 session_->network_thread()->PostDelayed( 824 session_->allocator()->step_delay(), 825 this, MSG_ALLOCATION_PHASE); 826 } else { 827 // If all phases in AllocationSequence are completed, no allocation 828 // steps needed further. Canceling pending signal. 829 session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); 830 SignalPortAllocationComplete(this); 831 } 832} 833 834void AllocationSequence::EnableProtocol(ProtocolType proto) { 835 if (!ProtocolEnabled(proto)) { 836 protocols_.push_back(proto); 837 session_->OnProtocolEnabled(this, proto); 838 } 839} 840 841bool AllocationSequence::ProtocolEnabled(ProtocolType proto) const { 842 for (ProtocolList::const_iterator it = protocols_.begin(); 843 it != protocols_.end(); ++it) { 844 if (*it == proto) 845 return true; 846 } 847 return false; 848} 849 850void AllocationSequence::CreateUDPPorts() { 851 if (IsFlagSet(PORTALLOCATOR_DISABLE_UDP)) { 852 LOG(LS_VERBOSE) << "AllocationSequence: UDP ports disabled, skipping."; 853 return; 854 } 855 856 // TODO(mallinath) - Remove UDPPort creating socket after shared socket 857 // is enabled completely. 858 UDPPort* port = NULL; 859 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && udp_socket_) { 860 port = UDPPort::Create(session_->network_thread(), network_, 861 udp_socket_.get(), 862 session_->username(), session_->password()); 863 } else { 864 port = UDPPort::Create(session_->network_thread(), 865 session_->socket_factory(), 866 network_, ip_, 867 session_->allocator()->min_port(), 868 session_->allocator()->max_port(), 869 session_->username(), session_->password()); 870 } 871 872 if (port) { 873 ports.push_back(port); 874 // If shared socket is enabled, STUN candidate will be allocated by the 875 // UDPPort. 876 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && 877 !IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) { 878 ASSERT(config_ && !config_->stun_address.IsNil()); 879 if (!(config_ && !config_->stun_address.IsNil())) { 880 LOG(LS_WARNING) 881 << "AllocationSequence: No STUN server configured, skipping."; 882 return; 883 } 884 port->set_server_addr(config_->stun_address); 885 } 886 887 session_->AddAllocatedPort(port, this, true); 888 port->SignalDestroyed.connect(this, &AllocationSequence::OnPortDestroyed); 889 } 890} 891 892void AllocationSequence::CreateTCPPorts() { 893 if (IsFlagSet(PORTALLOCATOR_DISABLE_TCP)) { 894 LOG(LS_VERBOSE) << "AllocationSequence: TCP ports disabled, skipping."; 895 return; 896 } 897 898 Port* port = TCPPort::Create(session_->network_thread(), 899 session_->socket_factory(), 900 network_, ip_, 901 session_->allocator()->min_port(), 902 session_->allocator()->max_port(), 903 session_->username(), session_->password(), 904 session_->allocator()->allow_tcp_listen()); 905 if (port) { 906 session_->AddAllocatedPort(port, this, true); 907 // Since TCPPort is not created using shared socket, |port| will not be 908 // added to the dequeue. 909 } 910} 911 912void AllocationSequence::CreateStunPorts() { 913 if (IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) { 914 LOG(LS_VERBOSE) << "AllocationSequence: STUN ports disabled, skipping."; 915 return; 916 } 917 918 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) { 919 LOG(LS_INFO) << "AllocationSequence: " 920 << "UDPPort will be handling the STUN candidate generation."; 921 return; 922 } 923 924 // If BasicPortAllocatorSession::OnAllocate left STUN ports enabled then we 925 // ought to have an address for them here. 926 ASSERT(config_ && !config_->stun_address.IsNil()); 927 if (!(config_ && !config_->stun_address.IsNil())) { 928 LOG(LS_WARNING) 929 << "AllocationSequence: No STUN server configured, skipping."; 930 return; 931 } 932 933 StunPort* port = StunPort::Create(session_->network_thread(), 934 session_->socket_factory(), 935 network_, ip_, 936 session_->allocator()->min_port(), 937 session_->allocator()->max_port(), 938 session_->username(), session_->password(), 939 config_->stun_address); 940 if (port) { 941 session_->AddAllocatedPort(port, this, true); 942 // Since StunPort is not created using shared socket, |port| will not be 943 // added to the dequeue. 944 } 945} 946 947void AllocationSequence::CreateRelayPorts() { 948 if (IsFlagSet(PORTALLOCATOR_DISABLE_RELAY)) { 949 LOG(LS_VERBOSE) << "AllocationSequence: Relay ports disabled, skipping."; 950 return; 951 } 952 953 // If BasicPortAllocatorSession::OnAllocate left relay ports enabled then we 954 // ought to have a relay list for them here. 955 ASSERT(config_ && !config_->relays.empty()); 956 if (!(config_ && !config_->relays.empty())) { 957 LOG(LS_WARNING) 958 << "AllocationSequence: No relay server configured, skipping."; 959 return; 960 } 961 962 PortConfiguration::RelayList::const_iterator relay; 963 for (relay = config_->relays.begin(); 964 relay != config_->relays.end(); ++relay) { 965 if (relay->type == RELAY_GTURN) { 966 CreateGturnPort(*relay); 967 } else if (relay->type == RELAY_TURN) { 968 CreateTurnPort(*relay); 969 } else { 970 ASSERT(false); 971 } 972 } 973} 974 975void AllocationSequence::CreateGturnPort(const RelayServerConfig& config) { 976 // TODO(mallinath) - Rename RelayPort to GTurnPort. 977 RelayPort* port = RelayPort::Create(session_->network_thread(), 978 session_->socket_factory(), 979 network_, ip_, 980 session_->allocator()->min_port(), 981 session_->allocator()->max_port(), 982 config_->username, config_->password); 983 if (port) { 984 // Since RelayPort is not created using shared socket, |port| will not be 985 // added to the dequeue. 986 // Note: We must add the allocated port before we add addresses because 987 // the latter will create candidates that need name and preference 988 // settings. However, we also can't prepare the address (normally 989 // done by AddAllocatedPort) until we have these addresses. So we 990 // wait to do that until below. 991 session_->AddAllocatedPort(port, this, false); 992 993 // Add the addresses of this protocol. 994 PortList::const_iterator relay_port; 995 for (relay_port = config.ports.begin(); 996 relay_port != config.ports.end(); 997 ++relay_port) { 998 port->AddServerAddress(*relay_port); 999 port->AddExternalAddress(*relay_port); 1000 } 1001 // Start fetching an address for this port. 1002 port->PrepareAddress(); 1003 } 1004} 1005 1006void AllocationSequence::CreateTurnPort(const RelayServerConfig& config) { 1007 PortList::const_iterator relay_port; 1008 for (relay_port = config.ports.begin(); 1009 relay_port != config.ports.end(); ++relay_port) { 1010 TurnPort* port = TurnPort::Create(session_->network_thread(), 1011 session_->socket_factory(), 1012 network_, ip_, 1013 session_->allocator()->min_port(), 1014 session_->allocator()->max_port(), 1015 session_->username(), 1016 session_->password(), 1017 *relay_port, config.credentials); 1018 if (port) { 1019 session_->AddAllocatedPort(port, this, true); 1020 } 1021 } 1022} 1023 1024void AllocationSequence::OnReadPacket( 1025 talk_base::AsyncPacketSocket* socket, const char* data, size_t size, 1026 const talk_base::SocketAddress& remote_addr) { 1027 ASSERT(socket == udp_socket_.get()); 1028 for (std::deque<Port*>::iterator iter = ports.begin(); 1029 iter != ports.end(); ++iter) { 1030 // We have only one port in the queue. 1031 // TODO(mallinath) - Add shared socket support to Relay and Turn ports. 1032 if ((*iter)->HandleIncomingPacket(socket, data, size, remote_addr)) { 1033 break; 1034 } 1035 } 1036} 1037 1038void AllocationSequence::OnPortDestroyed(PortInterface* port) { 1039 std::deque<Port*>::iterator iter = 1040 std::find(ports.begin(), ports.end(), port); 1041 ASSERT(iter != ports.end()); 1042 ports.erase(iter); 1043} 1044 1045// PortConfiguration 1046PortConfiguration::PortConfiguration( 1047 const talk_base::SocketAddress& stun_address, 1048 const std::string& username, 1049 const std::string& password) 1050 : stun_address(stun_address), 1051 username(username), 1052 password(password) { 1053} 1054 1055void PortConfiguration::AddRelay(const RelayServerConfig& config) { 1056 relays.push_back(config); 1057} 1058 1059bool PortConfiguration::SupportsProtocol( 1060 const RelayServerConfig& relay, ProtocolType type) { 1061 PortList::const_iterator relay_port; 1062 for (relay_port = relay.ports.begin(); 1063 relay_port != relay.ports.end(); 1064 ++relay_port) { 1065 if (relay_port->proto == type) 1066 return true; 1067 } 1068 return false; 1069} 1070 1071} // namespace cricket 1072