1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus 6import logging 7import os 8import time 9 10from autotest_lib.client.bin import test 11from autotest_lib.client.bin import utils 12from autotest_lib.client.common_lib import error 13from autotest_lib.client.cros.cellular import mm1_constants 14from autotest_lib.client.cros.cellular.pseudomodem import pm_constants 15from autotest_lib.client.cros.cellular.pseudomodem import pseudomodem_context 16from autotest_lib.client.cros.networking import cellular_proxy 17 18# Used for software message propagation latencies. 19SHORT_TIMEOUT_SECONDS = 2 20STATE_MACHINE_SCAN = 'ScanMachine' 21TEST_MODEMS_MODULE_PATH = os.path.join(os.path.dirname(__file__), 'files', 22 'modems.py') 23 24class network_3GScanningProperty(test.test): 25 """ 26 Test that the |Scanning| Property of the shill cellular device object is 27 updated correctly in the following two scenarios: 28 (1) When a user requests a network scan using the |ProposeScan| method of 29 the cellular device. 30 (2) During the initial modem enable-register-connect sequence. 31 32 """ 33 version = 1 34 35 def _find_mm_modem(self): 36 """ 37 Find the modemmanager modem object. 38 39 Assumption: There is only one modem in the system. 40 41 @raises: TestError unless exactly one modem is found. 42 43 """ 44 object_manager = dbus.Interface( 45 self._bus.get_object(mm1_constants.I_MODEM_MANAGER, 46 mm1_constants.MM1), 47 mm1_constants.I_OBJECT_MANAGER) 48 try: 49 modems = object_manager.GetManagedObjects() 50 except dbus.exceptions.DBusException as e: 51 raise error.TestFail('Failed to list the available modems. ' 52 'DBus error: |%s|', repr(e)) 53 if len(modems) != 1: 54 raise error.TestFail('Expected one modem object, found %d' % 55 len(modems)) 56 57 modem_path = modems.keys()[0] 58 modem_object = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, 59 modem_path) 60 # Check that this object is valid 61 try: 62 modem_object.GetAll(mm1_constants.I_MODEM, 63 dbus_interface=mm1_constants.I_PROPERTIES) 64 except dbus.exceptions.DBusException as e: 65 raise error.TestFail('Failed to obtain dbus object for the modem ' 66 'DBus error: |%s|', repr(e)) 67 68 return dbus.Interface(modem_object, mm1_constants.I_MODEM) 69 70 71 def _check_mm_state(self, modem, states): 72 """ 73 Verify that the modemmanager state is |state|. 74 75 @param modem: A DBus object for the modemmanager modem. 76 @param states: The expected state of the modem. This is either a single 77 state, or a list of states. 78 @raises: TestError if the state differs. 79 """ 80 if not isinstance(states, list): 81 states = [states] 82 properties = modem.GetAll(mm1_constants.I_MODEM, 83 dbus_interface=mm1_constants.I_PROPERTIES) 84 actual_state = properties[mm1_constants.MM_MODEM_PROPERTY_NAME_STATE] 85 if actual_state not in states: 86 state_names = [mm1_constants.ModemStateToString(x) for x in states] 87 raise error.TestFail( 88 'Expected modemmanager modem state to be one of %s but ' 89 'found %s' % 90 (state_names, 91 mm1_constants.ModemStateToString(actual_state))) 92 93 94 def _check_shill_property_update(self, cellular_device, property_name, 95 old_state, new_state): 96 """ 97 Check the value of property of shill. 98 99 @param cellular_device: The DBus proxy object for the cellular device. 100 @param property_name: Name of the property to check. 101 @param old_state: old value of property. 102 @param new_state: new expected value of property. 103 @raises: TestError if the property fails to enter the given state. 104 105 """ 106 # If we don't expect a change in the value, there is a race between this 107 # check and a possible (erronous) update of the value. Allow some time 108 # for the property to be updated before checking. 109 if old_state == new_state: 110 time.sleep(SHORT_TIMEOUT_SECONDS) 111 polling_timeout = 0 112 else: 113 polling_timeout = SHORT_TIMEOUT_SECONDS 114 success, _, _ = self._cellular_proxy.wait_for_property_in( 115 cellular_device, 116 property_name, 117 (new_state,), 118 timeout_seconds=polling_timeout) 119 if not success: 120 raise error.TestFail('Shill failed to set |%s| to %s.' % 121 (property_name, str(new_state))) 122 123 124 def _itesting_machine(self, machine_name, timeout=SHORT_TIMEOUT_SECONDS): 125 """ 126 Get the testing interface of the given interactive state machine. 127 128 @param machine_name: The name of the interactive state machine. 129 @return dbus.Interface for the testing interface of 130 InteractiveScanningMachine, if found. None otherwise. 131 @raises utils.TimeoutError if a valid dbus object can't be found. 132 133 """ 134 def _get_machine(): 135 machine = self._bus.get_object( 136 mm1_constants.I_MODEM_MANAGER, 137 '/'.join([pm_constants.TESTING_PATH, machine_name])) 138 if machine: 139 i_machine = dbus.Interface(machine, pm_constants.I_TESTING_ISM) 140 # Only way to know if this DBus object is valid is to call a 141 # method on it. 142 try: 143 i_machine.IsWaiting() # Ignore result. 144 return i_machine 145 except dbus.exceptions.DBusException as e: 146 logging.debug(e) 147 return None 148 149 utils.poll_for_condition(_get_machine, timeout=timeout) 150 return _get_machine() 151 152 153 def test_user_initiated_cellular_scan(self): 154 """ 155 Test that the |ProposeScan| DBus method exported by shill cellular 156 object correctly updates the cellular object |Scanning| property while 157 the scan is in progress. 158 """ 159 with pseudomodem_context.PseudoModemManagerContext( 160 True, 161 {'test-module' : TEST_MODEMS_MODULE_PATH, 162 'test-modem-class' : 'AsyncScanModem'}): 163 self._cellular_proxy = cellular_proxy.CellularProxy.get_proxy() 164 self._bus = dbus.SystemBus() 165 self._cellular_proxy.set_logging_for_cellular_test() 166 167 logging.info('Sanity check initial values') 168 utils.poll_for_condition( 169 self._cellular_proxy.find_cellular_device_object, 170 exception=error.TestFail( 171 'Bad initial state: Failed to obtain a cellular ' 172 'device in pseudomodem context.'), 173 timeout=SHORT_TIMEOUT_SECONDS) 174 device = self._cellular_proxy.find_cellular_device_object() 175 try: 176 self._itesting_machine(STATE_MACHINE_SCAN, 0) 177 raise error.TestFail('Bad initial state: scan machine created ' 178 'by pseudomodem before scan is proposed.') 179 except utils.TimeoutError: 180 pass 181 182 self._check_shill_property_update( 183 device, 184 self._cellular_proxy.DEVICE_PROPERTY_SCANNING, 185 False, 186 False) 187 188 logging.info('Test actions and checks') 189 device.ProposeScan() 190 try: 191 itesting_scan_machine = self._itesting_machine( 192 STATE_MACHINE_SCAN) 193 except utils.TimeoutError: 194 raise error.TestFail('Pseudomodem failed to launch %s' % 195 STATE_MACHINE_SCAN) 196 utils.poll_for_condition( 197 itesting_scan_machine.IsWaiting, 198 exception=error.TestFail('Scan machine failed to enter ' 199 'scan state'), 200 timeout=SHORT_TIMEOUT_SECONDS) 201 self._check_shill_property_update( 202 device, 203 self._cellular_proxy.DEVICE_PROPERTY_SCANNING, 204 False, 205 True) 206 207 itesting_scan_machine.Advance() 208 utils.poll_for_condition( 209 lambda: not itesting_scan_machine.IsWaiting(), 210 exception=error.TestFail('Scan machine failed to exit ' 211 'scan state'), 212 timeout=SHORT_TIMEOUT_SECONDS) 213 self._check_shill_property_update( 214 device, 215 self._cellular_proxy.DEVICE_PROPERTY_SCANNING, 216 True, 217 False) 218 219 220 def test_activated_service_states(self): 221 """ 222 Test that shill |Scanning| property is updated correctly when an 223 activated 3GPP service connects. 224 """ 225 with pseudomodem_context.PseudoModemManagerContext( 226 True, 227 {'test-module' : TEST_MODEMS_MODULE_PATH, 228 'test-state-machine-factory-class' : 229 'InteractiveStateMachineFactory'}): 230 self._cellular_proxy = cellular_proxy.CellularProxy.get_proxy() 231 self._bus = dbus.SystemBus() 232 self._cellular_proxy.set_logging_for_cellular_test() 233 234 logging.info('Sanity check initial values') 235 enable_machine = self._itesting_machine( 236 pm_constants.STATE_MACHINE_ENABLE) 237 utils.poll_for_condition( 238 enable_machine.IsWaiting, 239 exception=error.TestFail( 240 'Bad initial state: Pseudomodem did not launch ' 241 'Enable machine'), 242 timeout=SHORT_TIMEOUT_SECONDS) 243 utils.poll_for_condition( 244 self._cellular_proxy.find_cellular_device_object, 245 exception=error.TestFail( 246 'Bad initial state: Failed to obtain a cellular ' 247 'device in pseudomodem context.'), 248 timeout=SHORT_TIMEOUT_SECONDS) 249 device = self._cellular_proxy.find_cellular_device_object() 250 mm_modem = self._find_mm_modem() 251 252 logging.info('Test Connect sequence') 253 self._check_mm_state(mm_modem, 254 mm1_constants.MM_MODEM_STATE_DISABLED) 255 self._check_shill_property_update( 256 device, 257 self._cellular_proxy.DEVICE_PROPERTY_POWERED, 258 False, 259 False) 260 self._check_shill_property_update( 261 device, 262 self._cellular_proxy.DEVICE_PROPERTY_SCANNING, 263 False, 264 False) 265 logging.info('Expectation met: |Scanning| is False in MM state ' 266 'Disabled') 267 enable_machine.Advance() 268 269 # MM state: Enabling 270 utils.poll_for_condition( 271 enable_machine.IsWaiting, 272 exception=error.TestFail('EnableMachine failed to wait in ' 273 'Enabling state'), 274 timeout=SHORT_TIMEOUT_SECONDS) 275 self._check_mm_state(mm_modem, 276 mm1_constants.MM_MODEM_STATE_ENABLING) 277 self._check_shill_property_update( 278 device, 279 self._cellular_proxy.DEVICE_PROPERTY_SCANNING, 280 False, 281 True) 282 logging.info('Expectation met: |Scanning| is True in MM state ' 283 'Enabling') 284 enable_machine.Advance() 285 286 # MM state: Enabled 287 utils.poll_for_condition( 288 enable_machine.IsWaiting, 289 exception=error.TestFail('EnableMachine failed to wait in ' 290 'Enabled state'), 291 timeout=SHORT_TIMEOUT_SECONDS) 292 # Finish the enable call. 293 enable_machine.Advance() 294 295 self._check_mm_state(mm_modem, mm1_constants.MM_MODEM_STATE_ENABLED) 296 self._check_shill_property_update( 297 device, 298 self._cellular_proxy.DEVICE_PROPERTY_POWERED, 299 False, 300 True) 301 self._check_shill_property_update( 302 device, 303 self._cellular_proxy.DEVICE_PROPERTY_SCANNING, 304 True, 305 True) 306 307 register_machine = self._itesting_machine( 308 pm_constants.STATE_MACHINE_REGISTER) 309 utils.poll_for_condition( 310 register_machine.IsWaiting, 311 exception=error.TestFail('SearchingMachine failed to wait ' 312 'in Enabled state'), 313 timeout=SHORT_TIMEOUT_SECONDS) 314 logging.info('Expectation met: |Scanning| is True in MM state ' 315 'Enabled') 316 register_machine.Advance() 317 318 # MM state: Searching 319 utils.poll_for_condition( 320 register_machine.IsWaiting, 321 exception=error.TestFail('SearchingMachine failed to wait ' 322 'in Searching state'), 323 timeout=SHORT_TIMEOUT_SECONDS) 324 self._check_mm_state(mm_modem, 325 mm1_constants.MM_MODEM_STATE_SEARCHING) 326 enable_machine.Advance() 327 self._check_shill_property_update( 328 device, 329 self._cellular_proxy.DEVICE_PROPERTY_SCANNING, 330 True, 331 True) 332 logging.info('Expectation met: |Scanning| is True in MM state ' 333 'Searching') 334 register_machine.Advance() 335 336 # MM state: >= Registered 337 utils.poll_for_condition( 338 self._cellular_proxy.find_cellular_service_object, 339 error.TestFail('Failed to create Cellular Service for a ' 340 'registered modem'), 341 timeout=SHORT_TIMEOUT_SECONDS) 342 self._check_mm_state(mm_modem, 343 [mm1_constants.MM_MODEM_STATE_REGISTERED, 344 mm1_constants.MM_MODEM_STATE_CONNECTING, 345 mm1_constants.MM_MODEM_STATE_CONNECTED]) 346 self._check_shill_property_update( 347 device, 348 self._cellular_proxy.DEVICE_PROPERTY_SCANNING, 349 True, 350 False) 351 logging.info('Expectation met: |Scanning| is False in MM state ' 352 'Registered') 353 354 355 def run_once(self): 356 """ Autotest entry function """ 357 self.test_user_initiated_cellular_scan() 358 self.test_activated_service_states() 359