iw_runner.py revision 385c10a611ee4307a3d5295513392fa49a406e86
1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import logging
7import re
8import time
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib import utils
12from autotest_lib.client.common_lib.cros.network import iw_event_logger
13
14# These must mirror the values in 'iw list' output.
15CHAN_FLAG_DISABLED = 'disabled'
16CHAN_FLAG_NO_IR = 'no IR'
17CHAN_FLAG_PASSIVE_SCAN = 'passive scan'
18CHAN_FLAG_RADAR_DETECT = 'radar detection'
19
20HT20 = 'HT20'
21HT40_ABOVE = 'HT40+'
22HT40_BELOW = 'HT40-'
23
24SECURITY_OPEN = 'open'
25SECURITY_WEP = 'wep'
26SECURITY_WPA = 'wpa'
27SECURITY_WPA2 = 'wpa2'
28# MIxed mode security is WPA2/WPA
29SECURITY_MIXED = 'mixed'
30
31# Table of lookups between the output of item 'secondary channel offset:' from
32# iw <device> scan to constants.
33
34HT_TABLE = {'no secondary': HT20,
35            'above': HT40_ABOVE,
36            'below': HT40_BELOW}
37
38IwBand = collections.namedtuple(
39    'Band', ['num', 'frequencies', 'frequency_flags', 'mcs_indices'])
40IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security',
41                                         'ht'])
42IwNetDev = collections.namedtuple('IwNetDev', ['phy', 'if_name', 'if_type'])
43IwTimedScan = collections.namedtuple('IwTimedScan', ['time', 'bss_list'])
44
45# The fields for IwPhy are as follows:
46#   name: string name of the phy, such as "phy0"
47#   bands: list of IwBand objects.
48#   modes: List of strings containing interface modes supported, such as "AP".
49#   commands: List of strings containing nl80211 commands supported, such as
50#          "authenticate".
51#   features: List of strings containing nl80211 features supported, such as
52#          "T-DLS".
53#   max_scan_ssids: Maximum number of SSIDs which can be scanned at once.
54IwPhy = collections.namedtuple(
55    'Phy', ['name', 'bands', 'modes', 'commands', 'features',
56            'max_scan_ssids', 'avail_tx_antennas', 'avail_rx_antennas',
57            'supports_setting_antenna_mask', 'support_vht'])
58
59DEFAULT_COMMAND_IW = 'iw'
60
61# Time command to get elapsed time. Full path is used to avoid using the
62# built-in 'time' command from the bash shell
63IW_TIME_COMMAND = '/usr/local/bin/time -f "%e"'
64
65IW_LINK_KEY_BEACON_INTERVAL = 'beacon int'
66IW_LINK_KEY_DTIM_PERIOD = 'dtim period'
67IW_LINK_KEY_FREQUENCY = 'freq'
68IW_LOCAL_EVENT_LOG_FILE = './debug/iw_event_%d.log'
69
70
71class IwRunner(object):
72    """Defines an interface to the 'iw' command."""
73
74
75    def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW):
76        self._run = utils.run
77        self._host = remote_host
78        if remote_host:
79            self._run = remote_host.run
80        self._command_iw = command_iw
81        self._log_id = 0
82
83
84    def _parse_scan_results(self, output):
85        """Parse the output of the 'scan' and 'scan dump' commands.
86
87        @param output: string command output.
88
89        @returns a list of IwBss namedtuples; None if the scan fails
90
91        """
92        bss = None
93        frequency = None
94        ssid = None
95        ht = None
96        security = None
97        supported_securities = []
98        bss_list = []
99        for line in output.splitlines():
100            line = line.strip()
101            bss_match = re.match('BSS ([0-9a-f:]+)', line)
102            if bss_match:
103                if bss != None:
104                    security = self.determine_security(supported_securities)
105                    iwbss = IwBss(bss, frequency, ssid, security, ht)
106                    bss_list.append(iwbss)
107                    bss = frequency = ssid = security = ht = None
108                    supported_securities = []
109                bss = bss_match.group(1)
110            if line.startswith('freq:'):
111                frequency = int(line.split()[1])
112            if line.startswith('SSID: '):
113                _, ssid = line.split(': ', 1)
114            if line.startswith('* secondary channel offset'):
115                ht = HT_TABLE[line.split(':')[1].strip()]
116            if line.startswith('WPA'):
117               supported_securities.append(SECURITY_WPA)
118            if line.startswith('RSN'):
119               supported_securities.append(SECURITY_WPA2)
120        security = self.determine_security(supported_securities)
121        bss_list.append(IwBss(bss, frequency, ssid, security, ht))
122        return bss_list
123
124
125    def add_interface(self, phy, interface, interface_type):
126        """
127        Add an interface to a WiFi PHY.
128
129        @param phy: string name of PHY to add an interface to.
130        @param interface: string name of interface to add.
131        @param interface_type: string type of interface to add (e.g. 'monitor').
132
133        """
134        self._run('%s phy %s interface add %s type %s' %
135                  (self._command_iw, phy, interface, interface_type))
136
137
138    def disconnect_station(self, interface):
139        """
140        Disconnect a STA from a network.
141
142        @param interface: string name of interface to disconnect.
143
144        """
145        self._run('%s dev %s disconnect' % (self._command_iw, interface))
146
147
148    def get_current_bssid(self, interface_name):
149        """Get the BSSID that |interface_name| is associated with.
150
151        @param interface_name: string name of interface (e.g. 'wlan0').
152        @return string bssid of our current association, or None.
153
154        """
155        result = self._run('%s dev %s link' %
156                           (self._command_iw, interface_name),
157                           ignore_status=True)
158        if result.exit_status:
159            # See comment in get_link_value.
160            return None
161
162        # We're looking for a line like:
163        #   Connected to 04:f0:21:03:7d:bb (on wlan0)
164        match = re.search(
165                'Connected to ([0-9a-fA-F:]{17}) \\(on %s\\)' % interface_name,
166                result.stdout)
167        if match is None:
168            return None
169        return match.group(1)
170
171
172    def get_interface(self, interface_name):
173        """Get full information about an interface given an interface name.
174
175        @param interface_name: string name of interface (e.g. 'wlan0').
176        @return IwNetDev tuple.
177
178        """
179        matching_interfaces = [iw_if for iw_if in self.list_interfaces()
180                                     if iw_if.if_name == interface_name]
181        if len(matching_interfaces) != 1:
182            raise error.TestFail('Could not find interface named %s' %
183                                 interface_name)
184
185        return matching_interfaces[0]
186
187
188    def get_link_value(self, interface, iw_link_key):
189        """Get the value of a link property for |interface|.
190
191        This command parses fields of iw link:
192
193        #> iw dev wlan0 link
194        Connected to 74:e5:43:10:4f:c0 (on wlan0)
195              SSID: PMKSACaching_4m9p5_ch1
196              freq: 5220
197              RX: 5370 bytes (37 packets)
198              TX: 3604 bytes (15 packets)
199              signal: -59 dBm
200              tx bitrate: 13.0 MBit/s MCS 1
201
202              bss flags:      short-slot-time
203              dtim period:    5
204              beacon int:     100
205
206        @param iw_link_key: string one of IW_LINK_KEY_* defined above.
207        @param interface: string desired value of iw link property.
208
209        """
210        result = self._run('%s dev %s link' % (self._command_iw, interface),
211                           ignore_status=True)
212        if result.exit_status:
213            # When roaming, there is a period of time for mac80211 based drivers
214            # when the driver is 'associated' with an SSID but not a particular
215            # BSS.  This causes iw to return an error code (-2) when attempting
216            # to retrieve information specific to the BSS.  This does not happen
217            # in mwifiex drivers.
218            return None
219
220        find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key)
221        find_results = filter(bool,
222                              map(find_re.match, result.stdout.splitlines()))
223        if not find_results:
224            return None
225
226        actual_value = find_results[0].group(1)
227        logging.info('Found iw link key %s with value %s.',
228                     iw_link_key, actual_value)
229        return actual_value
230
231
232    def ibss_join(self, interface, ssid, frequency):
233        """
234        Join a WiFi interface to an IBSS.
235
236        @param interface: string name of interface to join to the IBSS.
237        @param ssid: string SSID of IBSS to join.
238        @param frequency: int frequency of IBSS in Mhz.
239
240        """
241        self._run('%s dev %s ibss join %s %d' %
242                  (self._command_iw, interface, ssid, frequency))
243
244
245    def ibss_leave(self, interface):
246        """
247        Leave an IBSS.
248
249        @param interface: string name of interface to remove from the IBSS.
250
251        """
252        self._run('%s dev %s ibss leave' % (self._command_iw, interface))
253
254
255    def list_interfaces(self, desired_if_type=None):
256        """List WiFi related interfaces on this system.
257
258        @param desired_if_type: string type of interface to filter
259                our returned list of interfaces for (e.g. 'managed').
260
261        @return list of IwNetDev tuples.
262
263        """
264        output = self._run('%s dev' % self._command_iw).stdout
265        interfaces = []
266        phy = None
267        if_name = None
268        if_type = None
269        for line in output.splitlines():
270            m = re.match('phy#([0-9]+)', line)
271            if m:
272                phy = 'phy%d' % int(m.group(1))
273            m = re.match('[\s]*Interface (.*)', line)
274            if m:
275                if_name = m.group(1)
276            # Common values for type are 'managed', 'monitor', and 'IBSS'.
277            m = re.match('[\s]*type ([a-zA-Z]+)', line)
278            if m:
279                if_type = m.group(1)
280            if phy and if_name and if_type:
281                interfaces.append(IwNetDev(phy=phy, if_name=if_name,
282                                           if_type=if_type))
283                # One phy may have many interfaces, so don't reset it.
284                if_name = if_type = None
285
286        if desired_if_type:
287            interfaces = [interface for interface in interfaces
288                          if interface.if_type == desired_if_type]
289        return interfaces
290
291
292    def list_phys(self):
293        """
294        List WiFi PHYs on the given host.
295
296        @return list of IwPhy tuples.
297
298        """
299        output = self._run('%s list' % self._command_iw).stdout
300
301        pending_phy_name = None
302        current_band = None
303        current_section = None
304        all_phys = []
305
306        def add_pending_phy():
307            """Add the pending phy into |all_phys|."""
308            bands = tuple(IwBand(band.num,
309                                 tuple(band.frequencies),
310                                 dict(band.frequency_flags),
311                                 tuple(band.mcs_indices))
312                          for band in pending_phy_bands)
313            new_phy = IwPhy(pending_phy_name,
314                            bands,
315                            tuple(pending_phy_modes),
316                            tuple(pending_phy_commands),
317                            tuple(pending_phy_features),
318                            pending_phy_max_scan_ssids,
319                            pending_phy_tx_antennas,
320                            pending_phy_rx_antennas,
321                            pending_phy_tx_antennas and pending_phy_rx_antennas,
322                            pending_phy_support_vht)
323            all_phys.append(new_phy)
324
325        for line in output.splitlines():
326            match_phy = re.search('Wiphy (.*)', line)
327            if match_phy:
328                if pending_phy_name:
329                    add_pending_phy()
330                pending_phy_name = match_phy.group(1)
331                pending_phy_bands = []
332                pending_phy_modes = []
333                pending_phy_commands = []
334                pending_phy_features = []
335                pending_phy_max_scan_ssids = None
336                pending_phy_tx_antennas = 0
337                pending_phy_rx_antennas = 0
338                pending_phy_support_vht = False
339                continue
340
341            match_section = re.match('\s*(\w.*):\s*$', line)
342            if match_section:
343                current_section = match_section.group(1)
344                match_band = re.match('Band (\d+)', current_section)
345                if match_band:
346                    current_band = IwBand(num=int(match_band.group(1)),
347                                          frequencies=[],
348                                          frequency_flags={},
349                                          mcs_indices=[])
350                    pending_phy_bands.append(current_band)
351                continue
352
353            # Check for max_scan_ssids. This isn't a section, but it
354            # also isn't within a section.
355            match_max_scan_ssids = re.match('\s*max # scan SSIDs: (\d+)',
356                                            line)
357            if match_max_scan_ssids and pending_phy_name:
358                pending_phy_max_scan_ssids = int(
359                    match_max_scan_ssids.group(1))
360                continue
361
362            if (current_section == 'Supported interface modes' and
363                pending_phy_name):
364                mode_match = re.search('\* (\w+)', line)
365                if mode_match:
366                    pending_phy_modes.append(mode_match.group(1))
367                    continue
368
369            if current_section == 'Supported commands' and pending_phy_name:
370                command_match = re.search('\* (\w+)', line)
371                if command_match:
372                    pending_phy_commands.append(command_match.group(1))
373                    continue
374
375            if (current_section is not None and
376                current_section.startswith('VHT Capabilities') and
377                pending_phy_name):
378                pending_phy_support_vht = True
379                continue
380
381            match_avail_antennas = re.match('\s*Available Antennas: TX (\S+)'
382                                            ' RX (\S+)', line)
383            if match_avail_antennas and pending_phy_name:
384                pending_phy_tx_antennas = int(
385                        match_avail_antennas.group(1), 16)
386                pending_phy_rx_antennas = int(
387                        match_avail_antennas.group(2), 16)
388                continue
389
390            match_device_support = re.match('\s*Device supports (.*)\.', line)
391            if match_device_support and pending_phy_name:
392                pending_phy_features.append(match_device_support.group(1))
393                continue
394
395            if not all([current_band, pending_phy_name,
396                        line.startswith('\t')]):
397                continue
398
399            # E.g.
400            # * 2412 MHz [1] (20.0 dBm)
401            # * 2467 MHz [12] (20.0 dBm) (passive scan)
402            # * 2472 MHz [13] (disabled)
403            # * 5260 MHz [52] (19.0 dBm) (no IR, radar detection)
404            match_chan_info = re.search(
405                r'(?P<frequency>\d+) MHz'
406                r' (?P<chan_num>\[\d+\])'
407                r'(?: \((?P<tx_power_limit>[0-9.]+ dBm)\))?'
408                r'(?: \((?P<flags>[a-zA-Z, ]+)\))?', line)
409            if match_chan_info:
410                frequency = int(match_chan_info.group('frequency'))
411                current_band.frequencies.append(frequency)
412                flags_string = match_chan_info.group('flags')
413                if flags_string:
414                    current_band.frequency_flags[frequency] = frozenset(
415                        flags_string.split(','))
416                else:
417                    # Populate the dict with an empty set, to make
418                    # things uniform for client code.
419                    current_band.frequency_flags[frequency] = frozenset()
420                continue
421
422            # re_mcs needs to match something like:
423            # HT TX/RX MCS rate indexes supported: 0-15, 32
424            if re.search('HT TX/RX MCS rate indexes supported: ', line):
425                rate_string = line.split(':')[1].strip()
426                for piece in rate_string.split(','):
427                    if piece.find('-') > 0:
428                        # Must be a range like '  0-15'
429                        begin, end = piece.split('-')
430                        for index in range(int(begin), int(end) + 1):
431                            current_band.mcs_indices.append(index)
432                    else:
433                        # Must be a single rate like '32   '
434                        current_band.mcs_indices.append(int(piece))
435        if pending_phy_name:
436            add_pending_phy()
437        return all_phys
438
439
440    def remove_interface(self, interface, ignore_status=False):
441        """
442        Remove a WiFi interface from a PHY.
443
444        @param interface: string name of interface (e.g. mon0)
445        @param ignore_status: boolean True iff we should ignore failures
446                to remove the interface.
447
448        """
449        self._run('%s dev %s del' % (self._command_iw, interface),
450                  ignore_status=ignore_status)
451
452
453    def determine_security(self, supported_securities):
454        """Determines security from the given list of supported securities.
455
456        @param supported_securities: list of supported securities from scan
457
458        """
459        if not supported_securities:
460            security = SECURITY_OPEN
461        elif len(supported_securities) == 1:
462            security = supported_securities[0]
463        else:
464            security = SECURITY_MIXED
465        return security
466
467
468    def scan(self, interface, frequencies=(), ssids=()):
469        """Performs a scan.
470
471        @param interface: the interface to run the iw command against
472        @param frequencies: list of int frequencies in Mhz to scan.
473        @param ssids: list of string SSIDs to send probe requests for.
474
475        @returns a list of IwBss namedtuples; None if the scan fails
476
477        """
478        scan_result = self.timed_scan(interface, frequencies, ssids)
479        if scan_result is None:
480            return None
481        return scan_result.bss_list
482
483
484    def timed_scan(self, interface, frequencies=(), ssids=()):
485        """Performs a timed scan.
486
487        @param interface: the interface to run the iw command against
488        @param frequencies: list of int frequencies in Mhz to scan.
489        @param ssids: list of string SSIDs to send probe requests for.
490
491        @returns a IwTimedScan namedtuple; None if the scan fails
492
493        """
494        freq_param = ''
495        if frequencies:
496            freq_param = ' freq %s' % ' '.join(map(str, frequencies))
497        ssid_param = ''
498        if ssids:
499           ssid_param = ' ssid "%s"' % '" "'.join(ssids)
500
501        command = '%s %s dev %s scan%s%s' % (IW_TIME_COMMAND,
502                self._command_iw, interface, freq_param, ssid_param)
503        scan = self._run(command, ignore_status=True)
504        if scan.exit_status != 0:
505            # The device was busy
506            logging.debug('scan exit_status: %d', scan.exit_status)
507            return None
508        if not scan.stdout:
509            logging.debug('Empty scan result')
510            bss_list = []
511        else:
512            bss_list = self._parse_scan_results(scan.stdout)
513        scan_time = float(scan.stderr)
514        return IwTimedScan(scan_time, bss_list)
515
516
517    def scan_dump(self, interface):
518        """Dump the contents of the scan cache.
519
520        Note that this does not trigger a scan.  Instead, it returns
521        the kernel's idea of what BSS's are currently visible.
522
523        @param interface: the interface to run the iw command against
524
525        @returns a list of IwBss namedtuples; None if the scan fails
526
527        """
528        result = self._run('%s dev %s scan dump' % (self._command_iw,
529                                                    interface))
530        return self._parse_scan_results(result.stdout)
531
532
533    def set_tx_power(self, interface, power):
534        """
535        Set the transmission power for an interface.
536
537        @param interface: string name of interface to set Tx power on.
538        @param power: string power parameter. (e.g. 'auto').
539
540        """
541        self._run('%s dev %s set txpower %s' %
542                  (self._command_iw, interface, power))
543
544
545    def set_freq(self, interface, freq):
546        """
547        Set the frequency for an interface.
548
549        @param interface: string name of interface to set frequency on.
550        @param freq: int frequency
551
552        """
553        self._run('%s dev %s set freq %d' %
554                  (self._command_iw, interface, freq))
555
556
557    def set_regulatory_domain(self, domain_string):
558        """
559        Set the regulatory domain of the current machine.  Note that
560        the regulatory change happens asynchronously to the exit of
561        this function.
562
563        @param domain_string: string regulatory domain name (e.g. 'US').
564
565        """
566        self._run('%s reg set %s' % (self._command_iw, domain_string))
567
568
569    def get_regulatory_domain(self):
570        """
571        Get the regulatory domain of the current machine.
572
573        @returns a string containing the 2-letter regulatory domain name
574            (e.g. 'US').
575
576        """
577        output = self._run('%s reg get' % self._command_iw).stdout
578        m = re.match('^country (..):', output)
579        if not m:
580            return None
581        return m.group(1)
582
583
584    def wait_for_scan_result(self, interface, bss=None, ssid=None,
585                             timeout_seconds=30):
586        """Returns a IWBSS object for a network with the given bssed or ssid.
587
588        @param interface: which interface to run iw against
589        @param bss: BSS as a string
590        @param ssid: ssid as a string
591        @param timeout_seconds: the amount of time to wait in seconds
592
593        @returns a list of IwBss collections that contain the given bss or ssid;
594            if the scan is empty or returns an error code None is returned.
595
596        """
597        start_time = time.time()
598        scan_failure_attempts = 0
599        logging.info('Performing a scan with a max timeout of %d seconds.',
600                     timeout_seconds)
601        while time.time() - start_time < timeout_seconds:
602            scan_results = self.scan(interface)
603            if scan_results is None or len(scan_results) == 0:
604                scan_failure_attempts += 1
605                # Allow in-progress scan to complete
606                time.sleep(5)
607                # If the in-progress scan takes more than 30 seconds to
608                # complete it will most likely never complete; abort.
609                # See crbug.com/309148.
610                if scan_failure_attempts > 5:
611                    logging.error('Scan failed to run, see debug log for '
612                                  'error code.')
613                    return None
614                continue
615            scan_failure_attempts = 0
616            matching_bsses = list()
617            for iwbss in scan_results:
618                if bss is not None and iwbss.bss != bss:
619                    continue
620                if ssid is not None and iwbss.ssid != ssid:
621                    continue
622                matching_bsses.append(iwbss)
623            if len(matching_bsses) > 0:
624                return matching_bsses
625
626        if scan_failure_attempts > 0:
627            return None
628        # The SSID wasn't found, but the device is fine.
629        return list()
630
631
632    def wait_for_link(self, interface, timeout_seconds=10):
633        """Waits until a link completes on |interface|.
634
635        @param interface: which interface to run iw against.
636        @param timeout_seconds: the amount of time to wait in seconds.
637
638        @returns True if link was established before the timeout.
639
640        """
641        start_time = time.time()
642        while time.time() - start_time < timeout_seconds:
643            link_results = self._run('%s dev %s link' %
644                                     (self._command_iw, interface))
645            if 'Not connected' not in link_results.stdout:
646                return True
647            time.sleep(1)
648        return False
649
650
651    def set_antenna_bitmap(self, phy, tx_bitmap, rx_bitmap):
652        """Set antenna chain mask on given phy (radio).
653
654        This function will set the antennas allowed to use for TX and
655        RX on the |phy| based on the |tx_bitmap| and |rx_bitmap|.
656        This command is only allowed when the interfaces on the phy are down.
657
658        @param phy: phy name
659        @param tx_bitmap: bitmap of allowed antennas to use for TX
660        @param rx_bitmap: bitmap of allowed antennas to use for RX
661
662        """
663        command = '%s phy %s set antenna %d %d' % (self._command_iw, phy,
664                                                   tx_bitmap, rx_bitmap)
665        self._run(command)
666
667
668    def get_event_logger(self):
669        """Create and return a IwEventLogger object.
670
671        @returns a IwEventLogger object.
672
673        """
674        local_file = IW_LOCAL_EVENT_LOG_FILE % (self._log_id)
675        self._log_id += 1
676        return iw_event_logger.IwEventLogger(self._host, self._command_iw,
677                                             local_file)
678
679
680    def vht_supported(self):
681        """Returns True if VHT is supported; False otherwise."""
682        result = self._run('%s list' % self._command_iw).stdout
683        if 'VHT Capabilities' in result:
684            return True
685        return False
686