1#!/usr/bin/python 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import errno 18import random 19from socket import * # pylint: disable=wildcard-import 20import time 21import unittest 22 23from scapy import all as scapy 24 25import csocket 26import iproute 27import multinetwork_base 28import packets 29import net_test 30 31# Setsockopt values. 32IPV6_ADDR_PREFERENCES = 72 33IPV6_PREFER_SRC_PUBLIC = 0x0002 34 35 36class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest): 37 """Test for IPv6 source address selection. 38 39 Relevant kernel commits: 40 upstream net-next: 41 7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 42 c58da4c net: ipv6: allow explicitly choosing optimistic addresses 43 9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface. 44 c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr(). 45 c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr(). 46 3985e8a ipv6: sysctl to restrict candidate source addresses 47 48 android-3.10: 49 2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 50 0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic 51 0633924 ipv6: sysctl to restrict candidate source addresses 52 """ 53 54 def SetIPv6Sysctl(self, ifname, sysctl, value): 55 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value) 56 57 def SetDAD(self, ifname, value): 58 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value) 59 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value) 60 61 def SetOptimisticDAD(self, ifname, value): 62 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value) 63 64 def SetUseTempaddrs(self, ifname, value): 65 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value) 66 67 def SetUseOptimistic(self, ifname, value): 68 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value) 69 70 def GetSourceIP(self, netid, mode="mark"): 71 s = self.BuildSocket(6, net_test.UDPSocket, netid, mode) 72 # Because why not...testing for temporary addresses is a separate thing. 73 s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC) 74 75 s.connect((net_test.IPV6_ADDR, 123)) 76 src_addr = s.getsockname()[0] 77 self.assertTrue(src_addr) 78 return src_addr 79 80 def assertAddressNotPresent(self, address): 81 self.assertRaises(IOError, self.iproute.GetAddress, address) 82 83 def assertAddressHasExpectedAttributes( 84 self, address, expected_ifindex, expected_flags): 85 ifa_msg = self.iproute.GetAddress(address)[0] 86 self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family) 87 self.assertEquals(64, ifa_msg.prefixlen) 88 self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope) 89 self.assertEquals(expected_ifindex, ifa_msg.index) 90 self.assertEquals(expected_flags, ifa_msg.flags & expected_flags) 91 92 def AddressIsTentative(self, address): 93 ifa_msg = self.iproute.GetAddress(address)[0] 94 return ifa_msg.flags & iproute.IFA_F_TENTATIVE 95 96 def BindToAddress(self, address): 97 s = net_test.UDPSocket(AF_INET6) 98 s.bind((address, 0, 0, 0)) 99 100 def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR): 101 pktinfo = multinetwork_base.MakePktInfo(6, address, 0) 102 cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)] 103 s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark") 104 return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0) 105 106 def assertAddressUsable(self, address, netid): 107 self.BindToAddress(address) 108 self.SendWithSourceAddress(address, netid) 109 # No exceptions? Good. 110 111 def assertAddressNotUsable(self, address, netid): 112 self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address) 113 self.assertRaisesErrno(errno.EINVAL, 114 self.SendWithSourceAddress, address, netid) 115 116 def assertAddressSelected(self, address, netid): 117 self.assertEquals(address, self.GetSourceIP(netid)) 118 119 def assertAddressNotSelected(self, address, netid): 120 self.assertNotEquals(address, self.GetSourceIP(netid)) 121 122 def WaitForDad(self, address): 123 for _ in xrange(20): 124 if not self.AddressIsTentative(address): 125 return 126 time.sleep(0.1) 127 raise AssertionError("%s did not complete DAD after 2 seconds") 128 129 130class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest): 131 132 def setUp(self): 133 # [0] Make sure DAD, optimistic DAD, and the use_optimistic option 134 # are all consistently disabled at the outset. 135 for netid in self.tuns: 136 ifname = self.GetInterfaceName(netid) 137 self.SetDAD(ifname, 0) 138 self.SetOptimisticDAD(ifname, 0) 139 self.SetUseTempaddrs(ifname, 0) 140 self.SetUseOptimistic(ifname, 0) 141 self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0) 142 143 # [1] Pick an interface on which to test. 144 self.test_netid = random.choice(self.tuns.keys()) 145 self.test_ip = self.MyAddress(6, self.test_netid) 146 self.test_ifindex = self.ifindices[self.test_netid] 147 self.test_ifname = self.GetInterfaceName(self.test_netid) 148 self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True) 149 150 # [2] Delete the test interface's IPv6 address. 151 self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex) 152 self.assertAddressNotPresent(self.test_ip) 153 154 self.assertAddressNotUsable(self.test_ip, self.test_netid) 155 # Verify that the link-local address is not tentative. 156 self.assertFalse(self.AddressIsTentative(self.test_lladdr)) 157 158 159class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest): 160 161 def testRfc6724Behaviour(self): 162 # [3] Get an IPv6 address back, in DAD start-up. 163 self.SetDAD(self.test_ifname, 1) # Enable DAD 164 # Send a RA to start SLAAC and subsequent DAD. 165 self.SendRA(self.test_netid, 0) 166 # Get flags and prove tentative-ness. 167 self.assertAddressHasExpectedAttributes( 168 self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE) 169 170 # Even though the interface has an IPv6 address, its tentative nature 171 # prevents it from being selected. 172 self.assertAddressNotUsable(self.test_ip, self.test_netid) 173 self.assertAddressNotSelected(self.test_ip, self.test_netid) 174 175 # Busy wait for DAD to complete (should be less than 1 second). 176 self.WaitForDad(self.test_ip) 177 178 # The test_ip should have completed DAD by now, and should be the 179 # chosen source address, eligible to bind to, etc. 180 self.assertAddressUsable(self.test_ip, self.test_netid) 181 self.assertAddressSelected(self.test_ip, self.test_netid) 182 183 184class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest): 185 186 def testRfc6724Behaviour(self): 187 # [3] Get an IPv6 address back, in optimistic DAD start-up. 188 self.SetDAD(self.test_ifname, 1) # Enable DAD 189 self.SetOptimisticDAD(self.test_ifname, 1) 190 # Send a RA to start SLAAC and subsequent DAD. 191 self.SendRA(self.test_netid, 0) 192 # Get flags and prove optimism. 193 self.assertAddressHasExpectedAttributes( 194 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 195 196 # Optimistic addresses are usable but are not selected. 197 if net_test.LinuxVersion() >= (3, 18, 0): 198 # The version checked in to android kernels <= 3.10 requires the 199 # use_optimistic sysctl to be turned on. 200 self.assertAddressUsable(self.test_ip, self.test_netid) 201 self.assertAddressNotSelected(self.test_ip, self.test_netid) 202 203 # Busy wait for DAD to complete (should be less than 1 second). 204 self.WaitForDad(self.test_ip) 205 206 # The test_ip should have completed DAD by now, and should be the 207 # chosen source address. 208 self.assertAddressUsable(self.test_ip, self.test_netid) 209 self.assertAddressSelected(self.test_ip, self.test_netid) 210 211 212class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest): 213 214 def testModifiedRfc6724Behaviour(self): 215 # [3] Get an IPv6 address back, in optimistic DAD start-up. 216 self.SetDAD(self.test_ifname, 1) # Enable DAD 217 self.SetOptimisticDAD(self.test_ifname, 1) 218 self.SetUseOptimistic(self.test_ifname, 1) 219 # Send a RA to start SLAAC and subsequent DAD. 220 self.SendRA(self.test_netid, 0) 221 # Get flags and prove optimistism. 222 self.assertAddressHasExpectedAttributes( 223 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 224 225 # The interface has an IPv6 address and, despite its optimistic nature, 226 # the use_optimistic option allows it to be selected. 227 self.assertAddressUsable(self.test_ip, self.test_netid) 228 self.assertAddressSelected(self.test_ip, self.test_netid) 229 230 231class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest): 232 233 def testModifiedRfc6724Behaviour(self): 234 # [3] Add a valid IPv6 address to this interface and verify it is 235 # selected as the source address. 236 preferred_ip = self.IPv6Prefix(self.test_netid) + "cafe" 237 self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex) 238 self.assertAddressHasExpectedAttributes( 239 preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT) 240 self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid)) 241 242 # [4] Get another IPv6 address, in optimistic DAD start-up. 243 self.SetDAD(self.test_ifname, 1) # Enable DAD 244 self.SetOptimisticDAD(self.test_ifname, 1) 245 self.SetUseOptimistic(self.test_ifname, 1) 246 # Send a RA to start SLAAC and subsequent DAD. 247 self.SendRA(self.test_netid, 0) 248 # Get flags and prove optimism. 249 self.assertAddressHasExpectedAttributes( 250 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 251 252 # Since the interface has another IPv6 address, the optimistic address 253 # is not selected--the other, valid address is chosen. 254 self.assertAddressUsable(self.test_ip, self.test_netid) 255 self.assertAddressNotSelected(self.test_ip, self.test_netid) 256 self.assertAddressSelected(preferred_ip, self.test_netid) 257 258 259class DadFailureTest(MultiInterfaceSourceAddressSelectionTest): 260 261 def testDadFailure(self): 262 # [3] Get an IPv6 address back, in optimistic DAD start-up. 263 self.SetDAD(self.test_ifname, 1) # Enable DAD 264 self.SetOptimisticDAD(self.test_ifname, 1) 265 self.SetUseOptimistic(self.test_ifname, 1) 266 # Send a RA to start SLAAC and subsequent DAD. 267 self.SendRA(self.test_netid, 0) 268 # Prove optimism and usability. 269 self.assertAddressHasExpectedAttributes( 270 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 271 self.assertAddressUsable(self.test_ip, self.test_netid) 272 self.assertAddressSelected(self.test_ip, self.test_netid) 273 274 # Send a NA for the optimistic address, indicating address conflict 275 # ("DAD defense"). 276 conflict_macaddr = "02:00:0b:ad:d0:0d" 277 dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") / 278 scapy.IPv6(src=self.test_ip, dst="ff02::1") / 279 scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) / 280 scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr)) 281 self.ReceiveEtherPacketOn(self.test_netid, dad_defense) 282 283 # The address should have failed DAD, and therefore no longer be usable. 284 self.assertAddressNotUsable(self.test_ip, self.test_netid) 285 self.assertAddressNotSelected(self.test_ip, self.test_netid) 286 287 # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address. 288 289 290class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest): 291 292 def testSendToOnlinkDestination(self): 293 # [3] Get an IPv6 address back, in optimistic DAD start-up. 294 self.SetDAD(self.test_ifname, 1) # Enable DAD 295 self.SetOptimisticDAD(self.test_ifname, 1) 296 self.SetUseOptimistic(self.test_ifname, 1) 297 # Send a RA to start SLAAC and subsequent DAD. 298 self.SendRA(self.test_netid, 0) 299 # Prove optimism and usability. 300 self.assertAddressHasExpectedAttributes( 301 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 302 self.assertAddressUsable(self.test_ip, self.test_netid) 303 self.assertAddressSelected(self.test_ip, self.test_netid) 304 305 # [4] Send to an on-link destination and observe a Neighbor Solicitation 306 # packet with a source address that is NOT the optimistic address. 307 # In this setup, the only usable address is the link-local address. 308 onlink_dest = self.GetRandomDestination(self.IPv6Prefix(self.test_netid)) 309 self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest) 310 311 if net_test.LinuxVersion() >= (3, 18, 0): 312 # Older versions will actually choose the optimistic address to 313 # originate Neighbor Solications (RFC violation). 314 expected_ns = packets.NS( 315 self.test_lladdr, 316 onlink_dest, 317 self.MyMacAddress(self.test_netid))[1] 318 self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns) 319 320 321# TODO(ek): add tests listening for netlink events. 322 323 324class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): 325 326 def testChoosesNonInterfaceSourceAddress(self): 327 self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0) 328 src_ip = self.GetSourceIP(self.test_netid) 329 self.assertFalse(src_ip in [self.test_ip, self.test_lladdr]) 330 self.assertTrue(src_ip in 331 [self.MyAddress(6, netid) 332 for netid in self.tuns if netid != self.test_netid]) 333 334 335class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): 336 337 def testChoosesOnlyInterfaceSourceAddress(self): 338 self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1) 339 # self.test_ifname does not have a global IPv6 address, so the only 340 # candidate is the existing link-local address. 341 self.assertAddressSelected(self.test_lladdr, self.test_netid) 342 343 344if __name__ == "__main__": 345 unittest.main() 346