1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6import logging
7import os
8import time
9
10from autotest_lib.client.bin import test
11from autotest_lib.client.bin import utils
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.cros.cellular import mm1_constants
14from autotest_lib.client.cros.cellular.pseudomodem import pm_constants
15from autotest_lib.client.cros.cellular.pseudomodem import pseudomodem_context
16from autotest_lib.client.cros.networking import cellular_proxy
17
18# Used for software message propagation latencies.
19SHORT_TIMEOUT_SECONDS = 2
20STATE_MACHINE_SCAN = 'ScanMachine'
21TEST_MODEMS_MODULE_PATH = os.path.join(os.path.dirname(__file__), 'files',
22                                       'modems.py')
23
24class network_3GScanningProperty(test.test):
25    """
26    Test that the |Scanning| Property of the shill cellular device object is
27    updated correctly in the following two scenarios:
28      (1) When a user requests a network scan using the |ProposeScan| method of
29          the cellular device.
30      (2) During the initial modem enable-register-connect sequence.
31
32    """
33    version = 1
34
35    def _find_mm_modem(self):
36        """
37        Find the modemmanager modem object.
38
39        Assumption: There is only one modem in the system.
40
41        @raises: TestError unless exactly one modem is found.
42
43        """
44        object_manager = dbus.Interface(
45                self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
46                                     mm1_constants.MM1),
47                mm1_constants.I_OBJECT_MANAGER)
48        try:
49            modems = object_manager.GetManagedObjects()
50        except dbus.exceptions.DBusException as e:
51            raise error.TestFail('Failed to list the available modems. '
52                                 'DBus error: |%s|', repr(e))
53        if len(modems) != 1:
54            raise error.TestFail('Expected one modem object, found %d' %
55                                 len(modems))
56
57        modem_path = modems.keys()[0]
58        modem_object = self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
59                                            modem_path)
60        # Check that this object is valid
61        try:
62            modem_object.GetAll(mm1_constants.I_MODEM,
63                                dbus_interface=mm1_constants.I_PROPERTIES)
64        except dbus.exceptions.DBusException as e:
65            raise error.TestFail('Failed to obtain dbus object for the modem '
66                                 'DBus error: |%s|', repr(e))
67
68        return dbus.Interface(modem_object, mm1_constants.I_MODEM)
69
70
71    def _check_mm_state(self, modem, states):
72        """
73        Verify that the modemmanager state is |state|.
74
75        @param modem: A DBus object for the modemmanager modem.
76        @param states: The expected state of the modem. This is either a single
77                state, or a list of states.
78        @raises: TestError if the state differs.
79        """
80        if not isinstance(states, list):
81            states = [states]
82        properties = modem.GetAll(mm1_constants.I_MODEM,
83                                  dbus_interface=mm1_constants.I_PROPERTIES)
84        actual_state = properties[mm1_constants.MM_MODEM_PROPERTY_NAME_STATE]
85        if actual_state not in states:
86            state_names = [mm1_constants.ModemStateToString(x) for x in states]
87            raise error.TestFail(
88                    'Expected modemmanager modem state to be one of %s but '
89                    'found %s' %
90                    (state_names,
91                     mm1_constants.ModemStateToString(actual_state)))
92
93
94    def _check_shill_property_update(self, cellular_device, property_name,
95                                     old_state, new_state):
96        """
97        Check the value of property of shill.
98
99        @param cellular_device: The DBus proxy object for the cellular device.
100        @param property_name: Name of the property to check.
101        @param old_state: old value of property.
102        @param new_state: new expected value of property.
103        @raises: TestError if the property fails to enter the given state.
104
105        """
106        # If we don't expect a change in the value, there is a race between this
107        # check and a possible (erronous) update of the value. Allow some time
108        # for the property to be updated before checking.
109        if old_state == new_state:
110            time.sleep(SHORT_TIMEOUT_SECONDS)
111            polling_timeout = 0
112        else:
113            polling_timeout = SHORT_TIMEOUT_SECONDS
114        success, _, _ = self._cellular_proxy.wait_for_property_in(
115                cellular_device,
116                property_name,
117                (new_state,),
118                timeout_seconds=polling_timeout)
119        if not success:
120            raise error.TestFail('Shill failed to set |%s| to %s.' %
121                                 (property_name, str(new_state)))
122
123
124    def _itesting_machine(self, machine_name, timeout=SHORT_TIMEOUT_SECONDS):
125        """
126        Get the testing interface of the given interactive state machine.
127
128        @param machine_name: The name of the interactive state machine.
129        @return dbus.Interface for the testing interface of
130                InteractiveScanningMachine, if found. None otherwise.
131        @raises utils.TimeoutError if a valid dbus object can't be found.
132
133        """
134        def _get_machine():
135            machine = self._bus.get_object(
136                    mm1_constants.I_MODEM_MANAGER,
137                    '/'.join([pm_constants.TESTING_PATH, machine_name]))
138            if machine:
139                i_machine = dbus.Interface(machine, pm_constants.I_TESTING_ISM)
140                # Only way to know if this DBus object is valid is to call a
141                # method on it.
142                try:
143                    i_machine.IsWaiting()  # Ignore result.
144                    return i_machine
145                except dbus.exceptions.DBusException as e:
146                    logging.debug(e)
147                    return None
148
149        utils.poll_for_condition(_get_machine, timeout=timeout)
150        return _get_machine()
151
152
153    def test_user_initiated_cellular_scan(self):
154        """
155        Test that the |ProposeScan| DBus method exported by shill cellular
156        object correctly updates the cellular object |Scanning| property while
157        the scan is in progress.
158        """
159        with pseudomodem_context.PseudoModemManagerContext(
160                True,
161                {'test-module' : TEST_MODEMS_MODULE_PATH,
162                 'test-modem-class' : 'AsyncScanModem'}):
163            self._cellular_proxy = cellular_proxy.CellularProxy.get_proxy()
164            self._bus = dbus.SystemBus()
165            self._cellular_proxy.set_logging_for_cellular_test()
166
167            logging.info('Sanity check initial values')
168            utils.poll_for_condition(
169                    self._cellular_proxy.find_cellular_device_object,
170                    exception=error.TestFail(
171                            'Bad initial state: Failed to obtain a cellular '
172                            'device in pseudomodem context.'),
173                    timeout=SHORT_TIMEOUT_SECONDS)
174            device = self._cellular_proxy.find_cellular_device_object()
175            try:
176                self._itesting_machine(STATE_MACHINE_SCAN, 0)
177                raise error.TestFail('Bad initial state: scan machine created '
178                                     'by pseudomodem before scan is proposed.')
179            except utils.TimeoutError:
180                pass
181
182            self._check_shill_property_update(
183                    device,
184                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
185                    False,
186                    False)
187
188            logging.info('Test actions and checks')
189            device.ProposeScan()
190            try:
191                itesting_scan_machine = self._itesting_machine(
192                        STATE_MACHINE_SCAN)
193            except utils.TimeoutError:
194                raise error.TestFail('Pseudomodem failed to launch %s' %
195                                     STATE_MACHINE_SCAN)
196            utils.poll_for_condition(
197                    itesting_scan_machine.IsWaiting,
198                    exception=error.TestFail('Scan machine failed to enter '
199                                             'scan state'),
200                    timeout=SHORT_TIMEOUT_SECONDS)
201            self._check_shill_property_update(
202                    device,
203                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
204                    False,
205                    True)
206
207            itesting_scan_machine.Advance()
208            utils.poll_for_condition(
209                    lambda: not itesting_scan_machine.IsWaiting(),
210                    exception=error.TestFail('Scan machine failed to exit '
211                                             'scan state'),
212                    timeout=SHORT_TIMEOUT_SECONDS)
213            self._check_shill_property_update(
214                    device,
215                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
216                    True,
217                    False)
218
219
220    def test_activated_service_states(self):
221        """
222        Test that shill |Scanning| property is updated correctly when an
223        activated 3GPP service connects.
224        """
225        with pseudomodem_context.PseudoModemManagerContext(
226                True,
227                {'test-module' : TEST_MODEMS_MODULE_PATH,
228                 'test-state-machine-factory-class' :
229                        'InteractiveStateMachineFactory'}):
230            self._cellular_proxy = cellular_proxy.CellularProxy.get_proxy()
231            self._bus = dbus.SystemBus()
232            self._cellular_proxy.set_logging_for_cellular_test()
233
234            logging.info('Sanity check initial values')
235            enable_machine = self._itesting_machine(
236                    pm_constants.STATE_MACHINE_ENABLE)
237            utils.poll_for_condition(
238                    enable_machine.IsWaiting,
239                    exception=error.TestFail(
240                            'Bad initial state: Pseudomodem did not launch '
241                            'Enable machine'),
242                    timeout=SHORT_TIMEOUT_SECONDS)
243            utils.poll_for_condition(
244                    self._cellular_proxy.find_cellular_device_object,
245                    exception=error.TestFail(
246                            'Bad initial state: Failed to obtain a cellular '
247                            'device in pseudomodem context.'),
248                    timeout=SHORT_TIMEOUT_SECONDS)
249            device = self._cellular_proxy.find_cellular_device_object()
250            mm_modem = self._find_mm_modem()
251
252            logging.info('Test Connect sequence')
253            self._check_mm_state(mm_modem,
254                                 mm1_constants.MM_MODEM_STATE_DISABLED)
255            self._check_shill_property_update(
256                    device,
257                    self._cellular_proxy.DEVICE_PROPERTY_POWERED,
258                    False,
259                    False)
260            self._check_shill_property_update(
261                    device,
262                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
263                    False,
264                    False)
265            logging.info('Expectation met: |Scanning| is False in MM state '
266                         'Disabled')
267            enable_machine.Advance()
268
269            # MM state: Enabling
270            utils.poll_for_condition(
271                    enable_machine.IsWaiting,
272                    exception=error.TestFail('EnableMachine failed to wait in '
273                                             'Enabling state'),
274                    timeout=SHORT_TIMEOUT_SECONDS)
275            self._check_mm_state(mm_modem,
276                                 mm1_constants.MM_MODEM_STATE_ENABLING)
277            self._check_shill_property_update(
278                    device,
279                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
280                    False,
281                    True)
282            logging.info('Expectation met: |Scanning| is True in MM state '
283                         'Enabling')
284            enable_machine.Advance()
285
286            # MM state: Enabled
287            utils.poll_for_condition(
288                    enable_machine.IsWaiting,
289                    exception=error.TestFail('EnableMachine failed to wait in '
290                                             'Enabled state'),
291                    timeout=SHORT_TIMEOUT_SECONDS)
292            # Finish the enable call.
293            enable_machine.Advance()
294
295            self._check_mm_state(mm_modem, mm1_constants.MM_MODEM_STATE_ENABLED)
296            self._check_shill_property_update(
297                    device,
298                    self._cellular_proxy.DEVICE_PROPERTY_POWERED,
299                    False,
300                    True)
301            self._check_shill_property_update(
302                    device,
303                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
304                    True,
305                    True)
306
307            register_machine = self._itesting_machine(
308                    pm_constants.STATE_MACHINE_REGISTER)
309            utils.poll_for_condition(
310                    register_machine.IsWaiting,
311                    exception=error.TestFail('SearchingMachine failed to wait '
312                                             'in Enabled state'),
313                    timeout=SHORT_TIMEOUT_SECONDS)
314            logging.info('Expectation met: |Scanning| is True in MM state '
315                         'Enabled')
316            register_machine.Advance()
317
318            # MM state: Searching
319            utils.poll_for_condition(
320                    register_machine.IsWaiting,
321                    exception=error.TestFail('SearchingMachine failed to wait '
322                                             'in Searching state'),
323                    timeout=SHORT_TIMEOUT_SECONDS)
324            self._check_mm_state(mm_modem,
325                                 mm1_constants.MM_MODEM_STATE_SEARCHING)
326            enable_machine.Advance()
327            self._check_shill_property_update(
328                    device,
329                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
330                    True,
331                    True)
332            logging.info('Expectation met: |Scanning| is True in MM state '
333                         'Searching')
334            register_machine.Advance()
335
336            # MM state: >= Registered
337            utils.poll_for_condition(
338                    self._cellular_proxy.find_cellular_service_object,
339                    error.TestFail('Failed to create Cellular Service for a '
340                                   'registered modem'),
341                    timeout=SHORT_TIMEOUT_SECONDS)
342            self._check_mm_state(mm_modem,
343                                 [mm1_constants.MM_MODEM_STATE_REGISTERED,
344                                  mm1_constants.MM_MODEM_STATE_CONNECTING,
345                                  mm1_constants.MM_MODEM_STATE_CONNECTED])
346            self._check_shill_property_update(
347                    device,
348                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
349                    True,
350                    False)
351            logging.info('Expectation met: |Scanning| is False in MM state '
352                         'Registered')
353
354
355    def run_once(self):
356        """ Autotest entry function """
357        self.test_user_initiated_cellular_scan()
358        self.test_activated_service_states()
359