1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/p2p/base/transport.h"
29
30#include "talk/base/common.h"
31#include "talk/base/logging.h"
32#include "talk/p2p/base/candidate.h"
33#include "talk/p2p/base/constants.h"
34#include "talk/p2p/base/sessionmanager.h"
35#include "talk/p2p/base/parsing.h"
36#include "talk/p2p/base/transportchannelimpl.h"
37#include "talk/xmllite/xmlelement.h"
38#include "talk/xmpp/constants.h"
39
40namespace cricket {
41
42struct ChannelParams {
43  ChannelParams() : channel(NULL), candidate(NULL) {}
44  explicit ChannelParams(const std::string& name)
45      : name(name), channel(NULL), candidate(NULL) {}
46  ChannelParams(const std::string& name,
47                const std::string& content_type)
48      : name(name), content_type(content_type),
49        channel(NULL), candidate(NULL) {}
50  explicit ChannelParams(cricket::Candidate* candidate) :
51      channel(NULL), candidate(candidate) {
52    name = candidate->name();
53  }
54
55  ~ChannelParams() {
56    delete candidate;
57  }
58
59  std::string name;
60  std::string content_type;
61  cricket::TransportChannelImpl* channel;
62  cricket::Candidate* candidate;
63};
64typedef talk_base::TypedMessageData<ChannelParams*> ChannelMessage;
65
66enum {
67  MSG_CREATECHANNEL = 1,
68  MSG_DESTROYCHANNEL = 2,
69  MSG_DESTROYALLCHANNELS = 3,
70  MSG_CONNECTCHANNELS = 4,
71  MSG_RESETCHANNELS = 5,
72  MSG_ONSIGNALINGREADY = 6,
73  MSG_ONREMOTECANDIDATE = 7,
74  MSG_READSTATE = 8,
75  MSG_WRITESTATE = 9,
76  MSG_REQUESTSIGNALING = 10,
77  MSG_ONCHANNELCANDIDATEREADY = 11,
78  MSG_CONNECTING = 12,
79};
80
81Transport::Transport(talk_base::Thread* signaling_thread,
82                     talk_base::Thread* worker_thread,
83                     const std::string& type,
84                     PortAllocator* allocator)
85  : signaling_thread_(signaling_thread),
86    worker_thread_(worker_thread), type_(type), allocator_(allocator),
87    destroyed_(false), readable_(false), writable_(false),
88    connect_requested_(false), allow_local_ips_(false) {
89}
90
91Transport::~Transport() {
92  ASSERT(signaling_thread_->IsCurrent());
93  ASSERT(destroyed_);
94}
95
96TransportChannelImpl* Transport::CreateChannel(
97    const std::string& name, const std::string& content_type) {
98  ChannelParams params(name, content_type);
99  ChannelMessage msg(&params);
100  worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
101  return msg.data()->channel;
102}
103
104TransportChannelImpl* Transport::CreateChannel_w(
105    const std::string& name, const std::string& content_type) {
106  ASSERT(worker_thread()->IsCurrent());
107
108  TransportChannelImpl* impl = CreateTransportChannel(name, content_type);
109  impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
110  impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
111  impl->SignalRequestSignaling.connect(
112      this, &Transport::OnChannelRequestSignaling);
113  impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
114
115  talk_base::CritScope cs(&crit_);
116  ASSERT(channels_.find(name) == channels_.end());
117  channels_[name] = impl;
118  destroyed_ = false;
119  if (connect_requested_) {
120    impl->Connect();
121    if (channels_.size() == 1) {
122      // If this is the first channel, then indicate that we have started
123      // connecting.
124      signaling_thread()->Post(this, MSG_CONNECTING, NULL);
125    }
126  }
127  return impl;
128}
129
130TransportChannelImpl* Transport::GetChannel(const std::string& name) {
131  talk_base::CritScope cs(&crit_);
132  ChannelMap::iterator iter = channels_.find(name);
133  return (iter != channels_.end()) ? iter->second : NULL;
134}
135
136bool Transport::HasChannels() {
137  talk_base::CritScope cs(&crit_);
138  return !channels_.empty();
139}
140
141void Transport::DestroyChannel(const std::string& name) {
142  ChannelParams params(name);
143  ChannelMessage msg(&params);
144  worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
145}
146
147void Transport::DestroyChannel_w(const std::string& name) {
148  ASSERT(worker_thread()->IsCurrent());
149
150  TransportChannelImpl* impl = NULL;
151  {
152    talk_base::CritScope cs(&crit_);
153    ChannelMap::iterator iter = channels_.find(name);
154    if (iter == channels_.end())
155      return;
156    impl = iter->second;
157    channels_.erase(iter);
158  }
159
160  if (connect_requested_ && channels_.empty()) {
161    // We're no longer attempting to connect.
162    signaling_thread()->Post(this, MSG_CONNECTING, NULL);
163  }
164
165  if (impl) {
166    // Check in case the deleted channel was the only non-writable channel.
167    OnChannelWritableState(impl);
168    DestroyTransportChannel(impl);
169  }
170}
171
172void Transport::ConnectChannels() {
173  ASSERT(signaling_thread()->IsCurrent());
174  worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL);
175}
176
177void Transport::ConnectChannels_w() {
178  ASSERT(worker_thread()->IsCurrent());
179  if (connect_requested_ || channels_.empty())
180    return;
181  connect_requested_ = true;
182  signaling_thread()->Post(
183      this, MSG_ONCHANNELCANDIDATEREADY, NULL);
184  CallChannels_w(&TransportChannelImpl::Connect);
185  if (!channels_.empty()) {
186    signaling_thread()->Post(this, MSG_CONNECTING, NULL);
187  }
188}
189
190void Transport::OnConnecting_s() {
191  ASSERT(signaling_thread()->IsCurrent());
192  SignalConnecting(this);
193}
194
195void Transport::DestroyAllChannels() {
196  ASSERT(signaling_thread()->IsCurrent());
197  worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
198  worker_thread()->Clear(this);
199  signaling_thread()->Clear(this);
200  destroyed_ = true;
201}
202
203void Transport::DestroyAllChannels_w() {
204  ASSERT(worker_thread()->IsCurrent());
205  std::vector<TransportChannelImpl*> impls;
206  {
207    talk_base::CritScope cs(&crit_);
208    for (ChannelMap::iterator iter = channels_.begin();
209         iter != channels_.end();
210         ++iter) {
211      impls.push_back(iter->second);
212    }
213    channels_.clear();
214  }
215
216  for (size_t i = 0; i < impls.size(); ++i)
217    DestroyTransportChannel(impls[i]);
218}
219
220void Transport::ResetChannels() {
221  ASSERT(signaling_thread()->IsCurrent());
222  worker_thread()->Send(this, MSG_RESETCHANNELS, NULL);
223}
224
225void Transport::ResetChannels_w() {
226  ASSERT(worker_thread()->IsCurrent());
227
228  // We are no longer attempting to connect
229  connect_requested_ = false;
230
231  // Clear out the old messages, they aren't relevant
232  talk_base::CritScope cs(&crit_);
233  ready_candidates_.clear();
234
235  // Reset all of the channels
236  CallChannels_w(&TransportChannelImpl::Reset);
237}
238
239void Transport::OnSignalingReady() {
240  ASSERT(signaling_thread()->IsCurrent());
241  if (destroyed_) return;
242
243  worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
244
245  // Notify the subclass.
246  OnTransportSignalingReady();
247}
248
249void Transport::CallChannels_w(TransportChannelFunc func) {
250  ASSERT(worker_thread()->IsCurrent());
251  talk_base::CritScope cs(&crit_);
252  for (ChannelMap::iterator iter = channels_.begin();
253       iter != channels_.end();
254       ++iter) {
255    ((iter->second)->*func)();
256  }
257}
258
259bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) {
260  if (cand.address().IsLocalIP() && !allow_local_ips_)
261    return BadParse("candidate has local IP address", error);
262
263  // No address zero.
264  if (cand.address().IsAny()) {
265    return BadParse("candidate has address of zero", error);
266  }
267
268  // Disallow all ports below 1024, except for 80 and 443 on public addresses.
269  int port = cand.address().port();
270  if (port < 1024) {
271    if ((port != 80) && (port != 443))
272      return BadParse(
273          "candidate has port below 1024, but not 80 or 443", error);
274    if (cand.address().IsPrivateIP()) {
275      return BadParse(
276          "candidate has port of 80 or 443 with private IP address", error);
277    }
278  }
279
280  return true;
281}
282
283void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
284  for (std::vector<Candidate>::const_iterator iter = candidates.begin();
285       iter != candidates.end();
286       ++iter) {
287    OnRemoteCandidate(*iter);
288  }
289}
290
291void Transport::OnRemoteCandidate(const Candidate& candidate) {
292  ASSERT(signaling_thread()->IsCurrent());
293  if (destroyed_) return;
294  if (!HasChannel(candidate.name())) {
295    LOG(LS_WARNING) << "Ignoring candidate for unknown channel "
296                    << candidate.name();
297    return;
298  }
299
300  // new candidate deleted when params is deleted
301  ChannelParams* params = new ChannelParams(new Candidate(candidate));
302  ChannelMessage* msg = new ChannelMessage(params);
303  worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
304}
305
306void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
307  ASSERT(worker_thread()->IsCurrent());
308  ChannelMap::iterator iter = channels_.find(candidate.name());
309  // It's ok for a channel to go away while this message is in transit.
310  if (iter != channels_.end()) {
311    iter->second->OnCandidate(candidate);
312  }
313}
314
315void Transport::OnChannelReadableState(TransportChannel* channel) {
316  ASSERT(worker_thread()->IsCurrent());
317  signaling_thread()->Post(this, MSG_READSTATE, NULL);
318}
319
320void Transport::OnChannelReadableState_s() {
321  ASSERT(signaling_thread()->IsCurrent());
322  bool readable = GetTransportState_s(true);
323  if (readable_ != readable) {
324    readable_ = readable;
325    SignalReadableState(this);
326  }
327}
328
329void Transport::OnChannelWritableState(TransportChannel* channel) {
330  ASSERT(worker_thread()->IsCurrent());
331  signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
332}
333
334void Transport::OnChannelWritableState_s() {
335  ASSERT(signaling_thread()->IsCurrent());
336  bool writable = GetTransportState_s(false);
337  if (writable_ != writable) {
338    writable_ = writable;
339    SignalWritableState(this);
340  }
341}
342
343bool Transport::GetTransportState_s(bool read) {
344  ASSERT(signaling_thread()->IsCurrent());
345  bool result = false;
346  talk_base::CritScope cs(&crit_);
347  for (ChannelMap::iterator iter = channels_.begin();
348       iter != channels_.end();
349       ++iter) {
350    bool b = (read ? iter->second->readable() : iter->second->writable());
351    result = result || b;
352  }
353  return result;
354}
355
356void Transport::OnChannelRequestSignaling() {
357  ASSERT(worker_thread()->IsCurrent());
358  signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL);
359}
360
361void Transport::OnChannelRequestSignaling_s() {
362  ASSERT(signaling_thread()->IsCurrent());
363  SignalRequestSignaling(this);
364}
365
366void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
367                                        const Candidate& candidate) {
368  ASSERT(worker_thread()->IsCurrent());
369  talk_base::CritScope cs(&crit_);
370  ready_candidates_.push_back(candidate);
371
372  // We hold any messages until the client lets us connect.
373  if (connect_requested_) {
374    signaling_thread()->Post(
375        this, MSG_ONCHANNELCANDIDATEREADY, NULL);
376  }
377}
378
379void Transport::OnChannelCandidateReady_s() {
380  ASSERT(signaling_thread()->IsCurrent());
381  ASSERT(connect_requested_);
382
383  std::vector<Candidate> candidates;
384  {
385    talk_base::CritScope cs(&crit_);
386    candidates.swap(ready_candidates_);
387  }
388
389  // we do the deleting of Candidate* here to keep the new above and
390  // delete below close to each other
391  if (!candidates.empty()) {
392    SignalCandidatesReady(this, candidates);
393  }
394}
395
396void Transport::OnMessage(talk_base::Message* msg) {
397  switch (msg->message_id) {
398  case MSG_CREATECHANNEL:
399    {
400      ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
401      params->channel = CreateChannel_w(params->name, params->content_type);
402    }
403    break;
404  case MSG_DESTROYCHANNEL:
405    {
406      ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
407      DestroyChannel_w(params->name);
408    }
409    break;
410  case MSG_CONNECTCHANNELS:
411    ConnectChannels_w();
412    break;
413  case MSG_RESETCHANNELS:
414    ResetChannels_w();
415    break;
416  case MSG_DESTROYALLCHANNELS:
417    DestroyAllChannels_w();
418    break;
419  case MSG_ONSIGNALINGREADY:
420    CallChannels_w(&TransportChannelImpl::OnSignalingReady);
421    break;
422  case MSG_ONREMOTECANDIDATE:
423    {
424      ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
425      ChannelParams* params = channel_msg->data();
426      OnRemoteCandidate_w(*(params->candidate));
427      delete params;
428      delete channel_msg;
429    }
430    break;
431  case MSG_CONNECTING:
432    OnConnecting_s();
433    break;
434  case MSG_READSTATE:
435    OnChannelReadableState_s();
436    break;
437  case MSG_WRITESTATE:
438    OnChannelWritableState_s();
439    break;
440  case MSG_REQUESTSIGNALING:
441    OnChannelRequestSignaling_s();
442    break;
443  case MSG_ONCHANNELCANDIDATEREADY:
444    OnChannelCandidateReady_s();
445    break;
446  }
447}
448
449bool TransportParser::ParseAddress(const buzz::XmlElement* elem,
450                                   const buzz::QName& address_name,
451                                   const buzz::QName& port_name,
452                                   talk_base::SocketAddress* address,
453                                   ParseError* error) {
454  if (!elem->HasAttr(address_name))
455    return BadParse("address does not have " + address_name.LocalPart(), error);
456  if (!elem->HasAttr(port_name))
457    return BadParse("address does not have " + port_name.LocalPart(), error);
458
459  address->SetIP(elem->Attr(address_name));
460  std::istringstream ist(elem->Attr(port_name));
461  int port = 0;
462  ist >> port;
463  address->SetPort(port);
464
465  return true;
466}
467
468}  // namespace cricket
469