1/* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#include "talk/p2p/base/transport.h" 29 30#include "talk/base/common.h" 31#include "talk/base/logging.h" 32#include "talk/p2p/base/candidate.h" 33#include "talk/p2p/base/constants.h" 34#include "talk/p2p/base/sessionmanager.h" 35#include "talk/p2p/base/parsing.h" 36#include "talk/p2p/base/transportchannelimpl.h" 37#include "talk/xmllite/xmlelement.h" 38#include "talk/xmpp/constants.h" 39 40namespace cricket { 41 42struct ChannelParams { 43 ChannelParams() : channel(NULL), candidate(NULL) {} 44 explicit ChannelParams(const std::string& name) 45 : name(name), channel(NULL), candidate(NULL) {} 46 ChannelParams(const std::string& name, 47 const std::string& content_type) 48 : name(name), content_type(content_type), 49 channel(NULL), candidate(NULL) {} 50 explicit ChannelParams(cricket::Candidate* candidate) : 51 channel(NULL), candidate(candidate) { 52 name = candidate->name(); 53 } 54 55 ~ChannelParams() { 56 delete candidate; 57 } 58 59 std::string name; 60 std::string content_type; 61 cricket::TransportChannelImpl* channel; 62 cricket::Candidate* candidate; 63}; 64typedef talk_base::TypedMessageData<ChannelParams*> ChannelMessage; 65 66enum { 67 MSG_CREATECHANNEL = 1, 68 MSG_DESTROYCHANNEL = 2, 69 MSG_DESTROYALLCHANNELS = 3, 70 MSG_CONNECTCHANNELS = 4, 71 MSG_RESETCHANNELS = 5, 72 MSG_ONSIGNALINGREADY = 6, 73 MSG_ONREMOTECANDIDATE = 7, 74 MSG_READSTATE = 8, 75 MSG_WRITESTATE = 9, 76 MSG_REQUESTSIGNALING = 10, 77 MSG_ONCHANNELCANDIDATEREADY = 11, 78 MSG_CONNECTING = 12, 79}; 80 81Transport::Transport(talk_base::Thread* signaling_thread, 82 talk_base::Thread* worker_thread, 83 const std::string& type, 84 PortAllocator* allocator) 85 : signaling_thread_(signaling_thread), 86 worker_thread_(worker_thread), type_(type), allocator_(allocator), 87 destroyed_(false), readable_(false), writable_(false), 88 connect_requested_(false), allow_local_ips_(false) { 89} 90 91Transport::~Transport() { 92 ASSERT(signaling_thread_->IsCurrent()); 93 ASSERT(destroyed_); 94} 95 96TransportChannelImpl* Transport::CreateChannel( 97 const std::string& name, const std::string& content_type) { 98 ChannelParams params(name, content_type); 99 ChannelMessage msg(¶ms); 100 worker_thread()->Send(this, MSG_CREATECHANNEL, &msg); 101 return msg.data()->channel; 102} 103 104TransportChannelImpl* Transport::CreateChannel_w( 105 const std::string& name, const std::string& content_type) { 106 ASSERT(worker_thread()->IsCurrent()); 107 108 TransportChannelImpl* impl = CreateTransportChannel(name, content_type); 109 impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState); 110 impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState); 111 impl->SignalRequestSignaling.connect( 112 this, &Transport::OnChannelRequestSignaling); 113 impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady); 114 115 talk_base::CritScope cs(&crit_); 116 ASSERT(channels_.find(name) == channels_.end()); 117 channels_[name] = impl; 118 destroyed_ = false; 119 if (connect_requested_) { 120 impl->Connect(); 121 if (channels_.size() == 1) { 122 // If this is the first channel, then indicate that we have started 123 // connecting. 124 signaling_thread()->Post(this, MSG_CONNECTING, NULL); 125 } 126 } 127 return impl; 128} 129 130TransportChannelImpl* Transport::GetChannel(const std::string& name) { 131 talk_base::CritScope cs(&crit_); 132 ChannelMap::iterator iter = channels_.find(name); 133 return (iter != channels_.end()) ? iter->second : NULL; 134} 135 136bool Transport::HasChannels() { 137 talk_base::CritScope cs(&crit_); 138 return !channels_.empty(); 139} 140 141void Transport::DestroyChannel(const std::string& name) { 142 ChannelParams params(name); 143 ChannelMessage msg(¶ms); 144 worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg); 145} 146 147void Transport::DestroyChannel_w(const std::string& name) { 148 ASSERT(worker_thread()->IsCurrent()); 149 150 TransportChannelImpl* impl = NULL; 151 { 152 talk_base::CritScope cs(&crit_); 153 ChannelMap::iterator iter = channels_.find(name); 154 if (iter == channels_.end()) 155 return; 156 impl = iter->second; 157 channels_.erase(iter); 158 } 159 160 if (connect_requested_ && channels_.empty()) { 161 // We're no longer attempting to connect. 162 signaling_thread()->Post(this, MSG_CONNECTING, NULL); 163 } 164 165 if (impl) { 166 // Check in case the deleted channel was the only non-writable channel. 167 OnChannelWritableState(impl); 168 DestroyTransportChannel(impl); 169 } 170} 171 172void Transport::ConnectChannels() { 173 ASSERT(signaling_thread()->IsCurrent()); 174 worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL); 175} 176 177void Transport::ConnectChannels_w() { 178 ASSERT(worker_thread()->IsCurrent()); 179 if (connect_requested_ || channels_.empty()) 180 return; 181 connect_requested_ = true; 182 signaling_thread()->Post( 183 this, MSG_ONCHANNELCANDIDATEREADY, NULL); 184 CallChannels_w(&TransportChannelImpl::Connect); 185 if (!channels_.empty()) { 186 signaling_thread()->Post(this, MSG_CONNECTING, NULL); 187 } 188} 189 190void Transport::OnConnecting_s() { 191 ASSERT(signaling_thread()->IsCurrent()); 192 SignalConnecting(this); 193} 194 195void Transport::DestroyAllChannels() { 196 ASSERT(signaling_thread()->IsCurrent()); 197 worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL); 198 worker_thread()->Clear(this); 199 signaling_thread()->Clear(this); 200 destroyed_ = true; 201} 202 203void Transport::DestroyAllChannels_w() { 204 ASSERT(worker_thread()->IsCurrent()); 205 std::vector<TransportChannelImpl*> impls; 206 { 207 talk_base::CritScope cs(&crit_); 208 for (ChannelMap::iterator iter = channels_.begin(); 209 iter != channels_.end(); 210 ++iter) { 211 impls.push_back(iter->second); 212 } 213 channels_.clear(); 214 } 215 216 for (size_t i = 0; i < impls.size(); ++i) 217 DestroyTransportChannel(impls[i]); 218} 219 220void Transport::ResetChannels() { 221 ASSERT(signaling_thread()->IsCurrent()); 222 worker_thread()->Send(this, MSG_RESETCHANNELS, NULL); 223} 224 225void Transport::ResetChannels_w() { 226 ASSERT(worker_thread()->IsCurrent()); 227 228 // We are no longer attempting to connect 229 connect_requested_ = false; 230 231 // Clear out the old messages, they aren't relevant 232 talk_base::CritScope cs(&crit_); 233 ready_candidates_.clear(); 234 235 // Reset all of the channels 236 CallChannels_w(&TransportChannelImpl::Reset); 237} 238 239void Transport::OnSignalingReady() { 240 ASSERT(signaling_thread()->IsCurrent()); 241 if (destroyed_) return; 242 243 worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL); 244 245 // Notify the subclass. 246 OnTransportSignalingReady(); 247} 248 249void Transport::CallChannels_w(TransportChannelFunc func) { 250 ASSERT(worker_thread()->IsCurrent()); 251 talk_base::CritScope cs(&crit_); 252 for (ChannelMap::iterator iter = channels_.begin(); 253 iter != channels_.end(); 254 ++iter) { 255 ((iter->second)->*func)(); 256 } 257} 258 259bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) { 260 if (cand.address().IsLocalIP() && !allow_local_ips_) 261 return BadParse("candidate has local IP address", error); 262 263 // No address zero. 264 if (cand.address().IsAny()) { 265 return BadParse("candidate has address of zero", error); 266 } 267 268 // Disallow all ports below 1024, except for 80 and 443 on public addresses. 269 int port = cand.address().port(); 270 if (port < 1024) { 271 if ((port != 80) && (port != 443)) 272 return BadParse( 273 "candidate has port below 1024, but not 80 or 443", error); 274 if (cand.address().IsPrivateIP()) { 275 return BadParse( 276 "candidate has port of 80 or 443 with private IP address", error); 277 } 278 } 279 280 return true; 281} 282 283void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) { 284 for (std::vector<Candidate>::const_iterator iter = candidates.begin(); 285 iter != candidates.end(); 286 ++iter) { 287 OnRemoteCandidate(*iter); 288 } 289} 290 291void Transport::OnRemoteCandidate(const Candidate& candidate) { 292 ASSERT(signaling_thread()->IsCurrent()); 293 if (destroyed_) return; 294 if (!HasChannel(candidate.name())) { 295 LOG(LS_WARNING) << "Ignoring candidate for unknown channel " 296 << candidate.name(); 297 return; 298 } 299 300 // new candidate deleted when params is deleted 301 ChannelParams* params = new ChannelParams(new Candidate(candidate)); 302 ChannelMessage* msg = new ChannelMessage(params); 303 worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg); 304} 305 306void Transport::OnRemoteCandidate_w(const Candidate& candidate) { 307 ASSERT(worker_thread()->IsCurrent()); 308 ChannelMap::iterator iter = channels_.find(candidate.name()); 309 // It's ok for a channel to go away while this message is in transit. 310 if (iter != channels_.end()) { 311 iter->second->OnCandidate(candidate); 312 } 313} 314 315void Transport::OnChannelReadableState(TransportChannel* channel) { 316 ASSERT(worker_thread()->IsCurrent()); 317 signaling_thread()->Post(this, MSG_READSTATE, NULL); 318} 319 320void Transport::OnChannelReadableState_s() { 321 ASSERT(signaling_thread()->IsCurrent()); 322 bool readable = GetTransportState_s(true); 323 if (readable_ != readable) { 324 readable_ = readable; 325 SignalReadableState(this); 326 } 327} 328 329void Transport::OnChannelWritableState(TransportChannel* channel) { 330 ASSERT(worker_thread()->IsCurrent()); 331 signaling_thread()->Post(this, MSG_WRITESTATE, NULL); 332} 333 334void Transport::OnChannelWritableState_s() { 335 ASSERT(signaling_thread()->IsCurrent()); 336 bool writable = GetTransportState_s(false); 337 if (writable_ != writable) { 338 writable_ = writable; 339 SignalWritableState(this); 340 } 341} 342 343bool Transport::GetTransportState_s(bool read) { 344 ASSERT(signaling_thread()->IsCurrent()); 345 bool result = false; 346 talk_base::CritScope cs(&crit_); 347 for (ChannelMap::iterator iter = channels_.begin(); 348 iter != channels_.end(); 349 ++iter) { 350 bool b = (read ? iter->second->readable() : iter->second->writable()); 351 result = result || b; 352 } 353 return result; 354} 355 356void Transport::OnChannelRequestSignaling() { 357 ASSERT(worker_thread()->IsCurrent()); 358 signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL); 359} 360 361void Transport::OnChannelRequestSignaling_s() { 362 ASSERT(signaling_thread()->IsCurrent()); 363 SignalRequestSignaling(this); 364} 365 366void Transport::OnChannelCandidateReady(TransportChannelImpl* channel, 367 const Candidate& candidate) { 368 ASSERT(worker_thread()->IsCurrent()); 369 talk_base::CritScope cs(&crit_); 370 ready_candidates_.push_back(candidate); 371 372 // We hold any messages until the client lets us connect. 373 if (connect_requested_) { 374 signaling_thread()->Post( 375 this, MSG_ONCHANNELCANDIDATEREADY, NULL); 376 } 377} 378 379void Transport::OnChannelCandidateReady_s() { 380 ASSERT(signaling_thread()->IsCurrent()); 381 ASSERT(connect_requested_); 382 383 std::vector<Candidate> candidates; 384 { 385 talk_base::CritScope cs(&crit_); 386 candidates.swap(ready_candidates_); 387 } 388 389 // we do the deleting of Candidate* here to keep the new above and 390 // delete below close to each other 391 if (!candidates.empty()) { 392 SignalCandidatesReady(this, candidates); 393 } 394} 395 396void Transport::OnMessage(talk_base::Message* msg) { 397 switch (msg->message_id) { 398 case MSG_CREATECHANNEL: 399 { 400 ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data(); 401 params->channel = CreateChannel_w(params->name, params->content_type); 402 } 403 break; 404 case MSG_DESTROYCHANNEL: 405 { 406 ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data(); 407 DestroyChannel_w(params->name); 408 } 409 break; 410 case MSG_CONNECTCHANNELS: 411 ConnectChannels_w(); 412 break; 413 case MSG_RESETCHANNELS: 414 ResetChannels_w(); 415 break; 416 case MSG_DESTROYALLCHANNELS: 417 DestroyAllChannels_w(); 418 break; 419 case MSG_ONSIGNALINGREADY: 420 CallChannels_w(&TransportChannelImpl::OnSignalingReady); 421 break; 422 case MSG_ONREMOTECANDIDATE: 423 { 424 ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata); 425 ChannelParams* params = channel_msg->data(); 426 OnRemoteCandidate_w(*(params->candidate)); 427 delete params; 428 delete channel_msg; 429 } 430 break; 431 case MSG_CONNECTING: 432 OnConnecting_s(); 433 break; 434 case MSG_READSTATE: 435 OnChannelReadableState_s(); 436 break; 437 case MSG_WRITESTATE: 438 OnChannelWritableState_s(); 439 break; 440 case MSG_REQUESTSIGNALING: 441 OnChannelRequestSignaling_s(); 442 break; 443 case MSG_ONCHANNELCANDIDATEREADY: 444 OnChannelCandidateReady_s(); 445 break; 446 } 447} 448 449bool TransportParser::ParseAddress(const buzz::XmlElement* elem, 450 const buzz::QName& address_name, 451 const buzz::QName& port_name, 452 talk_base::SocketAddress* address, 453 ParseError* error) { 454 if (!elem->HasAttr(address_name)) 455 return BadParse("address does not have " + address_name.LocalPart(), error); 456 if (!elem->HasAttr(port_name)) 457 return BadParse("address does not have " + port_name.LocalPart(), error); 458 459 address->SetIP(elem->Attr(address_name)); 460 std::istringstream ist(elem->Attr(port_name)); 461 int port = 0; 462 ist >> port; 463 address->SetPort(port); 464 465 return true; 466} 467 468} // namespace cricket 469