1/* 2 * libjingle 3 * Copyright 2004--2006, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#include <string> 29#include "pseudotcpchannel.h" 30#include "talk/p2p/base/candidate.h" 31#include "talk/p2p/base/transportchannel.h" 32#include "webrtc/base/basictypes.h" 33#include "webrtc/base/common.h" 34#include "webrtc/base/logging.h" 35#include "webrtc/base/scoped_ptr.h" 36#include "webrtc/base/stringutils.h" 37 38using namespace rtc; 39 40namespace cricket { 41 42extern const rtc::ConstantLabel SESSION_STATES[]; 43 44// MSG_WK_* - worker thread messages 45// MSG_ST_* - stream thread messages 46// MSG_SI_* - signal thread messages 47 48enum { 49 MSG_WK_CLOCK = 1, 50 MSG_WK_PURGE, 51 MSG_ST_EVENT, 52 MSG_SI_DESTROYCHANNEL, 53 MSG_SI_DESTROY, 54}; 55 56struct EventData : public MessageData { 57 int event, error; 58 EventData(int ev, int err = 0) : event(ev), error(err) { } 59}; 60 61/////////////////////////////////////////////////////////////////////////////// 62// PseudoTcpChannel::InternalStream 63/////////////////////////////////////////////////////////////////////////////// 64 65class PseudoTcpChannel::InternalStream : public StreamInterface { 66public: 67 InternalStream(PseudoTcpChannel* parent); 68 virtual ~InternalStream(); 69 70 virtual StreamState GetState() const; 71 virtual StreamResult Read(void* buffer, size_t buffer_len, 72 size_t* read, int* error); 73 virtual StreamResult Write(const void* data, size_t data_len, 74 size_t* written, int* error); 75 virtual void Close(); 76 77private: 78 // parent_ is accessed and modified exclusively on the event thread, to 79 // avoid thread contention. This means that the PseudoTcpChannel cannot go 80 // away until after it receives a Close() from TunnelStream. 81 PseudoTcpChannel* parent_; 82}; 83 84/////////////////////////////////////////////////////////////////////////////// 85// PseudoTcpChannel 86// Member object lifetime summaries: 87// session_ - passed in constructor, cleared when channel_ goes away. 88// channel_ - created in Connect, destroyed when session_ or tcp_ goes away. 89// tcp_ - created in Connect, destroyed when channel_ goes away, or connection 90// closes. 91// worker_thread_ - created when channel_ is created, purged when channel_ is 92// destroyed. 93// stream_ - created in GetStream, destroyed by owner at arbitrary time. 94// this - created in constructor, destroyed when worker_thread_ and stream_ 95// are both gone. 96/////////////////////////////////////////////////////////////////////////////// 97 98// 99// Signal thread methods 100// 101 102PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session) 103 : signal_thread_(session->session_manager()->signaling_thread()), 104 worker_thread_(NULL), 105 stream_thread_(stream_thread), 106 session_(session), channel_(NULL), tcp_(NULL), stream_(NULL), 107 stream_readable_(false), pending_read_event_(false), 108 ready_to_connect_(false) { 109 ASSERT(signal_thread_->IsCurrent()); 110 ASSERT(NULL != session_); 111} 112 113PseudoTcpChannel::~PseudoTcpChannel() { 114 ASSERT(signal_thread_->IsCurrent()); 115 ASSERT(worker_thread_ == NULL); 116 ASSERT(session_ == NULL); 117 ASSERT(channel_ == NULL); 118 ASSERT(stream_ == NULL); 119 ASSERT(tcp_ == NULL); 120} 121 122bool PseudoTcpChannel::Connect(const std::string& content_name, 123 const std::string& channel_name, 124 int component) { 125 ASSERT(signal_thread_->IsCurrent()); 126 CritScope lock(&cs_); 127 128 if (channel_) 129 return false; 130 131 ASSERT(session_ != NULL); 132 worker_thread_ = session_->session_manager()->worker_thread(); 133 content_name_ = content_name; 134 channel_ = session_->CreateChannel( 135 content_name, channel_name, component); 136 channel_name_ = channel_name; 137 channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1); 138 139 channel_->SignalDestroyed.connect(this, 140 &PseudoTcpChannel::OnChannelDestroyed); 141 channel_->SignalWritableState.connect(this, 142 &PseudoTcpChannel::OnChannelWritableState); 143 channel_->SignalReadPacket.connect(this, 144 &PseudoTcpChannel::OnChannelRead); 145 channel_->SignalRouteChange.connect(this, 146 &PseudoTcpChannel::OnChannelConnectionChanged); 147 148 ASSERT(tcp_ == NULL); 149 tcp_ = new PseudoTcp(this, 0); 150 if (session_->initiator()) { 151 // Since we may try several protocols and network adapters that won't work, 152 // waiting until we get our first writable notification before initiating 153 // TCP negotiation. 154 ready_to_connect_ = true; 155 } 156 157 return true; 158} 159 160StreamInterface* PseudoTcpChannel::GetStream() { 161 ASSERT(signal_thread_->IsCurrent()); 162 CritScope lock(&cs_); 163 ASSERT(NULL != session_); 164 if (!stream_) 165 stream_ = new PseudoTcpChannel::InternalStream(this); 166 //TODO("should we disallow creation of new stream at some point?"); 167 return stream_; 168} 169 170void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) { 171 LOG_F(LS_INFO) << "(" << channel->component() << ")"; 172 ASSERT(signal_thread_->IsCurrent()); 173 CritScope lock(&cs_); 174 ASSERT(channel == channel_); 175 signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL); 176 // When MSG_WK_PURGE is received, we know there will be no more messages from 177 // the worker thread. 178 worker_thread_->Clear(this, MSG_WK_CLOCK); 179 worker_thread_->Post(this, MSG_WK_PURGE); 180 session_ = NULL; 181 channel_ = NULL; 182 if ((stream_ != NULL) 183 && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED))) 184 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0)); 185 if (tcp_) { 186 tcp_->Close(true); 187 AdjustClock(); 188 } 189 SignalChannelClosed(this); 190} 191 192void PseudoTcpChannel::OnSessionTerminate(Session* session) { 193 // When the session terminates before we even connected 194 CritScope lock(&cs_); 195 if (session_ != NULL && channel_ == NULL) { 196 ASSERT(session == session_); 197 ASSERT(worker_thread_ == NULL); 198 ASSERT(tcp_ == NULL); 199 LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel"; 200 session_ = NULL; 201 if (stream_ != NULL) 202 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1)); 203 } 204 205 // Even though session_ is being destroyed, we mustn't clear the pointer, 206 // since we'll need it to tear down channel_. 207 // 208 // TODO: Is it always the case that if channel_ != NULL then we'll get 209 // a channel-destroyed notification? 210} 211 212void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) { 213 ASSERT(signal_thread_->IsCurrent()); 214 CritScope lock(&cs_); 215 ASSERT(tcp_ != NULL); 216 tcp_->GetOption(opt, value); 217} 218 219void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) { 220 ASSERT(signal_thread_->IsCurrent()); 221 CritScope lock(&cs_); 222 ASSERT(tcp_ != NULL); 223 tcp_->SetOption(opt, value); 224} 225 226// 227// Stream thread methods 228// 229 230StreamState PseudoTcpChannel::GetState() const { 231 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 232 CritScope lock(&cs_); 233 if (!session_) 234 return SS_CLOSED; 235 if (!tcp_) 236 return SS_OPENING; 237 switch (tcp_->State()) { 238 case PseudoTcp::TCP_LISTEN: 239 case PseudoTcp::TCP_SYN_SENT: 240 case PseudoTcp::TCP_SYN_RECEIVED: 241 return SS_OPENING; 242 case PseudoTcp::TCP_ESTABLISHED: 243 return SS_OPEN; 244 case PseudoTcp::TCP_CLOSED: 245 default: 246 return SS_CLOSED; 247 } 248} 249 250StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len, 251 size_t* read, int* error) { 252 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 253 CritScope lock(&cs_); 254 if (!tcp_) 255 return SR_BLOCK; 256 257 stream_readable_ = false; 258 int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len); 259 //LOG_F(LS_VERBOSE) << "Recv returned: " << result; 260 if (result > 0) { 261 if (read) 262 *read = result; 263 // PseudoTcp doesn't currently support repeated Readable signals. Simulate 264 // them here. 265 stream_readable_ = true; 266 if (!pending_read_event_) { 267 pending_read_event_ = true; 268 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true); 269 } 270 return SR_SUCCESS; 271 } else if (IsBlockingError(tcp_->GetError())) { 272 return SR_BLOCK; 273 } else { 274 if (error) 275 *error = tcp_->GetError(); 276 return SR_ERROR; 277 } 278 // This spot is never reached. 279} 280 281StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len, 282 size_t* written, int* error) { 283 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 284 CritScope lock(&cs_); 285 if (!tcp_) 286 return SR_BLOCK; 287 int result = tcp_->Send(static_cast<const char*>(data), data_len); 288 //LOG_F(LS_VERBOSE) << "Send returned: " << result; 289 if (result > 0) { 290 if (written) 291 *written = result; 292 return SR_SUCCESS; 293 } else if (IsBlockingError(tcp_->GetError())) { 294 return SR_BLOCK; 295 } else { 296 if (error) 297 *error = tcp_->GetError(); 298 return SR_ERROR; 299 } 300 // This spot is never reached. 301} 302 303void PseudoTcpChannel::Close() { 304 ASSERT(stream_ != NULL && stream_thread_->IsCurrent()); 305 CritScope lock(&cs_); 306 stream_ = NULL; 307 // Clear out any pending event notifications 308 stream_thread_->Clear(this, MSG_ST_EVENT); 309 if (tcp_) { 310 tcp_->Close(false); 311 AdjustClock(); 312 } else { 313 CheckDestroy(); 314 } 315} 316 317// 318// Worker thread methods 319// 320 321void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) { 322 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 323 ASSERT(worker_thread_->IsCurrent()); 324 CritScope lock(&cs_); 325 if (!channel_) { 326 LOG_F(LS_WARNING) << "NULL channel"; 327 return; 328 } 329 ASSERT(channel == channel_); 330 if (!tcp_) { 331 LOG_F(LS_WARNING) << "NULL tcp"; 332 return; 333 } 334 if (!ready_to_connect_ || !channel->writable()) 335 return; 336 337 ready_to_connect_ = false; 338 tcp_->Connect(); 339 AdjustClock(); 340} 341 342void PseudoTcpChannel::OnChannelRead(TransportChannel* channel, 343 const char* data, size_t size, 344 const rtc::PacketTime& packet_time, 345 int flags) { 346 //LOG_F(LS_VERBOSE) << "(" << size << ")"; 347 ASSERT(worker_thread_->IsCurrent()); 348 CritScope lock(&cs_); 349 if (!channel_) { 350 LOG_F(LS_WARNING) << "NULL channel"; 351 return; 352 } 353 ASSERT(channel == channel_); 354 if (!tcp_) { 355 LOG_F(LS_WARNING) << "NULL tcp"; 356 return; 357 } 358 tcp_->NotifyPacket(data, size); 359 AdjustClock(); 360} 361 362void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel, 363 const Candidate& candidate) { 364 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 365 ASSERT(worker_thread_->IsCurrent()); 366 CritScope lock(&cs_); 367 if (!channel_) { 368 LOG_F(LS_WARNING) << "NULL channel"; 369 return; 370 } 371 ASSERT(channel == channel_); 372 if (!tcp_) { 373 LOG_F(LS_WARNING) << "NULL tcp"; 374 return; 375 } 376 377 uint16 mtu = 1280; // safe default 378 int family = candidate.address().family(); 379 Socket* socket = 380 worker_thread_->socketserver()->CreateAsyncSocket(family, SOCK_DGRAM); 381 rtc::scoped_ptr<Socket> mtu_socket(socket); 382 if (socket == NULL) { 383 LOG_F(LS_WARNING) << "Couldn't create socket while estimating MTU."; 384 } else { 385 if (mtu_socket->Connect(candidate.address()) < 0 || 386 mtu_socket->EstimateMTU(&mtu) < 0) { 387 LOG_F(LS_WARNING) << "Failed to estimate MTU, error=" 388 << mtu_socket->GetError(); 389 } 390 } 391 392 LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes"; 393 tcp_->NotifyMTU(mtu); 394 AdjustClock(); 395} 396 397void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) { 398 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 399 ASSERT(cs_.CurrentThreadIsOwner()); 400 ASSERT(worker_thread_->IsCurrent()); 401 ASSERT(tcp == tcp_); 402 if (stream_) { 403 stream_readable_ = true; 404 pending_read_event_ = true; 405 stream_thread_->Post(this, MSG_ST_EVENT, 406 new EventData(SE_OPEN | SE_READ | SE_WRITE)); 407 } 408} 409 410void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) { 411 //LOG_F(LS_VERBOSE); 412 ASSERT(cs_.CurrentThreadIsOwner()); 413 ASSERT(worker_thread_->IsCurrent()); 414 ASSERT(tcp == tcp_); 415 if (stream_) { 416 stream_readable_ = true; 417 if (!pending_read_event_) { 418 pending_read_event_ = true; 419 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ)); 420 } 421 } 422} 423 424void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) { 425 //LOG_F(LS_VERBOSE); 426 ASSERT(cs_.CurrentThreadIsOwner()); 427 ASSERT(worker_thread_->IsCurrent()); 428 ASSERT(tcp == tcp_); 429 if (stream_) 430 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE)); 431} 432 433void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) { 434 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]"; 435 ASSERT(cs_.CurrentThreadIsOwner()); 436 ASSERT(worker_thread_->IsCurrent()); 437 ASSERT(tcp == tcp_); 438 if (stream_) 439 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError)); 440} 441 442// 443// Multi-thread methods 444// 445 446void PseudoTcpChannel::OnMessage(Message* pmsg) { 447 if (pmsg->message_id == MSG_WK_CLOCK) { 448 449 ASSERT(worker_thread_->IsCurrent()); 450 //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)"; 451 CritScope lock(&cs_); 452 if (tcp_) { 453 tcp_->NotifyClock(PseudoTcp::Now()); 454 AdjustClock(false); 455 } 456 457 } else if (pmsg->message_id == MSG_WK_PURGE) { 458 459 ASSERT(worker_thread_->IsCurrent()); 460 LOG_F(LS_INFO) << "(MSG_WK_PURGE)"; 461 // At this point, we know there are no additional worker thread messages. 462 CritScope lock(&cs_); 463 ASSERT(NULL == session_); 464 ASSERT(NULL == channel_); 465 worker_thread_ = NULL; 466 CheckDestroy(); 467 468 } else if (pmsg->message_id == MSG_ST_EVENT) { 469 470 ASSERT(stream_thread_->IsCurrent()); 471 //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, " 472 // << data->event << ", " << data->error << ")"; 473 ASSERT(stream_ != NULL); 474 EventData* data = static_cast<EventData*>(pmsg->pdata); 475 if (data->event & SE_READ) { 476 CritScope lock(&cs_); 477 pending_read_event_ = false; 478 } 479 stream_->SignalEvent(stream_, data->event, data->error); 480 delete data; 481 482 } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) { 483 484 ASSERT(signal_thread_->IsCurrent()); 485 LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)"; 486 ASSERT(session_ != NULL); 487 ASSERT(channel_ != NULL); 488 session_->DestroyChannel(content_name_, channel_->component()); 489 490 } else if (pmsg->message_id == MSG_SI_DESTROY) { 491 492 ASSERT(signal_thread_->IsCurrent()); 493 LOG_F(LS_INFO) << "(MSG_SI_DESTROY)"; 494 // The message queue is empty, so it is safe to destroy ourselves. 495 delete this; 496 497 } else { 498 ASSERT(false); 499 } 500} 501 502IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket( 503 PseudoTcp* tcp, const char* buffer, size_t len) { 504 ASSERT(cs_.CurrentThreadIsOwner()); 505 ASSERT(tcp == tcp_); 506 ASSERT(NULL != channel_); 507 rtc::PacketOptions packet_options; 508 int sent = channel_->SendPacket(buffer, len, packet_options); 509 if (sent > 0) { 510 //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent"; 511 return IPseudoTcpNotify::WR_SUCCESS; 512 } else if (IsBlockingError(channel_->GetError())) { 513 LOG_F(LS_VERBOSE) << "Blocking"; 514 return IPseudoTcpNotify::WR_SUCCESS; 515 } else if (channel_->GetError() == EMSGSIZE) { 516 LOG_F(LS_ERROR) << "EMSGSIZE"; 517 return IPseudoTcpNotify::WR_TOO_LARGE; 518 } else { 519 PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket"; 520 ASSERT(false); 521 return IPseudoTcpNotify::WR_FAIL; 522 } 523} 524 525void PseudoTcpChannel::AdjustClock(bool clear) { 526 ASSERT(cs_.CurrentThreadIsOwner()); 527 ASSERT(NULL != tcp_); 528 529 long timeout = 0; 530 if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) { 531 ASSERT(NULL != channel_); 532 // Reset the next clock, by clearing the old and setting a new one. 533 if (clear) 534 worker_thread_->Clear(this, MSG_WK_CLOCK); 535 worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK); 536 return; 537 } 538 539 delete tcp_; 540 tcp_ = NULL; 541 ready_to_connect_ = false; 542 543 if (channel_) { 544 // If TCP has failed, no need for channel_ anymore 545 signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL); 546 } 547} 548 549void PseudoTcpChannel::CheckDestroy() { 550 ASSERT(cs_.CurrentThreadIsOwner()); 551 if ((worker_thread_ != NULL) || (stream_ != NULL)) 552 return; 553 signal_thread_->Post(this, MSG_SI_DESTROY); 554} 555 556/////////////////////////////////////////////////////////////////////////////// 557// PseudoTcpChannel::InternalStream 558/////////////////////////////////////////////////////////////////////////////// 559 560PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent) 561 : parent_(parent) { 562} 563 564PseudoTcpChannel::InternalStream::~InternalStream() { 565 Close(); 566} 567 568StreamState PseudoTcpChannel::InternalStream::GetState() const { 569 if (!parent_) 570 return SS_CLOSED; 571 return parent_->GetState(); 572} 573 574StreamResult PseudoTcpChannel::InternalStream::Read( 575 void* buffer, size_t buffer_len, size_t* read, int* error) { 576 if (!parent_) { 577 if (error) 578 *error = ENOTCONN; 579 return SR_ERROR; 580 } 581 return parent_->Read(buffer, buffer_len, read, error); 582} 583 584StreamResult PseudoTcpChannel::InternalStream::Write( 585 const void* data, size_t data_len, size_t* written, int* error) { 586 if (!parent_) { 587 if (error) 588 *error = ENOTCONN; 589 return SR_ERROR; 590 } 591 return parent_->Write(data, data_len, written, error); 592} 593 594void PseudoTcpChannel::InternalStream::Close() { 595 if (!parent_) 596 return; 597 parent_->Close(); 598 parent_ = NULL; 599} 600 601/////////////////////////////////////////////////////////////////////////////// 602 603} // namespace cricket 604