iw_runner.py revision 6ddeba733cfdb86ae4399159a2d8c1b71ed54fa6
1# Copyright (c) 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import re 7import time 8 9from autotest_lib.client.common_lib import utils 10 11 12HT20 = 'HT20' 13HT40_ABOVE = 'HT40+' 14HT40_BELOW = 'HT40-' 15 16SECURITY_OPEN = 'open' 17SECURITY_WEP = 'wep' 18SECURITY_WPA = 'wpa' 19SECURITY_WPA2 = 'wpa2' 20# MIxed mode security is WPA2/WPA 21SECURITY_MIXED = 'mixed' 22 23# Table of lookups between the output of item 'secondary channel offset:' from 24# iw <device> scan to constants. 25 26HT_TABLE = {'no secondary': HT20, 27 'above': HT40_ABOVE, 28 'below': HT40_BELOW} 29 30IwBand = collections.namedtuple('Band', ['num', 'frequencies', 'mcs_indices']) 31IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security', 32 'ht']) 33# The fields for IwPhy are as follows: 34# name: string name of the phy, such as "phy0" 35# bands: list of IwBand objects. 36# modes: List of strings containing interface modes supported, such as "AP". 37# command: List of strings containing nl80211 commands supported, such as 38# "authenticate". 39IwPhy = collections.namedtuple('Phy', ['name', 'bands', 'modes', 'commands']) 40 41DEFAULT_COMMAND_IW = 'iw' 42 43class IwRunner(object): 44 """Defines an interface to the 'iw' command.""" 45 46 47 def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW): 48 self._run = utils.run 49 if remote_host: 50 self._run = remote_host.run 51 self._command_iw = command_iw 52 53 54 def add_interface(self, phy, interface, interface_type): 55 """ 56 Add an interface to a WiFi PHY. 57 58 @param phy: string name of PHY to add an interface to. 59 @param interface: string name of interface to add. 60 @param interface_type: string type of interface to add (e.g. 'monitor'). 61 62 """ 63 self._run('%s phy %s interface add %s type %s' % 64 (self._command_iw, phy, interface, interface_type)) 65 66 67 def disconnect_station(self, interface): 68 """ 69 Disconnect a STA from a network. 70 71 @param interface: string name of interface to disconnect. 72 73 """ 74 self._run('%s dev %s disconnect' % (self._command_iw, interface)) 75 76 77 def ibss_join(self, interface, ssid, frequency): 78 """ 79 Join a WiFi interface to an IBSS. 80 81 @param interface: string name of interface to join to the IBSS. 82 @param ssid: string SSID of IBSS to join. 83 @param frequency: int frequency of IBSS in Mhz. 84 85 """ 86 self._run('%s dev %s ibss join %s %d' % 87 (self._command_iw, interface, ssid, frequency)) 88 89 90 def ibss_leave(self, interface): 91 """ 92 Leave an IBSS. 93 94 @param interface: string name of interface to remove from the IBSS. 95 96 """ 97 self._run('%s dev %s ibss leave' % (self._command_iw, interface)) 98 99 100 def list_interfaces(self): 101 """@return list of string WiFi interface names on device.""" 102 output = self._run('%s dev' % self._command_iw).stdout 103 interfaces = [] 104 for line in output.splitlines(): 105 m = re.match('[\s]*Interface (.*)', line) 106 if m: 107 interfaces.append(m.group(1)) 108 109 return interfaces 110 111 112 def list_phys(self): 113 """ 114 List WiFi PHYs on the given host. 115 116 @return list of IwPhy tuples. 117 118 """ 119 output = self._run('%s list' % self._command_iw).stdout 120 current_phy = None 121 current_band = None 122 current_section = None 123 all_phys = [] 124 for line in output.splitlines(): 125 match_phy = re.search('Wiphy (.*)', line) 126 if match_phy: 127 current_phy = IwPhy(name=match_phy.group(1), bands=[], modes=[], 128 commands=[]) 129 all_phys.append(current_phy) 130 continue 131 132 match_section = re.match('\s*(\w.*):', line) 133 if match_section: 134 current_section = match_section.group(1) 135 match_band = re.match('Band (\d+)', current_section) 136 if match_band: 137 current_band = IwBand(num=int(match_band.group(1)), 138 frequencies=[], 139 mcs_indices=[]) 140 current_phy.bands.append(current_band) 141 continue 142 143 if current_section == 'Supported interface modes' and current_phy: 144 mode_match = re.search('\* (\w+)', line) 145 if mode_match: 146 current_phy.modes.append(mode_match.group(1)) 147 continue 148 149 if current_section == 'Supported commands' and current_phy: 150 command_match = re.search('\* (\w+)', line) 151 if command_match: 152 current_phy.commands.append(command_match.group(1)) 153 continue 154 155 if not all([current_band, current_phy, line.startswith('\t')]): 156 continue 157 158 mhz_match = re.search('(\d+) MHz', line) 159 if mhz_match: 160 current_band.frequencies.append(int(mhz_match.group(1))) 161 continue 162 163 # re_mcs needs to match something like: 164 # HT TX/RX MCS rate indexes supported: 0-15, 32 165 if re.search('HT TX/RX MCS rate indexes supported: ', line): 166 rate_string = line.split(':')[1].strip() 167 for piece in rate_string.split(','): 168 if piece.find('-') > 0: 169 # Must be a range like ' 0-15' 170 begin, end = piece.split('-') 171 for index in range(int(begin), int(end) + 1): 172 current_band.mcs_indices.append(index) 173 else: 174 # Must be a single rate like '32 ' 175 current_band.mcs_indices.append(int(piece)) 176 return all_phys 177 178 179 def remove_interface(self, interface, ignore_status=False): 180 """ 181 Remove a WiFi interface from a PHY. 182 183 @param interface: string name of interface (e.g. mon0) 184 @param ignore_status: boolean True iff we should ignore failures 185 to remove the interface. 186 187 """ 188 self._run('%s dev %s del' % (self._command_iw, interface), 189 ignore_status=ignore_status) 190 191 192 def determine_security(self, supported_securities): 193 """Determines security from the given list of supported securities. 194 195 @param supported_securities: list of supported securities from scan 196 197 """ 198 if not supported_securities: 199 security = SECURITY_OPEN 200 elif len(supported_securities) == 1: 201 security = supported_securities[0] 202 else: 203 security = SECURITY_MIXED 204 return security 205 206 207 def scan(self, interface, frequencies=(), ssids=()): 208 """Performs a scan. 209 210 @param interface: the interface to run the iw command against 211 @param frequencies: list of int frequencies in Mhz to scan. 212 @param ssids: list of string SSIDs to send probe requests for. 213 214 @returns a list of IwBss collections; None if the scan fails 215 216 """ 217 freq_param = '' 218 if frequencies: 219 freq_param = ' freq %s' % ' '.join(map(str, frequencies)) 220 ssid_param = '' 221 if ssids: 222 ssid_param = ' ssid "%s"' % '" "'.join(ssids) 223 224 command = str('%s dev %s scan%s%s' % (self._command_iw, interface, 225 freq_param, ssid_param)) 226 scan = self._run(command, ignore_status=True) 227 if scan.exit_status != 0: 228 # The device was busy 229 return None 230 231 bss = None 232 frequency = None 233 ssid = None 234 ht = None 235 security = None 236 237 supported_securities = [] 238 bss_list = [] 239 240 for line in scan.stdout.splitlines(): 241 line = line.strip() 242 if line.startswith('BSS'): 243 if bss != None: 244 security = self.determine_security(supported_securities) 245 iwbss = IwBss(bss, frequency, ssid, security, ht) 246 bss_list.append(iwbss) 247 bss = frequency = ssid = security = ht = None 248 supported_securities = [] 249 bss = line.split()[1] 250 if line.startswith('freq:'): 251 frequency = int(line.split()[1]) 252 if line.startswith('SSID:'): 253 ssid = line.split() 254 if len(ssid) > 1: 255 ssid = ssid[1] 256 else: 257 ssid = None 258 if line.startswith('* secondary channel offset'): 259 ht = HT_TABLE[line.split(':')[1].strip()] 260 if line.startswith('WPA'): 261 supported_securities.append(SECURITY_WPA) 262 if line.startswith('RSN'): 263 supported_securities.append(SECURITY_WPA2) 264 security = self.determine_security(supported_securities) 265 bss_list.append(IwBss(bss, frequency, ssid, security, ht)) 266 return bss_list 267 268 269 def set_tx_power(self, interface, power): 270 """ 271 Set the transmission power for an interface. 272 273 @param interface: string name of interface to set Tx power on. 274 @param power: string power parameter. (e.g. 'auto'). 275 276 """ 277 self._run('%s dev %s set txpower %s' % 278 (self._command_iw, interface, power)) 279 280 281 def set_regulatory_domain(self, domain_string): 282 """ 283 Set the regulatory domain of the current machine. 284 285 @param domain_string: string regulatory domain name (e.g. 'US'). 286 287 """ 288 self._run('%s reg set %s' % (self._command_iw, domain_string)) 289 290 291 def wait_for_scan_result(self, interface, bss=None, ssid=None, 292 timeout_seconds=30): 293 """Returns a IWBSS object for a network with the given bssed or ssid. 294 295 @param interface: which interface to run iw against 296 @param bss: BSS as a string 297 @param ssid: ssid as a string 298 @param timeout_seconds: the amount of time to wait in seconds 299 300 @returns a list of IwBss collections that contain the given bss or ssid 301 302 """ 303 start_time = time.time() 304 while time.time() - start_time < timeout_seconds: 305 scan_results = self.scan(interface) 306 if scan_results is None: 307 time.sleep(5) ## allow in-progress scan to complete 308 continue 309 matching_bsses = [] 310 for iwbss in scan_results: 311 if bss is not None and iwbss.bss != bss: 312 continue 313 if ssid is not None and iwbss.ssid != ssid: 314 continue 315 matching_bsses.append(iwbss) 316 if len(matching_bsses) > 0: 317 return matching_bsses 318 319 320 def wait_for_link(self, interface, timeout_seconds=10): 321 """Waits until a link completes on |interface|. 322 323 @param interface: which interface to run iw against. 324 @param timeout_seconds: the amount of time to wait in seconds. 325 326 @returns True if link was established before the timeout. 327 328 """ 329 start_time = time.time() 330 while time.time() - start_time < timeout_seconds: 331 link_results = self._run('%s dev %s link' % 332 (self._command_iw, interface)) 333 if 'Not connected' not in link_results.stdout: 334 return True 335 time.sleep(1) 336 return False 337