1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/udp/udp_socket_libevent.h"
6
7#include <errno.h>
8#include <fcntl.h>
9#include <netdb.h>
10#include <net/if.h>
11#include <netinet/in.h>
12#include <sys/ioctl.h>
13#include <sys/socket.h>
14
15#include "base/callback.h"
16#include "base/logging.h"
17#include "base/message_loop/message_loop.h"
18#include "base/metrics/sparse_histogram.h"
19#include "base/metrics/stats_counters.h"
20#include "base/posix/eintr_wrapper.h"
21#include "base/rand_util.h"
22#include "net/base/io_buffer.h"
23#include "net/base/ip_endpoint.h"
24#include "net/base/net_errors.h"
25#include "net/base/net_log.h"
26#include "net/base/net_util.h"
27#include "net/socket/socket_descriptor.h"
28#include "net/udp/udp_net_log_parameters.h"
29
30
31namespace net {
32
33namespace {
34
35const int kBindRetries = 10;
36const int kPortStart = 1024;
37const int kPortEnd = 65535;
38
39#if defined(OS_MACOSX)
40
41// Returns IPv4 address in network order.
42int GetIPv4AddressFromIndex(int socket, uint32 index, uint32* address){
43  if (!index) {
44    *address = htonl(INADDR_ANY);
45    return OK;
46  }
47  ifreq ifr;
48  ifr.ifr_addr.sa_family = AF_INET;
49  if (!if_indextoname(index, ifr.ifr_name))
50    return MapSystemError(errno);
51  int rv = ioctl(socket, SIOCGIFADDR, &ifr);
52  if (rv == -1)
53    return MapSystemError(errno);
54  *address = reinterpret_cast<sockaddr_in*>(&ifr.ifr_addr)->sin_addr.s_addr;
55  return OK;
56}
57
58#endif  // OS_MACOSX
59
60}  // namespace
61
62UDPSocketLibevent::UDPSocketLibevent(
63    DatagramSocket::BindType bind_type,
64    const RandIntCallback& rand_int_cb,
65    net::NetLog* net_log,
66    const net::NetLog::Source& source)
67        : socket_(kInvalidSocket),
68          addr_family_(0),
69          socket_options_(SOCKET_OPTION_MULTICAST_LOOP),
70          multicast_interface_(0),
71          multicast_time_to_live_(1),
72          bind_type_(bind_type),
73          rand_int_cb_(rand_int_cb),
74          read_watcher_(this),
75          write_watcher_(this),
76          read_buf_len_(0),
77          recv_from_address_(NULL),
78          write_buf_len_(0),
79          net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)) {
80  net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
81                      source.ToEventParametersCallback());
82  if (bind_type == DatagramSocket::RANDOM_BIND)
83    DCHECK(!rand_int_cb.is_null());
84}
85
86UDPSocketLibevent::~UDPSocketLibevent() {
87  Close();
88  net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE);
89}
90
91void UDPSocketLibevent::Close() {
92  DCHECK(CalledOnValidThread());
93
94  if (!is_connected())
95    return;
96
97  // Zero out any pending read/write callback state.
98  read_buf_ = NULL;
99  read_buf_len_ = 0;
100  read_callback_.Reset();
101  recv_from_address_ = NULL;
102  write_buf_ = NULL;
103  write_buf_len_ = 0;
104  write_callback_.Reset();
105  send_to_address_.reset();
106
107  bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
108  DCHECK(ok);
109  ok = write_socket_watcher_.StopWatchingFileDescriptor();
110  DCHECK(ok);
111
112  if (IGNORE_EINTR(close(socket_)) < 0)
113    PLOG(ERROR) << "close";
114
115  socket_ = kInvalidSocket;
116  addr_family_ = 0;
117}
118
119int UDPSocketLibevent::GetPeerAddress(IPEndPoint* address) const {
120  DCHECK(CalledOnValidThread());
121  DCHECK(address);
122  if (!is_connected())
123    return ERR_SOCKET_NOT_CONNECTED;
124
125  if (!remote_address_.get()) {
126    SockaddrStorage storage;
127    if (getpeername(socket_, storage.addr, &storage.addr_len))
128      return MapSystemError(errno);
129    scoped_ptr<IPEndPoint> address(new IPEndPoint());
130    if (!address->FromSockAddr(storage.addr, storage.addr_len))
131      return ERR_ADDRESS_INVALID;
132    remote_address_.reset(address.release());
133  }
134
135  *address = *remote_address_;
136  return OK;
137}
138
139int UDPSocketLibevent::GetLocalAddress(IPEndPoint* address) const {
140  DCHECK(CalledOnValidThread());
141  DCHECK(address);
142  if (!is_connected())
143    return ERR_SOCKET_NOT_CONNECTED;
144
145  if (!local_address_.get()) {
146    SockaddrStorage storage;
147    if (getsockname(socket_, storage.addr, &storage.addr_len))
148      return MapSystemError(errno);
149    scoped_ptr<IPEndPoint> address(new IPEndPoint());
150    if (!address->FromSockAddr(storage.addr, storage.addr_len))
151      return ERR_ADDRESS_INVALID;
152    local_address_.reset(address.release());
153    net_log_.AddEvent(NetLog::TYPE_UDP_LOCAL_ADDRESS,
154                      CreateNetLogUDPConnectCallback(local_address_.get()));
155  }
156
157  *address = *local_address_;
158  return OK;
159}
160
161int UDPSocketLibevent::Read(IOBuffer* buf,
162                            int buf_len,
163                            const CompletionCallback& callback) {
164  return RecvFrom(buf, buf_len, NULL, callback);
165}
166
167int UDPSocketLibevent::RecvFrom(IOBuffer* buf,
168                                int buf_len,
169                                IPEndPoint* address,
170                                const CompletionCallback& callback) {
171  DCHECK(CalledOnValidThread());
172  DCHECK_NE(kInvalidSocket, socket_);
173  DCHECK(read_callback_.is_null());
174  DCHECK(!recv_from_address_);
175  DCHECK(!callback.is_null());  // Synchronous operation not supported
176  DCHECK_GT(buf_len, 0);
177
178  int nread = InternalRecvFrom(buf, buf_len, address);
179  if (nread != ERR_IO_PENDING)
180    return nread;
181
182  if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
183          socket_, true, base::MessageLoopForIO::WATCH_READ,
184          &read_socket_watcher_, &read_watcher_)) {
185    PLOG(ERROR) << "WatchFileDescriptor failed on read";
186    int result = MapSystemError(errno);
187    LogRead(result, NULL, 0, NULL);
188    return result;
189  }
190
191  read_buf_ = buf;
192  read_buf_len_ = buf_len;
193  recv_from_address_ = address;
194  read_callback_ = callback;
195  return ERR_IO_PENDING;
196}
197
198int UDPSocketLibevent::Write(IOBuffer* buf,
199                             int buf_len,
200                             const CompletionCallback& callback) {
201  return SendToOrWrite(buf, buf_len, NULL, callback);
202}
203
204int UDPSocketLibevent::SendTo(IOBuffer* buf,
205                              int buf_len,
206                              const IPEndPoint& address,
207                              const CompletionCallback& callback) {
208  return SendToOrWrite(buf, buf_len, &address, callback);
209}
210
211int UDPSocketLibevent::SendToOrWrite(IOBuffer* buf,
212                                     int buf_len,
213                                     const IPEndPoint* address,
214                                     const CompletionCallback& callback) {
215  DCHECK(CalledOnValidThread());
216  DCHECK_NE(kInvalidSocket, socket_);
217  DCHECK(write_callback_.is_null());
218  DCHECK(!callback.is_null());  // Synchronous operation not supported
219  DCHECK_GT(buf_len, 0);
220
221  int result = InternalSendTo(buf, buf_len, address);
222  if (result != ERR_IO_PENDING)
223    return result;
224
225  if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
226          socket_, true, base::MessageLoopForIO::WATCH_WRITE,
227          &write_socket_watcher_, &write_watcher_)) {
228    DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno;
229    int result = MapSystemError(errno);
230    LogWrite(result, NULL, NULL);
231    return result;
232  }
233
234  write_buf_ = buf;
235  write_buf_len_ = buf_len;
236  DCHECK(!send_to_address_.get());
237  if (address) {
238    send_to_address_.reset(new IPEndPoint(*address));
239  }
240  write_callback_ = callback;
241  return ERR_IO_PENDING;
242}
243
244int UDPSocketLibevent::Connect(const IPEndPoint& address) {
245  net_log_.BeginEvent(NetLog::TYPE_UDP_CONNECT,
246                      CreateNetLogUDPConnectCallback(&address));
247  int rv = InternalConnect(address);
248  if (rv != OK)
249    Close();
250  net_log_.EndEventWithNetErrorCode(NetLog::TYPE_UDP_CONNECT, rv);
251  return rv;
252}
253
254int UDPSocketLibevent::InternalConnect(const IPEndPoint& address) {
255  DCHECK(CalledOnValidThread());
256  DCHECK(!is_connected());
257  DCHECK(!remote_address_.get());
258  int addr_family = address.GetSockAddrFamily();
259  int rv = CreateSocket(addr_family);
260  if (rv < 0)
261    return rv;
262
263  if (bind_type_ == DatagramSocket::RANDOM_BIND) {
264    // Construct IPAddressNumber of appropriate size (IPv4 or IPv6) of 0s,
265    // representing INADDR_ANY or in6addr_any.
266    size_t addr_size =
267        addr_family == AF_INET ? kIPv4AddressSize : kIPv6AddressSize;
268    IPAddressNumber addr_any(addr_size);
269    rv = RandomBind(addr_any);
270  }
271  // else connect() does the DatagramSocket::DEFAULT_BIND
272
273  if (rv < 0) {
274    UMA_HISTOGRAM_SPARSE_SLOWLY("Net.UdpSocketRandomBindErrorCode", -rv);
275    Close();
276    return rv;
277  }
278
279  SockaddrStorage storage;
280  if (!address.ToSockAddr(storage.addr, &storage.addr_len)) {
281    Close();
282    return ERR_ADDRESS_INVALID;
283  }
284
285  rv = HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len));
286  if (rv < 0) {
287    // Close() may change the current errno. Map errno beforehand.
288    int result = MapSystemError(errno);
289    Close();
290    return result;
291  }
292
293  remote_address_.reset(new IPEndPoint(address));
294  return rv;
295}
296
297int UDPSocketLibevent::Bind(const IPEndPoint& address) {
298  DCHECK(CalledOnValidThread());
299  DCHECK(!is_connected());
300  int rv = CreateSocket(address.GetSockAddrFamily());
301  if (rv < 0)
302    return rv;
303
304  rv = SetSocketOptions();
305  if (rv < 0) {
306    Close();
307    return rv;
308  }
309  rv = DoBind(address);
310  if (rv < 0) {
311    Close();
312    return rv;
313  }
314  local_address_.reset();
315  return rv;
316}
317
318int UDPSocketLibevent::SetReceiveBufferSize(int32 size) {
319  DCHECK(CalledOnValidThread());
320  int rv = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF,
321                      reinterpret_cast<const char*>(&size), sizeof(size));
322  int last_error = errno;
323  DCHECK(!rv) << "Could not set socket receive buffer size: " << last_error;
324  return rv == 0 ? OK : MapSystemError(last_error);
325}
326
327int UDPSocketLibevent::SetSendBufferSize(int32 size) {
328  DCHECK(CalledOnValidThread());
329  int rv = setsockopt(socket_, SOL_SOCKET, SO_SNDBUF,
330                      reinterpret_cast<const char*>(&size), sizeof(size));
331  int last_error = errno;
332  DCHECK(!rv) << "Could not set socket send buffer size: " << last_error;
333  return rv == 0 ? OK : MapSystemError(last_error);
334}
335
336void UDPSocketLibevent::AllowAddressReuse() {
337  DCHECK(CalledOnValidThread());
338  DCHECK(!is_connected());
339
340  socket_options_ |= SOCKET_OPTION_REUSE_ADDRESS;
341}
342
343void UDPSocketLibevent::AllowBroadcast() {
344  DCHECK(CalledOnValidThread());
345  DCHECK(!is_connected());
346
347  socket_options_ |= SOCKET_OPTION_BROADCAST;
348}
349
350void UDPSocketLibevent::ReadWatcher::OnFileCanReadWithoutBlocking(int) {
351  if (!socket_->read_callback_.is_null())
352    socket_->DidCompleteRead();
353}
354
355void UDPSocketLibevent::WriteWatcher::OnFileCanWriteWithoutBlocking(int) {
356  if (!socket_->write_callback_.is_null())
357    socket_->DidCompleteWrite();
358}
359
360void UDPSocketLibevent::DoReadCallback(int rv) {
361  DCHECK_NE(rv, ERR_IO_PENDING);
362  DCHECK(!read_callback_.is_null());
363
364  // since Run may result in Read being called, clear read_callback_ up front.
365  CompletionCallback c = read_callback_;
366  read_callback_.Reset();
367  c.Run(rv);
368}
369
370void UDPSocketLibevent::DoWriteCallback(int rv) {
371  DCHECK_NE(rv, ERR_IO_PENDING);
372  DCHECK(!write_callback_.is_null());
373
374  // since Run may result in Write being called, clear write_callback_ up front.
375  CompletionCallback c = write_callback_;
376  write_callback_.Reset();
377  c.Run(rv);
378}
379
380void UDPSocketLibevent::DidCompleteRead() {
381  int result =
382      InternalRecvFrom(read_buf_.get(), read_buf_len_, recv_from_address_);
383  if (result != ERR_IO_PENDING) {
384    read_buf_ = NULL;
385    read_buf_len_ = 0;
386    recv_from_address_ = NULL;
387    bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
388    DCHECK(ok);
389    DoReadCallback(result);
390  }
391}
392
393void UDPSocketLibevent::LogRead(int result,
394                                const char* bytes,
395                                socklen_t addr_len,
396                                const sockaddr* addr) const {
397  if (result < 0) {
398    net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
399    return;
400  }
401
402  if (net_log_.IsLogging()) {
403    DCHECK(addr_len > 0);
404    DCHECK(addr);
405
406    IPEndPoint address;
407    bool is_address_valid = address.FromSockAddr(addr, addr_len);
408    net_log_.AddEvent(
409        NetLog::TYPE_UDP_BYTES_RECEIVED,
410        CreateNetLogUDPDataTranferCallback(
411            result, bytes,
412            is_address_valid ? &address : NULL));
413  }
414
415  base::StatsCounter read_bytes("udp.read_bytes");
416  read_bytes.Add(result);
417}
418
419int UDPSocketLibevent::CreateSocket(int addr_family) {
420  addr_family_ = addr_family;
421  socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, 0);
422  if (socket_ == kInvalidSocket)
423    return MapSystemError(errno);
424  if (SetNonBlocking(socket_)) {
425    const int err = MapSystemError(errno);
426    Close();
427    return err;
428  }
429  return OK;
430}
431
432void UDPSocketLibevent::DidCompleteWrite() {
433  int result =
434      InternalSendTo(write_buf_.get(), write_buf_len_, send_to_address_.get());
435
436  if (result != ERR_IO_PENDING) {
437    write_buf_ = NULL;
438    write_buf_len_ = 0;
439    send_to_address_.reset();
440    write_socket_watcher_.StopWatchingFileDescriptor();
441    DoWriteCallback(result);
442  }
443}
444
445void UDPSocketLibevent::LogWrite(int result,
446                                 const char* bytes,
447                                 const IPEndPoint* address) const {
448  if (result < 0) {
449    net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR, result);
450    return;
451  }
452
453  if (net_log_.IsLogging()) {
454    net_log_.AddEvent(
455        NetLog::TYPE_UDP_BYTES_SENT,
456        CreateNetLogUDPDataTranferCallback(result, bytes, address));
457  }
458
459  base::StatsCounter write_bytes("udp.write_bytes");
460  write_bytes.Add(result);
461}
462
463int UDPSocketLibevent::InternalRecvFrom(IOBuffer* buf, int buf_len,
464                                        IPEndPoint* address) {
465  int bytes_transferred;
466  int flags = 0;
467
468  SockaddrStorage storage;
469
470  bytes_transferred =
471      HANDLE_EINTR(recvfrom(socket_,
472                            buf->data(),
473                            buf_len,
474                            flags,
475                            storage.addr,
476                            &storage.addr_len));
477  int result;
478  if (bytes_transferred >= 0) {
479    result = bytes_transferred;
480    if (address && !address->FromSockAddr(storage.addr, storage.addr_len))
481      result = ERR_ADDRESS_INVALID;
482  } else {
483    result = MapSystemError(errno);
484  }
485  if (result != ERR_IO_PENDING)
486    LogRead(result, buf->data(), storage.addr_len, storage.addr);
487  return result;
488}
489
490int UDPSocketLibevent::InternalSendTo(IOBuffer* buf, int buf_len,
491                                      const IPEndPoint* address) {
492  SockaddrStorage storage;
493  struct sockaddr* addr = storage.addr;
494  if (!address) {
495    addr = NULL;
496    storage.addr_len = 0;
497  } else {
498    if (!address->ToSockAddr(storage.addr, &storage.addr_len)) {
499      int result = ERR_ADDRESS_INVALID;
500      LogWrite(result, NULL, NULL);
501      return result;
502    }
503  }
504
505  int result = HANDLE_EINTR(sendto(socket_,
506                            buf->data(),
507                            buf_len,
508                            0,
509                            addr,
510                            storage.addr_len));
511  if (result < 0)
512    result = MapSystemError(errno);
513  if (result != ERR_IO_PENDING)
514    LogWrite(result, buf->data(), address);
515  return result;
516}
517
518int UDPSocketLibevent::SetSocketOptions() {
519  int true_value = 1;
520  if (socket_options_ & SOCKET_OPTION_REUSE_ADDRESS) {
521    int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &true_value,
522                        sizeof(true_value));
523    if (rv < 0)
524      return MapSystemError(errno);
525  }
526  if (socket_options_ & SOCKET_OPTION_BROADCAST) {
527    int rv;
528#if defined(OS_MACOSX)
529    // SO_REUSEPORT on OSX permits multiple processes to each receive
530    // UDP multicast or broadcast datagrams destined for the bound
531    // port.
532    rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &true_value,
533                    sizeof(true_value));
534#else
535    rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &true_value,
536                    sizeof(true_value));
537#endif  // defined(OS_MACOSX)
538    if (rv < 0)
539      return MapSystemError(errno);
540  }
541
542  if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
543    int rv;
544    if (addr_family_ == AF_INET) {
545      u_char loop = 0;
546      rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_LOOP,
547                      &loop, sizeof(loop));
548    } else {
549      u_int loop = 0;
550      rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
551                      &loop, sizeof(loop));
552    }
553    if (rv < 0)
554      return MapSystemError(errno);
555  }
556  if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
557    int rv;
558    if (addr_family_ == AF_INET) {
559      u_char ttl = multicast_time_to_live_;
560      rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_TTL,
561                      &ttl, sizeof(ttl));
562    } else {
563      // Signed integer. -1 to use route default.
564      int ttl = multicast_time_to_live_;
565      rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
566                      &ttl, sizeof(ttl));
567    }
568    if (rv < 0)
569      return MapSystemError(errno);
570  }
571  if (multicast_interface_ != 0) {
572    switch (addr_family_) {
573      case AF_INET: {
574#if !defined(OS_MACOSX)
575        ip_mreqn mreq;
576        mreq.imr_ifindex = multicast_interface_;
577        mreq.imr_address.s_addr = htonl(INADDR_ANY);
578#else
579        ip_mreq mreq;
580        int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
581                                            &mreq.imr_interface.s_addr);
582        if (error != OK)
583          return error;
584#endif
585        int rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_IF,
586                            reinterpret_cast<const char*>(&mreq), sizeof(mreq));
587        if (rv)
588          return MapSystemError(errno);
589        break;
590      }
591      case AF_INET6: {
592        uint32 interface_index = multicast_interface_;
593        int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
594                            reinterpret_cast<const char*>(&interface_index),
595                            sizeof(interface_index));
596        if (rv)
597          return MapSystemError(errno);
598        break;
599      }
600      default:
601        NOTREACHED() << "Invalid address family";
602        return ERR_ADDRESS_INVALID;
603    }
604  }
605  return OK;
606}
607
608int UDPSocketLibevent::DoBind(const IPEndPoint& address) {
609  SockaddrStorage storage;
610  if (!address.ToSockAddr(storage.addr, &storage.addr_len))
611    return ERR_ADDRESS_INVALID;
612  int rv = bind(socket_, storage.addr, storage.addr_len);
613  if (rv == 0)
614    return OK;
615  int last_error = errno;
616  UMA_HISTOGRAM_SPARSE_SLOWLY("Net.UdpSocketBindErrorFromPosix", last_error);
617#if defined(OS_CHROMEOS)
618  if (last_error == EINVAL)
619    return ERR_ADDRESS_IN_USE;
620#elif defined(OS_MACOSX)
621  if (last_error == EADDRNOTAVAIL)
622    return ERR_ADDRESS_IN_USE;
623#endif
624  return MapSystemError(last_error);
625}
626
627int UDPSocketLibevent::RandomBind(const IPAddressNumber& address) {
628  DCHECK(bind_type_ == DatagramSocket::RANDOM_BIND && !rand_int_cb_.is_null());
629
630  for (int i = 0; i < kBindRetries; ++i) {
631    int rv = DoBind(IPEndPoint(address,
632                               rand_int_cb_.Run(kPortStart, kPortEnd)));
633    if (rv == OK || rv != ERR_ADDRESS_IN_USE)
634      return rv;
635  }
636  return DoBind(IPEndPoint(address, 0));
637}
638
639int UDPSocketLibevent::JoinGroup(const IPAddressNumber& group_address) const {
640  DCHECK(CalledOnValidThread());
641  if (!is_connected())
642    return ERR_SOCKET_NOT_CONNECTED;
643
644  switch (group_address.size()) {
645    case kIPv4AddressSize: {
646      if (addr_family_ != AF_INET)
647        return ERR_ADDRESS_INVALID;
648
649#if !defined(OS_MACOSX)
650      ip_mreqn mreq;
651      mreq.imr_ifindex = multicast_interface_;
652      mreq.imr_address.s_addr = htonl(INADDR_ANY);
653#else
654      ip_mreq mreq;
655      int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
656                                            &mreq.imr_interface.s_addr);
657      if (error != OK)
658        return error;
659#endif
660      memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize);
661      int rv = setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
662                          &mreq, sizeof(mreq));
663      if (rv < 0)
664        return MapSystemError(errno);
665      return OK;
666    }
667    case kIPv6AddressSize: {
668      if (addr_family_ != AF_INET6)
669        return ERR_ADDRESS_INVALID;
670      ipv6_mreq mreq;
671      mreq.ipv6mr_interface = multicast_interface_;
672      memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize);
673      int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP,
674                          &mreq, sizeof(mreq));
675      if (rv < 0)
676        return MapSystemError(errno);
677      return OK;
678    }
679    default:
680      NOTREACHED() << "Invalid address family";
681      return ERR_ADDRESS_INVALID;
682  }
683}
684
685int UDPSocketLibevent::LeaveGroup(const IPAddressNumber& group_address) const {
686  DCHECK(CalledOnValidThread());
687
688  if (!is_connected())
689    return ERR_SOCKET_NOT_CONNECTED;
690
691  switch (group_address.size()) {
692    case kIPv4AddressSize: {
693      if (addr_family_ != AF_INET)
694        return ERR_ADDRESS_INVALID;
695      ip_mreq mreq;
696      mreq.imr_interface.s_addr = INADDR_ANY;
697      memcpy(&mreq.imr_multiaddr, &group_address[0], kIPv4AddressSize);
698      int rv = setsockopt(socket_, IPPROTO_IP, IP_DROP_MEMBERSHIP,
699                          &mreq, sizeof(mreq));
700      if (rv < 0)
701        return MapSystemError(errno);
702      return OK;
703    }
704    case kIPv6AddressSize: {
705      if (addr_family_ != AF_INET6)
706        return ERR_ADDRESS_INVALID;
707      ipv6_mreq mreq;
708      mreq.ipv6mr_interface = 0;  // 0 indicates default multicast interface.
709      memcpy(&mreq.ipv6mr_multiaddr, &group_address[0], kIPv6AddressSize);
710      int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
711                          &mreq, sizeof(mreq));
712      if (rv < 0)
713        return MapSystemError(errno);
714      return OK;
715    }
716    default:
717      NOTREACHED() << "Invalid address family";
718      return ERR_ADDRESS_INVALID;
719  }
720}
721
722int UDPSocketLibevent::SetMulticastInterface(uint32 interface_index) {
723  DCHECK(CalledOnValidThread());
724  if (is_connected())
725    return ERR_SOCKET_IS_CONNECTED;
726  multicast_interface_ = interface_index;
727  return OK;
728}
729
730int UDPSocketLibevent::SetMulticastTimeToLive(int time_to_live) {
731  DCHECK(CalledOnValidThread());
732  if (is_connected())
733    return ERR_SOCKET_IS_CONNECTED;
734
735  if (time_to_live < 0 || time_to_live > 255)
736    return ERR_INVALID_ARGUMENT;
737  multicast_time_to_live_ = time_to_live;
738  return OK;
739}
740
741int UDPSocketLibevent::SetMulticastLoopbackMode(bool loopback) {
742  DCHECK(CalledOnValidThread());
743  if (is_connected())
744    return ERR_SOCKET_IS_CONNECTED;
745
746  if (loopback)
747    socket_options_ |= SOCKET_OPTION_MULTICAST_LOOP;
748  else
749    socket_options_ &= ~SOCKET_OPTION_MULTICAST_LOOP;
750  return OK;
751}
752
753int UDPSocketLibevent::SetDiffServCodePoint(DiffServCodePoint dscp) {
754  if (dscp == DSCP_NO_CHANGE) {
755    return OK;
756  }
757  int rv;
758  int dscp_and_ecn = dscp << 2;
759  if (addr_family_ == AF_INET) {
760    rv = setsockopt(socket_, IPPROTO_IP, IP_TOS,
761                    &dscp_and_ecn, sizeof(dscp_and_ecn));
762  } else {
763    rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_TCLASS,
764                    &dscp_and_ecn, sizeof(dscp_and_ecn));
765  }
766  if (rv < 0)
767    return MapSystemError(errno);
768
769  return OK;
770}
771
772void UDPSocketLibevent::DetachFromThread() {
773  base::NonThreadSafe::DetachFromThread();
774}
775
776}  // namespace net
777