1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7
8from autotest_lib.client.common_lib import error
9from autotest_lib.client.common_lib.cros import site_eap_certs
10from autotest_lib.client.common_lib.cros.network import iw_runner
11from autotest_lib.client.common_lib.cros.network import ping_runner
12from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
13from autotest_lib.client.common_lib.cros.network import xmlrpc_security_types
14from autotest_lib.server.cros.network import hostap_config
15from autotest_lib.server.cros.network import wifi_cell_test_base
16
17
18class network_WiFi_PMKSACaching(wifi_cell_test_base.WiFiCellTestBase):
19    """Test that we use PMKSA caching where appropriate."""
20    version = 1
21    AP0_FREQUENCY = 2412
22    AP1_FREQUENCY = 5220
23    TIMEOUT_SECONDS = 15
24
25
26    def dut_sees_bss(self, bssid):
27        """
28        Check if a DUT can see a BSS in scan results.
29
30        @param bssid: string bssid of AP we expect to see in scan results.
31        @return True iff scan results from DUT include the specified BSS.
32
33        """
34        runner = iw_runner.IwRunner(remote_host=self.context.client.host)
35        is_requested_bss = lambda iw_bss: iw_bss.bss == bssid
36        scan_results = runner.scan(self.context.client.wifi_if)
37        return scan_results and filter(is_requested_bss, scan_results)
38
39
40    def retry(self, func, reason, timeout_seconds=TIMEOUT_SECONDS):
41        """
42        Retry a function until it returns true or we time out.
43
44        @param func: function that takes no parameters.
45        @param reason: string concise description of what the function does.
46        @param timeout_seconds: int number of seconds to wait for a True
47                response from |func|.
48
49        """
50        logging.info('Waiting for %s.', reason)
51        start_time = time.time()
52        while time.time() - start_time < timeout_seconds:
53            if func():
54                return
55            time.sleep(1)
56        else:
57            raise error.TestFail('Timed out waiting for %s.' % reason)
58
59
60    def run_once(self):
61        """Body of the test."""
62        mode_n = hostap_config.HostapConfig.MODE_11N_PURE
63        eap_config = xmlrpc_security_types.WPAEAPConfig(
64                server_ca_cert=site_eap_certs.ca_cert_1,
65                server_cert=site_eap_certs.server_cert_1,
66                server_key=site_eap_certs.server_private_key_1,
67                client_ca_cert=site_eap_certs.ca_cert_1,
68                client_cert=site_eap_certs.client_cert_1,
69                client_key=site_eap_certs.client_private_key_1,
70                # PMKSA caching is only defined for WPA2.
71                wpa_mode=xmlrpc_security_types.WPAConfig.MODE_PURE_WPA2)
72        ap_config0 = hostap_config.HostapConfig(
73                mode=mode_n, frequency=self.AP0_FREQUENCY,
74                security_config=eap_config)
75        self.context.configure(ap_config0)
76        assoc_params = xmlrpc_datatypes.AssociationParameters(
77                ssid=self.context.router.get_ssid(),
78                security_config=eap_config)
79        self.context.assert_connect_wifi(assoc_params)
80        # Add another AP with identical configuration except in 5 Ghz.
81        ap_config1 = hostap_config.HostapConfig(
82                mode=mode_n, ssid=self.context.router.get_ssid(),
83                frequency=self.AP1_FREQUENCY, security_config=eap_config)
84        self.context.configure(ap_config1, multi_interface=True)
85        bssid0 = self.context.router.get_hostapd_mac(0)
86        bssid1 = self.context.router.get_hostapd_mac(1)
87        self.retry(lambda: self.dut_sees_bss(bssid1), 'DUT to see second AP')
88        self.context.client.request_roam(bssid1)
89        if not self.context.client.wait_for_roam(
90                bssid1, timeout_seconds=self.TIMEOUT_SECONDS):
91            raise error.TestFail('Failed to roam to second BSS.')
92
93        self.context.router.deconfig_aps(instance=1, silent=True)
94        if not self.context.client.wait_for_roam(
95                bssid0, timeout_seconds=self.TIMEOUT_SECONDS):
96            raise error.TestFail('Failed to fall back to first BSS.')
97
98        pinger = ping_runner.PingRunner(host=self.context.client.host)
99        self.retry(lambda: pinger.simple_ping(
100                           self.context.router.get_wifi_ip(0)),
101                   'DUT to be able to ping first BSS after fallback')
102        self.context.router.confirm_pmksa_cache_use(instance=0)
103