iw_runner.py revision a618b1222a65809bc10660403bb13598f2cb587d
1# Copyright (c) 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import logging 7import re 8import time 9 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib import utils 12 13 14HT20 = 'HT20' 15HT40_ABOVE = 'HT40+' 16HT40_BELOW = 'HT40-' 17 18SECURITY_OPEN = 'open' 19SECURITY_WEP = 'wep' 20SECURITY_WPA = 'wpa' 21SECURITY_WPA2 = 'wpa2' 22# MIxed mode security is WPA2/WPA 23SECURITY_MIXED = 'mixed' 24 25# Table of lookups between the output of item 'secondary channel offset:' from 26# iw <device> scan to constants. 27 28HT_TABLE = {'no secondary': HT20, 29 'above': HT40_ABOVE, 30 'below': HT40_BELOW} 31 32IwBand = collections.namedtuple('Band', ['num', 'frequencies', 'mcs_indices']) 33IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security', 34 'ht']) 35IwNetDev = collections.namedtuple('IwNetDev', ['phy', 'if_name', 'if_type']) 36 37# The fields for IwPhy are as follows: 38# name: string name of the phy, such as "phy0" 39# bands: list of IwBand objects. 40# modes: List of strings containing interface modes supported, such as "AP". 41# command: List of strings containing nl80211 commands supported, such as 42# "authenticate". 43# max_scan_ssids: Maximum number of SSIDs which can be scanned at once. 44IwPhy = collections.namedtuple( 45 'Phy', ['name', 'bands', 'modes', 'commands', 'max_scan_ssids']) 46 47DEFAULT_COMMAND_IW = 'iw' 48 49IW_LINK_KEY_BEACON_INTERVAL = 'beacon int' 50IW_LINK_KEY_DTIM_PERIOD = 'dtim period' 51IW_LINK_KEY_FREQUENCY = 'freq' 52 53 54class IwRunner(object): 55 """Defines an interface to the 'iw' command.""" 56 57 58 def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW): 59 self._run = utils.run 60 if remote_host: 61 self._run = remote_host.run 62 self._command_iw = command_iw 63 64 65 def _parse_scan_results(self, output): 66 """Parse the output of the 'scan' and 'scan dump' commands. 67 68 @param output: string command output. 69 70 @returns a list of IwBss namedtuples; None if the scan fails 71 72 """ 73 bss = None 74 frequency = None 75 ssid = None 76 ht = None 77 security = None 78 supported_securities = [] 79 bss_list = [] 80 for line in output.splitlines(): 81 line = line.strip() 82 bss_match = re.match('BSS ([0-9a-f:]+)', line) 83 if bss_match: 84 if bss != None: 85 security = self.determine_security(supported_securities) 86 iwbss = IwBss(bss, frequency, ssid, security, ht) 87 bss_list.append(iwbss) 88 bss = frequency = ssid = security = ht = None 89 supported_securities = [] 90 bss = bss_match.group(1) 91 if line.startswith('freq:'): 92 frequency = int(line.split()[1]) 93 if line.startswith('SSID:'): 94 ssid = line.split() 95 if len(ssid) > 1: 96 ssid = ssid[1] 97 else: 98 ssid = None 99 if line.startswith('* secondary channel offset'): 100 ht = HT_TABLE[line.split(':')[1].strip()] 101 if line.startswith('WPA'): 102 supported_securities.append(SECURITY_WPA) 103 if line.startswith('RSN'): 104 supported_securities.append(SECURITY_WPA2) 105 security = self.determine_security(supported_securities) 106 bss_list.append(IwBss(bss, frequency, ssid, security, ht)) 107 return bss_list 108 109 110 def add_interface(self, phy, interface, interface_type): 111 """ 112 Add an interface to a WiFi PHY. 113 114 @param phy: string name of PHY to add an interface to. 115 @param interface: string name of interface to add. 116 @param interface_type: string type of interface to add (e.g. 'monitor'). 117 118 """ 119 self._run('%s phy %s interface add %s type %s' % 120 (self._command_iw, phy, interface, interface_type)) 121 122 123 def disconnect_station(self, interface): 124 """ 125 Disconnect a STA from a network. 126 127 @param interface: string name of interface to disconnect. 128 129 """ 130 self._run('%s dev %s disconnect' % (self._command_iw, interface)) 131 132 133 def get_interface(self, interface_name): 134 """Get full information about an interface given an interface name. 135 136 @param interface_name: string name of interface (e.g. 'wlan0'). 137 @return IwNetDev tuple. 138 139 """ 140 matching_interfaces = [iw_if for iw_if in self.list_interfaces() 141 if iw_if.if_name == interface_name] 142 if len(matching_interfaces) != 1: 143 raise error.TestFail('Could not find interface named %s' % 144 interface_name) 145 146 return matching_interfaces[0] 147 148 149 def get_link_value(self, interface, iw_link_key, ignore_failures=False): 150 """Get the value of a link property for |interface|. 151 152 This command parses fields of iw link: 153 154 #> iw dev wlan0 link 155 Connected to 74:e5:43:10:4f:c0 (on wlan0) 156 SSID: PMKSACaching_4m9p5_ch1 157 freq: 5220 158 RX: 5370 bytes (37 packets) 159 TX: 3604 bytes (15 packets) 160 signal: -59 dBm 161 tx bitrate: 13.0 MBit/s MCS 1 162 163 bss flags: short-slot-time 164 dtim period: 5 165 beacon int: 100 166 167 @param iw_link_key: string one of IW_LINK_KEY_* defined above. 168 @param interface: string desired value of iw link property. 169 170 """ 171 result = self._run('%s dev %s link' % (self._command_iw, interface), 172 ignore_status=ignore_failures) 173 if result.exit_status: 174 # When roaming, there is a period of time for mac80211 based drivers 175 # when the driver is 'associated' with an SSID but not a particular 176 # BSS. This causes iw to return an error code (-2) when attempting 177 # to retrieve information specific to the BSS. This does not happen 178 # in mwifiex drivers. 179 return None 180 181 find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key) 182 find_results = filter(bool, 183 map(find_re.match, result.stdout.splitlines())) 184 if not find_results: 185 if ignore_failures: 186 return None 187 188 raise error.TestFail('Could not find iw link property %s.' % 189 iw_link_key) 190 191 actual_value = find_results[0].group(1) 192 logging.info('Found iw link key %s with value %s.', 193 iw_link_key, actual_value) 194 return actual_value 195 196 197 def ibss_join(self, interface, ssid, frequency): 198 """ 199 Join a WiFi interface to an IBSS. 200 201 @param interface: string name of interface to join to the IBSS. 202 @param ssid: string SSID of IBSS to join. 203 @param frequency: int frequency of IBSS in Mhz. 204 205 """ 206 self._run('%s dev %s ibss join %s %d' % 207 (self._command_iw, interface, ssid, frequency)) 208 209 210 def ibss_leave(self, interface): 211 """ 212 Leave an IBSS. 213 214 @param interface: string name of interface to remove from the IBSS. 215 216 """ 217 self._run('%s dev %s ibss leave' % (self._command_iw, interface)) 218 219 220 def list_interfaces(self, desired_if_type=None): 221 """List WiFi related interfaces on this system. 222 223 @param desired_if_type: string type of interface to filter 224 our returned list of interfaces for (e.g. 'managed'). 225 226 @return list of IwNetDev tuples. 227 228 """ 229 output = self._run('%s dev' % self._command_iw).stdout 230 interfaces = [] 231 phy = None 232 if_name = None 233 if_type = None 234 for line in output.splitlines(): 235 m = re.match('phy#([0-9]+)', line) 236 if m: 237 phy = 'phy%d' % int(m.group(1)) 238 m = re.match('[\s]*Interface (.*)', line) 239 if m: 240 if_name = m.group(1) 241 # Common values for type are 'managed', 'monitor', and 'IBSS'. 242 m = re.match('[\s]*type ([a-zA-Z]+)', line) 243 if m: 244 if_type = m.group(1) 245 if phy and if_name and if_type: 246 interfaces.append(IwNetDev(phy=phy, if_name=if_name, 247 if_type=if_type)) 248 # One phy may have many interfaces, so don't reset it. 249 if_name = if_type = None 250 251 if desired_if_type: 252 interfaces = [interface for interface in interfaces 253 if interface.if_type == desired_if_type] 254 return interfaces 255 256 257 def list_phys(self): 258 """ 259 List WiFi PHYs on the given host. 260 261 @return list of IwPhy tuples. 262 263 """ 264 output = self._run('%s list' % self._command_iw).stdout 265 266 pending_phy_name = None 267 current_band = None 268 current_section = None 269 all_phys = [] 270 271 def add_pending_phy(): 272 """Add the pending phy into |all_phys|.""" 273 bands = tuple(IwBand(band.num, 274 tuple(band.frequencies), 275 tuple(band.mcs_indices)) 276 for band in pending_phy_bands) 277 new_phy = IwPhy(pending_phy_name, 278 bands, 279 tuple(pending_phy_modes), 280 tuple(pending_phy_commands), 281 pending_phy_max_scan_ssids) 282 all_phys.append(new_phy) 283 284 for line in output.splitlines(): 285 match_phy = re.search('Wiphy (.*)', line) 286 if match_phy: 287 if pending_phy_name: 288 add_pending_phy() 289 pending_phy_name = match_phy.group(1) 290 pending_phy_bands = [] 291 pending_phy_modes = [] 292 pending_phy_commands = [] 293 pending_phy_max_scan_ssids = None 294 continue 295 296 match_section = re.match('\s*(\w.*):\s*$', line) 297 if match_section: 298 current_section = match_section.group(1) 299 match_band = re.match('Band (\d+)', current_section) 300 if match_band: 301 current_band = IwBand(num=int(match_band.group(1)), 302 frequencies=[], 303 mcs_indices=[]) 304 pending_phy_bands.append(current_band) 305 continue 306 307 # Check for max_scan_ssids. This isn't a section, but it 308 # also isn't within a section. 309 match_max_scan_ssids = re.match('\s*max # scan SSIDs: (\d+)', 310 line) 311 if match_max_scan_ssids and pending_phy_name: 312 pending_phy_max_scan_ssids = int( 313 match_max_scan_ssids.group(1)) 314 continue 315 316 if (current_section == 'Supported interface modes' and 317 pending_phy_name): 318 mode_match = re.search('\* (\w+)', line) 319 if mode_match: 320 pending_phy_modes.append(mode_match.group(1)) 321 continue 322 323 if current_section == 'Supported commands' and pending_phy_name: 324 command_match = re.search('\* (\w+)', line) 325 if command_match: 326 pending_phy_commands.append(command_match.group(1)) 327 continue 328 329 if not all([current_band, pending_phy_name, 330 line.startswith('\t')]): 331 continue 332 333 mhz_match = re.search('(\d+) MHz', line) 334 if mhz_match: 335 current_band.frequencies.append(int(mhz_match.group(1))) 336 continue 337 338 # re_mcs needs to match something like: 339 # HT TX/RX MCS rate indexes supported: 0-15, 32 340 if re.search('HT TX/RX MCS rate indexes supported: ', line): 341 rate_string = line.split(':')[1].strip() 342 for piece in rate_string.split(','): 343 if piece.find('-') > 0: 344 # Must be a range like ' 0-15' 345 begin, end = piece.split('-') 346 for index in range(int(begin), int(end) + 1): 347 current_band.mcs_indices.append(index) 348 else: 349 # Must be a single rate like '32 ' 350 current_band.mcs_indices.append(int(piece)) 351 if pending_phy_name: 352 add_pending_phy() 353 return all_phys 354 355 356 def remove_interface(self, interface, ignore_status=False): 357 """ 358 Remove a WiFi interface from a PHY. 359 360 @param interface: string name of interface (e.g. mon0) 361 @param ignore_status: boolean True iff we should ignore failures 362 to remove the interface. 363 364 """ 365 self._run('%s dev %s del' % (self._command_iw, interface), 366 ignore_status=ignore_status) 367 368 369 def determine_security(self, supported_securities): 370 """Determines security from the given list of supported securities. 371 372 @param supported_securities: list of supported securities from scan 373 374 """ 375 if not supported_securities: 376 security = SECURITY_OPEN 377 elif len(supported_securities) == 1: 378 security = supported_securities[0] 379 else: 380 security = SECURITY_MIXED 381 return security 382 383 384 def scan(self, interface, frequencies=(), ssids=()): 385 """Performs a scan. 386 387 @param interface: the interface to run the iw command against 388 @param frequencies: list of int frequencies in Mhz to scan. 389 @param ssids: list of string SSIDs to send probe requests for. 390 391 @returns a list of IwBss namedtuples; None if the scan fails 392 393 """ 394 freq_param = '' 395 if frequencies: 396 freq_param = ' freq %s' % ' '.join(map(str, frequencies)) 397 ssid_param = '' 398 if ssids: 399 ssid_param = ' ssid "%s"' % '" "'.join(ssids) 400 401 command = str('%s dev %s scan%s%s' % (self._command_iw, interface, 402 freq_param, ssid_param)) 403 scan = self._run(command, ignore_status=True) 404 if scan.exit_status != 0: 405 # The device was busy 406 logging.debug('scan exit_status: %d', scan.exit_status) 407 return None 408 409 return self._parse_scan_results(scan.stdout) 410 411 412 def scan_dump(self, interface): 413 """Dump the contents of the scan cache. 414 415 Note that this does not trigger a scan. Instead, it returns 416 the kernel's idea of what BSS's are currently visible. 417 418 @param interface: the interface to run the iw command against 419 420 @returns a list of IwBss namedtuples; None if the scan fails 421 422 """ 423 result = self._run('%s dev %s scan dump' % (self._command_iw, 424 interface)) 425 return self._parse_scan_results(result.stdout) 426 427 428 def set_tx_power(self, interface, power): 429 """ 430 Set the transmission power for an interface. 431 432 @param interface: string name of interface to set Tx power on. 433 @param power: string power parameter. (e.g. 'auto'). 434 435 """ 436 self._run('%s dev %s set txpower %s' % 437 (self._command_iw, interface, power)) 438 439 440 def set_regulatory_domain(self, domain_string): 441 """ 442 Set the regulatory domain of the current machine. Note that 443 the regulatory change happens asynchronously to the exit of 444 this function. 445 446 @param domain_string: string regulatory domain name (e.g. 'US'). 447 448 """ 449 self._run('%s reg set %s' % (self._command_iw, domain_string)) 450 451 452 def get_regulatory_domain(self): 453 """ 454 Get the regulatory domain of the current machine. 455 456 @returns a string containing the 2-letter regulatory domain name 457 (e.g. 'US'). 458 459 """ 460 output = self._run('%s reg get' % self._command_iw).stdout 461 m = re.match('^country (..):', output) 462 if not m: 463 return None 464 return m.group(1) 465 466 467 def wait_for_scan_result(self, interface, bss=None, ssid=None, 468 timeout_seconds=30): 469 """Returns a IWBSS object for a network with the given bssed or ssid. 470 471 @param interface: which interface to run iw against 472 @param bss: BSS as a string 473 @param ssid: ssid as a string 474 @param timeout_seconds: the amount of time to wait in seconds 475 476 @returns a list of IwBss collections that contain the given bss or ssid 477 478 """ 479 start_time = time.time() 480 scan_failure_attempts = 0 481 logging.info('Performing a scan with a max timeout of %d seconds.', 482 timeout_seconds) 483 while time.time() - start_time < timeout_seconds: 484 scan_results = self.scan(interface) 485 if scan_results is None: 486 scan_failure_attempts += 1 487 # Allow in-progress scan to complete 488 time.sleep(5) 489 # If the in-progress scan takes more than 30 seconds to 490 # complete it will most likely never complete; abort. 491 # See crbug.com/309148. 492 if scan_failure_attempts > 5: 493 logging.error('Scan failed to run, see debug log for ' 494 'error code.') 495 return None 496 continue 497 scan_failure_attempts = 0 498 matching_bsses = [] 499 for iwbss in scan_results: 500 if bss is not None and iwbss.bss != bss: 501 continue 502 if ssid is not None and iwbss.ssid != ssid: 503 continue 504 matching_bsses.append(iwbss) 505 if len(matching_bsses) > 0: 506 return matching_bsses 507 508 509 def wait_for_link(self, interface, timeout_seconds=10): 510 """Waits until a link completes on |interface|. 511 512 @param interface: which interface to run iw against. 513 @param timeout_seconds: the amount of time to wait in seconds. 514 515 @returns True if link was established before the timeout. 516 517 """ 518 start_time = time.time() 519 while time.time() - start_time < timeout_seconds: 520 link_results = self._run('%s dev %s link' % 521 (self._command_iw, interface)) 522 if 'Not connected' not in link_results.stdout: 523 return True 524 time.sleep(1) 525 return False 526