1import mock 2import unittest 3 4from autotest_lib.client.common_lib.cros.cfm.usb import usb_device 5from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_spec 6from autotest_lib.server.cros.cfm.configurable_test import actions 7from autotest_lib.server.cros.cfm.configurable_test import action_context 8from autotest_lib.server.cros.cfm.configurable_test import scenario 9 10# Constants to use in case the actual values are irrelevant. 11USB_DEVICE_SPEC = usb_device_spec.UsbDeviceSpec( 12 'vid', 'pid', 'product', ['iface']) 13 14USB_DEVICE = usb_device.UsbDevice('v', 'p', 'prod', ['if'], 1, 2, 1) 15 16 17# Test, disable missing-docstring 18# pylint: disable=missing-docstring 19class TestActions(unittest.TestCase): 20 """ 21 Tests for the available actions for configurable CFM tests to run. 22 """ 23 24 def setUp(self): 25 self.host_mock = mock.MagicMock() 26 self.cfm_facade_mock = mock.MagicMock() 27 self.usb_device_collector_mock = mock.MagicMock() 28 self.usb_port_manager_mock = mock.MagicMock() 29 self.crash_detector_mock = mock.MagicMock() 30 self.context_with_mocks = action_context.ActionContext( 31 host=self.host_mock, 32 cfm_facade=self.cfm_facade_mock, 33 usb_device_collector=self.usb_device_collector_mock, 34 usb_port_manager=self.usb_port_manager_mock, 35 crash_detector=self.crash_detector_mock) 36 37 38 def test_assert_file_does_not_contain_no_match(self): 39 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW']) 40 context = action_context.ActionContext( 41 file_contents_collector=FakeCollector('abc\ndef')) 42 action.execute(context) 43 44 def test_assert_file_does_not_contain_match(self): 45 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW']) 46 context = action_context.ActionContext( 47 file_contents_collector=FakeCollector('abc\naWWd')) 48 self.assertRaises(AssertionError, lambda: action.execute(context)) 49 50 def test_assert_file_does_not_contain_regex_match(self): 51 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'W{3}Q+']) 52 context = action_context.ActionContext( 53 file_contents_collector=FakeCollector('abc\naWWWQQd')) 54 self.assertRaises(AssertionError, lambda: action.execute(context)) 55 56 def test_reboot_dut_no_restart(self): 57 action = actions.RebootDut() 58 action.execute(self.context_with_mocks) 59 self.host_mock.reboot.assert_called_once_with() 60 self.assertFalse(self.cfm_facade_mock.method_calls) 61 62 def test_reboot_dut_with_restart(self): 63 action = actions.RebootDut(restart_chrome_for_cfm=True) 64 action.execute(self.context_with_mocks) 65 self.host_mock.reboot.assert_called_once_with() 66 (self.cfm_facade_mock.restart_chrome_for_cfm 67 .assert_called_once_with()) 68 (self.cfm_facade_mock.wait_for_meetings_telemetry_commands 69 .assert_called_once_with()) 70 71 def test_assert_usb_device_collector(self): 72 spec = usb_device_spec.UsbDeviceSpec( 73 'vid', 'pid', 'product', ['iface']) 74 action = actions.AssertUsbDevices([spec], lambda x: True) 75 action.execute(self.context_with_mocks) 76 77 def test_assert_usb_device_collector_matching_predicate(self): 78 spec = usb_device_spec.UsbDeviceSpec( 79 'vid', 'pid', 'product', ['iface']) 80 device = usb_device.UsbDevice( 81 'v', 'p', 'prod', ['if'], 1, 2, 1) 82 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 83 return_value=[device]) 84 action = actions.AssertUsbDevices( 85 [spec], lambda x: x[0].product_id == 'p') 86 action.execute(self.context_with_mocks) 87 88 def test_assert_usb_device_collector_non_matching_predicate(self): 89 spec = usb_device_spec.UsbDeviceSpec( 90 'vid', 'pid', 'product', ['iface']) 91 device = usb_device.UsbDevice( 92 'v', 'p', 'prod', ['if'], 1, 2, 1) 93 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 94 return_value=[device]) 95 action = actions.AssertUsbDevices( 96 [spec], lambda x: x[0].product_id == 'r') 97 self.assertRaises(AssertionError, lambda: action.execute( 98 self.context_with_mocks)) 99 100 def test_assert_usb_device_collector_default_predicate(self): 101 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 102 return_value=[USB_DEVICE]) # Default checks list is of size 1 103 action = actions.AssertUsbDevices([USB_DEVICE_SPEC]) 104 action.execute(self.context_with_mocks) 105 106 def test_select_scenario_at_random(self): 107 dummy_action1 = DummyAction() 108 dummy_action2 = DummyAction() 109 scenarios = [scenario.Scenario(dummy_action1), 110 scenario.Scenario(dummy_action2)] 111 action = actions.SelectScenarioAtRandom(scenarios, 10) 112 action.execute(self.context_with_mocks) 113 # Assert that our actions were executed the expected number of times. 114 total_executes = (dummy_action1.executed_times 115 + dummy_action2.executed_times) 116 self.assertEqual(10, total_executes) 117 118 def test_select_scenario_at_random_str_contains_seed(self): 119 action = actions.SelectScenarioAtRandom([], 10, 123) 120 self.assertTrue('seed=123' in str(action)) 121 122 def test_select_scenario_at_random_same_seed_same_actions(self): 123 scenario1_action1 = DummyAction() 124 scenario1_action2 = DummyAction() 125 scenarios1 = [scenario.Scenario(scenario1_action1), 126 scenario.Scenario(scenario1_action2)] 127 scenario2_action1 = DummyAction() 128 scenario2_action2 = DummyAction() 129 scenarios2 = [scenario.Scenario(scenario2_action1), 130 scenario.Scenario(scenario2_action2)] 131 action1 = actions.SelectScenarioAtRandom(scenarios1, 100, 0) 132 action2 = actions.SelectScenarioAtRandom(scenarios2, 100, 0) 133 action1.execute(self.context_with_mocks) 134 action2.execute(self.context_with_mocks) 135 self.assertEqual(scenario1_action1.executed_times, 136 scenario2_action1.executed_times) 137 self.assertEqual(scenario1_action2.executed_times, 138 scenario2_action2.executed_times) 139 140 def test_power_cycle_usb_port(self): 141 device = usb_device.UsbDevice( 142 'v', 'p', 'prod', ['if'], 1, 2, 1) 143 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 144 side_effect=[[device, device], [device], [device, device]]) 145 action = actions.PowerCycleUsbPort( 146 [USB_DEVICE_SPEC], 0, lambda x: [x[0]]) 147 action.execute(self.context_with_mocks) 148 self.usb_port_manager_mock.set_port_power.assert_has_calls( 149 [mock.call([(1, 2)], False), mock.call([(1, 2)], True)]) 150 151 def test_power_cycle_usb_port_device_does_not_turn_off(self): 152 # Return the same device all the time - i.e., it does not turn off. 153 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 154 return_value=[USB_DEVICE]) 155 action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0) 156 self.assertRaises( 157 actions.TimeoutError, 158 lambda: action.execute(self.context_with_mocks)) 159 160 def test_power_cycle_usb_port_device_does_not_turn_on(self): 161 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 162 side_effect=[[USB_DEVICE, USB_DEVICE], [], [USB_DEVICE]]) 163 action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0) 164 self.assertRaises( 165 actions.TimeoutError, 166 lambda: action.execute(self.context_with_mocks)) 167 168 def test_retry_action_success_after_retry(self): 169 action = actions.RetryAssertAction(RaisesFirstTimeAction(), 3, 0) 170 action.execute(self.context_with_mocks) 171 172 def test_retry_action_fail_when_no_more_retries(self): 173 action = actions.RetryAssertAction(RaisesFirstTimeAction(), 1) 174 self.assertRaises( 175 AssertionError, lambda: action.execute(self.context_with_mocks)) 176 177 def test_assert_no_new_crashes(self): 178 action = actions.AssertNoNewCrashes() 179 self.crash_detector_mock.get_new_crash_files = mock.Mock( 180 return_value=[]) 181 action.do_execute(self.context_with_mocks) 182 183 def test_assert_no_new_crashes_crash_detected(self): 184 action = actions.AssertNoNewCrashes() 185 self.crash_detector_mock.get_new_crash_files = mock.Mock( 186 return_value=['/a/new/crash/file']) 187 self.assertRaises( 188 AssertionError, 189 lambda: action.do_execute(self.context_with_mocks)) 190 191 192class FakeCollector(object): 193 def __init__(self, contents): 194 self.contents = contents 195 196 def collect_file_contents(self, path): 197 return self.contents 198 199class DummyAction(actions.Action): 200 def __init__(self): 201 self.executed_times = 0 202 203 def do_execute(self, context): 204 self.executed_times += 1 205 206class RaisesFirstTimeAction(actions.Action): 207 def __init__(self): 208 self.executed = False 209 210 def do_execute(self, context): 211 if not self.executed: 212 self.executed = True 213 raise AssertionError() 214 215