1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import contextlib
6import dbus
7import logging
8import sys
9import traceback
10
11import common
12from autotest_lib.client.bin import utils
13from autotest_lib.client.common_lib import error
14from autotest_lib.client.cros import backchannel
15from autotest_lib.client.cros.cellular import mm
16from autotest_lib.client.cros.cellular.pseudomodem import pseudomodem_context
17from autotest_lib.client.cros.cellular.wardmodem import wardmodem
18from autotest_lib.client.cros.networking import cellular_proxy
19from autotest_lib.client.cros.networking import shill_context
20from autotest_lib.client.cros.networking import shill_proxy
21
22
23class CellularTestEnvironment(object):
24    """Setup and verify cellular test environment.
25
26    This context manager configures the following:
27        - Sets up backchannel.
28        - Shuts down other devices except cellular.
29        - Shill and MM logging is enabled appropriately for cellular.
30        - Initializes members that tests should use to access test environment
31          (eg. |shill|, |modem_manager|, |modem|).
32
33    Then it verifies the following is valid:
34        - The backchannel is using an Ethernet device.
35        - The SIM is inserted and valid.
36        - There is one and only one modem in the device.
37        - The modem is registered to the network.
38        - There is a cellular service in shill and it's not connected.
39
40    Don't use this base class directly, use the appropriate subclass.
41
42    Setup for over-the-air tests:
43        with CellularOTATestEnvironment() as test_env:
44            # Test body
45
46    Setup for pseudomodem tests:
47        with CellularPseudoMMTestEnvironment(
48                pseudomm_args=({'family': '3GPP'})) as test_env:
49            # Test body
50
51    Setup for wardmodem tests:
52        with CellularWardModemTestEnvironment(
53                wardmodem_modem='e362') as test_env:
54            # Test body
55
56    """
57
58    def __init__(self, use_backchannel=True, shutdown_other_devices=True,
59                 modem_pattern=''):
60        """
61        @param use_backchannel: Set up the backchannel that can be used to
62                communicate with the DUT.
63        @param shutdown_other_devices: If True, shutdown all devices except
64                cellular.
65        @param modem_pattern: Search string used when looking for the modem.
66
67        """
68        # Tests should use this main loop instead of creating their own.
69        self.mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
70        self.bus = dbus.SystemBus(mainloop=self.mainloop)
71
72        self.shill = None
73        self.modem_manager = None
74        self.modem = None
75        self.modem_path = None
76        self._backchannel = None
77
78        self._modem_pattern = modem_pattern
79
80        self._nested = None
81        self._context_managers = []
82        if use_backchannel:
83            self._backchannel = backchannel.Backchannel()
84            self._context_managers.append(self._backchannel)
85        if shutdown_other_devices:
86            self._context_managers.append(
87                    shill_context.AllowedTechnologiesContext(
88                            [shill_proxy.ShillProxy.TECHNOLOGY_CELLULAR]))
89
90
91    @contextlib.contextmanager
92    def _disable_shill_autoconnect(self):
93        self._enable_shill_cellular_autoconnect(False)
94        yield
95        self._enable_shill_cellular_autoconnect(True)
96
97
98    def __enter__(self):
99        try:
100            # Temporarily disable shill autoconnect to cellular service while
101            # the test environment is setup to prevent a race condition
102            # between disconnecting the modem in _verify_cellular_service()
103            # and shill autoconnect.
104            with self._disable_shill_autoconnect():
105                self._nested = contextlib.nested(*self._context_managers)
106                self._nested.__enter__()
107
108                self._initialize_shill()
109
110                # Perform SIM verification now to ensure that we can enable the
111                # modem in _initialize_modem_components(). ModemManager does not
112                # allow enabling a modem without a SIM.
113                self._verify_sim()
114                self._initialize_modem_components()
115
116                self._setup_logging()
117
118                self._verify_backchannel()
119                self._wait_for_modem_registration()
120                self._verify_cellular_service()
121
122                return self
123        except (error.TestError, dbus.DBusException,
124                shill_proxy.ShillProxyError) as e:
125            except_type, except_value, except_traceback = sys.exc_info()
126            lines = traceback.format_exception(except_type, except_value,
127                                               except_traceback)
128            logging.error('Error during test initialization:\n' +
129                          ''.join(lines))
130            self.__exit__(*sys.exc_info())
131            raise error.TestError('INIT_ERROR: %s' % str(e))
132        except:
133            self.__exit__(*sys.exc_info())
134            raise
135
136
137    def __exit__(self, exception, value, traceback):
138        if self._nested:
139            return self._nested.__exit__(exception, value, traceback)
140        self.shill = None
141        self.modem_manager = None
142        self.modem = None
143        self.modem_path = None
144
145
146    def _get_shill_cellular_device_object(self):
147        return utils.poll_for_condition(
148            lambda: self.shill.find_cellular_device_object(),
149            exception=error.TestError('Cannot find cellular device in shill. '
150                                      'Is the modem plugged in?'),
151            timeout=shill_proxy.ShillProxy.DEVICE_ENUMERATION_TIMEOUT)
152
153
154    def _enable_modem(self):
155        modem_device = self._get_shill_cellular_device_object()
156        try:
157            modem_device.Enable()
158        except dbus.DBusException as e:
159            if (e.get_dbus_name() !=
160                    shill_proxy.ShillProxy.ERROR_IN_PROGRESS):
161                raise
162
163        utils.poll_for_condition(
164            lambda: modem_device.GetProperties()['Powered'],
165            exception=error.TestError(
166                    'Failed to enable modem.'),
167            timeout=shill_proxy.ShillProxy.DEVICE_ENABLE_DISABLE_TIMEOUT)
168
169
170    def _enable_shill_cellular_autoconnect(self, enable):
171        shill = cellular_proxy.CellularProxy.get_proxy(self.bus)
172        shill.manager.SetProperty(
173                shill_proxy.ShillProxy.
174                MANAGER_PROPERTY_NO_AUTOCONNECT_TECHNOLOGIES,
175                '' if enable else 'cellular')
176
177
178    def _is_unsupported_error(self, e):
179        return (e.get_dbus_name() ==
180                shill_proxy.ShillProxy.ERROR_NOT_SUPPORTED or
181                (e.get_dbus_name() ==
182                 shill_proxy.ShillProxy.ERROR_FAILURE and
183                 'operation not supported' in e.get_dbus_message()))
184
185
186    def _reset_modem(self):
187        modem_device = self._get_shill_cellular_device_object()
188        try:
189            # Cromo/MBIM modems do not support being reset.
190            self.shill.reset_modem(modem_device, expect_service=False)
191        except dbus.DBusException as e:
192            if not self._is_unsupported_error(e):
193                raise
194
195
196    def _initialize_shill(self):
197        """Get access to shill."""
198        # CellularProxy.get_proxy() checks to see if shill is running and
199        # responding to DBus requests. It returns None if that's not the case.
200        self.shill = cellular_proxy.CellularProxy.get_proxy(self.bus)
201        if self.shill is None:
202            raise error.TestError('Cannot connect to shill, is shill running?')
203
204
205    def _initialize_modem_components(self):
206        """Reset the modem and get access to modem components."""
207        # Enable modem first so shill initializes the modemmanager proxies so
208        # we can call reset on it.
209        self._enable_modem()
210        self._reset_modem()
211
212        # PickOneModem() makes sure there's a modem manager and that there is
213        # one and only one modem.
214        self.modem_manager, self.modem_path = \
215                mm.PickOneModem(self._modem_pattern)
216        self.modem = self.modem_manager.GetModem(self.modem_path)
217        if self.modem is None:
218            raise error.TestError('Cannot get modem object at %s.' %
219                                  self.modem_path)
220
221
222    def _setup_logging(self):
223        self.shill.set_logging_for_cellular_test()
224        self.modem_manager.SetDebugLogging()
225
226
227    def _verify_sim(self):
228        """Verify SIM is valid.
229
230        Make sure a SIM in inserted and that it is not locked.
231
232        @raise error.TestError if SIM does not exist or is locked.
233
234        """
235        modem_device = self._get_shill_cellular_device_object()
236        props = modem_device.GetProperties()
237
238        # No SIM in CDMA modems.
239        family = props[
240                cellular_proxy.CellularProxy.DEVICE_PROPERTY_TECHNOLOGY_FAMILY]
241        if (family ==
242                cellular_proxy.CellularProxy.
243                DEVICE_PROPERTY_TECHNOLOGY_FAMILY_CDMA):
244            return
245
246        # Make sure there is a SIM.
247        if not props[cellular_proxy.CellularProxy.DEVICE_PROPERTY_SIM_PRESENT]:
248            raise error.TestError('There is no SIM in the modem.')
249
250        # Make sure SIM is not locked.
251        lock_status = props.get(
252                cellular_proxy.CellularProxy.DEVICE_PROPERTY_SIM_LOCK_STATUS,
253                None)
254        if lock_status is None:
255            raise error.TestError('Failed to read SIM lock status.')
256        locked = lock_status.get(
257                cellular_proxy.CellularProxy.PROPERTY_KEY_SIM_LOCK_ENABLED,
258                None)
259        if locked is None:
260            raise error.TestError('Failed to read SIM LockEnabled status.')
261        elif locked:
262            raise error.TestError(
263                    'SIM is locked, test requires an unlocked SIM.')
264
265
266    def _verify_backchannel(self):
267        """Verify backchannel is on an ethernet device.
268
269        @raise error.TestError if backchannel is not on an ethernet device.
270
271        """
272        if self._backchannel is None:
273            return
274
275        if not self._backchannel.is_using_ethernet():
276            raise error.TestError('An ethernet connection is required between '
277                                  'the test server and the device under test.')
278
279
280    def _wait_for_modem_registration(self):
281        """Wait for the modem to register with the network.
282
283        @raise error.TestError if modem is not registered.
284
285        """
286        utils.poll_for_condition(
287            self.modem.ModemIsRegistered,
288            exception=error.TestError(
289                    'Modem failed to register with the network.'),
290            timeout=cellular_proxy.CellularProxy.SERVICE_REGISTRATION_TIMEOUT)
291
292
293    def _verify_cellular_service(self):
294        """Make sure a cellular service exists.
295
296        The cellular service should not be connected to the network.
297
298        @raise error.TestError if cellular service does not exist or if
299                there are multiple cellular services.
300
301        """
302        service = self.shill.wait_for_cellular_service_object()
303
304        try:
305            service.Disconnect()
306        except dbus.DBusException as e:
307            if (e.get_dbus_name() !=
308                    cellular_proxy.CellularProxy.ERROR_NOT_CONNECTED):
309                raise
310        success, state, _ = self.shill.wait_for_property_in(
311                service,
312                cellular_proxy.CellularProxy.SERVICE_PROPERTY_STATE,
313                ('idle',),
314                cellular_proxy.CellularProxy.SERVICE_DISCONNECT_TIMEOUT)
315        if not success:
316            raise error.TestError(
317                    'Cellular service needs to start in the "idle" state. '
318                    'Current state is "%s". '
319                    'Modem disconnect may have failed.' %
320                    state)
321
322
323class CellularOTATestEnvironment(CellularTestEnvironment):
324    """Setup and verify cellular over-the-air (OTA) test environment. """
325    def __init__(self, **kwargs):
326        super(CellularOTATestEnvironment, self).__init__(**kwargs)
327
328
329class CellularPseudoMMTestEnvironment(CellularTestEnvironment):
330    """Setup and verify cellular pseudomodem test environment. """
331    def __init__(self, pseudomm_args=None, **kwargs):
332        """
333        @param pseudomm_args: Tuple of arguments passed to the pseudomodem, see
334                pseudomodem_context.py for description of each argument in the
335                tuple: (flags_map, block_output, bus)
336
337        """
338        super(CellularPseudoMMTestEnvironment, self).__init__(**kwargs)
339        self._context_managers.append(
340                pseudomodem_context.PseudoModemManagerContext(
341                        True, bus=self.bus, *pseudomm_args))
342
343
344class CellularWardModemTestEnvironment(CellularTestEnvironment):
345    """Setup and verify cellular ward modem test environment. """
346    def __init__(self, wardmodem_modem=None, **kwargs):
347        """
348        @param wardmodem_modem: Customized ward modem to use instead of the
349                default implementation, see wardmodem.py.
350
351        """
352        super(CellularWardModemTestEnvironment, self).__init__(**kwargs)
353        self._context_managers.append(
354                wardmodem.WardModemContext(args=['--modem', wardmodem_modem]))
355