1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "media/cast/net/udp_transport.h"
6
7#include <algorithm>
8#include <string>
9
10#include "base/bind.h"
11#include "base/logging.h"
12#include "base/memory/ref_counted.h"
13#include "base/memory/scoped_ptr.h"
14#include "base/message_loop/message_loop.h"
15#include "base/rand_util.h"
16#include "net/base/io_buffer.h"
17#include "net/base/net_errors.h"
18#include "net/base/rand_callback.h"
19
20namespace media {
21namespace cast {
22
23namespace {
24const int kMaxPacketSize = 1500;
25
26bool IsEmpty(const net::IPEndPoint& addr) {
27  net::IPAddressNumber empty_addr(addr.address().size());
28  return std::equal(
29             empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
30         !addr.port();
31}
32
33bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
34  return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
35                                                    addr1.address().end(),
36                                                    addr2.address().begin());
37}
38}  // namespace
39
40UdpTransport::UdpTransport(
41    net::NetLog* net_log,
42    const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
43    const net::IPEndPoint& local_end_point,
44    const net::IPEndPoint& remote_end_point,
45    const CastTransportStatusCallback& status_callback)
46    : io_thread_proxy_(io_thread_proxy),
47      local_addr_(local_end_point),
48      remote_addr_(remote_end_point),
49      udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
50                                     net::RandIntCallback(),
51                                     net_log,
52                                     net::NetLog::Source())),
53      send_pending_(false),
54      receive_pending_(false),
55      client_connected_(false),
56      next_dscp_value_(net::DSCP_NO_CHANGE),
57      status_callback_(status_callback),
58      bytes_sent_(0),
59      weak_factory_(this) {
60  DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
61}
62
63UdpTransport::~UdpTransport() {}
64
65void UdpTransport::StartReceiving(
66    const PacketReceiverCallback& packet_receiver) {
67  DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
68
69  packet_receiver_ = packet_receiver;
70  udp_socket_->AllowAddressReuse();
71  udp_socket_->SetMulticastLoopbackMode(true);
72  if (!IsEmpty(local_addr_)) {
73    if (udp_socket_->Bind(local_addr_) < 0) {
74      status_callback_.Run(TRANSPORT_SOCKET_ERROR);
75      LOG(ERROR) << "Failed to bind local address.";
76      return;
77    }
78  } else if (!IsEmpty(remote_addr_)) {
79    if (udp_socket_->Connect(remote_addr_) < 0) {
80      status_callback_.Run(TRANSPORT_SOCKET_ERROR);
81      LOG(ERROR) << "Failed to connect to remote address.";
82      return;
83    }
84    client_connected_ = true;
85  } else {
86    NOTREACHED() << "Either local or remote address has to be defined.";
87  }
88
89  ScheduleReceiveNextPacket();
90}
91
92void UdpTransport::SetDscp(net::DiffServCodePoint dscp) {
93  DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
94  next_dscp_value_ = dscp;
95}
96
97void UdpTransport::ScheduleReceiveNextPacket() {
98  DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
99  if (!packet_receiver_.is_null() && !receive_pending_) {
100    receive_pending_ = true;
101    io_thread_proxy_->PostTask(FROM_HERE,
102                               base::Bind(&UdpTransport::ReceiveNextPacket,
103                                          weak_factory_.GetWeakPtr(),
104                                          net::ERR_IO_PENDING));
105  }
106}
107
108void UdpTransport::ReceiveNextPacket(int length_or_status) {
109  DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
110
111  // Loop while UdpSocket is delivering data synchronously.  When it responds
112  // with a "pending" status, break and expect this method to be called back in
113  // the future when a packet is ready.
114  while (true) {
115    if (length_or_status == net::ERR_IO_PENDING) {
116      next_packet_.reset(new Packet(kMaxPacketSize));
117      recv_buf_ = new net::WrappedIOBuffer(
118          reinterpret_cast<char*>(&next_packet_->front()));
119      length_or_status =
120          udp_socket_->RecvFrom(recv_buf_.get(),
121                                kMaxPacketSize,
122                                &recv_addr_,
123                                base::Bind(&UdpTransport::ReceiveNextPacket,
124                                           weak_factory_.GetWeakPtr()));
125      if (length_or_status == net::ERR_IO_PENDING) {
126        receive_pending_ = true;
127        return;
128      }
129    }
130
131    // Note: At this point, either a packet is ready or an error has occurred.
132    if (length_or_status < 0) {
133      VLOG(1) << "Failed to receive packet: Status code is "
134              << length_or_status;
135      receive_pending_ = false;
136      return;
137    }
138
139    // Confirm the packet has come from the expected remote address; otherwise,
140    // ignore it.  If this is the first packet being received and no remote
141    // address has been set, set the remote address and expect all future
142    // packets to come from the same one.
143    // TODO(hubbe): We should only do this if the caller used a valid ssrc.
144    if (IsEmpty(remote_addr_)) {
145      remote_addr_ = recv_addr_;
146      VLOG(1) << "Setting remote address from first received packet: "
147              << remote_addr_.ToString();
148    } else if (!IsEqual(remote_addr_, recv_addr_)) {
149      VLOG(1) << "Ignoring packet received from an unrecognized address: "
150              << recv_addr_.ToString() << ".";
151      length_or_status = net::ERR_IO_PENDING;
152      continue;
153    }
154
155    next_packet_->resize(length_or_status);
156    packet_receiver_.Run(next_packet_.Pass());
157    length_or_status = net::ERR_IO_PENDING;
158  }
159}
160
161bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
162  DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
163
164  // Increase byte count no matter the packet was sent or dropped.
165  bytes_sent_ += packet->data.size();
166
167  DCHECK(!send_pending_);
168  if (send_pending_) {
169    VLOG(1) << "Cannot send because of pending IO.";
170    return true;
171  }
172
173  if (next_dscp_value_ != net::DSCP_NO_CHANGE) {
174    int result = udp_socket_->SetDiffServCodePoint(next_dscp_value_);
175    if (result != net::OK) {
176      VLOG(1) << "Unable to set DSCP: " << next_dscp_value_
177              << " to socket; Error: " << result;
178    }
179
180    if (result != net::ERR_SOCKET_NOT_CONNECTED) {
181      // Don't change DSCP in next send.
182      next_dscp_value_ = net::DSCP_NO_CHANGE;
183    }
184  }
185
186  scoped_refptr<net::IOBuffer> buf =
187      new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
188
189  int result;
190  base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
191                                                  weak_factory_.GetWeakPtr(),
192                                                  buf,
193                                                  packet,
194                                                  cb);
195  if (client_connected_) {
196    // If we called Connect() before we must call Write() instead of
197    // SendTo(). Otherwise on some platforms we might get
198    // ERR_SOCKET_IS_CONNECTED.
199    result = udp_socket_->Write(
200        buf.get(), static_cast<int>(packet->data.size()), callback);
201  } else if (!IsEmpty(remote_addr_)) {
202    result = udp_socket_->SendTo(buf.get(),
203                                 static_cast<int>(packet->data.size()),
204                                 remote_addr_,
205                                 callback);
206  } else {
207    VLOG(1) << "Failed to send packet; socket is neither bound nor "
208            << "connected.";
209    return true;
210  }
211
212  if (result == net::ERR_IO_PENDING) {
213    send_pending_ = true;
214    return false;
215  }
216  OnSent(buf, packet, base::Closure(), result);
217  return true;
218}
219
220int64 UdpTransport::GetBytesSent() {
221  return bytes_sent_;
222}
223
224void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
225                          PacketRef packet,
226                          const base::Closure& cb,
227                          int result) {
228  DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
229
230  send_pending_ = false;
231  if (result < 0) {
232    VLOG(1) << "Failed to send packet: " << result << ".";
233  }
234  ScheduleReceiveNextPacket();
235
236  if (!cb.is_null()) {
237    cb.Run();
238  }
239}
240
241}  // namespace cast
242}  // namespace media
243