1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5
6import json
7import os
8import time
9
10import selenium
11
12from autotest_lib.client.bin import utils
13from extension_pages import e2e_test_utils
14from extension_pages import options
15
16
17class TestUtils(object):
18    """Contains all the helper functions for Chrome mirroring automation."""
19
20    short_wait_secs = 3
21    step_timeout_secs = 60
22    cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq',
23                  'softirq', 'steal', 'guest', 'guest_nice']
24    cpu_idle_fields = ['idle', 'iowait']
25
26    def __init__(self):
27        """Constructor"""
28
29
30    def set_mirroring_options(self, driver, extension_id, settings):
31        """Apply all the settings given by the user to the option page.
32
33        @param driver: The chromedriver instance of the test
34        @param extension_id: The id of the Cast extension
35        @param settings: The settings and information about the test
36        """
37        options_page = options.OptionsPage(driver, extension_id)
38        options_page.open_hidden_options_menu()
39        time.sleep(self.short_wait_secs)
40        for key in settings.keys():
41            options_page.set_value(key, settings[key])
42
43
44    def start_v2_mirroring_test_utils(
45            self, driver, extension_id, receiver_ip, url, fullscreen,
46            udp_proxy_server=None, network_profile=None):
47        """Use test util page to start mirroring session on specific device.
48
49        @param driver: The chromedriver instance
50        @param extension_id: The id of the Cast extension
51        @param receiver_ip: The ip of the Eureka dongle to launch the activity
52        @param url: The URL to navigate to
53        @param fullscreen: click the fullscreen button or not
54        @param udp_proxy_server: the address of udp proxy server,
55            it should be a http address, http://<ip>:<port>
56        @param network_profile: the network profile,
57            it should be one of wifi, bad and evil.
58        @return True if the function finishes
59        """
60        e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
61                driver, extension_id)
62        time.sleep(self.short_wait_secs)
63        tab_handles = driver.window_handles
64        e2e_test_utils_page.receiver_ip_or_name_v2_text_box().set_value(
65                receiver_ip)
66        e2e_test_utils_page.url_to_open_v2_text_box().set_value(url)
67        if udp_proxy_server:
68            e2e_test_utils_page.udp_proxy_server_text_box().set_value(
69                    udp_proxy_server)
70        if network_profile:
71            e2e_test_utils_page.network_profile_text_box().set_value(
72                    network_profile)
73        time.sleep(self.short_wait_secs)
74        e2e_test_utils_page.open_then_mirror_v2_button().click()
75        time.sleep(self.short_wait_secs)
76        all_handles = driver.window_handles
77        video_handle = [x for x in all_handles if x not in tab_handles].pop()
78        driver.switch_to_window(video_handle)
79        self.navigate_to_test_url(driver, url, fullscreen)
80        return True
81
82
83    def stop_v2_mirroring_test_utils(self, driver, extension_id):
84      """Use test util page to stop a mirroring session on a specific device.
85
86      @param driver: The chromedriver instance
87      @param extension_id: The id of the Cast extension
88      @param activity_id: The id of the mirroring activity
89      """
90      e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(driver,
91                                                            extension_id)
92      e2e_test_utils_page.go_to_page()
93      time.sleep(self.short_wait_secs)
94      e2e_test_utils_page.stop_v2_mirroring_button().click()
95
96
97    def start_v2_mirroring_sdk(self, driver, device_ip, url, extension_id):
98        """Use SDK to start a mirroring session on a specific device.
99
100        @param driver: The chromedriver instance
101        @param device_ip: The IP of the Eureka device
102        @param url: The URL to navigate to
103        @param extension_id: The id of the Cast extension
104        @return True if the function finishes
105        @raise RuntimeError for timeouts
106        """
107        self.set_auto_testing_ip(driver, extension_id, device_ip)
108        self.nagviate(driver, url, False)
109        time.sleep(self.short_wait_secs)
110        driver.execute_script('loadScript()')
111        self._wait_for_result(
112                lambda: driver.execute_script('return isSuccessful'),
113                'Timeout when initiating mirroring... ...')
114        driver.execute_script('startV2Mirroring()')
115        self._wait_for_result(
116                lambda: driver.execute_script('return isSuccessful'),
117                'Timeout when triggering mirroring... ...')
118        return True
119
120
121    def stop_v2_mirroring_sdk(self, driver, activity_id=None):
122        """Use SDK to stop the mirroring activity in Chrome.
123
124        @param driver: The chromedriver instance
125        @param activity_id: The id of the mirroring activity
126        @raise RuntimeError for timeouts
127        """
128        driver.execute_script('stopV2Mirroring()')
129        self._wait_for_result(
130                lambda: driver.execute_script('return isSuccessful'),
131                self.step_timeout_secs)
132
133
134    def set_auto_testing_ip(self, driver, extension_id, device_ip):
135        """Set the auto testing IP on the extension page.
136
137        @param driver: The chromedriver instance
138        @param extension_id: The id of the Cast extension
139        @param device_ip: The IP of the device to test against
140        """
141        e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
142                driver, extension_id)
143        e2e_test_utils_page.execute_script(
144                'localStorage["AutoTestingIp"] = "%s";' % device_ip)
145
146
147    def upload_v2_mirroring_logs(self, driver, extension_id):
148        """Upload v2 mirroring logs for the latest mirroring session.
149
150        @param driver: The chromedriver instance of the browser
151        @param extension_id: The extension ID of the Cast extension
152        @return The report id in crash staging server.
153        @raises RuntimeError if an error occurred during uploading
154        """
155        e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
156                driver, extension_id)
157        e2e_test_utils_page.go_to_page()
158        time.sleep(self.short_wait_secs)
159        e2e_test_utils_page.upload_v2_mirroring_logs_button().click()
160        report_id = self._wait_for_result(
161            e2e_test_utils_page.v2_mirroring_logs_scroll_box().get_value,
162            'Failed to get v2 mirroring logs')
163        if 'Failed to upload logs' in report_id:
164          raise RuntimeError('Failed to get v2 mirroring logs')
165        return report_id
166
167
168    def get_chrome_version(self, driver):
169        """Return the Chrome version that is being used for running test.
170
171        @param driver: The chromedriver instance
172        @return The Chrome version
173        """
174        get_chrome_version_js = 'return window.navigator.appVersion;'
175        app_version = driver.execute_script(get_chrome_version_js)
176        for item in app_version.split():
177           if 'Chrome/' in item:
178              return item.split('/')[1]
179        return None
180
181
182    def get_chrome_revision(self, driver):
183        """Return Chrome revision number that is being used for running test.
184
185        @param driver: The chromedriver instance
186        @return The Chrome revision number
187        """
188        get_chrome_revision_js = ('return document.getElementById("version").'
189                                  'getElementsByTagName("span")[2].innerHTML;')
190        driver.get('chrome://version')
191        return driver.execute_script(get_chrome_revision_js)
192
193
194    def get_extension_id_from_flag(self, extra_flags):
195        """Gets the extension ID based on the whitelisted extension id flag.
196
197        @param extra_flags: A string which contains all the extra chrome flags
198        @return The ID of the extension. Return None if nothing is found.
199        """
200        extra_flags_list = extra_flags.split()
201        for flag in extra_flags_list:
202            if 'whitelisted-extension-id=' in flag:
203                return flag.split('=')[1]
204        return None
205
206
207    def navigate_to_test_url(self, driver, url, fullscreen):
208        """Navigate to a given URL. Click fullscreen button if needed.
209
210        @param driver: The chromedriver instance
211        @param url: The URL of the site to navigate to
212        @param fullscreen: True and the video will play in full screen mode.
213                           Otherwise, set to False
214        """
215        driver.get(url)
216        driver.refresh()
217        if fullscreen:
218          self.request_full_screen(driver)
219
220
221    def request_full_screen(self, driver):
222        """Request full screen.
223
224        @param driver: The chromedriver instance
225        """
226        try:
227            time.sleep(self.short_wait_secs)
228            driver.find_element_by_id('fsbutton').click()
229        except selenium.common.exceptions.NoSuchElementException as error_message:
230            print 'Full screen button is not found. ' + str(error_message)
231
232
233    def set_focus_tab(self, driver, tab_handle):
234      """Set the focus on a tab.
235
236      @param driver: The chromedriver instance
237      @param tab_handle: The chrome driver handle of the tab
238      """
239      driver.switch_to_window(tab_handle)
240      driver.get_screenshot_as_base64()
241
242
243    def block_setup_dialog(self, driver, extension_id):
244        """Tab cast through the extension.
245
246        @param driver: A chromedriver instance that has the extension loaded.
247        @param extension_id: Id of the extension to use.
248        """
249        e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
250                driver, extension_id)
251        e2e_test_utils_page.go_to_page()
252        time.sleep(self.short_wait_secs)
253        driver.execute_script(
254            'localStorage["blockChromekeySetupAutoLaunchOnInstall"] = "true"')
255
256
257    def close_popup_tabs(self, driver):
258        """Close any popup windows the extension might open by default.
259
260        Since we're going to handle the extension ourselves all we need is
261        the main browser window with a single tab. The safest way to handle
262        the popup however, is to close the currently active tab, so we don't
263        mess with chromedrivers ui debugger.
264
265        @param driver: Chromedriver instance.
266        @raises Exception If you close the tab associated with
267            the ui debugger.
268        """
269        # TODO: There are several, albeit hacky ways, to handle this popup
270        # that might need to change with different versions of the extension
271        # until the core issue is resolved. See crbug.com/338399.
272        current_tab_handle = driver.current_window_handle
273        for handle in driver.window_handles:
274            if current_tab_handle != handle:
275                try:
276                    time.sleep(self.short_wait_secs)
277                    driver.switch_to_window(handle)
278                    driver.close()
279                except:
280                    pass
281        driver.switch_to_window(current_tab_handle)
282
283
284    def output_dict_to_file(self, dictionary, file_name,
285                            path=None, sort_keys=False):
286        """Output a dictionary into a file.
287
288        @param dictionary: The dictionary to be output as JSON
289        @param file_name: The name of the file that is being output
290        @param path: The path of the file. The default is None
291        @param sort_keys: Sort dictionary by keys when output. False by default
292        """
293        if path is None:
294            path = os.path.abspath(os.path.dirname(__file__))
295        # if json file exists, read the existing one and append to it
296        json_file = os.path.join(path, file_name)
297        if os.path.isfile(json_file) and dictionary:
298            with open(json_file, 'r') as existing_json_data:
299                json_data = json.load(existing_json_data)
300            dictionary = dict(json_data.items() + dictionary.items())
301        output_json = json.dumps(dictionary, sort_keys=sort_keys)
302        with open(json_file, 'w') as file_handler:
303            file_handler.write(output_json)
304
305
306    def compute_cpu_utilization(self, cpu_dict):
307        """Generate the upper/lower bound and the average CPU consumption.
308
309        @param cpu_dict: The dictionary that contains CPU usage every sec.
310        @returns A dict that contains upper/lower bound and average cpu usage.
311        """
312        cpu_bound = {}
313        cpu_usage = sorted(cpu_dict.values())
314        cpu_bound['lower_bound'] = (
315                '%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)])
316        cpu_bound['upper_bound'] = (
317                '%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)])
318        cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage)))
319        return cpu_bound
320
321
322    def cpu_usage_interval(self, duration, interval=1):
323        """Get the CPU usage over a period of time based on interval.
324
325        @param duration: The duration of getting the CPU usage.
326        @param interval: The interval to check the CPU usage. Default is 1 sec.
327        @return A dict that contains CPU usage over different time intervals.
328        """
329        current_time = 0
330        cpu_usage = {}
331        while current_time < duration:
332            pre_times = self._get_system_times()
333            time.sleep(interval)
334            post_times = self._get_system_times()
335            cpu_usage[current_time] = self._get_avg_cpu_usage(
336                    pre_times, post_times)
337            current_time += interval
338        return cpu_usage
339
340
341    def _get_avg_cpu_usage(self, pre_times, post_times):
342        """Calculate the average CPU usage of two different periods of time.
343
344        @param pre_times: The CPU usage information of the start point.
345        @param post_times: The CPU usage information of the end point.
346        @return Average CPU usage over a time period.
347        """
348        diff_times = {}
349        for field in self.cpu_fields:
350            diff_times[field] = post_times[field] - pre_times[field]
351
352        idle_time = sum(diff_times[field] for field in self.cpu_idle_fields)
353        total_time = sum(diff_times[field] for field in self.cpu_fields)
354        return float(total_time - idle_time) / total_time * 100.0
355
356
357    def _get_system_times(self):
358        """Get the CPU information from the system times.
359
360        @return An list with CPU usage of different processes.
361        """
362        proc_stat = utils.read_file('/proc/stat')
363        for line in proc_stat.split('\n'):
364            if line.startswith('cpu '):
365                times = line[4:].strip().split(' ')
366                times = [int(jiffies) for jiffies in times]
367                return dict(zip(self.cpu_fields, times))
368
369
370    def _wait_for_result(self, get_result, error_message):
371        """Wait for the result.
372
373        @param get_result: the function to get result.
374        @param error_message: the error message in the exception
375            if it is failed to get result.
376        @return The result.
377        @raises RuntimeError if it is failed to get result within
378            self.step_timeout_secs.
379        """
380        start = time.time()
381        while (((time.time() - start) < self.step_timeout_secs)
382               and not get_result()):
383            time.sleep(self.step_timeout_secs/10.0)
384        if not get_result():
385            raise RuntimeError(error_message)
386        return get_result()
387