1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "remoting/jingle_glue/chromium_socket_factory.h"
6
7#include "base/bind.h"
8#include "base/logging.h"
9#include "base/memory/scoped_ptr.h"
10#include "jingle/glue/utils.h"
11#include "net/base/io_buffer.h"
12#include "net/base/ip_endpoint.h"
13#include "net/base/net_errors.h"
14#include "net/udp/udp_server_socket.h"
15#include "remoting/jingle_glue/socket_util.h"
16#include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
17#include "third_party/libjingle/source/talk/base/nethelpers.h"
18
19namespace remoting {
20
21namespace {
22
23// Size of the buffer to allocate for RecvFrom().
24const int kReceiveBufferSize = 65536;
25
26// Maximum amount of data in the send buffers. This is necessary to
27// prevent out-of-memory crashes if the caller sends data faster than
28// Pepper's UDP API can handle it. This maximum should never be
29// reached under normal conditions.
30const int kMaxSendBufferSize = 256 * 1024;
31
32class UdpPacketSocket : public talk_base::AsyncPacketSocket {
33 public:
34  UdpPacketSocket();
35  virtual ~UdpPacketSocket();
36
37  bool Init(const talk_base::SocketAddress& local_address,
38            int min_port, int max_port);
39
40  // talk_base::AsyncPacketSocket interface.
41  virtual talk_base::SocketAddress GetLocalAddress() const OVERRIDE;
42  virtual talk_base::SocketAddress GetRemoteAddress() const OVERRIDE;
43  virtual int Send(const void* data, size_t data_size,
44                   const talk_base::PacketOptions& options) OVERRIDE;
45  virtual int SendTo(const void* data, size_t data_size,
46                     const talk_base::SocketAddress& address,
47                     const talk_base::PacketOptions& options) OVERRIDE;
48  virtual int Close() OVERRIDE;
49  virtual State GetState() const OVERRIDE;
50  virtual int GetOption(talk_base::Socket::Option option, int* value) OVERRIDE;
51  virtual int SetOption(talk_base::Socket::Option option, int value) OVERRIDE;
52  virtual int GetError() const OVERRIDE;
53  virtual void SetError(int error) OVERRIDE;
54
55 private:
56  struct PendingPacket {
57    PendingPacket(const void* buffer,
58                  int buffer_size,
59                  const net::IPEndPoint& address);
60
61    scoped_refptr<net::IOBufferWithSize> data;
62    net::IPEndPoint address;
63    bool retried;
64  };
65
66  void OnBindCompleted(int error);
67
68  void DoSend();
69  void OnSendCompleted(int result);
70
71  void DoRead();
72  void OnReadCompleted(int result);
73  void HandleReadResult(int result);
74
75  scoped_ptr<net::UDPServerSocket> socket_;
76
77  State state_;
78  int error_;
79
80  talk_base::SocketAddress local_address_;
81
82  // Receive buffer and address are populated by asynchronous reads.
83  scoped_refptr<net::IOBuffer> receive_buffer_;
84  net::IPEndPoint receive_address_;
85
86  bool send_pending_;
87  std::list<PendingPacket> send_queue_;
88  int send_queue_size_;
89
90  DISALLOW_COPY_AND_ASSIGN(UdpPacketSocket);
91};
92
93UdpPacketSocket::PendingPacket::PendingPacket(
94    const void* buffer,
95    int buffer_size,
96    const net::IPEndPoint& address)
97    : data(new net::IOBufferWithSize(buffer_size)),
98      address(address),
99      retried(false) {
100  memcpy(data->data(), buffer, buffer_size);
101}
102
103UdpPacketSocket::UdpPacketSocket()
104    : state_(STATE_CLOSED),
105      error_(0),
106      send_pending_(false),
107      send_queue_size_(0) {
108}
109
110UdpPacketSocket::~UdpPacketSocket() {
111  Close();
112}
113
114bool UdpPacketSocket::Init(const talk_base::SocketAddress& local_address,
115                           int min_port, int max_port) {
116  net::IPEndPoint local_endpoint;
117  if (!jingle_glue::SocketAddressToIPEndPoint(
118          local_address, &local_endpoint)) {
119    return false;
120  }
121
122  for (int port = min_port; port <= max_port; ++port) {
123    socket_.reset(new net::UDPServerSocket(NULL, net::NetLog::Source()));
124    int result = socket_->Listen(
125        net::IPEndPoint(local_endpoint.address(), port));
126    if (result == net::OK) {
127      break;
128    } else {
129      socket_.reset();
130    }
131  }
132
133  if (!socket_.get()) {
134    // Failed to bind the socket.
135    return false;
136  }
137
138  if (socket_->GetLocalAddress(&local_endpoint) != net::OK ||
139      !jingle_glue::IPEndPointToSocketAddress(local_endpoint,
140                                              &local_address_)) {
141    return false;
142  }
143
144  state_ = STATE_BOUND;
145  DoRead();
146
147  return true;
148}
149
150talk_base::SocketAddress UdpPacketSocket::GetLocalAddress() const {
151  DCHECK_EQ(state_, STATE_BOUND);
152  return local_address_;
153}
154
155talk_base::SocketAddress UdpPacketSocket::GetRemoteAddress() const {
156  // UDP sockets are not connected - this method should never be called.
157  NOTREACHED();
158  return talk_base::SocketAddress();
159}
160
161int UdpPacketSocket::Send(const void* data, size_t data_size,
162                          const talk_base::PacketOptions& options) {
163  // UDP sockets are not connected - this method should never be called.
164  NOTREACHED();
165  return EWOULDBLOCK;
166}
167
168int UdpPacketSocket::SendTo(const void* data, size_t data_size,
169                            const talk_base::SocketAddress& address,
170                            const talk_base::PacketOptions& options) {
171  if (state_ != STATE_BOUND) {
172    NOTREACHED();
173    return EINVAL;
174  }
175
176  if (error_ != 0) {
177    return error_;
178  }
179
180  net::IPEndPoint endpoint;
181  if (!jingle_glue::SocketAddressToIPEndPoint(address, &endpoint)) {
182    return EINVAL;
183  }
184
185  if (send_queue_size_ >= kMaxSendBufferSize) {
186    return EWOULDBLOCK;
187  }
188
189  send_queue_.push_back(PendingPacket(data, data_size, endpoint));
190  send_queue_size_ += data_size;
191
192  DoSend();
193  return data_size;
194}
195
196int UdpPacketSocket::Close() {
197  state_ = STATE_CLOSED;
198  socket_.reset();
199  return 0;
200}
201
202talk_base::AsyncPacketSocket::State UdpPacketSocket::GetState() const {
203  return state_;
204}
205
206int UdpPacketSocket::GetOption(talk_base::Socket::Option option, int* value) {
207  // This method is never called by libjingle.
208  NOTIMPLEMENTED();
209  return -1;
210}
211
212int UdpPacketSocket::SetOption(talk_base::Socket::Option option, int value) {
213  if (state_ != STATE_BOUND) {
214    NOTREACHED();
215    return EINVAL;
216  }
217
218  switch (option) {
219    case talk_base::Socket::OPT_DONTFRAGMENT:
220      NOTIMPLEMENTED();
221      return -1;
222
223    case talk_base::Socket::OPT_RCVBUF: {
224      int net_error = socket_->SetReceiveBufferSize(value);
225      return (net_error == net::OK) ? 0 : -1;
226    }
227
228    case talk_base::Socket::OPT_SNDBUF: {
229      int net_error = socket_->SetSendBufferSize(value);
230      return (net_error == net::OK) ? 0 : -1;
231    }
232
233    case talk_base::Socket::OPT_NODELAY:
234      // OPT_NODELAY is only for TCP sockets.
235      NOTREACHED();
236      return -1;
237
238    case talk_base::Socket::OPT_IPV6_V6ONLY:
239      NOTIMPLEMENTED();
240      return -1;
241
242    case talk_base::Socket::OPT_DSCP:
243      NOTIMPLEMENTED();
244      return -1;
245
246    case talk_base::Socket::OPT_RTP_SENDTIME_EXTN_ID:
247      NOTIMPLEMENTED();
248      return -1;
249  }
250
251  NOTREACHED();
252  return -1;
253}
254
255int UdpPacketSocket::GetError() const {
256  return error_;
257}
258
259void UdpPacketSocket::SetError(int error) {
260  error_ = error;
261}
262
263void UdpPacketSocket::DoSend() {
264  if (send_pending_ || send_queue_.empty())
265    return;
266
267  PendingPacket& packet = send_queue_.front();
268  int result = socket_->SendTo(
269      packet.data.get(),
270      packet.data->size(),
271      packet.address,
272      base::Bind(&UdpPacketSocket::OnSendCompleted, base::Unretained(this)));
273  if (result == net::ERR_IO_PENDING) {
274    send_pending_ = true;
275  } else {
276    OnSendCompleted(result);
277  }
278}
279
280void UdpPacketSocket::OnSendCompleted(int result) {
281  send_pending_ = false;
282
283  if (result < 0) {
284    SocketErrorAction action = GetSocketErrorAction(result);
285    switch (action) {
286      case SOCKET_ERROR_ACTION_FAIL:
287        LOG(ERROR) << "Send failed on a UDP socket: " << result;
288        error_ = EINVAL;
289        return;
290
291      case SOCKET_ERROR_ACTION_RETRY:
292        // Retry resending only once.
293        if (!send_queue_.front().retried) {
294          send_queue_.front().retried = true;
295          DoSend();
296          return;
297        }
298        break;
299
300      case SOCKET_ERROR_ACTION_IGNORE:
301        break;
302    }
303  }
304
305  // Don't need to worry about partial sends because this is a datagram
306  // socket.
307  send_queue_size_ -= send_queue_.front().data->size();
308  send_queue_.pop_front();
309  DoSend();
310}
311
312void UdpPacketSocket::DoRead() {
313  int result = 0;
314  while (result >= 0) {
315    receive_buffer_ = new net::IOBuffer(kReceiveBufferSize);
316    result = socket_->RecvFrom(
317        receive_buffer_.get(),
318        kReceiveBufferSize,
319        &receive_address_,
320        base::Bind(&UdpPacketSocket::OnReadCompleted, base::Unretained(this)));
321    HandleReadResult(result);
322  }
323}
324
325void UdpPacketSocket::OnReadCompleted(int result) {
326  HandleReadResult(result);
327  if (result >= 0) {
328    DoRead();
329  }
330}
331
332void UdpPacketSocket::HandleReadResult(int result) {
333  if (result == net::ERR_IO_PENDING) {
334    return;
335  }
336
337  if (result > 0) {
338    talk_base::SocketAddress address;
339    if (!jingle_glue::IPEndPointToSocketAddress(receive_address_, &address)) {
340      NOTREACHED();
341      LOG(ERROR) << "Failed to convert address received from RecvFrom().";
342      return;
343    }
344    SignalReadPacket(this, receive_buffer_->data(), result, address,
345                     talk_base::CreatePacketTime(0));
346  } else {
347    LOG(ERROR) << "Received error when reading from UDP socket: " << result;
348  }
349}
350
351}  // namespace
352
353ChromiumPacketSocketFactory::ChromiumPacketSocketFactory() {
354}
355
356ChromiumPacketSocketFactory::~ChromiumPacketSocketFactory() {
357}
358
359talk_base::AsyncPacketSocket* ChromiumPacketSocketFactory::CreateUdpSocket(
360      const talk_base::SocketAddress& local_address,
361      int min_port, int max_port) {
362  scoped_ptr<UdpPacketSocket> result(new UdpPacketSocket());
363  if (!result->Init(local_address, min_port, max_port))
364    return NULL;
365  return result.release();
366}
367
368talk_base::AsyncPacketSocket*
369ChromiumPacketSocketFactory::CreateServerTcpSocket(
370    const talk_base::SocketAddress& local_address,
371    int min_port, int max_port,
372    int opts) {
373  // We don't use TCP sockets for remoting connections.
374  NOTREACHED();
375  return NULL;
376}
377
378talk_base::AsyncPacketSocket*
379ChromiumPacketSocketFactory::CreateClientTcpSocket(
380      const talk_base::SocketAddress& local_address,
381      const talk_base::SocketAddress& remote_address,
382      const talk_base::ProxyInfo& proxy_info,
383      const std::string& user_agent,
384      int opts) {
385  // We don't use TCP sockets for remoting connections.
386  NOTREACHED();
387  return NULL;
388}
389
390talk_base::AsyncResolverInterface*
391ChromiumPacketSocketFactory::CreateAsyncResolver() {
392  return new talk_base::AsyncResolver();
393}
394
395}  // namespace remoting
396