1/*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27#include "talk/app/webrtc/datachannel.h"
28
29#include <string>
30
31#include "talk/app/webrtc/mediastreamprovider.h"
32#include "talk/app/webrtc/sctputils.h"
33#include "talk/base/logging.h"
34#include "talk/base/refcount.h"
35
36namespace webrtc {
37
38static size_t kMaxQueuedReceivedDataPackets = 100;
39static size_t kMaxQueuedSendDataPackets = 100;
40
41enum {
42  MSG_CHANNELREADY,
43};
44
45talk_base::scoped_refptr<DataChannel> DataChannel::Create(
46    DataChannelProviderInterface* provider,
47    cricket::DataChannelType dct,
48    const std::string& label,
49    const InternalDataChannelInit& config) {
50  talk_base::scoped_refptr<DataChannel> channel(
51      new talk_base::RefCountedObject<DataChannel>(provider, dct, label));
52  if (!channel->Init(config)) {
53    return NULL;
54  }
55  return channel;
56}
57
58DataChannel::DataChannel(
59    DataChannelProviderInterface* provider,
60    cricket::DataChannelType dct,
61    const std::string& label)
62    : label_(label),
63      observer_(NULL),
64      state_(kConnecting),
65      data_channel_type_(dct),
66      provider_(provider),
67      waiting_for_open_ack_(false),
68      was_ever_writable_(false),
69      connected_to_provider_(false),
70      send_ssrc_set_(false),
71      receive_ssrc_set_(false),
72      send_ssrc_(0),
73      receive_ssrc_(0) {
74}
75
76bool DataChannel::Init(const InternalDataChannelInit& config) {
77  if (data_channel_type_ == cricket::DCT_RTP &&
78      (config.reliable ||
79       config.id != -1 ||
80       config.maxRetransmits != -1 ||
81       config.maxRetransmitTime != -1)) {
82    LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
83                  << "invalid DataChannelInit.";
84    return false;
85  } else if (data_channel_type_ == cricket::DCT_SCTP) {
86    if (config.id < -1 ||
87        config.maxRetransmits < -1 ||
88        config.maxRetransmitTime < -1) {
89      LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
90                    << "invalid DataChannelInit.";
91      return false;
92    }
93    if (config.maxRetransmits != -1 && config.maxRetransmitTime != -1) {
94      LOG(LS_ERROR) <<
95          "maxRetransmits and maxRetransmitTime should not be both set.";
96      return false;
97    }
98    config_ = config;
99
100    // Try to connect to the transport in case the transport channel already
101    // exists.
102    OnTransportChannelCreated();
103
104    // Checks if the transport is ready to send because the initial channel
105    // ready signal may have been sent before the DataChannel creation.
106    // This has to be done async because the upper layer objects (e.g.
107    // Chrome glue and WebKit) are not wired up properly until after this
108    // function returns.
109    if (provider_->ReadyToSendData()) {
110      talk_base::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL);
111    }
112  }
113
114  return true;
115}
116
117DataChannel::~DataChannel() {
118  ClearQueuedReceivedData();
119  ClearQueuedSendData();
120  ClearQueuedControlData();
121}
122
123void DataChannel::RegisterObserver(DataChannelObserver* observer) {
124  observer_ = observer;
125  DeliverQueuedReceivedData();
126}
127
128void DataChannel::UnregisterObserver() {
129  observer_ = NULL;
130}
131
132bool DataChannel::reliable() const {
133  if (data_channel_type_ == cricket::DCT_RTP) {
134    return false;
135  } else {
136    return config_.maxRetransmits == -1 &&
137           config_.maxRetransmitTime == -1;
138  }
139}
140
141uint64 DataChannel::buffered_amount() const {
142  uint64 buffered_amount = 0;
143  for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
144      it != queued_send_data_.end();
145      ++it) {
146    buffered_amount += (*it)->size();
147  }
148  return buffered_amount;
149}
150
151void DataChannel::Close() {
152  if (state_ == kClosed)
153    return;
154  send_ssrc_ = 0;
155  send_ssrc_set_ = false;
156  SetState(kClosing);
157  UpdateState();
158}
159
160bool DataChannel::Send(const DataBuffer& buffer) {
161  if (state_ != kOpen) {
162    return false;
163  }
164  // If the queue is non-empty, we're waiting for SignalReadyToSend,
165  // so just add to the end of the queue and keep waiting.
166  if (!queued_send_data_.empty()) {
167    if (!QueueSendData(buffer)) {
168      if (data_channel_type_ == cricket::DCT_RTP) {
169        return false;
170      }
171      Close();
172    }
173    return true;
174  }
175
176  cricket::SendDataResult send_result;
177  if (!InternalSendWithoutQueueing(buffer, &send_result)) {
178    if (data_channel_type_ == cricket::DCT_RTP) {
179      return false;
180    }
181    if (send_result != cricket::SDR_BLOCK || !QueueSendData(buffer)) {
182      Close();
183    }
184  }
185  return true;
186}
187
188void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
189  queued_control_data_.push(buffer);
190}
191
192bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) {
193  ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
194         was_ever_writable_ &&
195         config_.id >= 0 &&
196         !config_.negotiated);
197
198  talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
199
200  cricket::SendDataParams send_params;
201  send_params.ssrc = config_.id;
202  send_params.ordered = true;
203  send_params.type = cricket::DMT_CONTROL;
204
205  cricket::SendDataResult send_result;
206  bool retval = provider_->SendData(send_params, *buffer, &send_result);
207  if (retval) {
208    LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id;
209    // Send data as ordered before we receive any mesage from the remote peer
210    // to make sure the remote peer will not receive any data before it receives
211    // the OPEN message.
212    waiting_for_open_ack_ = true;
213  } else if (send_result == cricket::SDR_BLOCK) {
214    // Link is congested.  Queue for later.
215    QueueControl(buffer.release());
216  } else {
217    LOG(LS_ERROR) << "Failed to send OPEN message with result "
218                  << send_result << " on channel " << config_.id;
219  }
220  return retval;
221}
222
223bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) {
224  ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
225         was_ever_writable_ &&
226         config_.id >= 0);
227
228  talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
229
230  cricket::SendDataParams send_params;
231  send_params.ssrc = config_.id;
232  send_params.ordered = config_.ordered;
233  send_params.type = cricket::DMT_CONTROL;
234
235  cricket::SendDataResult send_result;
236  bool retval = provider_->SendData(send_params, *buffer, &send_result);
237  if (retval) {
238    LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id;
239  } else if (send_result == cricket::SDR_BLOCK) {
240    // Link is congested.  Queue for later.
241    QueueControl(buffer.release());
242  } else {
243    LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result "
244                  << send_result << " on channel " << config_.id;
245  }
246  return retval;
247}
248
249void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
250  ASSERT(data_channel_type_ == cricket::DCT_RTP);
251
252  if (receive_ssrc_set_) {
253    return;
254  }
255  receive_ssrc_ = receive_ssrc;
256  receive_ssrc_set_ = true;
257  UpdateState();
258}
259
260// The remote peer request that this channel shall be closed.
261void DataChannel::RemotePeerRequestClose() {
262  DoClose();
263}
264
265void DataChannel::SetSendSsrc(uint32 send_ssrc) {
266  ASSERT(data_channel_type_ == cricket::DCT_RTP);
267  if (send_ssrc_set_) {
268    return;
269  }
270  send_ssrc_ = send_ssrc;
271  send_ssrc_set_ = true;
272  UpdateState();
273}
274
275void DataChannel::OnMessage(talk_base::Message* msg) {
276  switch (msg->message_id) {
277    case MSG_CHANNELREADY:
278      OnChannelReady(true);
279      break;
280  }
281}
282
283// The underlaying data engine is closing.
284// This function makes sure the DataChannel is disconnected and changes state to
285// kClosed.
286void DataChannel::OnDataEngineClose() {
287  DoClose();
288}
289
290void DataChannel::OnDataReceived(cricket::DataChannel* channel,
291                                 const cricket::ReceiveDataParams& params,
292                                 const talk_base::Buffer& payload) {
293  uint32 expected_ssrc =
294      (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id;
295  if (params.ssrc != expected_ssrc) {
296    return;
297  }
298
299  if (params.type == cricket::DMT_CONTROL) {
300    ASSERT(data_channel_type_ == cricket::DCT_SCTP);
301    if (!waiting_for_open_ack_) {
302      // Ignore it if we are not expecting an ACK message.
303      LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
304                      << "sid = " << params.ssrc;
305      return;
306    }
307    if (ParseDataChannelOpenAckMessage(payload)) {
308      // We can send unordered as soon as we receive the ACK message.
309      waiting_for_open_ack_ = false;
310      LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
311                   << params.ssrc;
312    } else {
313      LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = "
314                      << params.ssrc;
315    }
316    return;
317  }
318
319  ASSERT(params.type == cricket::DMT_BINARY ||
320         params.type == cricket::DMT_TEXT);
321
322  LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " << params.ssrc;
323  // We can send unordered as soon as we receive any DATA message since the
324  // remote side must have received the OPEN (and old clients do not send
325  // OPEN_ACK).
326  waiting_for_open_ack_ = false;
327
328  bool binary = (params.type == cricket::DMT_BINARY);
329  talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
330  if (was_ever_writable_ && observer_) {
331    observer_->OnMessage(*buffer.get());
332  } else {
333    if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
334      LOG(LS_ERROR)
335          << "Queued received data exceeds the max number of packets.";
336      ClearQueuedReceivedData();
337    }
338    queued_received_data_.push(buffer.release());
339  }
340}
341
342void DataChannel::OnChannelReady(bool writable) {
343  if (!writable) {
344    return;
345  }
346  // Update the readyState and send the queued control message if the channel
347  // is writable for the first time; otherwise it means the channel was blocked
348  // for sending and now unblocked, so send the queued data now.
349  if (!was_ever_writable_) {
350    was_ever_writable_ = true;
351
352    if (data_channel_type_ == cricket::DCT_SCTP) {
353      if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
354        talk_base::Buffer* payload = new talk_base::Buffer;
355        WriteDataChannelOpenMessage(label_, config_, payload);
356        SendOpenMessage(payload);
357      } else if (config_.open_handshake_role ==
358                 InternalDataChannelInit::kAcker) {
359        talk_base::Buffer* payload = new talk_base::Buffer;
360        WriteDataChannelOpenAckMessage(payload);
361        SendOpenAckMessage(payload);
362      }
363    }
364
365    UpdateState();
366    ASSERT(queued_send_data_.empty());
367  } else if (state_ == kOpen) {
368    DeliverQueuedSendData();
369  }
370}
371
372void DataChannel::DoClose() {
373  if (state_ == kClosed)
374    return;
375
376  receive_ssrc_set_ = false;
377  send_ssrc_set_ = false;
378  SetState(kClosing);
379  UpdateState();
380}
381
382void DataChannel::UpdateState() {
383  switch (state_) {
384    case kConnecting: {
385      if (send_ssrc_set_ == receive_ssrc_set_) {
386        if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
387          connected_to_provider_ = provider_->ConnectDataChannel(this);
388        }
389        if (was_ever_writable_) {
390          // TODO(jiayl): Do not transition to kOpen if we failed to send the
391          // OPEN message.
392          DeliverQueuedControlData();
393          SetState(kOpen);
394          // If we have received buffers before the channel got writable.
395          // Deliver them now.
396          DeliverQueuedReceivedData();
397        }
398      }
399      break;
400    }
401    case kOpen: {
402      break;
403    }
404    case kClosing: {
405      DisconnectFromTransport();
406
407      if (!send_ssrc_set_ && !receive_ssrc_set_) {
408        SetState(kClosed);
409      }
410      break;
411    }
412    case kClosed:
413      break;
414  }
415}
416
417void DataChannel::SetState(DataState state) {
418  if (state_ == state)
419    return;
420
421  state_ = state;
422  if (observer_) {
423    observer_->OnStateChange();
424  }
425}
426
427void DataChannel::DisconnectFromTransport() {
428  if (!connected_to_provider_)
429    return;
430
431  provider_->DisconnectDataChannel(this);
432  connected_to_provider_ = false;
433
434  if (data_channel_type_ == cricket::DCT_SCTP) {
435    provider_->RemoveSctpDataStream(config_.id);
436  }
437}
438
439void DataChannel::DeliverQueuedReceivedData() {
440  if (!was_ever_writable_ || !observer_) {
441    return;
442  }
443
444  while (!queued_received_data_.empty()) {
445    DataBuffer* buffer = queued_received_data_.front();
446    observer_->OnMessage(*buffer);
447    queued_received_data_.pop();
448    delete buffer;
449  }
450}
451
452void DataChannel::ClearQueuedReceivedData() {
453  while (!queued_received_data_.empty()) {
454    DataBuffer* buffer = queued_received_data_.front();
455    queued_received_data_.pop();
456    delete buffer;
457  }
458}
459
460void DataChannel::DeliverQueuedSendData() {
461  ASSERT(was_ever_writable_ && state_ == kOpen);
462
463  // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
464  // that the readyState is open. According to the standard, the channel should
465  // not become open before the OPEN message is sent.
466  DeliverQueuedControlData();
467
468  while (!queued_send_data_.empty()) {
469    DataBuffer* buffer = queued_send_data_.front();
470    cricket::SendDataResult send_result;
471    if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
472      LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
473                      << send_result;
474      break;
475    }
476    queued_send_data_.pop_front();
477    delete buffer;
478  }
479}
480
481void DataChannel::ClearQueuedControlData() {
482  while (!queued_control_data_.empty()) {
483    const talk_base::Buffer *buf = queued_control_data_.front();
484    queued_control_data_.pop();
485    delete buf;
486  }
487}
488
489void DataChannel::DeliverQueuedControlData() {
490  ASSERT(was_ever_writable_);
491  while (!queued_control_data_.empty()) {
492    const talk_base::Buffer* buf = queued_control_data_.front();
493    queued_control_data_.pop();
494    if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
495      SendOpenMessage(buf);
496    } else {
497      ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker);
498      SendOpenAckMessage(buf);
499    }
500  }
501}
502
503void DataChannel::ClearQueuedSendData() {
504  while (!queued_send_data_.empty()) {
505    DataBuffer* buffer = queued_send_data_.front();
506    queued_send_data_.pop_front();
507    delete buffer;
508  }
509}
510
511bool DataChannel::InternalSendWithoutQueueing(
512    const DataBuffer& buffer, cricket::SendDataResult* send_result) {
513  cricket::SendDataParams send_params;
514
515  if (data_channel_type_ == cricket::DCT_SCTP) {
516    send_params.ordered = config_.ordered;
517    // Send as ordered if it is waiting for the OPEN_ACK message.
518    if (waiting_for_open_ack_ && !config_.ordered) {
519      send_params.ordered = true;
520      LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
521                      << "because the OPEN_ACK message has not been received.";
522    }
523
524    send_params.max_rtx_count = config_.maxRetransmits;
525    send_params.max_rtx_ms = config_.maxRetransmitTime;
526    send_params.ssrc = config_.id;
527  } else {
528    send_params.ssrc = send_ssrc_;
529  }
530  send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
531
532  return provider_->SendData(send_params, buffer.data, send_result);
533}
534
535bool DataChannel::QueueSendData(const DataBuffer& buffer) {
536  if (queued_send_data_.size() >= kMaxQueuedSendDataPackets) {
537    LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
538    return false;
539  }
540  queued_send_data_.push_back(new DataBuffer(buffer));
541  return true;
542}
543
544void DataChannel::SetSctpSid(int sid) {
545  ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
546  config_.id = sid;
547  provider_->AddSctpDataStream(sid);
548}
549
550void DataChannel::OnTransportChannelCreated() {
551  ASSERT(data_channel_type_ == cricket::DCT_SCTP);
552  if (!connected_to_provider_) {
553    connected_to_provider_ = provider_->ConnectDataChannel(this);
554  }
555  // The sid may have been unassigned when provider_->ConnectDataChannel was
556  // done. So always add the streams even if connected_to_provider_ is true.
557  if (config_.id >= 0) {
558    provider_->AddSctpDataStream(config_.id);
559  }
560}
561
562}  // namespace webrtc
563