1/*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27#include "talk/app/webrtc/datachannel.h"
28
29#include <string>
30
31#include "talk/app/webrtc/mediastreamprovider.h"
32#include "talk/app/webrtc/sctputils.h"
33#include "webrtc/base/logging.h"
34#include "webrtc/base/refcount.h"
35
36namespace webrtc {
37
38static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
39static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024;
40
41enum {
42  MSG_CHANNELREADY,
43};
44
45DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {}
46
47DataChannel::PacketQueue::~PacketQueue() {
48  Clear();
49}
50
51bool DataChannel::PacketQueue::Empty() const {
52  return packets_.empty();
53}
54
55DataBuffer* DataChannel::PacketQueue::Front() {
56  return packets_.front();
57}
58
59void DataChannel::PacketQueue::Pop() {
60  if (packets_.empty()) {
61    return;
62  }
63
64  byte_count_ -= packets_.front()->size();
65  packets_.pop_front();
66}
67
68void DataChannel::PacketQueue::Push(DataBuffer* packet) {
69  byte_count_ += packet->size();
70  packets_.push_back(packet);
71}
72
73void DataChannel::PacketQueue::Clear() {
74  while (!packets_.empty()) {
75    delete packets_.front();
76    packets_.pop_front();
77  }
78  byte_count_ = 0;
79}
80
81void DataChannel::PacketQueue::Swap(PacketQueue* other) {
82  size_t other_byte_count = other->byte_count_;
83  other->byte_count_ = byte_count_;
84  byte_count_ = other_byte_count;
85
86  other->packets_.swap(packets_);
87}
88
89rtc::scoped_refptr<DataChannel> DataChannel::Create(
90    DataChannelProviderInterface* provider,
91    cricket::DataChannelType dct,
92    const std::string& label,
93    const InternalDataChannelInit& config) {
94  rtc::scoped_refptr<DataChannel> channel(
95      new rtc::RefCountedObject<DataChannel>(provider, dct, label));
96  if (!channel->Init(config)) {
97    return NULL;
98  }
99  return channel;
100}
101
102DataChannel::DataChannel(
103    DataChannelProviderInterface* provider,
104    cricket::DataChannelType dct,
105    const std::string& label)
106    : label_(label),
107      observer_(NULL),
108      state_(kConnecting),
109      data_channel_type_(dct),
110      provider_(provider),
111      waiting_for_open_ack_(false),
112      was_ever_writable_(false),
113      connected_to_provider_(false),
114      send_ssrc_set_(false),
115      receive_ssrc_set_(false),
116      send_ssrc_(0),
117      receive_ssrc_(0) {
118}
119
120bool DataChannel::Init(const InternalDataChannelInit& config) {
121  if (data_channel_type_ == cricket::DCT_RTP &&
122      (config.reliable ||
123       config.id != -1 ||
124       config.maxRetransmits != -1 ||
125       config.maxRetransmitTime != -1)) {
126    LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
127                  << "invalid DataChannelInit.";
128    return false;
129  } else if (data_channel_type_ == cricket::DCT_SCTP) {
130    if (config.id < -1 ||
131        config.maxRetransmits < -1 ||
132        config.maxRetransmitTime < -1) {
133      LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
134                    << "invalid DataChannelInit.";
135      return false;
136    }
137    if (config.maxRetransmits != -1 && config.maxRetransmitTime != -1) {
138      LOG(LS_ERROR) <<
139          "maxRetransmits and maxRetransmitTime should not be both set.";
140      return false;
141    }
142    config_ = config;
143
144    // Try to connect to the transport in case the transport channel already
145    // exists.
146    OnTransportChannelCreated();
147
148    // Checks if the transport is ready to send because the initial channel
149    // ready signal may have been sent before the DataChannel creation.
150    // This has to be done async because the upper layer objects (e.g.
151    // Chrome glue and WebKit) are not wired up properly until after this
152    // function returns.
153    if (provider_->ReadyToSendData()) {
154      rtc::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL);
155    }
156  }
157
158  return true;
159}
160
161DataChannel::~DataChannel() {}
162
163void DataChannel::RegisterObserver(DataChannelObserver* observer) {
164  observer_ = observer;
165  DeliverQueuedReceivedData();
166}
167
168void DataChannel::UnregisterObserver() {
169  observer_ = NULL;
170}
171
172bool DataChannel::reliable() const {
173  if (data_channel_type_ == cricket::DCT_RTP) {
174    return false;
175  } else {
176    return config_.maxRetransmits == -1 &&
177           config_.maxRetransmitTime == -1;
178  }
179}
180
181uint64 DataChannel::buffered_amount() const {
182  return queued_send_data_.byte_count();
183}
184
185void DataChannel::Close() {
186  if (state_ == kClosed)
187    return;
188  send_ssrc_ = 0;
189  send_ssrc_set_ = false;
190  SetState(kClosing);
191  UpdateState();
192}
193
194bool DataChannel::Send(const DataBuffer& buffer) {
195  if (state_ != kOpen) {
196    return false;
197  }
198
199  // TODO(jiayl): the spec is unclear about if the remote side should get the
200  // onmessage event. We need to figure out the expected behavior and change the
201  // code accordingly.
202  if (buffer.size() == 0) {
203    return true;
204  }
205
206  // If the queue is non-empty, we're waiting for SignalReadyToSend,
207  // so just add to the end of the queue and keep waiting.
208  if (!queued_send_data_.Empty()) {
209    // Only SCTP DataChannel queues the outgoing data when the transport is
210    // blocked.
211    ASSERT(data_channel_type_ == cricket::DCT_SCTP);
212    if (!QueueSendDataMessage(buffer)) {
213      Close();
214    }
215    return true;
216  }
217
218  bool success = SendDataMessage(buffer);
219  if (data_channel_type_ == cricket::DCT_RTP) {
220    return success;
221  }
222
223  // Always return true for SCTP DataChannel per the spec.
224  return true;
225}
226
227void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
228  ASSERT(data_channel_type_ == cricket::DCT_RTP);
229
230  if (receive_ssrc_set_) {
231    return;
232  }
233  receive_ssrc_ = receive_ssrc;
234  receive_ssrc_set_ = true;
235  UpdateState();
236}
237
238// The remote peer request that this channel shall be closed.
239void DataChannel::RemotePeerRequestClose() {
240  DoClose();
241}
242
243void DataChannel::SetSctpSid(int sid) {
244  ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
245  if (config_.id == sid)
246    return;
247
248  config_.id = sid;
249  provider_->AddSctpDataStream(sid);
250}
251
252void DataChannel::OnTransportChannelCreated() {
253  ASSERT(data_channel_type_ == cricket::DCT_SCTP);
254  if (!connected_to_provider_) {
255    connected_to_provider_ = provider_->ConnectDataChannel(this);
256  }
257  // The sid may have been unassigned when provider_->ConnectDataChannel was
258  // done. So always add the streams even if connected_to_provider_ is true.
259  if (config_.id >= 0) {
260    provider_->AddSctpDataStream(config_.id);
261  }
262}
263
264void DataChannel::SetSendSsrc(uint32 send_ssrc) {
265  ASSERT(data_channel_type_ == cricket::DCT_RTP);
266  if (send_ssrc_set_) {
267    return;
268  }
269  send_ssrc_ = send_ssrc;
270  send_ssrc_set_ = true;
271  UpdateState();
272}
273
274void DataChannel::OnMessage(rtc::Message* msg) {
275  switch (msg->message_id) {
276    case MSG_CHANNELREADY:
277      OnChannelReady(true);
278      break;
279  }
280}
281
282// The underlaying data engine is closing.
283// This function makes sure the DataChannel is disconnected and changes state to
284// kClosed.
285void DataChannel::OnDataEngineClose() {
286  DoClose();
287}
288
289void DataChannel::OnDataReceived(cricket::DataChannel* channel,
290                                 const cricket::ReceiveDataParams& params,
291                                 const rtc::Buffer& payload) {
292  uint32 expected_ssrc =
293      (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id;
294  if (params.ssrc != expected_ssrc) {
295    return;
296  }
297
298  if (params.type == cricket::DMT_CONTROL) {
299    ASSERT(data_channel_type_ == cricket::DCT_SCTP);
300    if (!waiting_for_open_ack_) {
301      // Ignore it if we are not expecting an ACK message.
302      LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
303                      << "sid = " << params.ssrc;
304      return;
305    }
306    if (ParseDataChannelOpenAckMessage(payload)) {
307      // We can send unordered as soon as we receive the ACK message.
308      waiting_for_open_ack_ = false;
309      LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
310                   << params.ssrc;
311    } else {
312      LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = "
313                      << params.ssrc;
314    }
315    return;
316  }
317
318  ASSERT(params.type == cricket::DMT_BINARY ||
319         params.type == cricket::DMT_TEXT);
320
321  LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " << params.ssrc;
322  // We can send unordered as soon as we receive any DATA message since the
323  // remote side must have received the OPEN (and old clients do not send
324  // OPEN_ACK).
325  waiting_for_open_ack_ = false;
326
327  bool binary = (params.type == cricket::DMT_BINARY);
328  rtc::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
329  if (was_ever_writable_ && observer_) {
330    observer_->OnMessage(*buffer.get());
331  } else {
332    if (queued_received_data_.byte_count() + payload.length() >
333            kMaxQueuedReceivedDataBytes) {
334      LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
335
336      queued_received_data_.Clear();
337      if (data_channel_type_ != cricket::DCT_RTP) {
338        Close();
339      }
340
341      return;
342    }
343    queued_received_data_.Push(buffer.release());
344  }
345}
346
347void DataChannel::OnChannelReady(bool writable) {
348  if (!writable) {
349    return;
350  }
351  // Update the readyState and send the queued control message if the channel
352  // is writable for the first time; otherwise it means the channel was blocked
353  // for sending and now unblocked, so send the queued data now.
354  if (!was_ever_writable_) {
355    was_ever_writable_ = true;
356
357    if (data_channel_type_ == cricket::DCT_SCTP) {
358      rtc::Buffer payload;
359
360      if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
361        WriteDataChannelOpenMessage(label_, config_, &payload);
362        SendControlMessage(payload);
363      } else if (config_.open_handshake_role ==
364                     InternalDataChannelInit::kAcker) {
365        WriteDataChannelOpenAckMessage(&payload);
366        SendControlMessage(payload);
367      }
368    }
369
370    UpdateState();
371    ASSERT(queued_send_data_.Empty());
372  } else if (state_ == kOpen) {
373    // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
374    // that the readyState is open. According to the standard, the channel
375    // should not become open before the OPEN message is sent.
376    SendQueuedControlMessages();
377
378    SendQueuedDataMessages();
379  }
380}
381
382void DataChannel::DoClose() {
383  if (state_ == kClosed)
384    return;
385
386  receive_ssrc_set_ = false;
387  send_ssrc_set_ = false;
388  SetState(kClosing);
389  UpdateState();
390}
391
392void DataChannel::UpdateState() {
393  switch (state_) {
394    case kConnecting: {
395      if (send_ssrc_set_ == receive_ssrc_set_) {
396        if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
397          connected_to_provider_ = provider_->ConnectDataChannel(this);
398        }
399        if (was_ever_writable_) {
400          // TODO(jiayl): Do not transition to kOpen if we failed to send the
401          // OPEN message.
402          SendQueuedControlMessages();
403          SetState(kOpen);
404          // If we have received buffers before the channel got writable.
405          // Deliver them now.
406          DeliverQueuedReceivedData();
407        }
408      }
409      break;
410    }
411    case kOpen: {
412      break;
413    }
414    case kClosing: {
415      DisconnectFromTransport();
416
417      if (!send_ssrc_set_ && !receive_ssrc_set_) {
418        SetState(kClosed);
419      }
420      break;
421    }
422    case kClosed:
423      break;
424  }
425}
426
427void DataChannel::SetState(DataState state) {
428  if (state_ == state)
429    return;
430
431  state_ = state;
432  if (observer_) {
433    observer_->OnStateChange();
434  }
435}
436
437void DataChannel::DisconnectFromTransport() {
438  if (!connected_to_provider_)
439    return;
440
441  provider_->DisconnectDataChannel(this);
442  connected_to_provider_ = false;
443
444  if (data_channel_type_ == cricket::DCT_SCTP) {
445    provider_->RemoveSctpDataStream(config_.id);
446  }
447}
448
449void DataChannel::DeliverQueuedReceivedData() {
450  if (!was_ever_writable_ || !observer_) {
451    return;
452  }
453
454  while (!queued_received_data_.Empty()) {
455    rtc::scoped_ptr<DataBuffer> buffer(queued_received_data_.Front());
456    observer_->OnMessage(*buffer);
457    queued_received_data_.Pop();
458  }
459}
460
461void DataChannel::SendQueuedDataMessages() {
462  ASSERT(was_ever_writable_ && state_ == kOpen);
463
464  PacketQueue packet_buffer;
465  packet_buffer.Swap(&queued_send_data_);
466
467  while (!packet_buffer.Empty()) {
468    rtc::scoped_ptr<DataBuffer> buffer(packet_buffer.Front());
469    SendDataMessage(*buffer);
470    packet_buffer.Pop();
471  }
472}
473
474bool DataChannel::SendDataMessage(const DataBuffer& buffer) {
475  cricket::SendDataParams send_params;
476
477  if (data_channel_type_ == cricket::DCT_SCTP) {
478    send_params.ordered = config_.ordered;
479    // Send as ordered if it is waiting for the OPEN_ACK message.
480    if (waiting_for_open_ack_ && !config_.ordered) {
481      send_params.ordered = true;
482      LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
483                      << "because the OPEN_ACK message has not been received.";
484    }
485
486    send_params.max_rtx_count = config_.maxRetransmits;
487    send_params.max_rtx_ms = config_.maxRetransmitTime;
488    send_params.ssrc = config_.id;
489  } else {
490    send_params.ssrc = send_ssrc_;
491  }
492  send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
493
494  cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
495  bool success = provider_->SendData(send_params, buffer.data, &send_result);
496
497  if (!success && data_channel_type_ == cricket::DCT_SCTP) {
498    if (send_result != cricket::SDR_BLOCK || !QueueSendDataMessage(buffer)) {
499      LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
500                    << "send_result = " << send_result;
501      Close();
502    }
503  }
504  return success;
505}
506
507bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
508  if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) {
509    LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
510    return false;
511  }
512  queued_send_data_.Push(new DataBuffer(buffer));
513  return true;
514}
515
516void DataChannel::SendQueuedControlMessages() {
517  ASSERT(was_ever_writable_);
518
519  PacketQueue control_packets;
520  control_packets.Swap(&queued_control_data_);
521
522  while (!control_packets.Empty()) {
523    rtc::scoped_ptr<DataBuffer> buf(control_packets.Front());
524    SendControlMessage(buf->data);
525    control_packets.Pop();
526  }
527}
528
529void DataChannel::QueueControlMessage(const rtc::Buffer& buffer) {
530  queued_control_data_.Push(new DataBuffer(buffer, true));
531}
532
533bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) {
534  bool is_open_message =
535      (config_.open_handshake_role == InternalDataChannelInit::kOpener);
536
537  ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
538         was_ever_writable_ &&
539         config_.id >= 0 &&
540         (!is_open_message || !config_.negotiated));
541
542  cricket::SendDataParams send_params;
543  send_params.ssrc = config_.id;
544  send_params.ordered = config_.ordered || is_open_message;
545  send_params.type = cricket::DMT_CONTROL;
546
547  cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
548  bool retval = provider_->SendData(send_params, buffer, &send_result);
549  if (retval) {
550    LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
551
552    if (is_open_message) {
553      // Send data as ordered before we receive any message from the remote peer
554      // to make sure the remote peer will not receive any data before it
555      // receives the OPEN message.
556      waiting_for_open_ack_ = true;
557    }
558  } else if (send_result == cricket::SDR_BLOCK) {
559    QueueControlMessage(buffer);
560  } else {
561    LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
562                  << " the CONTROL message, send_result = " << send_result;
563    Close();
564  }
565  return retval;
566}
567
568}  // namespace webrtc
569