1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/p2p/client/basicportallocator.h"
29
30#include <string>
31#include <vector>
32
33#include "talk/base/common.h"
34#include "talk/base/helpers.h"
35#include "talk/base/logging.h"
36#include "talk/p2p/base/basicpacketsocketfactory.h"
37#include "talk/p2p/base/common.h"
38#include "talk/p2p/base/port.h"
39#include "talk/p2p/base/relayport.h"
40#include "talk/p2p/base/stunport.h"
41#include "talk/p2p/base/tcpport.h"
42#include "talk/p2p/base/turnport.h"
43#include "talk/p2p/base/udpport.h"
44
45using talk_base::CreateRandomId;
46using talk_base::CreateRandomString;
47
48namespace {
49
50const uint32 MSG_CONFIG_START = 1;
51const uint32 MSG_CONFIG_READY = 2;
52const uint32 MSG_ALLOCATE = 3;
53const uint32 MSG_ALLOCATION_PHASE = 4;
54const uint32 MSG_SHAKE = 5;
55const uint32 MSG_SEQUENCEOBJECTS_CREATED = 6;
56const uint32 MSG_CONFIG_STOP = 7;
57
58const uint32 ALLOCATE_DELAY = 250;
59const uint32 ALLOCATION_STEP_DELAY = 1 * 1000;
60
61const int PHASE_UDP = 0;
62const int PHASE_RELAY = 1;
63const int PHASE_TCP = 2;
64const int PHASE_SSLTCP = 3;
65
66const int kNumPhases = 4;
67
68// Both these values are in bytes.
69const int kLargeSocketSendBufferSize = 128 * 1024;
70const int kNormalSocketSendBufferSize = 64 * 1024;
71
72const int SHAKE_MIN_DELAY = 45 * 1000;  // 45 seconds
73const int SHAKE_MAX_DELAY = 90 * 1000;  // 90 seconds
74
75int ShakeDelay() {
76  int range = SHAKE_MAX_DELAY - SHAKE_MIN_DELAY + 1;
77  return SHAKE_MIN_DELAY + CreateRandomId() % range;
78}
79
80}  // namespace
81
82namespace cricket {
83
84const uint32 DISABLE_ALL_PHASES =
85  PORTALLOCATOR_DISABLE_UDP
86  | PORTALLOCATOR_DISABLE_TCP
87  | PORTALLOCATOR_DISABLE_STUN
88  | PORTALLOCATOR_DISABLE_RELAY;
89
90// Performs the allocation of ports, in a sequenced (timed) manner, for a given
91// network and IP address.
92class AllocationSequence : public talk_base::MessageHandler,
93                           public sigslot::has_slots<> {
94 public:
95  enum State {
96    kInit,       // Initial state.
97    kRunning,    // Started allocating ports.
98    kStopped,    // Stopped from running.
99    kCompleted,  // All ports are allocated.
100
101    // kInit --> kRunning --> {kCompleted|kStopped}
102  };
103
104  AllocationSequence(BasicPortAllocatorSession* session,
105                     talk_base::Network* network,
106                     PortConfiguration* config,
107                     uint32 flags);
108  ~AllocationSequence();
109  bool Init();
110
111  State state() const { return state_; }
112
113  // Disables the phases for a new sequence that this one already covers for an
114  // equivalent network setup.
115  void DisableEquivalentPhases(talk_base::Network* network,
116      PortConfiguration* config, uint32* flags);
117
118  // Starts and stops the sequence.  When started, it will continue allocating
119  // new ports on its own timed schedule.
120  void Start();
121  void Stop();
122
123  // MessageHandler
124  void OnMessage(talk_base::Message* msg);
125
126  void EnableProtocol(ProtocolType proto);
127  bool ProtocolEnabled(ProtocolType proto) const;
128
129  // Signal from AllocationSequence, when it's done with allocating ports.
130  // This signal is useful, when port allocation fails which doesn't result
131  // in any candidates. Using this signal BasicPortAllocatorSession can send
132  // its candidate discovery conclusion signal. Without this signal,
133  // BasicPortAllocatorSession doesn't have any event to trigger signal. This
134  // can also be achieved by starting timer in BPAS.
135  sigslot::signal1<AllocationSequence*> SignalPortAllocationComplete;
136
137 private:
138  typedef std::vector<ProtocolType> ProtocolList;
139
140  bool IsFlagSet(uint32 flag) {
141    return ((flags_ & flag) != 0);
142  }
143  void CreateUDPPorts();
144  void CreateTCPPorts();
145  void CreateStunPorts();
146  void CreateRelayPorts();
147  void CreateGturnPort(const RelayServerConfig& config);
148  void CreateTurnPort(const RelayServerConfig& config);
149
150  void OnReadPacket(talk_base::AsyncPacketSocket* socket,
151                    const char* data, size_t size,
152                    const talk_base::SocketAddress& remote_addr);
153  void OnPortDestroyed(PortInterface* port);
154
155  BasicPortAllocatorSession* session_;
156  talk_base::Network* network_;
157  talk_base::IPAddress ip_;
158  PortConfiguration* config_;
159  State state_;
160  uint32 flags_;
161  ProtocolList protocols_;
162  talk_base::scoped_ptr<talk_base::AsyncPacketSocket> udp_socket_;
163  // Keeping a list of all UDP based ports.
164  std::deque<Port*> ports;
165  int phase_;
166};
167
168// BasicPortAllocator
169BasicPortAllocator::BasicPortAllocator(
170    talk_base::NetworkManager* network_manager,
171    talk_base::PacketSocketFactory* socket_factory)
172    : network_manager_(network_manager),
173      socket_factory_(socket_factory) {
174  ASSERT(socket_factory_ != NULL);
175  Construct();
176}
177
178BasicPortAllocator::BasicPortAllocator(
179    talk_base::NetworkManager* network_manager)
180    : network_manager_(network_manager),
181      socket_factory_(NULL) {
182  Construct();
183}
184
185BasicPortAllocator::BasicPortAllocator(
186    talk_base::NetworkManager* network_manager,
187    talk_base::PacketSocketFactory* socket_factory,
188    const talk_base::SocketAddress& stun_address)
189    : network_manager_(network_manager),
190      socket_factory_(socket_factory),
191      stun_address_(stun_address) {
192  ASSERT(socket_factory_ != NULL);
193  Construct();
194}
195
196BasicPortAllocator::BasicPortAllocator(
197    talk_base::NetworkManager* network_manager,
198    const talk_base::SocketAddress& stun_address,
199    const talk_base::SocketAddress& relay_address_udp,
200    const talk_base::SocketAddress& relay_address_tcp,
201    const talk_base::SocketAddress& relay_address_ssl)
202    : network_manager_(network_manager),
203      socket_factory_(NULL),
204      stun_address_(stun_address) {
205
206  RelayServerConfig config(RELAY_GTURN);
207  if (!relay_address_udp.IsAny())
208    config.ports.push_back(ProtocolAddress(relay_address_udp, PROTO_UDP));
209  if (!relay_address_tcp.IsAny())
210    config.ports.push_back(ProtocolAddress(relay_address_tcp, PROTO_TCP));
211  if (!relay_address_ssl.IsAny())
212    config.ports.push_back(ProtocolAddress(relay_address_ssl, PROTO_SSLTCP));
213  AddRelay(config);
214
215  Construct();
216}
217
218void BasicPortAllocator::Construct() {
219  allow_tcp_listen_ = true;
220}
221
222BasicPortAllocator::~BasicPortAllocator() {
223}
224
225PortAllocatorSession *BasicPortAllocator::CreateSessionInternal(
226    const std::string& content_name, int component,
227    const std::string& ice_ufrag, const std::string& ice_pwd) {
228  return new BasicPortAllocatorSession(this, content_name, component,
229                                       ice_ufrag, ice_pwd);
230}
231
232// BasicPortAllocatorSession
233BasicPortAllocatorSession::BasicPortAllocatorSession(
234    BasicPortAllocator *allocator,
235    const std::string& content_name,
236    int component,
237    const std::string& ice_ufrag,
238    const std::string& ice_pwd)
239    : PortAllocatorSession(content_name, component,
240                           ice_ufrag, ice_pwd, allocator->flags()),
241      allocator_(allocator), network_thread_(NULL),
242      socket_factory_(allocator->socket_factory()),
243      configuration_done_(false),
244      allocation_started_(false),
245      network_manager_started_(false),
246      running_(false),
247      allocation_sequences_created_(false) {
248  allocator_->network_manager()->SignalNetworksChanged.connect(
249      this, &BasicPortAllocatorSession::OnNetworksChanged);
250  allocator_->network_manager()->StartUpdating();
251}
252
253BasicPortAllocatorSession::~BasicPortAllocatorSession() {
254  allocator_->network_manager()->StopUpdating();
255  if (network_thread_ != NULL)
256    network_thread_->Clear(this);
257
258  std::vector<PortData>::iterator it;
259  for (it = ports_.begin(); it != ports_.end(); it++)
260    delete it->port();
261
262  for (uint32 i = 0; i < configs_.size(); ++i)
263    delete configs_[i];
264
265  for (uint32 i = 0; i < sequences_.size(); ++i)
266    delete sequences_[i];
267}
268
269void BasicPortAllocatorSession::StartGettingPorts() {
270  network_thread_ = talk_base::Thread::Current();
271  if (!socket_factory_) {
272    owned_socket_factory_.reset(
273        new talk_base::BasicPacketSocketFactory(network_thread_));
274    socket_factory_ = owned_socket_factory_.get();
275  }
276
277  running_ = true;
278  network_thread_->Post(this, MSG_CONFIG_START);
279
280  if (flags() & PORTALLOCATOR_ENABLE_SHAKER)
281    network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE);
282}
283
284void BasicPortAllocatorSession::StopGettingPorts() {
285  ASSERT(talk_base::Thread::Current() == network_thread_);
286  running_ = false;
287  network_thread_->Clear(this, MSG_ALLOCATE);
288  for (uint32 i = 0; i < sequences_.size(); ++i)
289    sequences_[i]->Stop();
290  network_thread_->Post(this, MSG_CONFIG_STOP);
291}
292
293void BasicPortAllocatorSession::OnMessage(talk_base::Message *message) {
294  switch (message->message_id) {
295  case MSG_CONFIG_START:
296    ASSERT(talk_base::Thread::Current() == network_thread_);
297    GetPortConfigurations();
298    break;
299
300  case MSG_CONFIG_READY:
301    ASSERT(talk_base::Thread::Current() == network_thread_);
302    OnConfigReady(static_cast<PortConfiguration*>(message->pdata));
303    break;
304
305  case MSG_ALLOCATE:
306    ASSERT(talk_base::Thread::Current() == network_thread_);
307    OnAllocate();
308    break;
309
310  case MSG_SHAKE:
311    ASSERT(talk_base::Thread::Current() == network_thread_);
312    OnShake();
313    break;
314  case MSG_SEQUENCEOBJECTS_CREATED:
315    ASSERT(talk_base::Thread::Current() == network_thread_);
316    OnAllocationSequenceObjectsCreated();
317    break;
318  case MSG_CONFIG_STOP:
319    ASSERT(talk_base::Thread::Current() == network_thread_);
320    OnConfigStop();
321    break;
322  default:
323    ASSERT(false);
324  }
325}
326
327void BasicPortAllocatorSession::GetPortConfigurations() {
328  PortConfiguration* config = new PortConfiguration(allocator_->stun_address(),
329                                                    username(),
330                                                    password());
331
332  for (size_t i = 0; i < allocator_->relays().size(); ++i) {
333    config->AddRelay(allocator_->relays()[i]);
334  }
335  ConfigReady(config);
336}
337
338void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) {
339  network_thread_->Post(this, MSG_CONFIG_READY, config);
340}
341
342// Adds a configuration to the list.
343void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) {
344  if (config)
345    configs_.push_back(config);
346
347  AllocatePorts();
348}
349
350void BasicPortAllocatorSession::OnConfigStop() {
351  ASSERT(talk_base::Thread::Current() == network_thread_);
352
353  // If any of the allocated ports have not completed the candidates allocation,
354  // mark those as error. Since session doesn't need any new candidates
355  // at this stage of the allocation, it's safe to discard any new candidates.
356  bool send_signal = false;
357  for (std::vector<PortData>::iterator it = ports_.begin();
358       it != ports_.end(); ++it) {
359    if (!it->complete()) {
360      // Updating port state to error, which didn't finish allocating candidates
361      // yet.
362      it->set_error();
363      send_signal = true;
364    }
365  }
366
367  // Did we stop any running sequences?
368  for (std::vector<AllocationSequence*>::iterator it = sequences_.begin();
369       it != sequences_.end() && !send_signal; ++it) {
370    if ((*it)->state() == AllocationSequence::kStopped) {
371      send_signal = true;
372    }
373  }
374
375  // If we stopped anything that was running, send a done signal now.
376  if (send_signal) {
377    MaybeSignalCandidatesAllocationDone();
378  }
379}
380
381void BasicPortAllocatorSession::AllocatePorts() {
382  ASSERT(talk_base::Thread::Current() == network_thread_);
383  network_thread_->Post(this, MSG_ALLOCATE);
384}
385
386void BasicPortAllocatorSession::OnAllocate() {
387  if (network_manager_started_)
388    DoAllocate();
389
390  allocation_started_ = true;
391  if (running_)
392    network_thread_->PostDelayed(ALLOCATE_DELAY, this, MSG_ALLOCATE);
393}
394
395// For each network, see if we have a sequence that covers it already.  If not,
396// create a new sequence to create the appropriate ports.
397void BasicPortAllocatorSession::DoAllocate() {
398  bool done_signal_needed = false;
399  std::vector<talk_base::Network*> networks;
400  allocator_->network_manager()->GetNetworks(&networks);
401  if (networks.empty()) {
402    LOG(LS_WARNING) << "Machine has no networks; no ports will be allocated";
403    done_signal_needed = true;
404  } else {
405    for (uint32 i = 0; i < networks.size(); ++i) {
406      PortConfiguration* config = NULL;
407      if (configs_.size() > 0)
408        config = configs_.back();
409
410      uint32 sequence_flags = flags();
411      if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {
412        // If all the ports are disabled we should just fire the allocation
413        // done event and return.
414        done_signal_needed = true;
415        break;
416      }
417
418      // Disables phases that are not specified in this config.
419      if (!config || config->stun_address.IsNil()) {
420        // No STUN ports specified in this config.
421        sequence_flags |= PORTALLOCATOR_DISABLE_STUN;
422      }
423      if (!config || config->relays.empty()) {
424        // No relay ports specified in this config.
425        sequence_flags |= PORTALLOCATOR_DISABLE_RELAY;
426      }
427
428      if (!(sequence_flags & PORTALLOCATOR_ENABLE_IPV6) &&
429          networks[i]->ip().family() == AF_INET6) {
430        // Skip IPv6 networks unless the flag's been set.
431        continue;
432      }
433
434      // Disable phases that would only create ports equivalent to
435      // ones that we have already made.
436      DisableEquivalentPhases(networks[i], config, &sequence_flags);
437
438      if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {
439        // New AllocationSequence would have nothing to do, so don't make it.
440        continue;
441      }
442
443      AllocationSequence* sequence =
444          new AllocationSequence(this, networks[i], config, sequence_flags);
445      if (!sequence->Init()) {
446        delete sequence;
447        continue;
448      }
449      done_signal_needed = true;
450      sequence->SignalPortAllocationComplete.connect(
451          this, &BasicPortAllocatorSession::OnPortAllocationComplete);
452      if (running_)
453        sequence->Start();
454      sequences_.push_back(sequence);
455    }
456  }
457  if (done_signal_needed) {
458    network_thread_->Post(this, MSG_SEQUENCEOBJECTS_CREATED);
459  }
460}
461
462void BasicPortAllocatorSession::OnNetworksChanged() {
463  network_manager_started_ = true;
464  if (allocation_started_)
465    DoAllocate();
466}
467
468void BasicPortAllocatorSession::DisableEquivalentPhases(
469    talk_base::Network* network, PortConfiguration* config, uint32* flags) {
470  for (uint32 i = 0; i < sequences_.size() &&
471      (*flags & DISABLE_ALL_PHASES) != DISABLE_ALL_PHASES; ++i) {
472    sequences_[i]->DisableEquivalentPhases(network, config, flags);
473  }
474}
475
476void BasicPortAllocatorSession::AddAllocatedPort(Port* port,
477                                                 AllocationSequence * seq,
478                                                 bool prepare_address) {
479  if (!port)
480    return;
481
482  LOG(LS_INFO) << "Adding allocated port for " << content_name();
483  port->set_content_name(content_name());
484  port->set_component(component_);
485  port->set_generation(generation());
486  if (allocator_->proxy().type != talk_base::PROXY_NONE)
487    port->set_proxy(allocator_->user_agent(), allocator_->proxy());
488  port->set_send_retransmit_count_attribute((allocator_->flags() &
489      PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0);
490
491  if (content_name().compare(CN_VIDEO) == 0 &&
492      component_ == cricket::ICE_CANDIDATE_COMPONENT_RTP) {
493    // For video RTP alone, we set send-buffer sizes. This used to be set in the
494    // engines/channels.
495    int sendBufSize = (flags() & PORTALLOCATOR_USE_LARGE_SOCKET_SEND_BUFFERS)
496                      ? kLargeSocketSendBufferSize
497                      : kNormalSocketSendBufferSize;
498    port->SetOption(talk_base::Socket::OPT_SNDBUF, sendBufSize);
499  }
500
501  PortData data(port, seq);
502  ports_.push_back(data);
503
504  port->SignalCandidateReady.connect(
505      this, &BasicPortAllocatorSession::OnCandidateReady);
506  port->SignalPortComplete.connect(this,
507      &BasicPortAllocatorSession::OnPortComplete);
508  port->SignalDestroyed.connect(this,
509      &BasicPortAllocatorSession::OnPortDestroyed);
510  port->SignalPortError.connect(
511      this, &BasicPortAllocatorSession::OnPortError);
512  LOG_J(LS_INFO, port) << "Added port to allocator";
513
514  if (prepare_address)
515    port->PrepareAddress();
516  if (running_)
517    port->Start();
518}
519
520void BasicPortAllocatorSession::OnAllocationSequenceObjectsCreated() {
521  allocation_sequences_created_ = true;
522  // Send candidate allocation complete signal if we have no sequences.
523  MaybeSignalCandidatesAllocationDone();
524}
525
526void BasicPortAllocatorSession::OnCandidateReady(
527    Port* port, const Candidate& c) {
528  ASSERT(talk_base::Thread::Current() == network_thread_);
529  PortData* data = FindPort(port);
530  ASSERT(data != NULL);
531  // Discarding any candidate signal if port allocation status is
532  // already in completed state.
533  if (data->complete())
534    return;
535
536  // Send candidates whose protocol is enabled.
537  std::vector<Candidate> candidates;
538  ProtocolType pvalue;
539  if (StringToProto(c.protocol().c_str(), &pvalue) &&
540      data->sequence()->ProtocolEnabled(pvalue)) {
541    candidates.push_back(c);
542  }
543
544  if (!candidates.empty()) {
545    SignalCandidatesReady(this, candidates);
546  }
547
548  // Moving to READY state as we have atleast one candidate from the port.
549  // Since this port has atleast one candidate we should forward this port
550  // to listners, to allow connections from this port.
551  if (!data->ready()) {
552    data->set_ready();
553    SignalPortReady(this, port);
554  }
555}
556
557void BasicPortAllocatorSession::OnPortComplete(Port* port) {
558  ASSERT(talk_base::Thread::Current() == network_thread_);
559  PortData* data = FindPort(port);
560  ASSERT(data != NULL);
561
562  // Ignore any late signals.
563  if (data->complete())
564    return;
565
566  // Moving to COMPLETE state.
567  data->set_complete();
568  // Send candidate allocation complete signal if this was the last port.
569  MaybeSignalCandidatesAllocationDone();
570}
571
572void BasicPortAllocatorSession::OnPortError(Port* port) {
573  ASSERT(talk_base::Thread::Current() == network_thread_);
574  PortData* data = FindPort(port);
575  ASSERT(data != NULL);
576  // We might have already given up on this port and stopped it.
577  if (data->complete())
578    return;
579
580  // SignalAddressError is currently sent from StunPort/TurnPort.
581  // But this signal itself is generic.
582  data->set_error();
583  // Send candidate allocation complete signal if this was the last port.
584  MaybeSignalCandidatesAllocationDone();
585}
586
587void BasicPortAllocatorSession::OnProtocolEnabled(AllocationSequence* seq,
588                                                  ProtocolType proto) {
589  std::vector<Candidate> candidates;
590  for (std::vector<PortData>::iterator it = ports_.begin();
591       it != ports_.end(); ++it) {
592    if (it->sequence() != seq)
593      continue;
594
595    const std::vector<Candidate>& potentials = it->port()->Candidates();
596    for (size_t i = 0; i < potentials.size(); ++i) {
597      ProtocolType pvalue;
598      if (!StringToProto(potentials[i].protocol().c_str(), &pvalue))
599        continue;
600      if (pvalue == proto) {
601        candidates.push_back(potentials[i]);
602      }
603    }
604  }
605
606  if (!candidates.empty()) {
607    SignalCandidatesReady(this, candidates);
608  }
609}
610
611void BasicPortAllocatorSession::OnPortAllocationComplete(
612    AllocationSequence* seq) {
613  // Send candidate allocation complete signal if all ports are done.
614  MaybeSignalCandidatesAllocationDone();
615}
616
617void BasicPortAllocatorSession::MaybeSignalCandidatesAllocationDone() {
618  // Send signal only if all required AllocationSequence objects
619  // are created.
620  if (!allocation_sequences_created_)
621    return;
622
623  // Check that all port allocation sequences are complete.
624  for (std::vector<AllocationSequence*>::iterator it = sequences_.begin();
625       it != sequences_.end(); ++it) {
626    if ((*it)->state() == AllocationSequence::kRunning)
627      return;
628  }
629
630  // If all allocated ports are in complete state, session must have got all
631  // expected candidates. Session will trigger candidates allocation complete
632  // signal.
633  for (std::vector<PortData>::iterator it = ports_.begin();
634       it != ports_.end(); ++it) {
635    if (!it->complete())
636      return;
637  }
638  LOG(LS_INFO) << "All candidates gathered for " << content_name_ << ":"
639               << component_ << ":" << generation();
640  SignalCandidatesAllocationDone(this);
641}
642
643void BasicPortAllocatorSession::OnPortDestroyed(
644    PortInterface* port) {
645  ASSERT(talk_base::Thread::Current() == network_thread_);
646  for (std::vector<PortData>::iterator iter = ports_.begin();
647       iter != ports_.end(); ++iter) {
648    if (port == iter->port()) {
649      ports_.erase(iter);
650      LOG_J(LS_INFO, port) << "Removed port from allocator ("
651                           << static_cast<int>(ports_.size()) << " remaining)";
652      return;
653    }
654  }
655  ASSERT(false);
656}
657
658void BasicPortAllocatorSession::OnShake() {
659  LOG(INFO) << ">>>>> SHAKE <<<<< >>>>> SHAKE <<<<< >>>>> SHAKE <<<<<";
660
661  std::vector<Port*> ports;
662  std::vector<Connection*> connections;
663
664  for (size_t i = 0; i < ports_.size(); ++i) {
665    if (ports_[i].ready())
666      ports.push_back(ports_[i].port());
667  }
668
669  for (size_t i = 0; i < ports.size(); ++i) {
670    Port::AddressMap::const_iterator iter;
671    for (iter = ports[i]->connections().begin();
672         iter != ports[i]->connections().end();
673         ++iter) {
674      connections.push_back(iter->second);
675    }
676  }
677
678  LOG(INFO) << ">>>>> Destroying " << ports.size() << " ports and "
679            << connections.size() << " connections";
680
681  for (size_t i = 0; i < connections.size(); ++i)
682    connections[i]->Destroy();
683
684  if (running_ || (ports.size() > 0) || (connections.size() > 0))
685    network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE);
686}
687
688BasicPortAllocatorSession::PortData* BasicPortAllocatorSession::FindPort(
689    Port* port) {
690  for (std::vector<PortData>::iterator it = ports_.begin();
691       it != ports_.end(); ++it) {
692    if (it->port() == port) {
693      return &*it;
694    }
695  }
696  return NULL;
697}
698
699// AllocationSequence
700
701AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session,
702                                       talk_base::Network* network,
703                                       PortConfiguration* config,
704                                       uint32 flags)
705    : session_(session),
706      network_(network),
707      ip_(network->ip()),
708      config_(config),
709      state_(kInit),
710      flags_(flags),
711      udp_socket_(NULL),
712      phase_(0) {
713}
714
715bool AllocationSequence::Init() {
716  if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) &&
717      !IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_UFRAG)) {
718    LOG(LS_ERROR) << "Shared socket option can't be set without "
719                  << "shared ufrag.";
720    ASSERT(false);
721    return false;
722  }
723
724  if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
725    udp_socket_.reset(session_->socket_factory()->CreateUdpSocket(
726        talk_base::SocketAddress(ip_, 0), session_->allocator()->min_port(),
727        session_->allocator()->max_port()));
728    if (udp_socket_) {
729      udp_socket_->SignalReadPacket.connect(
730          this, &AllocationSequence::OnReadPacket);
731    }
732    // Continuing if |udp_socket_| is NULL, as local TCP and RelayPort using TCP
733    // are next available options to setup a communication channel.
734  }
735  return true;
736}
737
738AllocationSequence::~AllocationSequence() {
739  session_->network_thread()->Clear(this);
740}
741
742void AllocationSequence::DisableEquivalentPhases(talk_base::Network* network,
743    PortConfiguration* config, uint32* flags) {
744  if (!((network == network_) && (ip_ == network->ip()))) {
745    // Different network setup; nothing is equivalent.
746    return;
747  }
748
749  // Else turn off the stuff that we've already got covered.
750
751  // Every config implicitly specifies local, so turn that off right away.
752  *flags |= PORTALLOCATOR_DISABLE_UDP;
753  *flags |= PORTALLOCATOR_DISABLE_TCP;
754
755  if (config_ && config) {
756    if (config_->stun_address == config->stun_address) {
757      // Already got this STUN server covered.
758      *flags |= PORTALLOCATOR_DISABLE_STUN;
759    }
760    if (!config_->relays.empty()) {
761      // Already got relays covered.
762      // NOTE: This will even skip a _different_ set of relay servers if we
763      // were to be given one, but that never happens in our codebase. Should
764      // probably get rid of the list in PortConfiguration and just keep a
765      // single relay server in each one.
766      *flags |= PORTALLOCATOR_DISABLE_RELAY;
767    }
768  }
769}
770
771void AllocationSequence::Start() {
772  state_ = kRunning;
773  session_->network_thread()->Post(this, MSG_ALLOCATION_PHASE);
774}
775
776void AllocationSequence::Stop() {
777  // If the port is completed, don't set it to stopped.
778  if (state_ == kRunning) {
779    state_ = kStopped;
780    session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
781  }
782}
783
784void AllocationSequence::OnMessage(talk_base::Message* msg) {
785  ASSERT(talk_base::Thread::Current() == session_->network_thread());
786  ASSERT(msg->message_id == MSG_ALLOCATION_PHASE);
787
788  const char* const PHASE_NAMES[kNumPhases] = {
789    "Udp", "Relay", "Tcp", "SslTcp"
790  };
791
792  // Perform all of the phases in the current step.
793  LOG_J(LS_INFO, network_) << "Allocation Phase="
794                           << PHASE_NAMES[phase_];
795
796  switch (phase_) {
797    case PHASE_UDP:
798      CreateUDPPorts();
799      CreateStunPorts();
800      EnableProtocol(PROTO_UDP);
801      break;
802
803    case PHASE_RELAY:
804      CreateRelayPorts();
805      break;
806
807    case PHASE_TCP:
808      CreateTCPPorts();
809      EnableProtocol(PROTO_TCP);
810      break;
811
812    case PHASE_SSLTCP:
813      state_ = kCompleted;
814      EnableProtocol(PROTO_SSLTCP);
815      break;
816
817    default:
818      ASSERT(false);
819  }
820
821  if (state() == kRunning) {
822    ++phase_;
823    session_->network_thread()->PostDelayed(
824        session_->allocator()->step_delay(),
825        this, MSG_ALLOCATION_PHASE);
826  } else {
827    // If all phases in AllocationSequence are completed, no allocation
828    // steps needed further. Canceling  pending signal.
829    session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
830    SignalPortAllocationComplete(this);
831  }
832}
833
834void AllocationSequence::EnableProtocol(ProtocolType proto) {
835  if (!ProtocolEnabled(proto)) {
836    protocols_.push_back(proto);
837    session_->OnProtocolEnabled(this, proto);
838  }
839}
840
841bool AllocationSequence::ProtocolEnabled(ProtocolType proto) const {
842  for (ProtocolList::const_iterator it = protocols_.begin();
843       it != protocols_.end(); ++it) {
844    if (*it == proto)
845      return true;
846  }
847  return false;
848}
849
850void AllocationSequence::CreateUDPPorts() {
851  if (IsFlagSet(PORTALLOCATOR_DISABLE_UDP)) {
852    LOG(LS_VERBOSE) << "AllocationSequence: UDP ports disabled, skipping.";
853    return;
854  }
855
856  // TODO(mallinath) - Remove UDPPort creating socket after shared socket
857  // is enabled completely.
858  UDPPort* port = NULL;
859  if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && udp_socket_) {
860    port = UDPPort::Create(session_->network_thread(), network_,
861                           udp_socket_.get(),
862                           session_->username(), session_->password());
863  } else {
864    port = UDPPort::Create(session_->network_thread(),
865                           session_->socket_factory(),
866                           network_, ip_,
867                           session_->allocator()->min_port(),
868                           session_->allocator()->max_port(),
869                           session_->username(), session_->password());
870  }
871
872  if (port) {
873    ports.push_back(port);
874    // If shared socket is enabled, STUN candidate will be allocated by the
875    // UDPPort.
876    if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) &&
877        !IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) {
878      ASSERT(config_ && !config_->stun_address.IsNil());
879      if (!(config_ && !config_->stun_address.IsNil())) {
880        LOG(LS_WARNING)
881            << "AllocationSequence: No STUN server configured, skipping.";
882        return;
883      }
884      port->set_server_addr(config_->stun_address);
885    }
886
887    session_->AddAllocatedPort(port, this, true);
888    port->SignalDestroyed.connect(this, &AllocationSequence::OnPortDestroyed);
889  }
890}
891
892void AllocationSequence::CreateTCPPorts() {
893  if (IsFlagSet(PORTALLOCATOR_DISABLE_TCP)) {
894    LOG(LS_VERBOSE) << "AllocationSequence: TCP ports disabled, skipping.";
895    return;
896  }
897
898  Port* port = TCPPort::Create(session_->network_thread(),
899                               session_->socket_factory(),
900                               network_, ip_,
901                               session_->allocator()->min_port(),
902                               session_->allocator()->max_port(),
903                               session_->username(), session_->password(),
904                               session_->allocator()->allow_tcp_listen());
905  if (port) {
906    session_->AddAllocatedPort(port, this, true);
907    // Since TCPPort is not created using shared socket, |port| will not be
908    // added to the dequeue.
909  }
910}
911
912void AllocationSequence::CreateStunPorts() {
913  if (IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) {
914    LOG(LS_VERBOSE) << "AllocationSequence: STUN ports disabled, skipping.";
915    return;
916  }
917
918  if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
919    LOG(LS_INFO) << "AllocationSequence: "
920                 << "UDPPort will be handling the STUN candidate generation.";
921    return;
922  }
923
924  // If BasicPortAllocatorSession::OnAllocate left STUN ports enabled then we
925  // ought to have an address for them here.
926  ASSERT(config_ && !config_->stun_address.IsNil());
927  if (!(config_ && !config_->stun_address.IsNil())) {
928    LOG(LS_WARNING)
929        << "AllocationSequence: No STUN server configured, skipping.";
930    return;
931  }
932
933  StunPort* port = StunPort::Create(session_->network_thread(),
934                                session_->socket_factory(),
935                                network_, ip_,
936                                session_->allocator()->min_port(),
937                                session_->allocator()->max_port(),
938                                session_->username(), session_->password(),
939                                config_->stun_address);
940  if (port) {
941    session_->AddAllocatedPort(port, this, true);
942    // Since StunPort is not created using shared socket, |port| will not be
943    // added to the dequeue.
944  }
945}
946
947void AllocationSequence::CreateRelayPorts() {
948  if (IsFlagSet(PORTALLOCATOR_DISABLE_RELAY)) {
949     LOG(LS_VERBOSE) << "AllocationSequence: Relay ports disabled, skipping.";
950     return;
951  }
952
953  // If BasicPortAllocatorSession::OnAllocate left relay ports enabled then we
954  // ought to have a relay list for them here.
955  ASSERT(config_ && !config_->relays.empty());
956  if (!(config_ && !config_->relays.empty())) {
957    LOG(LS_WARNING)
958        << "AllocationSequence: No relay server configured, skipping.";
959    return;
960  }
961
962  PortConfiguration::RelayList::const_iterator relay;
963  for (relay = config_->relays.begin();
964       relay != config_->relays.end(); ++relay) {
965    if (relay->type == RELAY_GTURN) {
966      CreateGturnPort(*relay);
967    } else if (relay->type == RELAY_TURN) {
968      CreateTurnPort(*relay);
969    } else {
970      ASSERT(false);
971    }
972  }
973}
974
975void AllocationSequence::CreateGturnPort(const RelayServerConfig& config) {
976  // TODO(mallinath) - Rename RelayPort to GTurnPort.
977  RelayPort* port = RelayPort::Create(session_->network_thread(),
978                                      session_->socket_factory(),
979                                      network_, ip_,
980                                      session_->allocator()->min_port(),
981                                      session_->allocator()->max_port(),
982                                      config_->username, config_->password);
983  if (port) {
984    // Since RelayPort is not created using shared socket, |port| will not be
985    // added to the dequeue.
986    // Note: We must add the allocated port before we add addresses because
987    //       the latter will create candidates that need name and preference
988    //       settings.  However, we also can't prepare the address (normally
989    //       done by AddAllocatedPort) until we have these addresses.  So we
990    //       wait to do that until below.
991    session_->AddAllocatedPort(port, this, false);
992
993    // Add the addresses of this protocol.
994    PortList::const_iterator relay_port;
995    for (relay_port = config.ports.begin();
996         relay_port != config.ports.end();
997         ++relay_port) {
998      port->AddServerAddress(*relay_port);
999      port->AddExternalAddress(*relay_port);
1000    }
1001    // Start fetching an address for this port.
1002    port->PrepareAddress();
1003  }
1004}
1005
1006void AllocationSequence::CreateTurnPort(const RelayServerConfig& config) {
1007  PortList::const_iterator relay_port;
1008  for (relay_port = config.ports.begin();
1009       relay_port != config.ports.end(); ++relay_port) {
1010    TurnPort* port = TurnPort::Create(session_->network_thread(),
1011                                      session_->socket_factory(),
1012                                      network_, ip_,
1013                                      session_->allocator()->min_port(),
1014                                      session_->allocator()->max_port(),
1015                                      session_->username(),
1016                                      session_->password(),
1017                                      *relay_port, config.credentials);
1018    if (port) {
1019      session_->AddAllocatedPort(port, this, true);
1020    }
1021  }
1022}
1023
1024void AllocationSequence::OnReadPacket(
1025    talk_base::AsyncPacketSocket* socket, const char* data, size_t size,
1026    const talk_base::SocketAddress& remote_addr) {
1027  ASSERT(socket == udp_socket_.get());
1028  for (std::deque<Port*>::iterator iter = ports.begin();
1029       iter != ports.end(); ++iter) {
1030    // We have only one port in the queue.
1031    // TODO(mallinath) - Add shared socket support to Relay and Turn ports.
1032    if ((*iter)->HandleIncomingPacket(socket, data, size, remote_addr)) {
1033      break;
1034    }
1035  }
1036}
1037
1038void AllocationSequence::OnPortDestroyed(PortInterface* port) {
1039  std::deque<Port*>::iterator iter =
1040      std::find(ports.begin(), ports.end(), port);
1041  ASSERT(iter != ports.end());
1042  ports.erase(iter);
1043}
1044
1045// PortConfiguration
1046PortConfiguration::PortConfiguration(
1047    const talk_base::SocketAddress& stun_address,
1048    const std::string& username,
1049    const std::string& password)
1050    : stun_address(stun_address),
1051      username(username),
1052      password(password) {
1053}
1054
1055void PortConfiguration::AddRelay(const RelayServerConfig& config) {
1056  relays.push_back(config);
1057}
1058
1059bool PortConfiguration::SupportsProtocol(
1060    const RelayServerConfig& relay, ProtocolType type) {
1061  PortList::const_iterator relay_port;
1062  for (relay_port = relay.ports.begin();
1063        relay_port != relay.ports.end();
1064        ++relay_port) {
1065    if (relay_port->proto == type)
1066      return true;
1067  }
1068  return false;
1069}
1070
1071}  // namespace cricket
1072