1#!/usr/bin/python 2 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import logging 8import socket 9import sys 10import time 11 12import common 13 14from autotest_lib.client.cros import dhcp_handling_rule 15from autotest_lib.client.cros import dhcp_packet 16from autotest_lib.client.cros import dhcp_test_server 17 18TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/" 19 20TEST_CLASSLESS_STATIC_ROUTE_DATA = \ 21 "\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \ 22 "\x00\xc0\xa8\x00\xfe" 23 24TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [ 25 (18, "10.9.192.0", "172.31.155.10"), 26 (0, "0.0.0.0", "192.168.0.254") 27 ] 28 29TEST_DOMAIN_SEARCH_LIST_COMPRESSED = \ 30 "\x03eng\x06google\x03com\x00\x09marketing\xC0\x04" 31 32TEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com") 33 34# At this time, we don't support the compression allowed in the RFC. 35# This is correct and sufficient for our purposes. 36TEST_DOMAIN_SEARCH_LIST_EXPECTED = \ 37 "\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00" 38 39def bin2hex(byte_str, justification=20): 40 """ 41 Turn big hex strings into prettier strings of hex bytes. Group those hex 42 bytes into lines justification bytes long. 43 """ 44 chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str] 45 groups = [] 46 for i in xrange(0, len(chars), justification): 47 groups.append("".join(chars[i:i+justification])) 48 return "\n".join(groups) 49 50def test_packet_serialization(): 51 log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb") 52 binary_discovery_packet = log_file.read() 53 log_file.close() 54 discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet) 55 if not discovery_packet.is_valid: 56 return False 57 generated_string = discovery_packet.to_binary_string() 58 if generated_string is None: 59 print "Failed to generate string from packet object." 60 return False 61 if generated_string != binary_discovery_packet: 62 print "Packets didn't match: " 63 print "Generated: \n%s" % bin2hex(generated_string) 64 print "Expected: \n%s" % bin2hex(binary_discovery_packet) 65 return False 66 print "test_packet_serialization PASSED" 67 return True 68 69def test_classless_static_route_parsing(): 70 parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack( 71 TEST_CLASSLESS_STATIC_ROUTE_DATA) 72 if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED: 73 print ("Parsed binary domain list and got %s but expected %s" % 74 (repr(parsed_routes), 75 repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED))) 76 return False 77 print "test_classless_static_route_parsing PASSED" 78 return True 79 80def test_classless_static_route_serialization(): 81 byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack( 82 TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED) 83 if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA: 84 # Turn the strings into printable hex strings on a single line. 85 pretty_actual = bin2hex(byte_string, 100) 86 pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100) 87 print ("Expected to serialize %s to %s but instead got %s." % 88 (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected, 89 pretty_actual)) 90 return False 91 print "test_classless_static_route_serialization PASSED" 92 return True 93 94def test_domain_search_list_parsing(): 95 parsed_domains = dhcp_packet.DomainListOption.unpack( 96 TEST_DOMAIN_SEARCH_LIST_COMPRESSED) 97 # Order matters too. 98 parsed_domains = tuple(parsed_domains) 99 if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED: 100 print ("Parsed binary domain list and got %s but expected %s" % 101 (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED)) 102 return False 103 print "test_domain_search_list_parsing PASSED" 104 return True 105 106def test_domain_search_list_serialization(): 107 byte_string = dhcp_packet.DomainListOption.pack( 108 TEST_DOMAIN_SEARCH_LIST_PARSED) 109 if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED: 110 # Turn the strings into printable hex strings on a single line. 111 pretty_actual = bin2hex(byte_string, 100) 112 pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100) 113 print ("Expected to serialize %s to %s but instead got %s." % 114 (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual)) 115 return False 116 print "test_domain_search_list_serialization PASSED" 117 return True 118 119def receive_packet(a_socket, timeout_seconds=1.0): 120 data = None 121 start_time = time.time() 122 while data is None and start_time + timeout_seconds > time.time(): 123 try: 124 data, _ = a_socket.recvfrom(1024) 125 except socket.timeout: 126 pass # We expect many timeouts. 127 if data is None: 128 print "Timed out before we received a response from the server." 129 return None 130 131 print "Client received a packet of length %d from the server." % len(data) 132 packet = dhcp_packet.DhcpPacket(byte_str=data) 133 if not packet.is_valid: 134 print "Received an invalid response from DHCP server." 135 return None 136 137 return packet 138 139def test_simple_server_exchange(server): 140 intended_ip = "127.0.0.42" 141 subnet_mask = "255.255.255.0" 142 server_ip = "127.0.0.1" 143 lease_time_seconds = 60 144 test_timeout = 3.0 145 mac_addr = "\x01\x02\x03\x04\x05\x06" 146 # Build up our packets and have them request some default option values, 147 # like the IP we're being assigned and the address of the server assigning 148 # it. 149 discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr) 150 discovery_message.set_option( 151 dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, 152 dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) 153 request_message = dhcp_packet.DhcpPacket.create_request_packet( 154 discovery_message.transaction_id, 155 mac_addr) 156 request_message.set_option( 157 dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, 158 dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) 159 # This is the pool of settings the DHCP server will seem to draw from to 160 # answer queries from the client. This information is written into packets 161 # through the handling rules. 162 dhcp_server_config = { 163 dhcp_packet.OPTION_SERVER_ID : server_ip, 164 dhcp_packet.OPTION_SUBNET_MASK : subnet_mask, 165 dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds, 166 dhcp_packet.OPTION_REQUESTED_IP : intended_ip, 167 } 168 # Build up the handling rules for the server and start the test. 169 rules = [] 170 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 171 intended_ip, 172 server_ip, 173 dhcp_server_config)) 174 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 175 intended_ip, 176 server_ip, 177 dhcp_server_config)) 178 rules[-1].is_final_handler = True 179 server.start_test(rules, test_timeout) 180 # Because we don't want to require root permissions to run these tests, 181 # listen on the loopback device, don't broadcast, and don't use reserved 182 # ports (like the actual DHCP ports). Use 8068/8067 instead. 183 client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 184 client_socket.bind(("127.0.0.1", 8068)) 185 client_socket.settimeout(0.1) 186 client_socket.sendto(discovery_message.to_binary_string(), 187 (server_ip, 8067)) 188 189 offer_packet = receive_packet(client_socket) 190 if offer_packet is None: 191 return False 192 193 if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER): 194 print "Type of DHCP response is not offer." 195 return False 196 197 if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: 198 print "Server didn't offer the IP we expected." 199 return False 200 201 print "Offer looks good to the client, sending request." 202 # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages. In 203 # our unit test, we have to do this ourselves. 204 request_message.set_option( 205 dhcp_packet.OPTION_SERVER_ID, 206 offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID)) 207 request_message.set_option( 208 dhcp_packet.OPTION_SUBNET_MASK, 209 offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK)) 210 request_message.set_option( 211 dhcp_packet.OPTION_IP_LEASE_TIME, 212 offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME)) 213 request_message.set_option( 214 dhcp_packet.OPTION_REQUESTED_IP, 215 offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP)) 216 # Send the REQUEST message. 217 client_socket.sendto(request_message.to_binary_string(), 218 (server_ip, 8067)) 219 ack_packet = receive_packet(client_socket) 220 if ack_packet is None: 221 return False 222 223 if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK): 224 print "Type of DHCP response is not acknowledgement." 225 return False 226 227 if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: 228 print "Server didn't give us the IP we expected." 229 return False 230 231 print "Waiting for the server to finish." 232 server.wait_for_test_to_finish() 233 print "Server agrees that the test is over." 234 if not server.last_test_passed: 235 print "Server is unhappy with the test result." 236 return False 237 238 print "test_simple_server_exchange PASSED." 239 return True 240 241def test_server_dialogue(): 242 server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1", 243 ingress_port=8067, 244 broadcast_address="127.0.0.1", 245 broadcast_port=8068) 246 server.start() 247 ret = False 248 if server.is_healthy: 249 ret = test_simple_server_exchange(server) 250 else: 251 print "Server isn't healthy, aborting." 252 print "Sending server stop() signal." 253 server.stop() 254 print "Stop signal sent." 255 return ret 256 257def run_tests(): 258 logger = logging.getLogger("dhcp") 259 logger.setLevel(logging.DEBUG) 260 stream_handler = logging.StreamHandler() 261 stream_handler.setLevel(logging.DEBUG) 262 logger.addHandler(stream_handler) 263 retval = test_packet_serialization() 264 retval &= test_classless_static_route_parsing() 265 retval &= test_classless_static_route_serialization() 266 retval &= test_domain_search_list_parsing() 267 retval &= test_domain_search_list_serialization() 268 retval &= test_server_dialogue() 269 if retval: 270 print "All tests PASSED." 271 return 0 272 else: 273 print "Some tests FAILED" 274 return -1 275 276if __name__ == "__main__": 277 sys.exit(run_tests()) 278