1/* 2 * libjingle 3 * Copyright 2012, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27#include "talk/app/webrtc/datachannel.h" 28 29#include <string> 30 31#include "talk/app/webrtc/mediastreamprovider.h" 32#include "talk/app/webrtc/sctputils.h" 33#include "talk/base/logging.h" 34#include "talk/base/refcount.h" 35 36namespace webrtc { 37 38static size_t kMaxQueuedReceivedDataPackets = 100; 39static size_t kMaxQueuedSendDataPackets = 100; 40 41enum { 42 MSG_CHANNELREADY, 43}; 44 45talk_base::scoped_refptr<DataChannel> DataChannel::Create( 46 DataChannelProviderInterface* provider, 47 cricket::DataChannelType dct, 48 const std::string& label, 49 const InternalDataChannelInit& config) { 50 talk_base::scoped_refptr<DataChannel> channel( 51 new talk_base::RefCountedObject<DataChannel>(provider, dct, label)); 52 if (!channel->Init(config)) { 53 return NULL; 54 } 55 return channel; 56} 57 58DataChannel::DataChannel( 59 DataChannelProviderInterface* provider, 60 cricket::DataChannelType dct, 61 const std::string& label) 62 : label_(label), 63 observer_(NULL), 64 state_(kConnecting), 65 data_channel_type_(dct), 66 provider_(provider), 67 waiting_for_open_ack_(false), 68 was_ever_writable_(false), 69 connected_to_provider_(false), 70 send_ssrc_set_(false), 71 receive_ssrc_set_(false), 72 send_ssrc_(0), 73 receive_ssrc_(0) { 74} 75 76bool DataChannel::Init(const InternalDataChannelInit& config) { 77 if (data_channel_type_ == cricket::DCT_RTP && 78 (config.reliable || 79 config.id != -1 || 80 config.maxRetransmits != -1 || 81 config.maxRetransmitTime != -1)) { 82 LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to " 83 << "invalid DataChannelInit."; 84 return false; 85 } else if (data_channel_type_ == cricket::DCT_SCTP) { 86 if (config.id < -1 || 87 config.maxRetransmits < -1 || 88 config.maxRetransmitTime < -1) { 89 LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to " 90 << "invalid DataChannelInit."; 91 return false; 92 } 93 if (config.maxRetransmits != -1 && config.maxRetransmitTime != -1) { 94 LOG(LS_ERROR) << 95 "maxRetransmits and maxRetransmitTime should not be both set."; 96 return false; 97 } 98 config_ = config; 99 100 // Try to connect to the transport in case the transport channel already 101 // exists. 102 OnTransportChannelCreated(); 103 104 // Checks if the transport is ready to send because the initial channel 105 // ready signal may have been sent before the DataChannel creation. 106 // This has to be done async because the upper layer objects (e.g. 107 // Chrome glue and WebKit) are not wired up properly until after this 108 // function returns. 109 if (provider_->ReadyToSendData()) { 110 talk_base::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL); 111 } 112 } 113 114 return true; 115} 116 117DataChannel::~DataChannel() { 118 ClearQueuedReceivedData(); 119 ClearQueuedSendData(); 120 ClearQueuedControlData(); 121} 122 123void DataChannel::RegisterObserver(DataChannelObserver* observer) { 124 observer_ = observer; 125 DeliverQueuedReceivedData(); 126} 127 128void DataChannel::UnregisterObserver() { 129 observer_ = NULL; 130} 131 132bool DataChannel::reliable() const { 133 if (data_channel_type_ == cricket::DCT_RTP) { 134 return false; 135 } else { 136 return config_.maxRetransmits == -1 && 137 config_.maxRetransmitTime == -1; 138 } 139} 140 141uint64 DataChannel::buffered_amount() const { 142 uint64 buffered_amount = 0; 143 for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin(); 144 it != queued_send_data_.end(); 145 ++it) { 146 buffered_amount += (*it)->size(); 147 } 148 return buffered_amount; 149} 150 151void DataChannel::Close() { 152 if (state_ == kClosed) 153 return; 154 send_ssrc_ = 0; 155 send_ssrc_set_ = false; 156 SetState(kClosing); 157 UpdateState(); 158} 159 160bool DataChannel::Send(const DataBuffer& buffer) { 161 if (state_ != kOpen) { 162 return false; 163 } 164 // If the queue is non-empty, we're waiting for SignalReadyToSend, 165 // so just add to the end of the queue and keep waiting. 166 if (!queued_send_data_.empty()) { 167 if (!QueueSendData(buffer)) { 168 if (data_channel_type_ == cricket::DCT_RTP) { 169 return false; 170 } 171 Close(); 172 } 173 return true; 174 } 175 176 cricket::SendDataResult send_result; 177 if (!InternalSendWithoutQueueing(buffer, &send_result)) { 178 if (data_channel_type_ == cricket::DCT_RTP) { 179 return false; 180 } 181 if (send_result != cricket::SDR_BLOCK || !QueueSendData(buffer)) { 182 Close(); 183 } 184 } 185 return true; 186} 187 188void DataChannel::QueueControl(const talk_base::Buffer* buffer) { 189 queued_control_data_.push(buffer); 190} 191 192bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) { 193 ASSERT(data_channel_type_ == cricket::DCT_SCTP && 194 was_ever_writable_ && 195 config_.id >= 0 && 196 !config_.negotiated); 197 198 talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer); 199 200 cricket::SendDataParams send_params; 201 send_params.ssrc = config_.id; 202 send_params.ordered = true; 203 send_params.type = cricket::DMT_CONTROL; 204 205 cricket::SendDataResult send_result; 206 bool retval = provider_->SendData(send_params, *buffer, &send_result); 207 if (retval) { 208 LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id; 209 // Send data as ordered before we receive any mesage from the remote peer 210 // to make sure the remote peer will not receive any data before it receives 211 // the OPEN message. 212 waiting_for_open_ack_ = true; 213 } else if (send_result == cricket::SDR_BLOCK) { 214 // Link is congested. Queue for later. 215 QueueControl(buffer.release()); 216 } else { 217 LOG(LS_ERROR) << "Failed to send OPEN message with result " 218 << send_result << " on channel " << config_.id; 219 } 220 return retval; 221} 222 223bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) { 224 ASSERT(data_channel_type_ == cricket::DCT_SCTP && 225 was_ever_writable_ && 226 config_.id >= 0); 227 228 talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer); 229 230 cricket::SendDataParams send_params; 231 send_params.ssrc = config_.id; 232 send_params.ordered = config_.ordered; 233 send_params.type = cricket::DMT_CONTROL; 234 235 cricket::SendDataResult send_result; 236 bool retval = provider_->SendData(send_params, *buffer, &send_result); 237 if (retval) { 238 LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id; 239 } else if (send_result == cricket::SDR_BLOCK) { 240 // Link is congested. Queue for later. 241 QueueControl(buffer.release()); 242 } else { 243 LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result " 244 << send_result << " on channel " << config_.id; 245 } 246 return retval; 247} 248 249void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) { 250 ASSERT(data_channel_type_ == cricket::DCT_RTP); 251 252 if (receive_ssrc_set_) { 253 return; 254 } 255 receive_ssrc_ = receive_ssrc; 256 receive_ssrc_set_ = true; 257 UpdateState(); 258} 259 260// The remote peer request that this channel shall be closed. 261void DataChannel::RemotePeerRequestClose() { 262 DoClose(); 263} 264 265void DataChannel::SetSendSsrc(uint32 send_ssrc) { 266 ASSERT(data_channel_type_ == cricket::DCT_RTP); 267 if (send_ssrc_set_) { 268 return; 269 } 270 send_ssrc_ = send_ssrc; 271 send_ssrc_set_ = true; 272 UpdateState(); 273} 274 275void DataChannel::OnMessage(talk_base::Message* msg) { 276 switch (msg->message_id) { 277 case MSG_CHANNELREADY: 278 OnChannelReady(true); 279 break; 280 } 281} 282 283// The underlaying data engine is closing. 284// This function makes sure the DataChannel is disconnected and changes state to 285// kClosed. 286void DataChannel::OnDataEngineClose() { 287 DoClose(); 288} 289 290void DataChannel::OnDataReceived(cricket::DataChannel* channel, 291 const cricket::ReceiveDataParams& params, 292 const talk_base::Buffer& payload) { 293 uint32 expected_ssrc = 294 (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id; 295 if (params.ssrc != expected_ssrc) { 296 return; 297 } 298 299 if (params.type == cricket::DMT_CONTROL) { 300 ASSERT(data_channel_type_ == cricket::DCT_SCTP); 301 if (!waiting_for_open_ack_) { 302 // Ignore it if we are not expecting an ACK message. 303 LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, " 304 << "sid = " << params.ssrc; 305 return; 306 } 307 if (ParseDataChannelOpenAckMessage(payload)) { 308 // We can send unordered as soon as we receive the ACK message. 309 waiting_for_open_ack_ = false; 310 LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = " 311 << params.ssrc; 312 } else { 313 LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = " 314 << params.ssrc; 315 } 316 return; 317 } 318 319 ASSERT(params.type == cricket::DMT_BINARY || 320 params.type == cricket::DMT_TEXT); 321 322 LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " << params.ssrc; 323 // We can send unordered as soon as we receive any DATA message since the 324 // remote side must have received the OPEN (and old clients do not send 325 // OPEN_ACK). 326 waiting_for_open_ack_ = false; 327 328 bool binary = (params.type == cricket::DMT_BINARY); 329 talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary)); 330 if (was_ever_writable_ && observer_) { 331 observer_->OnMessage(*buffer.get()); 332 } else { 333 if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) { 334 LOG(LS_ERROR) 335 << "Queued received data exceeds the max number of packets."; 336 ClearQueuedReceivedData(); 337 } 338 queued_received_data_.push(buffer.release()); 339 } 340} 341 342void DataChannel::OnChannelReady(bool writable) { 343 if (!writable) { 344 return; 345 } 346 // Update the readyState and send the queued control message if the channel 347 // is writable for the first time; otherwise it means the channel was blocked 348 // for sending and now unblocked, so send the queued data now. 349 if (!was_ever_writable_) { 350 was_ever_writable_ = true; 351 352 if (data_channel_type_ == cricket::DCT_SCTP) { 353 if (config_.open_handshake_role == InternalDataChannelInit::kOpener) { 354 talk_base::Buffer* payload = new talk_base::Buffer; 355 WriteDataChannelOpenMessage(label_, config_, payload); 356 SendOpenMessage(payload); 357 } else if (config_.open_handshake_role == 358 InternalDataChannelInit::kAcker) { 359 talk_base::Buffer* payload = new talk_base::Buffer; 360 WriteDataChannelOpenAckMessage(payload); 361 SendOpenAckMessage(payload); 362 } 363 } 364 365 UpdateState(); 366 ASSERT(queued_send_data_.empty()); 367 } else if (state_ == kOpen) { 368 DeliverQueuedSendData(); 369 } 370} 371 372void DataChannel::DoClose() { 373 if (state_ == kClosed) 374 return; 375 376 receive_ssrc_set_ = false; 377 send_ssrc_set_ = false; 378 SetState(kClosing); 379 UpdateState(); 380} 381 382void DataChannel::UpdateState() { 383 switch (state_) { 384 case kConnecting: { 385 if (send_ssrc_set_ == receive_ssrc_set_) { 386 if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) { 387 connected_to_provider_ = provider_->ConnectDataChannel(this); 388 } 389 if (was_ever_writable_) { 390 // TODO(jiayl): Do not transition to kOpen if we failed to send the 391 // OPEN message. 392 DeliverQueuedControlData(); 393 SetState(kOpen); 394 // If we have received buffers before the channel got writable. 395 // Deliver them now. 396 DeliverQueuedReceivedData(); 397 } 398 } 399 break; 400 } 401 case kOpen: { 402 break; 403 } 404 case kClosing: { 405 DisconnectFromTransport(); 406 407 if (!send_ssrc_set_ && !receive_ssrc_set_) { 408 SetState(kClosed); 409 } 410 break; 411 } 412 case kClosed: 413 break; 414 } 415} 416 417void DataChannel::SetState(DataState state) { 418 if (state_ == state) 419 return; 420 421 state_ = state; 422 if (observer_) { 423 observer_->OnStateChange(); 424 } 425} 426 427void DataChannel::DisconnectFromTransport() { 428 if (!connected_to_provider_) 429 return; 430 431 provider_->DisconnectDataChannel(this); 432 connected_to_provider_ = false; 433 434 if (data_channel_type_ == cricket::DCT_SCTP) { 435 provider_->RemoveSctpDataStream(config_.id); 436 } 437} 438 439void DataChannel::DeliverQueuedReceivedData() { 440 if (!was_ever_writable_ || !observer_) { 441 return; 442 } 443 444 while (!queued_received_data_.empty()) { 445 DataBuffer* buffer = queued_received_data_.front(); 446 observer_->OnMessage(*buffer); 447 queued_received_data_.pop(); 448 delete buffer; 449 } 450} 451 452void DataChannel::ClearQueuedReceivedData() { 453 while (!queued_received_data_.empty()) { 454 DataBuffer* buffer = queued_received_data_.front(); 455 queued_received_data_.pop(); 456 delete buffer; 457 } 458} 459 460void DataChannel::DeliverQueuedSendData() { 461 ASSERT(was_ever_writable_ && state_ == kOpen); 462 463 // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition 464 // that the readyState is open. According to the standard, the channel should 465 // not become open before the OPEN message is sent. 466 DeliverQueuedControlData(); 467 468 while (!queued_send_data_.empty()) { 469 DataBuffer* buffer = queued_send_data_.front(); 470 cricket::SendDataResult send_result; 471 if (!InternalSendWithoutQueueing(*buffer, &send_result)) { 472 LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result " 473 << send_result; 474 break; 475 } 476 queued_send_data_.pop_front(); 477 delete buffer; 478 } 479} 480 481void DataChannel::ClearQueuedControlData() { 482 while (!queued_control_data_.empty()) { 483 const talk_base::Buffer *buf = queued_control_data_.front(); 484 queued_control_data_.pop(); 485 delete buf; 486 } 487} 488 489void DataChannel::DeliverQueuedControlData() { 490 ASSERT(was_ever_writable_); 491 while (!queued_control_data_.empty()) { 492 const talk_base::Buffer* buf = queued_control_data_.front(); 493 queued_control_data_.pop(); 494 if (config_.open_handshake_role == InternalDataChannelInit::kOpener) { 495 SendOpenMessage(buf); 496 } else { 497 ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker); 498 SendOpenAckMessage(buf); 499 } 500 } 501} 502 503void DataChannel::ClearQueuedSendData() { 504 while (!queued_send_data_.empty()) { 505 DataBuffer* buffer = queued_send_data_.front(); 506 queued_send_data_.pop_front(); 507 delete buffer; 508 } 509} 510 511bool DataChannel::InternalSendWithoutQueueing( 512 const DataBuffer& buffer, cricket::SendDataResult* send_result) { 513 cricket::SendDataParams send_params; 514 515 if (data_channel_type_ == cricket::DCT_SCTP) { 516 send_params.ordered = config_.ordered; 517 // Send as ordered if it is waiting for the OPEN_ACK message. 518 if (waiting_for_open_ack_ && !config_.ordered) { 519 send_params.ordered = true; 520 LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel " 521 << "because the OPEN_ACK message has not been received."; 522 } 523 524 send_params.max_rtx_count = config_.maxRetransmits; 525 send_params.max_rtx_ms = config_.maxRetransmitTime; 526 send_params.ssrc = config_.id; 527 } else { 528 send_params.ssrc = send_ssrc_; 529 } 530 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT; 531 532 return provider_->SendData(send_params, buffer.data, send_result); 533} 534 535bool DataChannel::QueueSendData(const DataBuffer& buffer) { 536 if (queued_send_data_.size() >= kMaxQueuedSendDataPackets) { 537 LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; 538 return false; 539 } 540 queued_send_data_.push_back(new DataBuffer(buffer)); 541 return true; 542} 543 544void DataChannel::SetSctpSid(int sid) { 545 ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP); 546 config_.id = sid; 547 provider_->AddSctpDataStream(sid); 548} 549 550void DataChannel::OnTransportChannelCreated() { 551 ASSERT(data_channel_type_ == cricket::DCT_SCTP); 552 if (!connected_to_provider_) { 553 connected_to_provider_ = provider_->ConnectDataChannel(this); 554 } 555 // The sid may have been unassigned when provider_->ConnectDataChannel was 556 // done. So always add the streams even if connected_to_provider_ is true. 557 if (config_.id >= 0) { 558 provider_->AddSctpDataStream(config_.id); 559 } 560} 561 562} // namespace webrtc 563