1#!/usr/bin/python
2#
3# Copyright 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import errno
18import random
19from socket import *  # pylint: disable=wildcard-import
20import time
21import unittest
22
23from scapy import all as scapy
24
25import csocket
26import iproute
27import multinetwork_base
28import packets
29import net_test
30
31# Setsockopt values.
32IPV6_ADDR_PREFERENCES = 72
33IPV6_PREFER_SRC_PUBLIC = 0x0002
34
35
36class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
37  """Test for IPv6 source address selection.
38
39  Relevant kernel commits:
40    upstream net-next:
41      7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
42      c58da4c net: ipv6: allow explicitly choosing optimistic addresses
43      9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface.
44      c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr().
45      c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr().
46      3985e8a ipv6: sysctl to restrict candidate source addresses
47
48    android-3.10:
49      2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
50      0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic
51      0633924 ipv6: sysctl to restrict candidate source addresses
52  """
53
54  def SetIPv6Sysctl(self, ifname, sysctl, value):
55    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value)
56
57  def SetDAD(self, ifname, value):
58    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
59    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)
60
61  def SetOptimisticDAD(self, ifname, value):
62    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)
63
64  def SetUseTempaddrs(self, ifname, value):
65    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)
66
67  def SetUseOptimistic(self, ifname, value):
68    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)
69
70  def GetSourceIP(self, netid, mode="mark"):
71    s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
72    # Because why not...testing for temporary addresses is a separate thing.
73    s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)
74
75    s.connect((net_test.IPV6_ADDR, 123))
76    src_addr = s.getsockname()[0]
77    self.assertTrue(src_addr)
78    return src_addr
79
80  def assertAddressNotPresent(self, address):
81    self.assertRaises(IOError, self.iproute.GetAddress, address)
82
83  def assertAddressHasExpectedAttributes(
84      self, address, expected_ifindex, expected_flags):
85    ifa_msg = self.iproute.GetAddress(address)[0]
86    self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
87    self.assertEquals(64, ifa_msg.prefixlen)
88    self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
89    self.assertEquals(expected_ifindex, ifa_msg.index)
90    self.assertEquals(expected_flags, ifa_msg.flags & expected_flags)
91
92  def AddressIsTentative(self, address):
93    ifa_msg = self.iproute.GetAddress(address)[0]
94    return ifa_msg.flags & iproute.IFA_F_TENTATIVE
95
96  def BindToAddress(self, address):
97    s = net_test.UDPSocket(AF_INET6)
98    s.bind((address, 0, 0, 0))
99
100  def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
101    pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
102    cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
103    s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
104    return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)
105
106  def assertAddressUsable(self, address, netid):
107    self.BindToAddress(address)
108    self.SendWithSourceAddress(address, netid)
109    # No exceptions? Good.
110
111  def assertAddressNotUsable(self, address, netid):
112    self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
113    self.assertRaisesErrno(errno.EINVAL,
114                           self.SendWithSourceAddress, address, netid)
115
116  def assertAddressSelected(self, address, netid):
117    self.assertEquals(address, self.GetSourceIP(netid))
118
119  def assertAddressNotSelected(self, address, netid):
120    self.assertNotEquals(address, self.GetSourceIP(netid))
121
122  def WaitForDad(self, address):
123    for _ in xrange(20):
124      if not self.AddressIsTentative(address):
125        return
126      time.sleep(0.1)
127    raise AssertionError("%s did not complete DAD after 2 seconds")
128
129
130class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
131
132  def setUp(self):
133    # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
134    # are all consistently disabled at the outset.
135    for netid in self.tuns:
136      ifname = self.GetInterfaceName(netid)
137      self.SetDAD(ifname, 0)
138      self.SetOptimisticDAD(ifname, 0)
139      self.SetUseTempaddrs(ifname, 0)
140      self.SetUseOptimistic(ifname, 0)
141      self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0)
142
143    # [1]  Pick an interface on which to test.
144    self.test_netid = random.choice(self.tuns.keys())
145    self.test_ip = self.MyAddress(6, self.test_netid)
146    self.test_ifindex = self.ifindices[self.test_netid]
147    self.test_ifname = self.GetInterfaceName(self.test_netid)
148    self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True)
149
150    # [2]  Delete the test interface's IPv6 address.
151    self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
152    self.assertAddressNotPresent(self.test_ip)
153
154    self.assertAddressNotUsable(self.test_ip, self.test_netid)
155    # Verify that the link-local address is not tentative.
156    self.assertFalse(self.AddressIsTentative(self.test_lladdr))
157
158
159class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):
160
161  def testRfc6724Behaviour(self):
162    # [3]  Get an IPv6 address back, in DAD start-up.
163    self.SetDAD(self.test_ifname, 1)  # Enable DAD
164    # Send a RA to start SLAAC and subsequent DAD.
165    self.SendRA(self.test_netid, 0)
166    # Get flags and prove tentative-ness.
167    self.assertAddressHasExpectedAttributes(
168        self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)
169
170    # Even though the interface has an IPv6 address, its tentative nature
171    # prevents it from being selected.
172    self.assertAddressNotUsable(self.test_ip, self.test_netid)
173    self.assertAddressNotSelected(self.test_ip, self.test_netid)
174
175    # Busy wait for DAD to complete (should be less than 1 second).
176    self.WaitForDad(self.test_ip)
177
178    # The test_ip should have completed DAD by now, and should be the
179    # chosen source address, eligible to bind to, etc.
180    self.assertAddressUsable(self.test_ip, self.test_netid)
181    self.assertAddressSelected(self.test_ip, self.test_netid)
182
183
184class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):
185
186  def testRfc6724Behaviour(self):
187    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
188    self.SetDAD(self.test_ifname, 1)  # Enable DAD
189    self.SetOptimisticDAD(self.test_ifname, 1)
190    # Send a RA to start SLAAC and subsequent DAD.
191    self.SendRA(self.test_netid, 0)
192    # Get flags and prove optimism.
193    self.assertAddressHasExpectedAttributes(
194        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
195
196    # Optimistic addresses are usable but are not selected.
197    if net_test.LinuxVersion() >= (3, 18, 0):
198      # The version checked in to android kernels <= 3.10 requires the
199      # use_optimistic sysctl to be turned on.
200      self.assertAddressUsable(self.test_ip, self.test_netid)
201    self.assertAddressNotSelected(self.test_ip, self.test_netid)
202
203    # Busy wait for DAD to complete (should be less than 1 second).
204    self.WaitForDad(self.test_ip)
205
206    # The test_ip should have completed DAD by now, and should be the
207    # chosen source address.
208    self.assertAddressUsable(self.test_ip, self.test_netid)
209    self.assertAddressSelected(self.test_ip, self.test_netid)
210
211
212class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):
213
214  def testModifiedRfc6724Behaviour(self):
215    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
216    self.SetDAD(self.test_ifname, 1)  # Enable DAD
217    self.SetOptimisticDAD(self.test_ifname, 1)
218    self.SetUseOptimistic(self.test_ifname, 1)
219    # Send a RA to start SLAAC and subsequent DAD.
220    self.SendRA(self.test_netid, 0)
221    # Get flags and prove optimistism.
222    self.assertAddressHasExpectedAttributes(
223        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
224
225    # The interface has an IPv6 address and, despite its optimistic nature,
226    # the use_optimistic option allows it to be selected.
227    self.assertAddressUsable(self.test_ip, self.test_netid)
228    self.assertAddressSelected(self.test_ip, self.test_netid)
229
230
231class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
232
233  def testModifiedRfc6724Behaviour(self):
234    # [3]  Add a valid IPv6 address to this interface and verify it is
235    # selected as the source address.
236    preferred_ip = self.IPv6Prefix(self.test_netid) + "cafe"
237    self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
238    self.assertAddressHasExpectedAttributes(
239        preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
240    self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid))
241
242    # [4]  Get another IPv6 address, in optimistic DAD start-up.
243    self.SetDAD(self.test_ifname, 1)  # Enable DAD
244    self.SetOptimisticDAD(self.test_ifname, 1)
245    self.SetUseOptimistic(self.test_ifname, 1)
246    # Send a RA to start SLAAC and subsequent DAD.
247    self.SendRA(self.test_netid, 0)
248    # Get flags and prove optimism.
249    self.assertAddressHasExpectedAttributes(
250        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
251
252    # Since the interface has another IPv6 address, the optimistic address
253    # is not selected--the other, valid address is chosen.
254    self.assertAddressUsable(self.test_ip, self.test_netid)
255    self.assertAddressNotSelected(self.test_ip, self.test_netid)
256    self.assertAddressSelected(preferred_ip, self.test_netid)
257
258
259class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):
260
261  def testDadFailure(self):
262    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
263    self.SetDAD(self.test_ifname, 1)  # Enable DAD
264    self.SetOptimisticDAD(self.test_ifname, 1)
265    self.SetUseOptimistic(self.test_ifname, 1)
266    # Send a RA to start SLAAC and subsequent DAD.
267    self.SendRA(self.test_netid, 0)
268    # Prove optimism and usability.
269    self.assertAddressHasExpectedAttributes(
270        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
271    self.assertAddressUsable(self.test_ip, self.test_netid)
272    self.assertAddressSelected(self.test_ip, self.test_netid)
273
274    # Send a NA for the optimistic address, indicating address conflict
275    # ("DAD defense").
276    conflict_macaddr = "02:00:0b:ad:d0:0d"
277    dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
278                   scapy.IPv6(src=self.test_ip, dst="ff02::1") /
279                   scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
280                   scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
281    self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
282
283    # The address should have failed DAD, and therefore no longer be usable.
284    self.assertAddressNotUsable(self.test_ip, self.test_netid)
285    self.assertAddressNotSelected(self.test_ip, self.test_netid)
286
287    # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.
288
289
290class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
291
292  def testSendToOnlinkDestination(self):
293    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
294    self.SetDAD(self.test_ifname, 1)  # Enable DAD
295    self.SetOptimisticDAD(self.test_ifname, 1)
296    self.SetUseOptimistic(self.test_ifname, 1)
297    # Send a RA to start SLAAC and subsequent DAD.
298    self.SendRA(self.test_netid, 0)
299    # Prove optimism and usability.
300    self.assertAddressHasExpectedAttributes(
301        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
302    self.assertAddressUsable(self.test_ip, self.test_netid)
303    self.assertAddressSelected(self.test_ip, self.test_netid)
304
305    # [4]  Send to an on-link destination and observe a Neighbor Solicitation
306    # packet with a source address that is NOT the optimistic address.
307    # In this setup, the only usable address is the link-local address.
308    onlink_dest = self.GetRandomDestination(self.IPv6Prefix(self.test_netid))
309    self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)
310
311    if net_test.LinuxVersion() >= (3, 18, 0):
312      # Older versions will actually choose the optimistic address to
313      # originate Neighbor Solications (RFC violation).
314      expected_ns = packets.NS(
315          self.test_lladdr,
316          onlink_dest,
317          self.MyMacAddress(self.test_netid))[1]
318      self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)
319
320
321# TODO(ek): add tests listening for netlink events.
322
323
324class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
325
326  def testChoosesNonInterfaceSourceAddress(self):
327    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0)
328    src_ip = self.GetSourceIP(self.test_netid)
329    self.assertFalse(src_ip in [self.test_ip, self.test_lladdr])
330    self.assertTrue(src_ip in
331                    [self.MyAddress(6, netid)
332                     for netid in self.tuns if netid != self.test_netid])
333
334
335class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
336
337  def testChoosesOnlyInterfaceSourceAddress(self):
338    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1)
339    # self.test_ifname does not have a global IPv6 address, so the only
340    # candidate is the existing link-local address.
341    self.assertAddressSelected(self.test_lladdr, self.test_netid)
342
343
344if __name__ == "__main__":
345  unittest.main()
346