1import mock
2import unittest
3
4from autotest_lib.client.common_lib.cros.cfm.usb import usb_device
5from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_spec
6from autotest_lib.server.cros.cfm.configurable_test import actions
7from autotest_lib.server.cros.cfm.configurable_test import action_context
8from autotest_lib.server.cros.cfm.configurable_test import scenario
9
10# Constants to use in case the actual values are irrelevant.
11USB_DEVICE_SPEC = usb_device_spec.UsbDeviceSpec(
12        'vid', 'pid', 'product', ['iface'])
13
14USB_DEVICE = usb_device.UsbDevice('v', 'p', 'prod', ['if'], 1, 2, 1)
15
16
17# Test, disable missing-docstring
18# pylint: disable=missing-docstring
19class TestActions(unittest.TestCase):
20    """
21    Tests for the available actions for configurable CFM tests to run.
22    """
23
24    def setUp(self):
25        self.host_mock = mock.MagicMock()
26        self.cfm_facade_mock = mock.MagicMock()
27        self.usb_device_collector_mock = mock.MagicMock()
28        self.usb_port_manager_mock = mock.MagicMock()
29        self.crash_detector_mock = mock.MagicMock()
30        self.context_with_mocks = action_context.ActionContext(
31                host=self.host_mock,
32                cfm_facade=self.cfm_facade_mock,
33                usb_device_collector=self.usb_device_collector_mock,
34                usb_port_manager=self.usb_port_manager_mock,
35                crash_detector=self.crash_detector_mock)
36
37
38    def test_assert_file_does_not_contain_no_match(self):
39        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW'])
40        context = action_context.ActionContext(
41                file_contents_collector=FakeCollector('abc\ndef'))
42        action.execute(context)
43
44    def test_assert_file_does_not_contain_match(self):
45        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW'])
46        context = action_context.ActionContext(
47                file_contents_collector=FakeCollector('abc\naWWd'))
48        self.assertRaises(AssertionError, lambda: action.execute(context))
49
50    def test_assert_file_does_not_contain_regex_match(self):
51        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'W{3}Q+'])
52        context = action_context.ActionContext(
53                file_contents_collector=FakeCollector('abc\naWWWQQd'))
54        self.assertRaises(AssertionError, lambda: action.execute(context))
55
56    def test_reboot_dut_no_restart(self):
57        action = actions.RebootDut()
58        action.execute(self.context_with_mocks)
59        self.host_mock.reboot.assert_called_once_with()
60        self.assertFalse(self.cfm_facade_mock.method_calls)
61
62    def test_reboot_dut_with_restart(self):
63        action = actions.RebootDut(restart_chrome_for_cfm=True)
64        action.execute(self.context_with_mocks)
65        self.host_mock.reboot.assert_called_once_with()
66        (self.cfm_facade_mock.restart_chrome_for_cfm
67                .assert_called_once_with())
68        (self.cfm_facade_mock.wait_for_meetings_telemetry_commands
69                .assert_called_once_with())
70
71    def test_assert_usb_device_collector(self):
72        spec = usb_device_spec.UsbDeviceSpec(
73                'vid', 'pid', 'product', ['iface'])
74        action = actions.AssertUsbDevices([spec], lambda x: True)
75        action.execute(self.context_with_mocks)
76
77    def test_assert_usb_device_collector_matching_predicate(self):
78        spec = usb_device_spec.UsbDeviceSpec(
79                'vid', 'pid', 'product', ['iface'])
80        device = usb_device.UsbDevice(
81                'v', 'p', 'prod', ['if'], 1, 2, 1)
82        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
83                return_value=[device])
84        action = actions.AssertUsbDevices(
85                [spec], lambda x: x[0].product_id == 'p')
86        action.execute(self.context_with_mocks)
87
88    def test_assert_usb_device_collector_non_matching_predicate(self):
89        spec = usb_device_spec.UsbDeviceSpec(
90                'vid', 'pid', 'product', ['iface'])
91        device = usb_device.UsbDevice(
92                'v', 'p', 'prod', ['if'], 1, 2, 1)
93        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
94                return_value=[device])
95        action = actions.AssertUsbDevices(
96                [spec], lambda x: x[0].product_id == 'r')
97        self.assertRaises(AssertionError, lambda: action.execute(
98                self.context_with_mocks))
99
100    def test_assert_usb_device_collector_default_predicate(self):
101        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
102                return_value=[USB_DEVICE])  # Default checks list is of size 1
103        action = actions.AssertUsbDevices([USB_DEVICE_SPEC])
104        action.execute(self.context_with_mocks)
105
106    def test_select_scenario_at_random(self):
107        dummy_action1 = DummyAction()
108        dummy_action2 = DummyAction()
109        scenarios = [scenario.Scenario(dummy_action1),
110                     scenario.Scenario(dummy_action2)]
111        action = actions.SelectScenarioAtRandom(scenarios, 10)
112        action.execute(self.context_with_mocks)
113        # Assert that our actions were executed the expected number of times.
114        total_executes = (dummy_action1.executed_times
115                          + dummy_action2.executed_times)
116        self.assertEqual(10, total_executes)
117
118    def test_select_scenario_at_random_str_contains_seed(self):
119        action = actions.SelectScenarioAtRandom([], 10, 123)
120        self.assertTrue('seed=123' in str(action))
121
122    def test_select_scenario_at_random_same_seed_same_actions(self):
123        scenario1_action1 = DummyAction()
124        scenario1_action2 = DummyAction()
125        scenarios1 = [scenario.Scenario(scenario1_action1),
126                     scenario.Scenario(scenario1_action2)]
127        scenario2_action1 = DummyAction()
128        scenario2_action2 = DummyAction()
129        scenarios2 = [scenario.Scenario(scenario2_action1),
130                     scenario.Scenario(scenario2_action2)]
131        action1 = actions.SelectScenarioAtRandom(scenarios1, 100, 0)
132        action2 = actions.SelectScenarioAtRandom(scenarios2, 100, 0)
133        action1.execute(self.context_with_mocks)
134        action2.execute(self.context_with_mocks)
135        self.assertEqual(scenario1_action1.executed_times,
136                         scenario2_action1.executed_times)
137        self.assertEqual(scenario1_action2.executed_times,
138                         scenario2_action2.executed_times)
139
140    def test_power_cycle_usb_port(self):
141        device = usb_device.UsbDevice(
142                'v', 'p', 'prod', ['if'], 1, 2, 1)
143        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
144                side_effect=[[device, device], [device], [device, device]])
145        action = actions.PowerCycleUsbPort(
146                [USB_DEVICE_SPEC], 0, lambda x: [x[0]])
147        action.execute(self.context_with_mocks)
148        self.usb_port_manager_mock.set_port_power.assert_has_calls(
149                [mock.call([(1, 2)], False), mock.call([(1, 2)], True)])
150
151    def test_power_cycle_usb_port_device_does_not_turn_off(self):
152        # Return the same device all the time - i.e., it does not turn off.
153        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
154                return_value=[USB_DEVICE])
155        action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0)
156        self.assertRaises(
157                actions.TimeoutError,
158                lambda: action.execute(self.context_with_mocks))
159
160    def test_power_cycle_usb_port_device_does_not_turn_on(self):
161        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
162                side_effect=[[USB_DEVICE, USB_DEVICE], [], [USB_DEVICE]])
163        action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0)
164        self.assertRaises(
165                actions.TimeoutError,
166                lambda: action.execute(self.context_with_mocks))
167
168    def test_retry_action_success_after_retry(self):
169        action = actions.RetryAssertAction(RaisesFirstTimeAction(), 3, 0)
170        action.execute(self.context_with_mocks)
171
172    def test_retry_action_fail_when_no_more_retries(self):
173        action = actions.RetryAssertAction(RaisesFirstTimeAction(), 1)
174        self.assertRaises(
175                AssertionError, lambda: action.execute(self.context_with_mocks))
176
177    def test_assert_no_new_crashes(self):
178        action = actions.AssertNoNewCrashes()
179        self.crash_detector_mock.get_new_crash_files = mock.Mock(
180                return_value=[])
181        action.do_execute(self.context_with_mocks)
182
183    def test_assert_no_new_crashes_crash_detected(self):
184        action = actions.AssertNoNewCrashes()
185        self.crash_detector_mock.get_new_crash_files = mock.Mock(
186                return_value=['/a/new/crash/file'])
187        self.assertRaises(
188                AssertionError,
189                lambda: action.do_execute(self.context_with_mocks))
190
191
192class FakeCollector(object):
193    def __init__(self, contents):
194        self.contents = contents
195
196    def collect_file_contents(self, path):
197        return self.contents
198
199class DummyAction(actions.Action):
200    def __init__(self):
201        self.executed_times = 0
202
203    def do_execute(self, context):
204        self.executed_times += 1
205
206class RaisesFirstTimeAction(actions.Action):
207    def __init__(self):
208        self.executed = False
209
210    def do_execute(self, context):
211        if not self.executed:
212            self.executed = True
213            raise AssertionError()
214
215