1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5 6import json 7import os 8import time 9import urllib2 10 11import selenium 12 13from autotest_lib.client.bin import utils 14from extension_pages import e2e_test_utils 15from extension_pages import options 16 17 18class TestUtils(object): 19 """Contains all the helper functions for Chrome mirroring automation.""" 20 21 short_wait_secs = 3 22 step_timeout_secs = 60 23 device_info = 'http://%s:8008/setup/eureka_info' 24 cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq', 25 'softirq', 'steal', 'guest', 'guest_nice'] 26 cpu_idle_fields = ['idle', 'iowait'] 27 28 def __init__(self): 29 """Constructor""" 30 31 32 def _connect_extension(self, driver, extension_id): 33 """Connects to extension. 34 35 @param driver: The webdriver instance. 36 @param extension_id: The id of Media Router extension. 37 """ 38 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 39 driver, extension_id) 40 e2e_test_utils_page.go_to_page() 41 e2e_test_utils_page.execute_script( 42 'chrome.runtime.connect("%s")' % extension_id) 43 44 45 def start_mirroring_media_router( 46 self, driver, extension_id, receiver_ip, url, fullscreen, 47 udp_proxy_server=None, network_profile=None): 48 """Starts a mirroring session using Media Router extension. 49 50 51 @param driver: The webdriver instance. 52 @param extension_id: The id of Media Router extension. 53 @param receiver_ip: The IP of the receiver to launch the activity. 54 @param url: The URL to navigate to. 55 @param fullscreen: Click the fullscreen button or not. 56 @param udp_proxy_server: Address of udp proxy server, 57 in http address format, http://<ip>:<port>. 58 @param network_profile: Network profile to use. 59 @return True if the function finishes. 60 """ 61 62 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 63 driver, extension_id) 64 tab_handles = driver.window_handles 65 66 device_name = self._get_device_name(receiver_ip) 67 self._connect_extension(driver, extension_id) 68 e2e_test_utils_page.execute_script( 69 'chrome.extension.getBackgroundPage().e2eTestService.start()') 70 time.sleep(self.short_wait_secs * 3) # Wait for discovery 71 72 self._connect_extension(driver, extension_id) 73 if network_profile and udp_proxy_server: 74 e2e_test_utils_page.execute_script( 75 'chrome.extension.getBackgroundPage().e2eTestService.' 76 'mirrorTabViaCastStreaming("%s", "%s", "%s", "%s")' % ( 77 device_name, url, 78 udp_proxy_server, network_profile)) 79 else: 80 e2e_test_utils_page.execute_script( 81 'chrome.extension.getBackgroundPage().e2eTestService.' 82 'mirrorTabViaCastStreaming("%s", "%s")' % ( 83 device_name, url)) 84 time.sleep(self.short_wait_secs) # Wait for mirroring to start 85 all_handles = driver.window_handles 86 video_handle = [x for x in all_handles if x not in tab_handles].pop() 87 driver.switch_to_window(video_handle) 88 self.navigate_to_test_url(driver, url, fullscreen) 89 return True 90 91 def stop_mirroring_media_router(self, driver, extension_id): 92 """Stops a mirroring session on a device using Media Router extension. 93 94 @param driver: The webdriver instance. 95 @param extension_id: The id of Media Router extension. 96 """ 97 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 98 driver, extension_id) 99 e2e_test_utils_page.go_to_page() 100 self._connect_extension(driver, extension_id) 101 e2e_test_utils_page.execute_script( 102 ('chrome.extension.getBackgroundPage()' 103 '.e2eTestService.stopMirroring()')) 104 # Wait for receiver to go back to home screen 105 time.sleep(self.short_wait_secs) 106 107 def enable_automatic_send_usage(self, driver): 108 """Enables automatic send usage statistics and crash reports in Chrome. 109 110 @param driver: The webdriver instance of the browser. 111 """ 112 driver.get('chrome://settings-frame') 113 driver.find_element_by_id('advanced-settings-expander').click() 114 usage_check_box = driver.find_element_by_id('metrics-reporting-enabled') 115 utils.poll_for_condition( 116 usage_check_box.is_displayed, timeout=10, sleep_interval=1, 117 desc='Wait for automatic send usage checkbox.') 118 if not usage_check_box.is_selected(): 119 usage_check_box.click() 120 121 def set_local_storage_mr_mirroring( 122 self, driver, extension_id, frame_rate=60): 123 """Enables extension fine log in Chrome for debugging. 124 125 @param driver: The webdriver instance of the browser. 126 @param extension_id: The extension ID of Media Router extension. 127 @param frame_rate: The capture frame rate of Media Router extension. 128 """ 129 scripts = ('localStorage["debug.console"] = true;' 130 'localStorage["debug.logs"] = "fine";') 131 scripts += ('localStorage["mr.mirror.Settings.Overrides"] =' 132 ' \'{"maxFrameRate":%s}\';') % str(frame_rate) 133 self._execute_script_reload_extension( 134 driver, extension_id, scripts) 135 136 137 def upload_mirroring_logs_media_router(self, driver, extension_id): 138 """Uploads MR mirroring logs for the latest mirroring session. 139 140 @param driver: The webdriver instance of the browser. 141 @param extension_id: The extension ID of Media Router extension. 142 @return The report id in crash staging server or 143 empty if there is nothing. 144 """ 145 report_id = None 146 wait_time = 0 147 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 148 driver, extension_id) 149 e2e_test_utils_page.go_to_page() 150 while not report_id and wait_time < 90: 151 report_id = e2e_test_utils_page.execute_script( 152 ('return localStorage["e2eTestService' 153 '.castStreamingMirrorLogId"]')) 154 time.sleep(self.short_wait_secs) 155 wait_time += self.short_wait_secs 156 if report_id: 157 return report_id 158 else: 159 return '' 160 161 162 def get_chrome_version(self, driver): 163 """Returns the Chrome version that is being used for running test. 164 165 @param driver: The chromedriver instance. 166 @return The Chrome version. 167 """ 168 get_chrome_version_js = 'return window.navigator.appVersion;' 169 app_version = driver.execute_script(get_chrome_version_js) 170 for item in app_version.split(): 171 if 'Chrome/' in item: 172 return item.split('/')[1] 173 return None 174 175 176 def get_chrome_revision(self, driver): 177 """Returns Chrome revision number that is being used for running test. 178 179 @param driver: The chromedriver instance. 180 @return The Chrome revision number. 181 """ 182 get_chrome_revision_js = ('return document.getElementById("version").' 183 'getElementsByTagName("span")[2].innerHTML;') 184 driver.get('chrome://version') 185 return driver.execute_script(get_chrome_revision_js) 186 187 188 def get_extension_id_from_flag(self, extra_flags): 189 """Gets the extension ID based on the whitelisted extension id flag. 190 191 @param extra_flags: A string which contains all the extra chrome flags. 192 @return The ID of the extension. Return None if nothing is found. 193 """ 194 extra_flags_list = extra_flags.split() 195 for flag in extra_flags_list: 196 if 'whitelisted-extension-id=' in flag: 197 return flag.split('=')[1] 198 return None 199 200 201 def navigate_to_test_url(self, driver, url, fullscreen): 202 """Navigates to a given URL. Click fullscreen button if needed. 203 204 @param driver: The chromedriver instance. 205 @param url: The URL of the site to navigate to. 206 @param fullscreen: True and the video will play in full screen mode. 207 Otherwise, set to False. 208 """ 209 driver.get(url) 210 driver.refresh() 211 if fullscreen: 212 self.request_full_screen(driver) 213 214 215 def request_full_screen(self, driver): 216 """Requests full screen. 217 218 @param driver: The chromedriver instance. 219 """ 220 try: 221 driver.find_element_by_id('fsbutton').click() 222 except selenium.common.exceptions.NoSuchElementException as err_msg: 223 print 'Full screen button is not found. ' + str(err_msg) 224 225 226 def set_focus_tab(self, driver, tab_handle): 227 """Sets the focus on a tab. 228 229 @param driver: The chromedriver instance. 230 @param tab_handle: The chrome driver handle of the tab. 231 """ 232 driver.switch_to_window(tab_handle) 233 driver.get_screenshot_as_base64() 234 235 236 def output_dict_to_file(self, dictionary, file_name, 237 path=None, sort_keys=False): 238 """Outputs a dictionary into a file. 239 240 @param dictionary: The dictionary to be output as JSON. 241 @param file_name: The name of the file that is being output. 242 @param path: The path of the file. The default is None. 243 @param sort_keys: Sort dictionary by keys when output. False by default. 244 """ 245 if path is None: 246 path = os.path.abspath(os.path.dirname(__file__)) 247 # if json file exists, read the existing one and append to it 248 json_file = os.path.join(path, file_name) 249 if os.path.isfile(json_file) and dictionary: 250 with open(json_file, 'r') as existing_json_data: 251 json_data = json.load(existing_json_data) 252 dictionary = dict(json_data.items() + dictionary.items()) 253 output_json = json.dumps(dictionary, sort_keys=sort_keys) 254 with open(json_file, 'w') as file_handler: 255 file_handler.write(output_json) 256 257 258 def compute_cpu_utilization(self, cpu_dict): 259 """Generates the upper/lower bound and the average CPU consumption. 260 261 @param cpu_dict: The dictionary that contains CPU usage every sec. 262 @returns A dict that contains upper/lower bound and average cpu usage. 263 """ 264 cpu_bound = {} 265 cpu_usage = sorted(cpu_dict.values()) 266 cpu_bound['lower_bound'] = ( 267 '%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)]) 268 cpu_bound['upper_bound'] = ( 269 '%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)]) 270 cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage))) 271 return cpu_bound 272 273 274 def cpu_usage_interval(self, duration, interval=1): 275 """Gets the CPU usage over a period of time based on interval. 276 277 @param duration: The duration of getting the CPU usage. 278 @param interval: The interval to check the CPU usage. Default is 1 sec. 279 @return A dict that contains CPU usage over different time intervals. 280 """ 281 current_time = 0 282 cpu_usage = {} 283 while current_time < duration: 284 pre_times = self._get_system_times() 285 time.sleep(interval) 286 post_times = self._get_system_times() 287 cpu_usage[current_time] = self._get_avg_cpu_usage( 288 pre_times, post_times) 289 current_time += interval 290 return cpu_usage 291 292 293 def _execute_script_reload_extension(self, driver, extension_id, script): 294 """Executes javascript in the extension and reload it afterwards. 295 296 @param driver: The chromedriver instance. 297 @param extension_id: The id of Media Router extension. 298 @param script: JavaScript to be executed. 299 """ 300 script += 'chrome.runtime.reload();' 301 current_handle = driver.current_window_handle 302 new_tab_handle = self._open_new_tab(driver) 303 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 304 driver, extension_id) 305 e2e_test_utils_page.execute_script(script) 306 driver.switch_to_window(new_tab_handle) 307 308 309 def _get_avg_cpu_usage(self, pre_times, post_times): 310 """Calculates the average CPU usage of two different periods of time. 311 312 @param pre_times: The CPU usage information of the start point. 313 @param post_times: The CPU usage information of the end point. 314 @return Average CPU usage over a time period. 315 """ 316 diff_times = {} 317 for field in self.cpu_fields: 318 diff_times[field] = post_times[field] - pre_times[field] 319 320 idle_time = sum(diff_times[field] for field in self.cpu_idle_fields) 321 total_time = sum(diff_times[field] for field in self.cpu_fields) 322 return float(total_time - idle_time) / total_time * 100.0 323 324 325 def _get_device_name(self, device_ip): 326 """Gets all the Chromecast information through eureka_info page. 327 328 @param device_ip: string, the IP address the device. 329 @return Name of the device. 330 """ 331 response = urllib2.urlopen(self.device_info % device_ip).read() 332 return json.loads(response).get('name') 333 334 335 def _get_system_times(self): 336 """Gets the CPU information from the system times. 337 338 @return An list with CPU usage of different processes. 339 """ 340 proc_stat = utils.read_file('/proc/stat') 341 for line in proc_stat.split('\n'): 342 if line.startswith('cpu '): 343 times = line[4:].strip().split(' ') 344 times = [int(jiffies) for jiffies in times] 345 return dict(zip(self.cpu_fields, times)) 346 347 348 def _open_new_tab(self, driver): 349 """Opens a new tab in Chrome. 350 351 @param driver: The webdriver instance of the browser. 352 @return Handle of the new tab. 353 """ 354 current_handles = driver.window_handles 355 driver.execute_script('window.open("chrome://newtab")') 356 utils.poll_for_condition( 357 lambda: len(driver.window_handles) > len(current_handles), 358 timeout=10, sleep_interval=1, desc='Wait for new tab to open.') 359 return list(set(driver.window_handles) - set(current_handles))[0] 360