iw_runner.py revision 385c10a611ee4307a3d5295513392fa49a406e86
1# Copyright (c) 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import logging 7import re 8import time 9 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib import utils 12from autotest_lib.client.common_lib.cros.network import iw_event_logger 13 14# These must mirror the values in 'iw list' output. 15CHAN_FLAG_DISABLED = 'disabled' 16CHAN_FLAG_NO_IR = 'no IR' 17CHAN_FLAG_PASSIVE_SCAN = 'passive scan' 18CHAN_FLAG_RADAR_DETECT = 'radar detection' 19 20HT20 = 'HT20' 21HT40_ABOVE = 'HT40+' 22HT40_BELOW = 'HT40-' 23 24SECURITY_OPEN = 'open' 25SECURITY_WEP = 'wep' 26SECURITY_WPA = 'wpa' 27SECURITY_WPA2 = 'wpa2' 28# MIxed mode security is WPA2/WPA 29SECURITY_MIXED = 'mixed' 30 31# Table of lookups between the output of item 'secondary channel offset:' from 32# iw <device> scan to constants. 33 34HT_TABLE = {'no secondary': HT20, 35 'above': HT40_ABOVE, 36 'below': HT40_BELOW} 37 38IwBand = collections.namedtuple( 39 'Band', ['num', 'frequencies', 'frequency_flags', 'mcs_indices']) 40IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security', 41 'ht']) 42IwNetDev = collections.namedtuple('IwNetDev', ['phy', 'if_name', 'if_type']) 43IwTimedScan = collections.namedtuple('IwTimedScan', ['time', 'bss_list']) 44 45# The fields for IwPhy are as follows: 46# name: string name of the phy, such as "phy0" 47# bands: list of IwBand objects. 48# modes: List of strings containing interface modes supported, such as "AP". 49# commands: List of strings containing nl80211 commands supported, such as 50# "authenticate". 51# features: List of strings containing nl80211 features supported, such as 52# "T-DLS". 53# max_scan_ssids: Maximum number of SSIDs which can be scanned at once. 54IwPhy = collections.namedtuple( 55 'Phy', ['name', 'bands', 'modes', 'commands', 'features', 56 'max_scan_ssids', 'avail_tx_antennas', 'avail_rx_antennas', 57 'supports_setting_antenna_mask', 'support_vht']) 58 59DEFAULT_COMMAND_IW = 'iw' 60 61# Time command to get elapsed time. Full path is used to avoid using the 62# built-in 'time' command from the bash shell 63IW_TIME_COMMAND = '/usr/local/bin/time -f "%e"' 64 65IW_LINK_KEY_BEACON_INTERVAL = 'beacon int' 66IW_LINK_KEY_DTIM_PERIOD = 'dtim period' 67IW_LINK_KEY_FREQUENCY = 'freq' 68IW_LOCAL_EVENT_LOG_FILE = './debug/iw_event_%d.log' 69 70 71class IwRunner(object): 72 """Defines an interface to the 'iw' command.""" 73 74 75 def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW): 76 self._run = utils.run 77 self._host = remote_host 78 if remote_host: 79 self._run = remote_host.run 80 self._command_iw = command_iw 81 self._log_id = 0 82 83 84 def _parse_scan_results(self, output): 85 """Parse the output of the 'scan' and 'scan dump' commands. 86 87 @param output: string command output. 88 89 @returns a list of IwBss namedtuples; None if the scan fails 90 91 """ 92 bss = None 93 frequency = None 94 ssid = None 95 ht = None 96 security = None 97 supported_securities = [] 98 bss_list = [] 99 for line in output.splitlines(): 100 line = line.strip() 101 bss_match = re.match('BSS ([0-9a-f:]+)', line) 102 if bss_match: 103 if bss != None: 104 security = self.determine_security(supported_securities) 105 iwbss = IwBss(bss, frequency, ssid, security, ht) 106 bss_list.append(iwbss) 107 bss = frequency = ssid = security = ht = None 108 supported_securities = [] 109 bss = bss_match.group(1) 110 if line.startswith('freq:'): 111 frequency = int(line.split()[1]) 112 if line.startswith('SSID: '): 113 _, ssid = line.split(': ', 1) 114 if line.startswith('* secondary channel offset'): 115 ht = HT_TABLE[line.split(':')[1].strip()] 116 if line.startswith('WPA'): 117 supported_securities.append(SECURITY_WPA) 118 if line.startswith('RSN'): 119 supported_securities.append(SECURITY_WPA2) 120 security = self.determine_security(supported_securities) 121 bss_list.append(IwBss(bss, frequency, ssid, security, ht)) 122 return bss_list 123 124 125 def add_interface(self, phy, interface, interface_type): 126 """ 127 Add an interface to a WiFi PHY. 128 129 @param phy: string name of PHY to add an interface to. 130 @param interface: string name of interface to add. 131 @param interface_type: string type of interface to add (e.g. 'monitor'). 132 133 """ 134 self._run('%s phy %s interface add %s type %s' % 135 (self._command_iw, phy, interface, interface_type)) 136 137 138 def disconnect_station(self, interface): 139 """ 140 Disconnect a STA from a network. 141 142 @param interface: string name of interface to disconnect. 143 144 """ 145 self._run('%s dev %s disconnect' % (self._command_iw, interface)) 146 147 148 def get_current_bssid(self, interface_name): 149 """Get the BSSID that |interface_name| is associated with. 150 151 @param interface_name: string name of interface (e.g. 'wlan0'). 152 @return string bssid of our current association, or None. 153 154 """ 155 result = self._run('%s dev %s link' % 156 (self._command_iw, interface_name), 157 ignore_status=True) 158 if result.exit_status: 159 # See comment in get_link_value. 160 return None 161 162 # We're looking for a line like: 163 # Connected to 04:f0:21:03:7d:bb (on wlan0) 164 match = re.search( 165 'Connected to ([0-9a-fA-F:]{17}) \\(on %s\\)' % interface_name, 166 result.stdout) 167 if match is None: 168 return None 169 return match.group(1) 170 171 172 def get_interface(self, interface_name): 173 """Get full information about an interface given an interface name. 174 175 @param interface_name: string name of interface (e.g. 'wlan0'). 176 @return IwNetDev tuple. 177 178 """ 179 matching_interfaces = [iw_if for iw_if in self.list_interfaces() 180 if iw_if.if_name == interface_name] 181 if len(matching_interfaces) != 1: 182 raise error.TestFail('Could not find interface named %s' % 183 interface_name) 184 185 return matching_interfaces[0] 186 187 188 def get_link_value(self, interface, iw_link_key): 189 """Get the value of a link property for |interface|. 190 191 This command parses fields of iw link: 192 193 #> iw dev wlan0 link 194 Connected to 74:e5:43:10:4f:c0 (on wlan0) 195 SSID: PMKSACaching_4m9p5_ch1 196 freq: 5220 197 RX: 5370 bytes (37 packets) 198 TX: 3604 bytes (15 packets) 199 signal: -59 dBm 200 tx bitrate: 13.0 MBit/s MCS 1 201 202 bss flags: short-slot-time 203 dtim period: 5 204 beacon int: 100 205 206 @param iw_link_key: string one of IW_LINK_KEY_* defined above. 207 @param interface: string desired value of iw link property. 208 209 """ 210 result = self._run('%s dev %s link' % (self._command_iw, interface), 211 ignore_status=True) 212 if result.exit_status: 213 # When roaming, there is a period of time for mac80211 based drivers 214 # when the driver is 'associated' with an SSID but not a particular 215 # BSS. This causes iw to return an error code (-2) when attempting 216 # to retrieve information specific to the BSS. This does not happen 217 # in mwifiex drivers. 218 return None 219 220 find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key) 221 find_results = filter(bool, 222 map(find_re.match, result.stdout.splitlines())) 223 if not find_results: 224 return None 225 226 actual_value = find_results[0].group(1) 227 logging.info('Found iw link key %s with value %s.', 228 iw_link_key, actual_value) 229 return actual_value 230 231 232 def ibss_join(self, interface, ssid, frequency): 233 """ 234 Join a WiFi interface to an IBSS. 235 236 @param interface: string name of interface to join to the IBSS. 237 @param ssid: string SSID of IBSS to join. 238 @param frequency: int frequency of IBSS in Mhz. 239 240 """ 241 self._run('%s dev %s ibss join %s %d' % 242 (self._command_iw, interface, ssid, frequency)) 243 244 245 def ibss_leave(self, interface): 246 """ 247 Leave an IBSS. 248 249 @param interface: string name of interface to remove from the IBSS. 250 251 """ 252 self._run('%s dev %s ibss leave' % (self._command_iw, interface)) 253 254 255 def list_interfaces(self, desired_if_type=None): 256 """List WiFi related interfaces on this system. 257 258 @param desired_if_type: string type of interface to filter 259 our returned list of interfaces for (e.g. 'managed'). 260 261 @return list of IwNetDev tuples. 262 263 """ 264 output = self._run('%s dev' % self._command_iw).stdout 265 interfaces = [] 266 phy = None 267 if_name = None 268 if_type = None 269 for line in output.splitlines(): 270 m = re.match('phy#([0-9]+)', line) 271 if m: 272 phy = 'phy%d' % int(m.group(1)) 273 m = re.match('[\s]*Interface (.*)', line) 274 if m: 275 if_name = m.group(1) 276 # Common values for type are 'managed', 'monitor', and 'IBSS'. 277 m = re.match('[\s]*type ([a-zA-Z]+)', line) 278 if m: 279 if_type = m.group(1) 280 if phy and if_name and if_type: 281 interfaces.append(IwNetDev(phy=phy, if_name=if_name, 282 if_type=if_type)) 283 # One phy may have many interfaces, so don't reset it. 284 if_name = if_type = None 285 286 if desired_if_type: 287 interfaces = [interface for interface in interfaces 288 if interface.if_type == desired_if_type] 289 return interfaces 290 291 292 def list_phys(self): 293 """ 294 List WiFi PHYs on the given host. 295 296 @return list of IwPhy tuples. 297 298 """ 299 output = self._run('%s list' % self._command_iw).stdout 300 301 pending_phy_name = None 302 current_band = None 303 current_section = None 304 all_phys = [] 305 306 def add_pending_phy(): 307 """Add the pending phy into |all_phys|.""" 308 bands = tuple(IwBand(band.num, 309 tuple(band.frequencies), 310 dict(band.frequency_flags), 311 tuple(band.mcs_indices)) 312 for band in pending_phy_bands) 313 new_phy = IwPhy(pending_phy_name, 314 bands, 315 tuple(pending_phy_modes), 316 tuple(pending_phy_commands), 317 tuple(pending_phy_features), 318 pending_phy_max_scan_ssids, 319 pending_phy_tx_antennas, 320 pending_phy_rx_antennas, 321 pending_phy_tx_antennas and pending_phy_rx_antennas, 322 pending_phy_support_vht) 323 all_phys.append(new_phy) 324 325 for line in output.splitlines(): 326 match_phy = re.search('Wiphy (.*)', line) 327 if match_phy: 328 if pending_phy_name: 329 add_pending_phy() 330 pending_phy_name = match_phy.group(1) 331 pending_phy_bands = [] 332 pending_phy_modes = [] 333 pending_phy_commands = [] 334 pending_phy_features = [] 335 pending_phy_max_scan_ssids = None 336 pending_phy_tx_antennas = 0 337 pending_phy_rx_antennas = 0 338 pending_phy_support_vht = False 339 continue 340 341 match_section = re.match('\s*(\w.*):\s*$', line) 342 if match_section: 343 current_section = match_section.group(1) 344 match_band = re.match('Band (\d+)', current_section) 345 if match_band: 346 current_band = IwBand(num=int(match_band.group(1)), 347 frequencies=[], 348 frequency_flags={}, 349 mcs_indices=[]) 350 pending_phy_bands.append(current_band) 351 continue 352 353 # Check for max_scan_ssids. This isn't a section, but it 354 # also isn't within a section. 355 match_max_scan_ssids = re.match('\s*max # scan SSIDs: (\d+)', 356 line) 357 if match_max_scan_ssids and pending_phy_name: 358 pending_phy_max_scan_ssids = int( 359 match_max_scan_ssids.group(1)) 360 continue 361 362 if (current_section == 'Supported interface modes' and 363 pending_phy_name): 364 mode_match = re.search('\* (\w+)', line) 365 if mode_match: 366 pending_phy_modes.append(mode_match.group(1)) 367 continue 368 369 if current_section == 'Supported commands' and pending_phy_name: 370 command_match = re.search('\* (\w+)', line) 371 if command_match: 372 pending_phy_commands.append(command_match.group(1)) 373 continue 374 375 if (current_section is not None and 376 current_section.startswith('VHT Capabilities') and 377 pending_phy_name): 378 pending_phy_support_vht = True 379 continue 380 381 match_avail_antennas = re.match('\s*Available Antennas: TX (\S+)' 382 ' RX (\S+)', line) 383 if match_avail_antennas and pending_phy_name: 384 pending_phy_tx_antennas = int( 385 match_avail_antennas.group(1), 16) 386 pending_phy_rx_antennas = int( 387 match_avail_antennas.group(2), 16) 388 continue 389 390 match_device_support = re.match('\s*Device supports (.*)\.', line) 391 if match_device_support and pending_phy_name: 392 pending_phy_features.append(match_device_support.group(1)) 393 continue 394 395 if not all([current_band, pending_phy_name, 396 line.startswith('\t')]): 397 continue 398 399 # E.g. 400 # * 2412 MHz [1] (20.0 dBm) 401 # * 2467 MHz [12] (20.0 dBm) (passive scan) 402 # * 2472 MHz [13] (disabled) 403 # * 5260 MHz [52] (19.0 dBm) (no IR, radar detection) 404 match_chan_info = re.search( 405 r'(?P<frequency>\d+) MHz' 406 r' (?P<chan_num>\[\d+\])' 407 r'(?: \((?P<tx_power_limit>[0-9.]+ dBm)\))?' 408 r'(?: \((?P<flags>[a-zA-Z, ]+)\))?', line) 409 if match_chan_info: 410 frequency = int(match_chan_info.group('frequency')) 411 current_band.frequencies.append(frequency) 412 flags_string = match_chan_info.group('flags') 413 if flags_string: 414 current_band.frequency_flags[frequency] = frozenset( 415 flags_string.split(',')) 416 else: 417 # Populate the dict with an empty set, to make 418 # things uniform for client code. 419 current_band.frequency_flags[frequency] = frozenset() 420 continue 421 422 # re_mcs needs to match something like: 423 # HT TX/RX MCS rate indexes supported: 0-15, 32 424 if re.search('HT TX/RX MCS rate indexes supported: ', line): 425 rate_string = line.split(':')[1].strip() 426 for piece in rate_string.split(','): 427 if piece.find('-') > 0: 428 # Must be a range like ' 0-15' 429 begin, end = piece.split('-') 430 for index in range(int(begin), int(end) + 1): 431 current_band.mcs_indices.append(index) 432 else: 433 # Must be a single rate like '32 ' 434 current_band.mcs_indices.append(int(piece)) 435 if pending_phy_name: 436 add_pending_phy() 437 return all_phys 438 439 440 def remove_interface(self, interface, ignore_status=False): 441 """ 442 Remove a WiFi interface from a PHY. 443 444 @param interface: string name of interface (e.g. mon0) 445 @param ignore_status: boolean True iff we should ignore failures 446 to remove the interface. 447 448 """ 449 self._run('%s dev %s del' % (self._command_iw, interface), 450 ignore_status=ignore_status) 451 452 453 def determine_security(self, supported_securities): 454 """Determines security from the given list of supported securities. 455 456 @param supported_securities: list of supported securities from scan 457 458 """ 459 if not supported_securities: 460 security = SECURITY_OPEN 461 elif len(supported_securities) == 1: 462 security = supported_securities[0] 463 else: 464 security = SECURITY_MIXED 465 return security 466 467 468 def scan(self, interface, frequencies=(), ssids=()): 469 """Performs a scan. 470 471 @param interface: the interface to run the iw command against 472 @param frequencies: list of int frequencies in Mhz to scan. 473 @param ssids: list of string SSIDs to send probe requests for. 474 475 @returns a list of IwBss namedtuples; None if the scan fails 476 477 """ 478 scan_result = self.timed_scan(interface, frequencies, ssids) 479 if scan_result is None: 480 return None 481 return scan_result.bss_list 482 483 484 def timed_scan(self, interface, frequencies=(), ssids=()): 485 """Performs a timed scan. 486 487 @param interface: the interface to run the iw command against 488 @param frequencies: list of int frequencies in Mhz to scan. 489 @param ssids: list of string SSIDs to send probe requests for. 490 491 @returns a IwTimedScan namedtuple; None if the scan fails 492 493 """ 494 freq_param = '' 495 if frequencies: 496 freq_param = ' freq %s' % ' '.join(map(str, frequencies)) 497 ssid_param = '' 498 if ssids: 499 ssid_param = ' ssid "%s"' % '" "'.join(ssids) 500 501 command = '%s %s dev %s scan%s%s' % (IW_TIME_COMMAND, 502 self._command_iw, interface, freq_param, ssid_param) 503 scan = self._run(command, ignore_status=True) 504 if scan.exit_status != 0: 505 # The device was busy 506 logging.debug('scan exit_status: %d', scan.exit_status) 507 return None 508 if not scan.stdout: 509 logging.debug('Empty scan result') 510 bss_list = [] 511 else: 512 bss_list = self._parse_scan_results(scan.stdout) 513 scan_time = float(scan.stderr) 514 return IwTimedScan(scan_time, bss_list) 515 516 517 def scan_dump(self, interface): 518 """Dump the contents of the scan cache. 519 520 Note that this does not trigger a scan. Instead, it returns 521 the kernel's idea of what BSS's are currently visible. 522 523 @param interface: the interface to run the iw command against 524 525 @returns a list of IwBss namedtuples; None if the scan fails 526 527 """ 528 result = self._run('%s dev %s scan dump' % (self._command_iw, 529 interface)) 530 return self._parse_scan_results(result.stdout) 531 532 533 def set_tx_power(self, interface, power): 534 """ 535 Set the transmission power for an interface. 536 537 @param interface: string name of interface to set Tx power on. 538 @param power: string power parameter. (e.g. 'auto'). 539 540 """ 541 self._run('%s dev %s set txpower %s' % 542 (self._command_iw, interface, power)) 543 544 545 def set_freq(self, interface, freq): 546 """ 547 Set the frequency for an interface. 548 549 @param interface: string name of interface to set frequency on. 550 @param freq: int frequency 551 552 """ 553 self._run('%s dev %s set freq %d' % 554 (self._command_iw, interface, freq)) 555 556 557 def set_regulatory_domain(self, domain_string): 558 """ 559 Set the regulatory domain of the current machine. Note that 560 the regulatory change happens asynchronously to the exit of 561 this function. 562 563 @param domain_string: string regulatory domain name (e.g. 'US'). 564 565 """ 566 self._run('%s reg set %s' % (self._command_iw, domain_string)) 567 568 569 def get_regulatory_domain(self): 570 """ 571 Get the regulatory domain of the current machine. 572 573 @returns a string containing the 2-letter regulatory domain name 574 (e.g. 'US'). 575 576 """ 577 output = self._run('%s reg get' % self._command_iw).stdout 578 m = re.match('^country (..):', output) 579 if not m: 580 return None 581 return m.group(1) 582 583 584 def wait_for_scan_result(self, interface, bss=None, ssid=None, 585 timeout_seconds=30): 586 """Returns a IWBSS object for a network with the given bssed or ssid. 587 588 @param interface: which interface to run iw against 589 @param bss: BSS as a string 590 @param ssid: ssid as a string 591 @param timeout_seconds: the amount of time to wait in seconds 592 593 @returns a list of IwBss collections that contain the given bss or ssid; 594 if the scan is empty or returns an error code None is returned. 595 596 """ 597 start_time = time.time() 598 scan_failure_attempts = 0 599 logging.info('Performing a scan with a max timeout of %d seconds.', 600 timeout_seconds) 601 while time.time() - start_time < timeout_seconds: 602 scan_results = self.scan(interface) 603 if scan_results is None or len(scan_results) == 0: 604 scan_failure_attempts += 1 605 # Allow in-progress scan to complete 606 time.sleep(5) 607 # If the in-progress scan takes more than 30 seconds to 608 # complete it will most likely never complete; abort. 609 # See crbug.com/309148. 610 if scan_failure_attempts > 5: 611 logging.error('Scan failed to run, see debug log for ' 612 'error code.') 613 return None 614 continue 615 scan_failure_attempts = 0 616 matching_bsses = list() 617 for iwbss in scan_results: 618 if bss is not None and iwbss.bss != bss: 619 continue 620 if ssid is not None and iwbss.ssid != ssid: 621 continue 622 matching_bsses.append(iwbss) 623 if len(matching_bsses) > 0: 624 return matching_bsses 625 626 if scan_failure_attempts > 0: 627 return None 628 # The SSID wasn't found, but the device is fine. 629 return list() 630 631 632 def wait_for_link(self, interface, timeout_seconds=10): 633 """Waits until a link completes on |interface|. 634 635 @param interface: which interface to run iw against. 636 @param timeout_seconds: the amount of time to wait in seconds. 637 638 @returns True if link was established before the timeout. 639 640 """ 641 start_time = time.time() 642 while time.time() - start_time < timeout_seconds: 643 link_results = self._run('%s dev %s link' % 644 (self._command_iw, interface)) 645 if 'Not connected' not in link_results.stdout: 646 return True 647 time.sleep(1) 648 return False 649 650 651 def set_antenna_bitmap(self, phy, tx_bitmap, rx_bitmap): 652 """Set antenna chain mask on given phy (radio). 653 654 This function will set the antennas allowed to use for TX and 655 RX on the |phy| based on the |tx_bitmap| and |rx_bitmap|. 656 This command is only allowed when the interfaces on the phy are down. 657 658 @param phy: phy name 659 @param tx_bitmap: bitmap of allowed antennas to use for TX 660 @param rx_bitmap: bitmap of allowed antennas to use for RX 661 662 """ 663 command = '%s phy %s set antenna %d %d' % (self._command_iw, phy, 664 tx_bitmap, rx_bitmap) 665 self._run(command) 666 667 668 def get_event_logger(self): 669 """Create and return a IwEventLogger object. 670 671 @returns a IwEventLogger object. 672 673 """ 674 local_file = IW_LOCAL_EVENT_LOG_FILE % (self._log_id) 675 self._log_id += 1 676 return iw_event_logger.IwEventLogger(self._host, self._command_iw, 677 local_file) 678 679 680 def vht_supported(self): 681 """Returns True if VHT is supported; False otherwise.""" 682 result = self._run('%s list' % self._command_iw).stdout 683 if 'VHT Capabilities' in result: 684 return True 685 return False 686