1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from autotest_lib.client.common_lib import error
6from autotest_lib.client.cros import dhcp_handling_rule
7from autotest_lib.client.cros import dhcp_packet
8from autotest_lib.client.cros import dhcp_test_base
9from autotest_lib.client.cros import shill_temporary_profile
10
11class network_DhcpStaticIP(dhcp_test_base.DhcpTestBase):
12    """DHCP test which confirms static IP functionality"""
13    # Length of time the lease from the DHCP server is valid.
14    LEASE_TIME_SECONDS = 60
15    # We'll fill in the subnet and give this address to the client over DHCP.
16    INTENDED_IP_SUFFIX = '0.0.0.101'
17    # We'll fill in the subnet and supply this as a static IP address.
18    STATIC_IP_SUFFIX = '0.0.0.201'
19    STATIC_IP_NAME_SERVERS = [ '1.1.2.2', '1.1.3.4' ]
20    # Time to wait for the DHCP negotiation protocol to complete.
21    DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10
22    # Time to wait after DHCP negotiation completes until service is marked
23    # as connected.
24    DHCP_SETUP_TIMEOUT_SECONDS = 3
25    # Name given to the temporary shill profile we create for this test.
26    TEST_PROFILE_NAME = 'TestStaticIP'
27    # Various parameters that can be set statically.
28    CONFIGURE_STATIC_IP_ADDRESS = 'ip-address'
29    CONFIGURE_STATIC_IP_DNS_SERVERS = 'dns-servers'
30
31    def configure_static_ip(self, service, params):
32        """Configures the Static IP parameters for the Ethernet interface
33        |interface_name| and applies those parameters to the interface by
34        forcing a re-connect.
35
36        @param service object the Service DBus interface to configure.
37        @param params list of static parameters to set on the service.
38
39        """
40
41        self._static_ip_options = {}
42        if self.CONFIGURE_STATIC_IP_ADDRESS in params:
43            subnet_mask = self.ethernet_pair.interface_subnet_mask
44            static_ip_address = dhcp_test_base.DhcpTestBase.rewrite_ip_suffix(
45                    subnet_mask,
46                    self.server_ip,
47                    self.STATIC_IP_SUFFIX)
48            prefix_len = self.ethernet_pair.interface_prefix
49            service.SetProperty('StaticIP.Address', static_ip_address)
50            service.SetProperty('StaticIP.Prefixlen', prefix_len)
51            self._static_ip_options[dhcp_packet.OPTION_REQUESTED_IP] = (
52                    static_ip_address)
53        if self.CONFIGURE_STATIC_IP_DNS_SERVERS in params:
54            service.SetProperty('StaticIP.NameServers',
55                                ','.join(self.STATIC_IP_NAME_SERVERS))
56            self._static_ip_options[dhcp_packet.OPTION_DNS_SERVERS] = (
57                    self.STATIC_IP_NAME_SERVERS)
58        service.Disconnect()
59        service.Connect()
60
61
62    def clear_static_ip(self, service, params):
63        """Clears configuration of Static IP parameters for the Ethernet
64        interface and forces a re-connect.
65
66        @param service object the Service DBus interface to clear properties.
67        @param params list of static parameters to clear from the service.
68
69        """
70        if self.CONFIGURE_STATIC_IP_ADDRESS in params:
71            service.ClearProperty('StaticIP.Address')
72            service.ClearProperty('StaticIP.Prefixlen')
73        if self.CONFIGURE_STATIC_IP_DNS_SERVERS in params:
74            service.ClearProperty('StaticIP.NameServers')
75        service.Disconnect()
76        service.Connect()
77
78
79    def check_saved_ip(self, service, options):
80        """Check the properties of the Ethernet service to make sure that
81        the address provided by the DHCP server is properly added to the
82        "Saved.Address".
83
84        @param service object the Service DBus interface to clear properties.
85        @param options dict parameters that were used to configure the DHCP
86            server.
87
88        """
89        intended_ip = options[dhcp_packet.OPTION_REQUESTED_IP]
90        properties = service.GetProperties()
91        if intended_ip != properties['SavedIP.Address']:
92            raise error.TestFail('Saved IP address %s is not DHCP address %s' %
93                                 (properties['SavedIP.Address'], intended_ip))
94
95
96    def make_lease_negotiation_rules(self, options):
97        """Generate a set of lease negotiation handling rules for a
98        server that will successfully return an IP address to the client.
99
100        @param options dict of options to be negotiated.  In particular,
101            the dhcp_packet.OPTION_REQUESTED_IP element is used to configure
102            the address that will be returned to the client.
103        @return array of DhcpHandlingRule instances which implement the
104            negotiation.
105
106        """
107        intended_ip = options[dhcp_packet.OPTION_REQUESTED_IP]
108        rules = []
109        rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
110                intended_ip,
111                self.server_ip,
112                options,
113                {}))
114        rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
115                intended_ip,
116                self.server_ip,
117                options,
118                {}))
119        return rules
120
121
122    def test_dhcp_negotiation(self, rules, service):
123        """Perform a DHCP lease negotiation using handler rules from |rules|,
124        and ensure that |service| becomes connected as a result.
125
126        @param rules array of handling rules that must complete in order for
127            the negotiation to be considered successful.
128        @param service Service DBus object which should become connected as
129            a result of the DHCP negotiation.
130
131        """
132        rules[-1].is_final_handler = True
133        self.server.start_test(rules, self.DHCP_NEGOTIATION_TIMEOUT_SECONDS)
134        self.server.wait_for_test_to_finish()
135        if not self.server.last_test_passed:
136            raise error.TestFail('Test server didn\'t get all the messages it '
137                                 'was told to expect during negotiation.')
138        # Wait for the service to enter a "good" state.
139        connect_result = self.shill_proxy.wait_for_property_in(
140                service,
141                self.shill_proxy.SERVICE_PROPERTY_STATE,
142                ('ready', 'portal', 'online'),
143                self.DHCP_SETUP_TIMEOUT_SECONDS)
144        (successful, _, association_time) = connect_result
145        if not successful:
146            raise error.TestFail('Ethernet service did not become connected.')
147
148
149    def connect_dynamic_ip(self, options, service):
150        """Perform a DHCP negotiation, using |options|.  Then check that
151           the IP information configured on client matches the parameters
152           in |options|.
153
154        @param options dict containing DHCP packet options to be returned
155            to the client during negotiation, and then later checked for
156            consistency.
157        @param service DBus object of the service that should become
158            connected as a result of the negotiation.
159
160        """
161        self.test_dhcp_negotiation(self.make_lease_negotiation_rules(options),
162                                   service)
163        self.check_dhcp_config(options)
164
165
166    def connect_static_ip(self, options, service, params):
167        """Perform a DHCP negotiation, using |options|.  Then check that
168           the IP information configured on client matches the parameters
169           in |options|, except that the client's IP address should be
170           |static_ip_address|.
171
172        @param options dict containing DHCP packet options to be returned
173            to the client during negotiation, and then later checked for
174            consistency.
175        @param service DBus object of the service that should become
176            connected as a result of the negotiation.
177        @param params list of static IP parameters we will be verifying.
178
179        """
180        rules = self.make_lease_negotiation_rules(options)
181        if self.CONFIGURE_STATIC_IP_ADDRESS in params:
182            # Add a rule that expects the client to release the lease.
183            rules.append(dhcp_handling_rule.DhcpHandlingRule_AcceptRelease(
184                    self.server_ip,
185                    options,
186                    {}))
187        self.test_dhcp_negotiation(rules, service)
188
189        # Check to make sure that the configured IP address of the client
190        # matches the configured static IP address.
191        static_ip_options = options.copy()
192        static_ip_options.update(self._static_ip_options)
193        self.check_dhcp_config(static_ip_options)
194        self.check_saved_ip(service, options)
195
196
197    def test_body(self):
198        """The test main body"""
199        subnet_mask = self.ethernet_pair.interface_subnet_mask
200        intended_ip = dhcp_test_base.DhcpTestBase.rewrite_ip_suffix(
201                subnet_mask,
202                self.server_ip,
203                self.INTENDED_IP_SUFFIX)
204        # Two real name servers, and a bogus one to be unpredictable.
205        dns_servers = ['8.8.8.8', '8.8.4.4', '192.168.87.88']
206        domain_name = 'corp.google.com'
207        dns_search_list = [
208                'corgie.google.com',
209                'lies.google.com',
210                'that.is.a.tasty.burger.google.com',
211                ]
212        # This is the pool of information the server will give out to the client
213        # upon request.
214        dhcp_options = {
215                dhcp_packet.OPTION_SERVER_ID : self.server_ip,
216                dhcp_packet.OPTION_SUBNET_MASK : subnet_mask,
217                dhcp_packet.OPTION_IP_LEASE_TIME : self.LEASE_TIME_SECONDS,
218                dhcp_packet.OPTION_REQUESTED_IP : intended_ip,
219                dhcp_packet.OPTION_DNS_SERVERS : dns_servers,
220                dhcp_packet.OPTION_DOMAIN_NAME : domain_name,
221                dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST : dns_search_list,
222                }
223        service = self.find_ethernet_service(
224                self.ethernet_pair.peer_interface_name)
225
226        manager = self.shill_proxy.manager
227        with shill_temporary_profile.ShillTemporaryProfile(
228                manager, profile_name=self.TEST_PROFILE_NAME):
229
230            self.connect_dynamic_ip(dhcp_options, service)
231
232            for params in self._static_param_list:
233                self.configure_static_ip(service, params)
234                self.connect_static_ip(dhcp_options, service, params)
235                self.clear_static_ip(service, params)
236
237            self.connect_dynamic_ip(dhcp_options, service)
238
239
240    def run_once(self, static_param_list):
241        """Setup the static parameter list before calling the DhcpTestBase
242        main loop.
243
244        @param static_param_list list of iterable properties to configure
245            for each static IP test.
246
247        """
248        self._static_param_list = static_param_list
249        super(network_DhcpStaticIP, self).run_once()
250