1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/udp/udp_socket_libevent.h"
6
7#include <errno.h>
8#include <fcntl.h>
9#include <netdb.h>
10#include <sys/socket.h>
11#include <netinet/in.h>
12
13#include "base/callback.h"
14#include "base/logging.h"
15#include "base/message_loop/message_loop.h"
16#include "base/metrics/stats_counters.h"
17#include "base/posix/eintr_wrapper.h"
18#include "base/rand_util.h"
19#include "net/base/io_buffer.h"
20#include "net/base/ip_endpoint.h"
21#include "net/base/net_errors.h"
22#include "net/base/net_log.h"
23#include "net/base/net_util.h"
24#include "net/udp/udp_net_log_parameters.h"
25
26namespace {
27
28const int kBindRetries = 10;
29const int kPortStart = 1024;
30const int kPortEnd = 65535;
31
32}  // namespace
33
34namespace net {
35
36UDPSocketLibevent::UDPSocketLibevent(
37    DatagramSocket::BindType bind_type,
38    const RandIntCallback& rand_int_cb,
39    net::NetLog* net_log,
40    const net::NetLog::Source& source)
41        : socket_(kInvalidSocket),
42          addr_family_(0),
43          socket_options_(SOCKET_OPTION_MULTICAST_LOOP),
44          multicast_time_to_live_(1),
45          bind_type_(bind_type),
46          rand_int_cb_(rand_int_cb),
47          read_watcher_(this),
48          write_watcher_(this),
49          read_buf_len_(0),
50          recv_from_address_(NULL),
51          write_buf_len_(0),
52          net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)) {
53  net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
54                      source.ToEventParametersCallback());
55  if (bind_type == DatagramSocket::RANDOM_BIND)
56    DCHECK(!rand_int_cb.is_null());
57}
58
59UDPSocketLibevent::~UDPSocketLibevent() {
60  Close();
61  net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE);
62}
63
64void UDPSocketLibevent::Close() {
65  DCHECK(CalledOnValidThread());
66
67  if (!is_connected())
68    return;
69
70  // Zero out any pending read/write callback state.
71  read_buf_ = NULL;
72  read_buf_len_ = 0;
73  read_callback_.Reset();
74  recv_from_address_ = NULL;
75  write_buf_ = NULL;
76  write_buf_len_ = 0;
77  write_callback_.Reset();
78  send_to_address_.reset();
79
80  bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
81  DCHECK(ok);
82  ok = write_socket_watcher_.StopWatchingFileDescriptor();
83  DCHECK(ok);
84
85  if (HANDLE_EINTR(close(socket_)) < 0)
86    PLOG(ERROR) << "close";
87
88  socket_ = kInvalidSocket;
89  addr_family_ = 0;
90}
91
92int UDPSocketLibevent::GetPeerAddress(IPEndPoint* address) const {
93  DCHECK(CalledOnValidThread());
94  DCHECK(address);
95  if (!is_connected())
96    return ERR_SOCKET_NOT_CONNECTED;
97
98  if (!remote_address_.get()) {
99    SockaddrStorage storage;
100    if (getpeername(socket_, storage.addr, &storage.addr_len))
101      return MapSystemError(errno);
102    scoped_ptr<IPEndPoint> address(new IPEndPoint());
103    if (!address->FromSockAddr(storage.addr, storage.addr_len))
104      return ERR_FAILED;
105    remote_address_.reset(address.release());
106  }
107
108  *address = *remote_address_;
109  return OK;
110}
111
112int UDPSocketLibevent::GetLocalAddress(IPEndPoint* address) const {
113  DCHECK(CalledOnValidThread());
114  DCHECK(address);
115  if (!is_connected())
116    return ERR_SOCKET_NOT_CONNECTED;
117
118  if (!local_address_.get()) {
119    SockaddrStorage storage;
120    if (getsockname(socket_, storage.addr, &storage.addr_len))
121      return MapSystemError(errno);
122    scoped_ptr<IPEndPoint> address(new IPEndPoint());
123    if (!address->FromSockAddr(storage.addr, storage.addr_len))
124      return ERR_FAILED;
125    local_address_.reset(address.release());
126    net_log_.AddEvent(NetLog::TYPE_UDP_LOCAL_ADDRESS,
127                      CreateNetLogUDPConnectCallback(local_address_.get()));
128  }
129
130  *address = *local_address_;
131  return OK;
132}
133
134int UDPSocketLibevent::Read(IOBuffer* buf,
135                            int buf_len,
136                            const CompletionCallback& callback) {
137  return RecvFrom(buf, buf_len, NULL, callback);
138}
139
140int UDPSocketLibevent::RecvFrom(IOBuffer* buf,
141                                int buf_len,
142                                IPEndPoint* address,
143                                const CompletionCallback& callback) {
144  DCHECK(CalledOnValidThread());
145  DCHECK_NE(kInvalidSocket, socket_);
146  DCHECK(read_callback_.is_null());
147  DCHECK(!recv_from_address_);
148  DCHECK(!callback.is_null());  // Synchronous operation not supported
149  DCHECK_GT(buf_len, 0);
150
151  int nread = InternalRecvFrom(buf, buf_len, address);
152  if (nread != ERR_IO_PENDING)
153    return nread;
154
155  if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
156          socket_, true, base::MessageLoopForIO::WATCH_READ,
157          &read_socket_watcher_, &read_watcher_)) {
158    PLOG(ERROR) << "WatchFileDescriptor failed on read";
159    int result = MapSystemError(errno);
160    LogRead(result, NULL, 0, NULL);
161    return result;
162  }
163
164  read_buf_ = buf;
165  read_buf_len_ = buf_len;
166  recv_from_address_ = address;
167  read_callback_ = callback;
168  return ERR_IO_PENDING;
169}
170
171int UDPSocketLibevent::Write(IOBuffer* buf,
172                             int buf_len,
173                             const CompletionCallback& callback) {
174  return SendToOrWrite(buf, buf_len, NULL, callback);
175}
176
177int UDPSocketLibevent::SendTo(IOBuffer* buf,
178                              int buf_len,
179                              const IPEndPoint& address,
180                              const CompletionCallback& callback) {
181  return SendToOrWrite(buf, buf_len, &address, callback);
182}
183
184int UDPSocketLibevent::SendToOrWrite(IOBuffer* buf,
185                                     int buf_len,
186                                     const IPEndPoint* address,
187                                     const CompletionCallback& callback) {
188  DCHECK(CalledOnValidThread());
189  DCHECK_NE(kInvalidSocket, socket_);
190  DCHECK(write_callback_.is_null());
191  DCHECK(!callback.is_null());  // Synchronous operation not supported
192  DCHECK_GT(buf_len, 0);
193
194  int result = InternalSendTo(buf, buf_len, address);
195  if (result != ERR_IO_PENDING)
196    return result;
197
198  if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
199          socket_, true, base::MessageLoopForIO::WATCH_WRITE,
200          &write_socket_watcher_, &write_watcher_)) {
201    DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno;
202    int result = MapSystemError(errno);
203    LogWrite(result, NULL, NULL);
204    return result;
205  }
206
207  write_buf_ = buf;
208  write_buf_len_ = buf_len;
209  DCHECK(!send_to_address_.get());
210  if (address) {
211    send_to_address_.reset(new IPEndPoint(*address));
212  }
213  write_callback_ = callback;
214  return ERR_IO_PENDING;
215}
216
217int UDPSocketLibevent::Connect(const IPEndPoint& address) {
218  net_log_.BeginEvent(NetLog::TYPE_UDP_CONNECT,
219                      CreateNetLogUDPConnectCallback(&address));
220  int rv = InternalConnect(address);
221  if (rv != OK)
222    Close();
223  net_log_.EndEventWithNetErrorCode(NetLog::TYPE_UDP_CONNECT, rv);
224  return rv;
225}
226
227int UDPSocketLibevent::InternalConnect(const IPEndPoint& address) {
228  DCHECK(CalledOnValidThread());
229  DCHECK(!is_connected());
230  DCHECK(!remote_address_.get());
231  int rv = CreateSocket(address);
232  if (rv < 0)
233    return rv;
234
235  if (bind_type_ == DatagramSocket::RANDOM_BIND)
236    rv = RandomBind(address);
237  // else connect() does the DatagramSocket::DEFAULT_BIND
238
239  if (rv < 0) {
240    Close();
241    return rv;
242  }
243
244  SockaddrStorage storage;
245  if (!address.ToSockAddr(storage.addr, &storage.addr_len)) {
246    Close();
247    return ERR_ADDRESS_INVALID;
248  }
249
250  rv = HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len));
251  if (rv < 0) {
252    // Close() may change the current errno. Map errno beforehand.
253    int result = MapSystemError(errno);
254    Close();
255    return result;
256  }
257
258  remote_address_.reset(new IPEndPoint(address));
259  return rv;
260}
261
262int UDPSocketLibevent::Bind(const IPEndPoint& address) {
263  DCHECK(CalledOnValidThread());
264  DCHECK(!is_connected());
265  int rv = CreateSocket(address);
266  if (rv < 0)
267    return rv;
268
269  rv = SetSocketOptions();
270  if (rv < 0) {
271    Close();
272    return rv;
273  }
274  rv = DoBind(address);
275  if (rv < 0) {
276    Close();
277    return rv;
278  }
279  local_address_.reset();
280  return rv;
281}
282
283bool UDPSocketLibevent::SetReceiveBufferSize(int32 size) {
284  DCHECK(CalledOnValidThread());
285  int rv = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF,
286                      reinterpret_cast<const char*>(&size), sizeof(size));
287  DCHECK(!rv) << "Could not set socket receive buffer size: " << errno;
288  return rv == 0;
289}
290
291bool UDPSocketLibevent::SetSendBufferSize(int32 size) {
292  DCHECK(CalledOnValidThread());
293  int rv = setsockopt(socket_, SOL_SOCKET, SO_SNDBUF,
294                      reinterpret_cast<const char*>(&size), sizeof(size));
295  DCHECK(!rv) << "Could not set socket send buffer size: " << errno;
296  return rv == 0;
297}
298
299void UDPSocketLibevent::AllowAddressReuse() {
300  DCHECK(CalledOnValidThread());
301  DCHECK(!is_connected());
302
303  socket_options_ |= SOCKET_OPTION_REUSE_ADDRESS;
304}
305
306void UDPSocketLibevent::AllowBroadcast() {
307  DCHECK(CalledOnValidThread());
308  DCHECK(!is_connected());
309
310  socket_options_ |= SOCKET_OPTION_BROADCAST;
311}
312
313void UDPSocketLibevent::ReadWatcher::OnFileCanReadWithoutBlocking(int) {
314  if (!socket_->read_callback_.is_null())
315    socket_->DidCompleteRead();
316}
317
318void UDPSocketLibevent::WriteWatcher::OnFileCanWriteWithoutBlocking(int) {
319  if (!socket_->write_callback_.is_null())
320    socket_->DidCompleteWrite();
321}
322
323void UDPSocketLibevent::DoReadCallback(int rv) {
324  DCHECK_NE(rv, ERR_IO_PENDING);
325  DCHECK(!read_callback_.is_null());
326
327  // since Run may result in Read being called, clear read_callback_ up front.
328  CompletionCallback c = read_callback_;
329  read_callback_.Reset();
330  c.Run(rv);
331}
332
333void UDPSocketLibevent::DoWriteCallback(int rv) {
334  DCHECK_NE(rv, ERR_IO_PENDING);
335  DCHECK(!write_callback_.is_null());
336
337  // since Run may result in Write being called, clear write_callback_ up front.
338  CompletionCallback c = write_callback_;
339  write_callback_.Reset();
340  c.Run(rv);
341}
342
343void UDPSocketLibevent::DidCompleteRead() {
344  int result =
345      InternalRecvFrom(read_buf_.get(), read_buf_len_, recv_from_address_);
346  if (result != ERR_IO_PENDING) {
347    read_buf_ = NULL;
348    read_buf_len_ = 0;
349    recv_from_address_ = NULL;
350    bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
351    DCHECK(ok);
352    DoReadCallback(result);
353  }
354}
355
356void UDPSocketLibevent::LogRead(int result,
357                                const char* bytes,
358                                socklen_t addr_len,
359                                const sockaddr* addr) const {
360  if (result < 0) {
361    net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
362    return;
363  }
364
365  if (net_log_.IsLoggingAllEvents()) {
366    DCHECK(addr_len > 0);
367    DCHECK(addr);
368
369    IPEndPoint address;
370    bool is_address_valid = address.FromSockAddr(addr, addr_len);
371    net_log_.AddEvent(
372        NetLog::TYPE_UDP_BYTES_RECEIVED,
373        CreateNetLogUDPDataTranferCallback(
374            result, bytes,
375            is_address_valid ? &address : NULL));
376  }
377
378  base::StatsCounter read_bytes("udp.read_bytes");
379  read_bytes.Add(result);
380}
381
382int UDPSocketLibevent::CreateSocket(const IPEndPoint& address) {
383  addr_family_ = address.GetSockAddrFamily();
384  socket_ = socket(addr_family_, SOCK_DGRAM, 0);
385  if (socket_ == kInvalidSocket)
386    return MapSystemError(errno);
387  if (SetNonBlocking(socket_)) {
388    const int err = MapSystemError(errno);
389    Close();
390    return err;
391  }
392  return OK;
393}
394
395void UDPSocketLibevent::DidCompleteWrite() {
396  int result =
397      InternalSendTo(write_buf_.get(), write_buf_len_, send_to_address_.get());
398
399  if (result != ERR_IO_PENDING) {
400    write_buf_ = NULL;
401    write_buf_len_ = 0;
402    send_to_address_.reset();
403    write_socket_watcher_.StopWatchingFileDescriptor();
404    DoWriteCallback(result);
405  }
406}
407
408void UDPSocketLibevent::LogWrite(int result,
409                                 const char* bytes,
410                                 const IPEndPoint* address) const {
411  if (result < 0) {
412    net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR, result);
413    return;
414  }
415
416  if (net_log_.IsLoggingAllEvents()) {
417    net_log_.AddEvent(
418        NetLog::TYPE_UDP_BYTES_SENT,
419        CreateNetLogUDPDataTranferCallback(result, bytes, address));
420  }
421
422  base::StatsCounter write_bytes("udp.write_bytes");
423  write_bytes.Add(result);
424}
425
426int UDPSocketLibevent::InternalRecvFrom(IOBuffer* buf, int buf_len,
427                                        IPEndPoint* address) {
428  int bytes_transferred;
429  int flags = 0;
430
431  SockaddrStorage storage;
432
433  bytes_transferred =
434      HANDLE_EINTR(recvfrom(socket_,
435                            buf->data(),
436                            buf_len,
437                            flags,
438                            storage.addr,
439                            &storage.addr_len));
440  int result;
441  if (bytes_transferred >= 0) {
442    result = bytes_transferred;
443    if (address && !address->FromSockAddr(storage.addr, storage.addr_len))
444      result = ERR_FAILED;
445  } else {
446    result = MapSystemError(errno);
447  }
448  if (result != ERR_IO_PENDING)
449    LogRead(result, buf->data(), storage.addr_len, storage.addr);
450  return result;
451}
452
453int UDPSocketLibevent::InternalSendTo(IOBuffer* buf, int buf_len,
454                                      const IPEndPoint* address) {
455  SockaddrStorage storage;
456  struct sockaddr* addr = storage.addr;
457  if (!address) {
458    addr = NULL;
459    storage.addr_len = 0;
460  } else {
461    if (!address->ToSockAddr(storage.addr, &storage.addr_len)) {
462      int result = ERR_FAILED;
463      LogWrite(result, NULL, NULL);
464      return result;
465    }
466  }
467
468  int result = HANDLE_EINTR(sendto(socket_,
469                            buf->data(),
470                            buf_len,
471                            0,
472                            addr,
473                            storage.addr_len));
474  if (result < 0)
475    result = MapSystemError(errno);
476  if (result != ERR_IO_PENDING)
477    LogWrite(result, buf->data(), address);
478  return result;
479}
480
481int UDPSocketLibevent::SetSocketOptions() {
482  int true_value = 1;
483  if (socket_options_ & SOCKET_OPTION_REUSE_ADDRESS) {
484    int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &true_value,
485                        sizeof(true_value));
486    if (rv < 0)
487      return MapSystemError(errno);
488  }
489  if (socket_options_ & SOCKET_OPTION_BROADCAST) {
490    int rv;
491#if defined(OS_MACOSX)
492    // SO_REUSEPORT on OSX permits multiple processes to each receive
493    // UDP multicast or broadcast datagrams destined for the bound
494    // port.
495    rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &true_value,
496                    sizeof(true_value));
497#else
498    rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &true_value,
499                    sizeof(true_value));
500#endif  // defined(OS_MACOSX)
501    if (rv < 0)
502      return MapSystemError(errno);
503  }
504
505  if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
506    int rv;
507    if (addr_family_ == AF_INET) {
508      u_char loop = 0;
509      rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_LOOP,
510                      &loop, sizeof(loop));
511    } else {
512      u_int loop = 0;
513      rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
514                      &loop, sizeof(loop));
515    }
516    if (rv < 0)
517      return MapSystemError(errno);
518  }
519  if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
520    int rv;
521    if (addr_family_ == AF_INET) {
522      u_char ttl = multicast_time_to_live_;
523      rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_TTL,
524                      &ttl, sizeof(ttl));
525    } else {
526      // Signed interger. -1 to use route default.
527      int ttl = multicast_time_to_live_;
528      rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
529                      &ttl, sizeof(ttl));
530    }
531    if (rv < 0)
532      return MapSystemError(errno);
533  }
534  return OK;
535}
536
537int UDPSocketLibevent::DoBind(const IPEndPoint& address) {
538  SockaddrStorage storage;
539  if (!address.ToSockAddr(storage.addr, &storage.addr_len))
540    return ERR_ADDRESS_INVALID;
541  int rv = bind(socket_, storage.addr, storage.addr_len);
542  return rv < 0 ? MapSystemError(errno) : rv;
543}
544
545int UDPSocketLibevent::RandomBind(const IPEndPoint& address) {
546  DCHECK(bind_type_ == DatagramSocket::RANDOM_BIND && !rand_int_cb_.is_null());
547
548  // Construct IPAddressNumber of appropriate size (IPv4 or IPv6) of 0s.
549  IPAddressNumber ip(address.address().size());
550
551  for (int i = 0; i < kBindRetries; ++i) {
552    int rv = DoBind(IPEndPoint(ip, rand_int_cb_.Run(kPortStart, kPortEnd)));
553    if (rv == OK || rv != ERR_ADDRESS_IN_USE)
554      return rv;
555  }
556  return DoBind(IPEndPoint(ip, 0));
557}
558
559int UDPSocketLibevent::JoinGroup(const IPAddressNumber& group_address) const {
560  DCHECK(CalledOnValidThread());
561  if (!is_connected())
562    return ERR_SOCKET_NOT_CONNECTED;
563
564  switch (group_address.size()) {
565    case kIPv4AddressSize: {
566      if (addr_family_ != AF_INET)
567        return ERR_ADDRESS_INVALID;
568      ip_mreq mreq;
569      mreq.imr_interface.s_addr = INADDR_ANY;
570      memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize);
571      int rv = setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
572                          &mreq, sizeof(mreq));
573      if (rv < 0)
574        return MapSystemError(errno);
575      return OK;
576    }
577    case kIPv6AddressSize: {
578      if (addr_family_ != AF_INET6)
579        return ERR_ADDRESS_INVALID;
580      ipv6_mreq mreq;
581      mreq.ipv6mr_interface = 0;  // 0 indicates default multicast interface.
582      memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize);
583      int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP,
584                          &mreq, sizeof(mreq));
585      if (rv < 0)
586        return MapSystemError(errno);
587      return OK;
588    }
589    default:
590      NOTREACHED() << "Invalid address family";
591      return ERR_ADDRESS_INVALID;
592  }
593}
594
595int UDPSocketLibevent::LeaveGroup(const IPAddressNumber& group_address) const {
596  DCHECK(CalledOnValidThread());
597
598  if (!is_connected())
599    return ERR_SOCKET_NOT_CONNECTED;
600
601  switch (group_address.size()) {
602    case kIPv4AddressSize: {
603      if (addr_family_ != AF_INET)
604        return ERR_ADDRESS_INVALID;
605      ip_mreq mreq;
606      mreq.imr_interface.s_addr = INADDR_ANY;
607      memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize);
608      int rv = setsockopt(socket_, IPPROTO_IP, IP_DROP_MEMBERSHIP,
609                          &mreq, sizeof(mreq));
610      if (rv < 0)
611        return MapSystemError(errno);
612      return OK;
613    }
614    case kIPv6AddressSize: {
615      if (addr_family_ != AF_INET6)
616        return ERR_ADDRESS_INVALID;
617      ipv6_mreq mreq;
618      mreq.ipv6mr_interface = 0;  // 0 indicates default multicast interface.
619      memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize);
620      int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
621                          &mreq, sizeof(mreq));
622      if (rv < 0)
623        return MapSystemError(errno);
624      return OK;
625    }
626    default:
627      NOTREACHED() << "Invalid address family";
628      return ERR_ADDRESS_INVALID;
629  }
630}
631
632int UDPSocketLibevent::SetMulticastTimeToLive(int time_to_live) {
633  DCHECK(CalledOnValidThread());
634  if (is_connected())
635    return ERR_SOCKET_IS_CONNECTED;
636
637  if (time_to_live < 0 || time_to_live > 255)
638    return ERR_INVALID_ARGUMENT;
639  multicast_time_to_live_ = time_to_live;
640  return OK;
641}
642
643int UDPSocketLibevent::SetMulticastLoopbackMode(bool loopback) {
644  DCHECK(CalledOnValidThread());
645  if (is_connected())
646    return ERR_SOCKET_IS_CONNECTED;
647
648  if (loopback)
649    socket_options_ |= SOCKET_OPTION_MULTICAST_LOOP;
650  else
651    socket_options_ &= ~SOCKET_OPTION_MULTICAST_LOOP;
652  return OK;
653}
654}  // namespace net
655