1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7import re
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices
11from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector
12from autotest_lib.server.cros.cfm import cfm_base_test
13
14class enterprise_CFM_PTZStress(cfm_base_test.CfmBaseTest):
15    """
16    Executes the following tests on CFM devices:
17
18       1. Enroll the device and join a meeting.
19       2. During meeting PTZ the camera according to the control file.
20    Verify the following functionalities:
21
22       1. Camera is enumerated.
23       2. Verify PTZ signal is sent to the camera.
24    """
25    version = 1
26
27    def check_camera_enumeration(self, camera_name):
28        """
29        Checks if there is any camera connected to the DUT.
30            If so, return the USB bus number the camera is on.
31        @param camera_name the camera's name under test
32        @returns The USB bus number the camera is on, if there is only
33            1 camera connected, false otherwise.
34        """
35        collector = usb_device_collector.UsbDeviceCollector(self._host)
36        camera = collector.get_devices_by_spec(camera_name)
37        if len(camera) == 1:
38            bus_number = camera[0].bus
39            logging.info('Camera enumerated: {} on bus {}'.
40                format(camera_name,bus_number))
41            return bus_number
42        raise error.TestFail('Camera failed to enumerate')
43
44
45    def dump_usbmon_traffic(self, bus, usb_trace_path):
46        """
47        Start usbmon with specified bus and dump the traffic to file
48        @param bus bus number the camera is on
49        @param usb_trace_path the USB traces file path
50        """
51        cmd = ('cat /sys/kernel/debug/usb/usbmon/{}u > {} &'.
52            format(bus, usb_trace_path))
53        try:
54            self._host.run(cmd, ignore_status = True)
55        except Exception as e:
56            logging.info('Fail to run cmd {}. Error: {}'.
57                format(cmd, str(e)))
58        logging.info('Usbmon traffic dumped to {}'.format(usb_trace_path))
59
60
61    def check_usbmon_traffic(self, usb_trace_path):
62        """
63        Check traces
64        @param usb_trace_path the USB traces file path
65        """
66        cmd = ('cat {} & '.format(usb_trace_path))
67        try:
68            traces = self._host.run_output(cmd, ignore_status = True)
69            if re.search('C Ii', traces) and re.search('S Ii', traces):
70                logging.info('PTZ signal verified')
71            else:
72                raise error.TestFail('PTZ signal did not go through')
73        except Exception as e:
74            logging.info('Fail to run cmd {}. Error: {}'.format(cmd, str(e)))
75
76
77    def clean_usb_traces_file(self, usb_trace_path):
78        """
79        Clean traces file
80        @param usb_trace_path the USB traces file path
81        """
82        cmd = ('rm {}'.format(usb_trace_path))
83        try:
84            self._host.run(cmd, ignore_status = True)
85        except Exception as e:
86            raise error.TestFail('Fail to run cmd {}. Error: {}'.format(cmd, str(e)))
87        logging.info('Cleaned up traces in {}'.format(usb_trace_path))
88
89
90    def run_once(self, host, test_config, ptz_motion_sequence):
91        """Runs the test."""
92        self.cfm_facade.wait_for_meetings_telemetry_commands()
93        for loop_no in xrange(1, test_config['repeat'] + 1):
94            logging.info('Test Loop : {}'.format(loop_no))
95            bus = self.check_camera_enumeration(test_config['camera'])
96            self.cfm_facade.start_meeting_session()
97            self.dump_usbmon_traffic(bus, test_config['usb_trace_path'])
98            for motion in ptz_motion_sequence:
99                self.cfm_facade.move_camera(motion)
100                time.sleep(test_config['motion_duration'])
101            self.check_usbmon_traffic(test_config['usb_trace_path'])
102            self.cfm_facade.end_meeting_session()
103            self.clean_usb_traces_file(test_config['usb_trace_path'])
104