iw_runner.py revision 6ddeba733cfdb86ae4399159a2d8c1b71ed54fa6
1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import re
7import time
8
9from autotest_lib.client.common_lib import utils
10
11
12HT20 = 'HT20'
13HT40_ABOVE = 'HT40+'
14HT40_BELOW = 'HT40-'
15
16SECURITY_OPEN = 'open'
17SECURITY_WEP = 'wep'
18SECURITY_WPA = 'wpa'
19SECURITY_WPA2 = 'wpa2'
20# MIxed mode security is WPA2/WPA
21SECURITY_MIXED = 'mixed'
22
23# Table of lookups between the output of item 'secondary channel offset:' from
24# iw <device> scan to constants.
25
26HT_TABLE = {'no secondary': HT20,
27            'above': HT40_ABOVE,
28            'below': HT40_BELOW}
29
30IwBand = collections.namedtuple('Band', ['num', 'frequencies', 'mcs_indices'])
31IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security',
32                                         'ht'])
33# The fields for IwPhy are as follows:
34#   name: string name of the phy, such as "phy0"
35#   bands: list of IwBand objects.
36#   modes: List of strings containing interface modes supported, such as "AP".
37#   command: List of strings containing nl80211 commands supported, such as
38#          "authenticate".
39IwPhy = collections.namedtuple('Phy', ['name', 'bands', 'modes', 'commands'])
40
41DEFAULT_COMMAND_IW = 'iw'
42
43class IwRunner(object):
44    """Defines an interface to the 'iw' command."""
45
46
47    def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW):
48        self._run = utils.run
49        if remote_host:
50            self._run = remote_host.run
51        self._command_iw = command_iw
52
53
54    def add_interface(self, phy, interface, interface_type):
55        """
56        Add an interface to a WiFi PHY.
57
58        @param phy: string name of PHY to add an interface to.
59        @param interface: string name of interface to add.
60        @param interface_type: string type of interface to add (e.g. 'monitor').
61
62        """
63        self._run('%s phy %s interface add %s type %s' %
64                  (self._command_iw, phy, interface, interface_type))
65
66
67    def disconnect_station(self, interface):
68        """
69        Disconnect a STA from a network.
70
71        @param interface: string name of interface to disconnect.
72
73        """
74        self._run('%s dev %s disconnect' % (self._command_iw, interface))
75
76
77    def ibss_join(self, interface, ssid, frequency):
78        """
79        Join a WiFi interface to an IBSS.
80
81        @param interface: string name of interface to join to the IBSS.
82        @param ssid: string SSID of IBSS to join.
83        @param frequency: int frequency of IBSS in Mhz.
84
85        """
86        self._run('%s dev %s ibss join %s %d' %
87                  (self._command_iw, interface, ssid, frequency))
88
89
90    def ibss_leave(self, interface):
91        """
92        Leave an IBSS.
93
94        @param interface: string name of interface to remove from the IBSS.
95
96        """
97        self._run('%s dev %s ibss leave' % (self._command_iw, interface))
98
99
100    def list_interfaces(self):
101        """@return list of string WiFi interface names on device."""
102        output = self._run('%s dev' % self._command_iw).stdout
103        interfaces = []
104        for line in output.splitlines():
105            m = re.match('[\s]*Interface (.*)', line)
106            if m:
107                interfaces.append(m.group(1))
108
109        return interfaces
110
111
112    def list_phys(self):
113        """
114        List WiFi PHYs on the given host.
115
116        @return list of IwPhy tuples.
117
118        """
119        output = self._run('%s list' % self._command_iw).stdout
120        current_phy = None
121        current_band = None
122        current_section = None
123        all_phys = []
124        for line in output.splitlines():
125            match_phy = re.search('Wiphy (.*)', line)
126            if match_phy:
127                current_phy = IwPhy(name=match_phy.group(1), bands=[], modes=[],
128                                    commands=[])
129                all_phys.append(current_phy)
130                continue
131
132            match_section = re.match('\s*(\w.*):', line)
133            if match_section:
134                current_section = match_section.group(1)
135                match_band = re.match('Band (\d+)', current_section)
136                if match_band:
137                    current_band = IwBand(num=int(match_band.group(1)),
138                                          frequencies=[],
139                                          mcs_indices=[])
140                    current_phy.bands.append(current_band)
141                continue
142
143            if current_section == 'Supported interface modes' and current_phy:
144                mode_match = re.search('\* (\w+)', line)
145                if mode_match:
146                    current_phy.modes.append(mode_match.group(1))
147                    continue
148
149            if current_section == 'Supported commands' and current_phy:
150                command_match = re.search('\* (\w+)', line)
151                if command_match:
152                    current_phy.commands.append(command_match.group(1))
153                    continue
154
155            if not all([current_band, current_phy, line.startswith('\t')]):
156                continue
157
158            mhz_match = re.search('(\d+) MHz', line)
159            if mhz_match:
160                current_band.frequencies.append(int(mhz_match.group(1)))
161                continue
162
163            # re_mcs needs to match something like:
164            # HT TX/RX MCS rate indexes supported: 0-15, 32
165            if re.search('HT TX/RX MCS rate indexes supported: ', line):
166                rate_string = line.split(':')[1].strip()
167                for piece in rate_string.split(','):
168                    if piece.find('-') > 0:
169                        # Must be a range like '  0-15'
170                        begin, end = piece.split('-')
171                        for index in range(int(begin), int(end) + 1):
172                            current_band.mcs_indices.append(index)
173                    else:
174                        # Must be a single rate like '32   '
175                        current_band.mcs_indices.append(int(piece))
176        return all_phys
177
178
179    def remove_interface(self, interface, ignore_status=False):
180        """
181        Remove a WiFi interface from a PHY.
182
183        @param interface: string name of interface (e.g. mon0)
184        @param ignore_status: boolean True iff we should ignore failures
185                to remove the interface.
186
187        """
188        self._run('%s dev %s del' % (self._command_iw, interface),
189                  ignore_status=ignore_status)
190
191
192    def determine_security(self, supported_securities):
193        """Determines security from the given list of supported securities.
194
195        @param supported_securities: list of supported securities from scan
196
197        """
198        if not supported_securities:
199            security = SECURITY_OPEN
200        elif len(supported_securities) == 1:
201            security = supported_securities[0]
202        else:
203            security = SECURITY_MIXED
204        return security
205
206
207    def scan(self, interface, frequencies=(), ssids=()):
208        """Performs a scan.
209
210        @param interface: the interface to run the iw command against
211        @param frequencies: list of int frequencies in Mhz to scan.
212        @param ssids: list of string SSIDs to send probe requests for.
213
214        @returns a list of IwBss collections; None if the scan fails
215
216        """
217        freq_param = ''
218        if frequencies:
219            freq_param = ' freq %s' % ' '.join(map(str, frequencies))
220        ssid_param = ''
221        if ssids:
222           ssid_param = ' ssid "%s"' % '" "'.join(ssids)
223
224        command = str('%s dev %s scan%s%s' % (self._command_iw, interface,
225                                              freq_param, ssid_param))
226        scan = self._run(command, ignore_status=True)
227        if scan.exit_status != 0:
228            # The device was busy
229           return None
230
231        bss = None
232        frequency = None
233        ssid = None
234        ht = None
235        security = None
236
237        supported_securities = []
238        bss_list = []
239
240        for line in scan.stdout.splitlines():
241            line = line.strip()
242            if line.startswith('BSS'):
243                if bss != None:
244                    security = self.determine_security(supported_securities)
245                    iwbss = IwBss(bss, frequency, ssid, security, ht)
246                    bss_list.append(iwbss)
247                    bss = frequency = ssid = security = ht = None
248                    supported_securities = []
249                bss = line.split()[1]
250            if line.startswith('freq:'):
251                frequency = int(line.split()[1])
252            if line.startswith('SSID:'):
253                ssid = line.split()
254                if len(ssid) > 1:
255                    ssid = ssid[1]
256                else:
257                    ssid = None
258            if line.startswith('* secondary channel offset'):
259                ht = HT_TABLE[line.split(':')[1].strip()]
260            if line.startswith('WPA'):
261               supported_securities.append(SECURITY_WPA)
262            if line.startswith('RSN'):
263               supported_securities.append(SECURITY_WPA2)
264        security = self.determine_security(supported_securities)
265        bss_list.append(IwBss(bss, frequency, ssid, security, ht))
266        return bss_list
267
268
269    def set_tx_power(self, interface, power):
270        """
271        Set the transmission power for an interface.
272
273        @param interface: string name of interface to set Tx power on.
274        @param power: string power parameter. (e.g. 'auto').
275
276        """
277        self._run('%s dev %s set txpower %s' %
278                  (self._command_iw, interface, power))
279
280
281    def set_regulatory_domain(self, domain_string):
282        """
283        Set the regulatory domain of the current machine.
284
285        @param domain_string: string regulatory domain name (e.g. 'US').
286
287        """
288        self._run('%s reg set %s' % (self._command_iw, domain_string))
289
290
291    def wait_for_scan_result(self, interface, bss=None, ssid=None,
292                             timeout_seconds=30):
293        """Returns a IWBSS object for a network with the given bssed or ssid.
294
295        @param interface: which interface to run iw against
296        @param bss: BSS as a string
297        @param ssid: ssid as a string
298        @param timeout_seconds: the amount of time to wait in seconds
299
300        @returns a list of IwBss collections that contain the given bss or ssid
301
302        """
303        start_time = time.time()
304        while time.time() - start_time < timeout_seconds:
305            scan_results = self.scan(interface)
306            if scan_results is None:
307                time.sleep(5) ## allow in-progress scan to complete
308                continue
309            matching_bsses = []
310            for iwbss in scan_results:
311                if bss is not None and iwbss.bss != bss:
312                    continue
313                if ssid is not None and iwbss.ssid != ssid:
314                    continue
315                matching_bsses.append(iwbss)
316            if len(matching_bsses) > 0:
317                return matching_bsses
318
319
320    def wait_for_link(self, interface, timeout_seconds=10):
321        """Waits until a link completes on |interface|.
322
323        @param interface: which interface to run iw against.
324        @param timeout_seconds: the amount of time to wait in seconds.
325
326        @returns True if link was established before the timeout.
327
328        """
329        start_time = time.time()
330        while time.time() - start_time < timeout_seconds:
331            link_results = self._run('%s dev %s link' %
332                                     (self._command_iw, interface))
333            if 'Not connected' not in link_results.stdout:
334                return True
335            time.sleep(1)
336        return False
337