iw_runner.py revision fc81be792d4e44542aacd7b41d0c1bce16d3cb2a
1# Copyright (c) 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import logging 7import re 8import time 9 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib import utils 12from autotest_lib.client.common_lib.cros.network import iw_event_logger 13 14 15HT20 = 'HT20' 16HT40_ABOVE = 'HT40+' 17HT40_BELOW = 'HT40-' 18 19SECURITY_OPEN = 'open' 20SECURITY_WEP = 'wep' 21SECURITY_WPA = 'wpa' 22SECURITY_WPA2 = 'wpa2' 23# MIxed mode security is WPA2/WPA 24SECURITY_MIXED = 'mixed' 25 26# Table of lookups between the output of item 'secondary channel offset:' from 27# iw <device> scan to constants. 28 29HT_TABLE = {'no secondary': HT20, 30 'above': HT40_ABOVE, 31 'below': HT40_BELOW} 32 33IwBand = collections.namedtuple('Band', ['num', 'frequencies', 'mcs_indices']) 34IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security', 35 'ht']) 36IwNetDev = collections.namedtuple('IwNetDev', ['phy', 'if_name', 'if_type']) 37IwTimedScan = collections.namedtuple('IwTimedScan', ['time', 'bss_list']) 38 39# The fields for IwPhy are as follows: 40# name: string name of the phy, such as "phy0" 41# bands: list of IwBand objects. 42# modes: List of strings containing interface modes supported, such as "AP". 43# command: List of strings containing nl80211 commands supported, such as 44# "authenticate". 45# max_scan_ssids: Maximum number of SSIDs which can be scanned at once. 46IwPhy = collections.namedtuple( 47 'Phy', ['name', 'bands', 'modes', 'commands', 'max_scan_ssids', 48 'avail_tx_antennas', 'avail_rx_antennas']) 49 50DEFAULT_COMMAND_IW = 'iw' 51 52# Time command to get elapsed time. Full path is used to avoid using the 53# built-in 'time' command from the bash shell 54IW_TIME_COMMAND = '/usr/local/bin/time -f "%e"' 55 56IW_LINK_KEY_BEACON_INTERVAL = 'beacon int' 57IW_LINK_KEY_DTIM_PERIOD = 'dtim period' 58IW_LINK_KEY_FREQUENCY = 'freq' 59IW_LOCAL_EVENT_LOG_FILE = './debug/iw_event_%d.log' 60 61 62class IwRunner(object): 63 """Defines an interface to the 'iw' command.""" 64 65 66 def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW): 67 self._run = utils.run 68 self._host = remote_host 69 if remote_host: 70 self._run = remote_host.run 71 self._command_iw = command_iw 72 self._log_id = 0 73 74 75 def _parse_scan_results(self, output): 76 """Parse the output of the 'scan' and 'scan dump' commands. 77 78 @param output: string command output. 79 80 @returns a list of IwBss namedtuples; None if the scan fails 81 82 """ 83 bss = None 84 frequency = None 85 ssid = None 86 ht = None 87 security = None 88 supported_securities = [] 89 bss_list = [] 90 for line in output.splitlines(): 91 line = line.strip() 92 bss_match = re.match('BSS ([0-9a-f:]+)', line) 93 if bss_match: 94 if bss != None: 95 security = self.determine_security(supported_securities) 96 iwbss = IwBss(bss, frequency, ssid, security, ht) 97 bss_list.append(iwbss) 98 bss = frequency = ssid = security = ht = None 99 supported_securities = [] 100 bss = bss_match.group(1) 101 if line.startswith('freq:'): 102 frequency = int(line.split()[1]) 103 if line.startswith('SSID:'): 104 ssid = line.split() 105 if len(ssid) > 1: 106 ssid = ssid[1] 107 else: 108 ssid = None 109 if line.startswith('* secondary channel offset'): 110 ht = HT_TABLE[line.split(':')[1].strip()] 111 if line.startswith('WPA'): 112 supported_securities.append(SECURITY_WPA) 113 if line.startswith('RSN'): 114 supported_securities.append(SECURITY_WPA2) 115 security = self.determine_security(supported_securities) 116 bss_list.append(IwBss(bss, frequency, ssid, security, ht)) 117 return bss_list 118 119 120 def add_interface(self, phy, interface, interface_type): 121 """ 122 Add an interface to a WiFi PHY. 123 124 @param phy: string name of PHY to add an interface to. 125 @param interface: string name of interface to add. 126 @param interface_type: string type of interface to add (e.g. 'monitor'). 127 128 """ 129 self._run('%s phy %s interface add %s type %s' % 130 (self._command_iw, phy, interface, interface_type)) 131 132 133 def disconnect_station(self, interface): 134 """ 135 Disconnect a STA from a network. 136 137 @param interface: string name of interface to disconnect. 138 139 """ 140 self._run('%s dev %s disconnect' % (self._command_iw, interface)) 141 142 143 def get_interface(self, interface_name): 144 """Get full information about an interface given an interface name. 145 146 @param interface_name: string name of interface (e.g. 'wlan0'). 147 @return IwNetDev tuple. 148 149 """ 150 matching_interfaces = [iw_if for iw_if in self.list_interfaces() 151 if iw_if.if_name == interface_name] 152 if len(matching_interfaces) != 1: 153 raise error.TestFail('Could not find interface named %s' % 154 interface_name) 155 156 return matching_interfaces[0] 157 158 159 def get_link_value(self, interface, iw_link_key, ignore_failures=False): 160 """Get the value of a link property for |interface|. 161 162 This command parses fields of iw link: 163 164 #> iw dev wlan0 link 165 Connected to 74:e5:43:10:4f:c0 (on wlan0) 166 SSID: PMKSACaching_4m9p5_ch1 167 freq: 5220 168 RX: 5370 bytes (37 packets) 169 TX: 3604 bytes (15 packets) 170 signal: -59 dBm 171 tx bitrate: 13.0 MBit/s MCS 1 172 173 bss flags: short-slot-time 174 dtim period: 5 175 beacon int: 100 176 177 @param iw_link_key: string one of IW_LINK_KEY_* defined above. 178 @param interface: string desired value of iw link property. 179 180 """ 181 result = self._run('%s dev %s link' % (self._command_iw, interface), 182 ignore_status=ignore_failures) 183 if result.exit_status: 184 # When roaming, there is a period of time for mac80211 based drivers 185 # when the driver is 'associated' with an SSID but not a particular 186 # BSS. This causes iw to return an error code (-2) when attempting 187 # to retrieve information specific to the BSS. This does not happen 188 # in mwifiex drivers. 189 return None 190 191 find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key) 192 find_results = filter(bool, 193 map(find_re.match, result.stdout.splitlines())) 194 if not find_results: 195 if ignore_failures: 196 return None 197 198 raise error.TestFail('Could not find iw link property %s.' % 199 iw_link_key) 200 201 actual_value = find_results[0].group(1) 202 logging.info('Found iw link key %s with value %s.', 203 iw_link_key, actual_value) 204 return actual_value 205 206 207 def ibss_join(self, interface, ssid, frequency): 208 """ 209 Join a WiFi interface to an IBSS. 210 211 @param interface: string name of interface to join to the IBSS. 212 @param ssid: string SSID of IBSS to join. 213 @param frequency: int frequency of IBSS in Mhz. 214 215 """ 216 self._run('%s dev %s ibss join %s %d' % 217 (self._command_iw, interface, ssid, frequency)) 218 219 220 def ibss_leave(self, interface): 221 """ 222 Leave an IBSS. 223 224 @param interface: string name of interface to remove from the IBSS. 225 226 """ 227 self._run('%s dev %s ibss leave' % (self._command_iw, interface)) 228 229 230 def list_interfaces(self, desired_if_type=None): 231 """List WiFi related interfaces on this system. 232 233 @param desired_if_type: string type of interface to filter 234 our returned list of interfaces for (e.g. 'managed'). 235 236 @return list of IwNetDev tuples. 237 238 """ 239 output = self._run('%s dev' % self._command_iw).stdout 240 interfaces = [] 241 phy = None 242 if_name = None 243 if_type = None 244 for line in output.splitlines(): 245 m = re.match('phy#([0-9]+)', line) 246 if m: 247 phy = 'phy%d' % int(m.group(1)) 248 m = re.match('[\s]*Interface (.*)', line) 249 if m: 250 if_name = m.group(1) 251 # Common values for type are 'managed', 'monitor', and 'IBSS'. 252 m = re.match('[\s]*type ([a-zA-Z]+)', line) 253 if m: 254 if_type = m.group(1) 255 if phy and if_name and if_type: 256 interfaces.append(IwNetDev(phy=phy, if_name=if_name, 257 if_type=if_type)) 258 # One phy may have many interfaces, so don't reset it. 259 if_name = if_type = None 260 261 if desired_if_type: 262 interfaces = [interface for interface in interfaces 263 if interface.if_type == desired_if_type] 264 return interfaces 265 266 267 def list_phys(self): 268 """ 269 List WiFi PHYs on the given host. 270 271 @return list of IwPhy tuples. 272 273 """ 274 output = self._run('%s list' % self._command_iw).stdout 275 276 pending_phy_name = None 277 current_band = None 278 current_section = None 279 all_phys = [] 280 281 def add_pending_phy(): 282 """Add the pending phy into |all_phys|.""" 283 bands = tuple(IwBand(band.num, 284 tuple(band.frequencies), 285 tuple(band.mcs_indices)) 286 for band in pending_phy_bands) 287 new_phy = IwPhy(pending_phy_name, 288 bands, 289 tuple(pending_phy_modes), 290 tuple(pending_phy_commands), 291 pending_phy_max_scan_ssids, 292 pending_phy_tx_antennas, 293 pending_phy_rx_antennas) 294 all_phys.append(new_phy) 295 296 for line in output.splitlines(): 297 match_phy = re.search('Wiphy (.*)', line) 298 if match_phy: 299 if pending_phy_name: 300 add_pending_phy() 301 pending_phy_name = match_phy.group(1) 302 pending_phy_bands = [] 303 pending_phy_modes = [] 304 pending_phy_commands = [] 305 pending_phy_max_scan_ssids = None 306 pending_phy_tx_antennas = 0 307 pending_phy_rx_antennas = 0 308 continue 309 310 match_section = re.match('\s*(\w.*):\s*$', line) 311 if match_section: 312 current_section = match_section.group(1) 313 match_band = re.match('Band (\d+)', current_section) 314 if match_band: 315 current_band = IwBand(num=int(match_band.group(1)), 316 frequencies=[], 317 mcs_indices=[]) 318 pending_phy_bands.append(current_band) 319 continue 320 321 # Check for max_scan_ssids. This isn't a section, but it 322 # also isn't within a section. 323 match_max_scan_ssids = re.match('\s*max # scan SSIDs: (\d+)', 324 line) 325 if match_max_scan_ssids and pending_phy_name: 326 pending_phy_max_scan_ssids = int( 327 match_max_scan_ssids.group(1)) 328 continue 329 330 if (current_section == 'Supported interface modes' and 331 pending_phy_name): 332 mode_match = re.search('\* (\w+)', line) 333 if mode_match: 334 pending_phy_modes.append(mode_match.group(1)) 335 continue 336 337 if current_section == 'Supported commands' and pending_phy_name: 338 command_match = re.search('\* (\w+)', line) 339 if command_match: 340 pending_phy_commands.append(command_match.group(1)) 341 continue 342 343 match_avail_antennas = re.match('\s*Available Antennas: TX (\S+)' 344 ' RX (\S+)', line) 345 if match_avail_antennas and pending_phy_name: 346 pending_phy_tx_antennas = int( 347 match_avail_antennas.group(1), 16) 348 pending_phy_rx_antennas = int( 349 match_avail_antennas.group(2), 16) 350 continue 351 352 if not all([current_band, pending_phy_name, 353 line.startswith('\t')]): 354 continue 355 356 mhz_match = re.search('(\d+) MHz', line) 357 if mhz_match: 358 current_band.frequencies.append(int(mhz_match.group(1))) 359 continue 360 361 # re_mcs needs to match something like: 362 # HT TX/RX MCS rate indexes supported: 0-15, 32 363 if re.search('HT TX/RX MCS rate indexes supported: ', line): 364 rate_string = line.split(':')[1].strip() 365 for piece in rate_string.split(','): 366 if piece.find('-') > 0: 367 # Must be a range like ' 0-15' 368 begin, end = piece.split('-') 369 for index in range(int(begin), int(end) + 1): 370 current_band.mcs_indices.append(index) 371 else: 372 # Must be a single rate like '32 ' 373 current_band.mcs_indices.append(int(piece)) 374 if pending_phy_name: 375 add_pending_phy() 376 return all_phys 377 378 379 def remove_interface(self, interface, ignore_status=False): 380 """ 381 Remove a WiFi interface from a PHY. 382 383 @param interface: string name of interface (e.g. mon0) 384 @param ignore_status: boolean True iff we should ignore failures 385 to remove the interface. 386 387 """ 388 self._run('%s dev %s del' % (self._command_iw, interface), 389 ignore_status=ignore_status) 390 391 392 def determine_security(self, supported_securities): 393 """Determines security from the given list of supported securities. 394 395 @param supported_securities: list of supported securities from scan 396 397 """ 398 if not supported_securities: 399 security = SECURITY_OPEN 400 elif len(supported_securities) == 1: 401 security = supported_securities[0] 402 else: 403 security = SECURITY_MIXED 404 return security 405 406 407 def scan(self, interface, frequencies=(), ssids=()): 408 """Performs a scan. 409 410 @param interface: the interface to run the iw command against 411 @param frequencies: list of int frequencies in Mhz to scan. 412 @param ssids: list of string SSIDs to send probe requests for. 413 414 @returns a list of IwBss namedtuples; None if the scan fails 415 416 """ 417 scan_result = self.timed_scan(interface, frequencies, ssids) 418 if scan_result is None: 419 return None 420 return scan_result.bss_list 421 422 423 def timed_scan(self, interface, frequencies=(), ssids=()): 424 """Performs a timed scan. 425 426 @param interface: the interface to run the iw command against 427 @param frequencies: list of int frequencies in Mhz to scan. 428 @param ssids: list of string SSIDs to send probe requests for. 429 430 @returns a IwTimedScan namedtuple; None if the scan fails 431 432 """ 433 freq_param = '' 434 if frequencies: 435 freq_param = ' freq %s' % ' '.join(map(str, frequencies)) 436 ssid_param = '' 437 if ssids: 438 ssid_param = ' ssid "%s"' % '" "'.join(ssids) 439 440 command = '%s %s dev %s scan%s%s' % (IW_TIME_COMMAND, 441 self._command_iw, interface, freq_param, ssid_param) 442 scan = self._run(command, ignore_status=True) 443 if scan.exit_status != 0: 444 # The device was busy 445 logging.debug('scan exit_status: %d', scan.exit_status) 446 return None 447 if not scan.stdout: 448 logging.debug('Empty scan result') 449 bss_list = [] 450 else: 451 bss_list = self._parse_scan_results(scan.stdout) 452 scan_time = float(scan.stderr) 453 return IwTimedScan(scan_time, bss_list) 454 455 456 def scan_dump(self, interface): 457 """Dump the contents of the scan cache. 458 459 Note that this does not trigger a scan. Instead, it returns 460 the kernel's idea of what BSS's are currently visible. 461 462 @param interface: the interface to run the iw command against 463 464 @returns a list of IwBss namedtuples; None if the scan fails 465 466 """ 467 result = self._run('%s dev %s scan dump' % (self._command_iw, 468 interface)) 469 return self._parse_scan_results(result.stdout) 470 471 472 def set_tx_power(self, interface, power): 473 """ 474 Set the transmission power for an interface. 475 476 @param interface: string name of interface to set Tx power on. 477 @param power: string power parameter. (e.g. 'auto'). 478 479 """ 480 self._run('%s dev %s set txpower %s' % 481 (self._command_iw, interface, power)) 482 483 484 def set_freq(self, interface, freq): 485 """ 486 Set the frequency for an interface. 487 488 @param interface: string name of interface to set frequency on. 489 @param freq: int frequency 490 491 """ 492 self._run('%s dev %s set freq %d' % 493 (self._command_iw, interface, freq)) 494 495 496 def set_regulatory_domain(self, domain_string): 497 """ 498 Set the regulatory domain of the current machine. Note that 499 the regulatory change happens asynchronously to the exit of 500 this function. 501 502 @param domain_string: string regulatory domain name (e.g. 'US'). 503 504 """ 505 self._run('%s reg set %s' % (self._command_iw, domain_string)) 506 507 508 def get_regulatory_domain(self): 509 """ 510 Get the regulatory domain of the current machine. 511 512 @returns a string containing the 2-letter regulatory domain name 513 (e.g. 'US'). 514 515 """ 516 output = self._run('%s reg get' % self._command_iw).stdout 517 m = re.match('^country (..):', output) 518 if not m: 519 return None 520 return m.group(1) 521 522 523 def wait_for_scan_result(self, interface, bss=None, ssid=None, 524 timeout_seconds=30): 525 """Returns a IWBSS object for a network with the given bssed or ssid. 526 527 @param interface: which interface to run iw against 528 @param bss: BSS as a string 529 @param ssid: ssid as a string 530 @param timeout_seconds: the amount of time to wait in seconds 531 532 @returns a list of IwBss collections that contain the given bss or ssid; 533 if the scan is empty or returns an error code None is returned. 534 535 """ 536 start_time = time.time() 537 scan_failure_attempts = 0 538 logging.info('Performing a scan with a max timeout of %d seconds.', 539 timeout_seconds) 540 while time.time() - start_time < timeout_seconds: 541 scan_results = self.scan(interface) 542 if scan_results is None or len(scan_results) == 0: 543 scan_failure_attempts += 1 544 # Allow in-progress scan to complete 545 time.sleep(5) 546 # If the in-progress scan takes more than 30 seconds to 547 # complete it will most likely never complete; abort. 548 # See crbug.com/309148. 549 if scan_failure_attempts > 5: 550 logging.error('Scan failed to run, see debug log for ' 551 'error code.') 552 return None 553 continue 554 scan_failure_attempts = 0 555 matching_bsses = list() 556 for iwbss in scan_results: 557 if bss is not None and iwbss.bss != bss: 558 continue 559 if ssid is not None and iwbss.ssid != ssid: 560 continue 561 matching_bsses.append(iwbss) 562 if len(matching_bsses) > 0: 563 return matching_bsses 564 565 if scan_failure_attempts > 0: 566 return None 567 # The SSID wasn't found, but the device is fine. 568 return list() 569 570 571 def wait_for_link(self, interface, timeout_seconds=10): 572 """Waits until a link completes on |interface|. 573 574 @param interface: which interface to run iw against. 575 @param timeout_seconds: the amount of time to wait in seconds. 576 577 @returns True if link was established before the timeout. 578 579 """ 580 start_time = time.time() 581 while time.time() - start_time < timeout_seconds: 582 link_results = self._run('%s dev %s link' % 583 (self._command_iw, interface)) 584 if 'Not connected' not in link_results.stdout: 585 return True 586 time.sleep(1) 587 return False 588 589 590 def set_antenna_bitmap(self, phy, tx_bitmap, rx_bitmap): 591 """Set antenna chain mask on given phy (radio). 592 593 This function will set the antennas allowed to use for TX and 594 RX on the |phy| based on the |tx_bitmap| and |rx_bitmap|. 595 This command is only allowed when the interfaces on the phy are down. 596 597 @param phy: phy name 598 @param tx_bitmap: bitmap of allowed antennas to use for TX 599 @param rx_bitmap: bitmap of allowed antennas to use for RX 600 601 """ 602 command = '%s phy %s set antenna %d %d' % (self._command_iw, phy, 603 tx_bitmap, rx_bitmap) 604 self._run(command) 605 606 607 def get_event_logger(self): 608 """Create and return a IwEventLogger object. 609 610 @returns a IwEventLogger object. 611 612 """ 613 local_file = IW_LOCAL_EVENT_LOG_FILE % (self._log_id) 614 self._log_id += 1 615 return iw_event_logger.IwEventLogger(self._host, self._command_iw, 616 local_file) 617