firmware_test.py revision 1bacc9661de528d838294d0c517905f4ded7f632
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import ctypes 7import logging 8import os 9import re 10import time 11import uuid 12 13from autotest_lib.client.bin import utils 14from autotest_lib.client.common_lib import error 15from autotest_lib.server import autotest 16from autotest_lib.server import test 17from autotest_lib.server.cros import vboot_constants as vboot 18from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig 19from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy 20from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers 21from autotest_lib.server.cros.servo import chrome_ec 22 23 24class ConnectionError(Exception): 25 """Raised on an error of connecting DUT.""" 26 pass 27 28 29class FAFTBase(test.test): 30 """The base class of FAFT classes. 31 32 It launches the FAFTClient on DUT, such that the test can access its 33 firmware functions and interfaces. It also provides some methods to 34 handle the reboot mechanism, in order to ensure FAFTClient is still 35 connected after reboot. 36 """ 37 def initialize(self, host): 38 """Create a FAFTClient object and install the dependency.""" 39 self.servo = host.servo 40 self.servo.initialize_dut() 41 self._client = host 42 self._autotest_client = autotest.Autotest(self._client) 43 self._autotest_client.install() 44 self.faft_client = RPCProxy(host) 45 46 def wait_for_client(self, install_deps=False, timeout=100): 47 """Wait for the client to come back online. 48 49 New remote processes will be launched if their used flags are enabled. 50 51 @param install_deps: If True, install Autotest dependency when ready. 52 @param timeout: Time in seconds to wait for the client SSH daemon to 53 come up. 54 @raise ConnectionError: Failed to connect DUT. 55 """ 56 if not self._client.wait_up(timeout): 57 raise ConnectionError() 58 if install_deps: 59 self._autotest_client.install() 60 # Check the FAFT client is avaiable. 61 self.faft_client.system.is_available() 62 63 def wait_for_client_offline(self, timeout=60, orig_boot_id=None): 64 """Wait for the client to come offline. 65 66 @param timeout: Time in seconds to wait the client to come offline. 67 @param orig_boot_id: A string containing the original boot id. 68 @raise ConnectionError: Failed to connect DUT. 69 """ 70 # When running against panther, we see that sometimes 71 # ping_wait_down() does not work correctly. There needs to 72 # be some investigation to the root cause. 73 # If we sleep for 120s before running get_boot_id(), it 74 # does succeed. But if we change this to ping_wait_down() 75 # there are implications on the wait time when running 76 # commands at the fw screens. 77 if not self._client.ping_wait_down(timeout): 78 if orig_boot_id and self._client.get_boot_id() != orig_boot_id: 79 logging.warn('Reboot done very quickly.') 80 return 81 raise ConnectionError() 82 83 84class FirmwareTest(FAFTBase): 85 """ 86 Base class that sets up helper objects/functions for firmware tests. 87 88 TODO: add documentaion as the FAFT rework progresses. 89 """ 90 version = 1 91 92 # Mapping of partition number of kernel and rootfs. 93 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'} 94 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'} 95 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'} 96 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'} 97 98 CHROMEOS_MAGIC = "CHROMEOS" 99 CORRUPTED_MAGIC = "CORRUPTD" 100 101 _SERVOD_LOG = '/var/log/servod.log' 102 103 _ROOTFS_PARTITION_NUMBER = 3 104 105 _backup_firmware_sha = () 106 _backup_kernel_sha = dict() 107 _backup_cgpt_attr = dict() 108 _backup_gbb_flags = None 109 _backup_dev_mode = None 110 111 # Class level variable, keep track the states of one time setup. 112 # This variable is preserved across tests which inherit this class. 113 _global_setup_done = { 114 'gbb_flags': False, 115 'reimage': False, 116 'usb_check': False, 117 } 118 119 @classmethod 120 def check_setup_done(cls, label): 121 """Check if the given setup is done. 122 123 @param label: The label of the setup. 124 """ 125 return cls._global_setup_done[label] 126 127 @classmethod 128 def mark_setup_done(cls, label): 129 """Mark the given setup done. 130 131 @param label: The label of the setup. 132 """ 133 cls._global_setup_done[label] = True 134 135 @classmethod 136 def unmark_setup_done(cls, label): 137 """Mark the given setup not done. 138 139 @param label: The label of the setup. 140 """ 141 cls._global_setup_done[label] = False 142 143 def initialize(self, host, cmdline_args, ec_wp=None): 144 super(FirmwareTest, self).initialize(host) 145 self.run_id = str(uuid.uuid4()) 146 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id) 147 # Parse arguments from command line 148 args = {} 149 self.power_control = host.POWER_CONTROL_RPM 150 for arg in cmdline_args: 151 match = re.search("^(\w+)=(.+)", arg) 152 if match: 153 args[match.group(1)] = match.group(2) 154 if 'power_control' in args: 155 self.power_control = args['power_control'] 156 if self.power_control not in host.POWER_CONTROL_VALID_ARGS: 157 raise error.TestError('Valid values for --args=power_control ' 158 'are %s. But you entered wrong argument ' 159 'as "%s".' 160 % (host.POWER_CONTROL_VALID_ARGS, 161 self.power_control)) 162 163 self.faft_config = FAFTConfig( 164 self.faft_client.system.get_platform_name()) 165 self.checkers = FAFTCheckers(self, self.faft_client) 166 167 if self.faft_config.chrome_ec: 168 self.ec = chrome_ec.ChromeEC(self.servo) 169 170 self._setup_uart_capture() 171 self._setup_servo_log() 172 self._record_system_info() 173 self._setup_gbb_flags() 174 self._stop_service('update-engine') 175 self._setup_ec_write_protect(ec_wp) 176 self.fw_vboot2 = self.faft_client.system.get_fw_vboot2() 177 logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1) 178 # See chromium:239034 regarding needing this sync. 179 self.blocking_sync() 180 logging.info('FirmwareTest initialize done (id=%s)', self.run_id) 181 182 def cleanup(self): 183 """Autotest cleanup function.""" 184 # Unset state checker in case it's set by subclass 185 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id) 186 try: 187 self.faft_client.system.is_available() 188 except: 189 # Remote is not responding. Revive DUT so that subsequent tests 190 # don't fail. 191 self._restore_routine_from_timeout() 192 self._restore_dev_mode() 193 self._restore_ec_write_protect() 194 self._restore_gbb_flags() 195 self._start_service('update-engine') 196 self._record_servo_log() 197 self._record_faft_client_log() 198 self._cleanup_uart_capture() 199 super(FirmwareTest, self).cleanup() 200 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id) 201 202 def _record_system_info(self): 203 """Record some critical system info to the attr keyval. 204 205 This info is used by generate_test_report and local_dash later. 206 """ 207 self.write_attr_keyval({ 208 'fw_version': self.faft_client.ec.get_version(), 209 'hwid': self.faft_client.system.get_crossystem_value('hwid'), 210 'fwid': self.faft_client.system.get_crossystem_value('fwid'), 211 }) 212 213 def invalidate_firmware_setup(self): 214 """Invalidate all firmware related setup state. 215 216 This method is called when the firmware is re-flashed. It resets all 217 firmware related setup states so that the next test setup properly 218 again. 219 """ 220 self.unmark_setup_done('gbb_flags') 221 222 def _retrieve_recovery_reason_from_trap(self): 223 """Try to retrieve the recovery reason from a trapped recovery screen. 224 225 @return: The recovery_reason, 0 if any error. 226 """ 227 recovery_reason = 0 228 logging.info('Try to retrieve recovery reason...') 229 if self.servo.get_usbkey_direction() == 'dut': 230 self.wait_fw_screen_and_plug_usb() 231 else: 232 self.servo.switch_usbkey('dut') 233 234 try: 235 self.wait_for_client(install_deps=True) 236 lines = self.faft_client.system.run_shell_command_get_output( 237 'crossystem recovery_reason') 238 recovery_reason = int(lines[0]) 239 logging.info('Got the recovery reason %d.', recovery_reason) 240 except ConnectionError: 241 logging.error('Failed to get the recovery reason due to connection ' 242 'error.') 243 return recovery_reason 244 245 def _reset_client(self): 246 """Reset client to a workable state. 247 248 This method is called when the client is not responsive. It may be 249 caused by the following cases: 250 - halt on a firmware screen without timeout, e.g. REC_INSERT screen; 251 - corrupted firmware; 252 - corrutped OS image. 253 """ 254 # DUT may halt on a firmware screen. Try cold reboot. 255 logging.info('Try cold reboot...') 256 self.reboot_cold_trigger() 257 self.wait_for_client_offline() 258 self.wait_dev_screen_and_ctrl_d() 259 try: 260 self.wait_for_client() 261 return 262 except ConnectionError: 263 logging.warn('Cold reboot doesn\'t help, still connection error.') 264 265 # DUT may be broken by a corrupted firmware. Restore firmware. 266 # We assume the recovery boot still works fine. Since the recovery 267 # code is in RO region and all FAFT tests don't change the RO region 268 # except GBB. 269 if self.is_firmware_saved(): 270 self._ensure_client_in_recovery() 271 logging.info('Try restore the original firmware...') 272 if self.is_firmware_changed(): 273 try: 274 self.restore_firmware() 275 return 276 except ConnectionError: 277 logging.warn('Restoring firmware doesn\'t help, still ' 278 'connection error.') 279 280 # Perhaps it's kernel that's broken. Let's try restoring it. 281 if self.is_kernel_saved(): 282 self._ensure_client_in_recovery() 283 logging.info('Try restore the original kernel...') 284 if self.is_kernel_changed(): 285 try: 286 self.restore_kernel() 287 return 288 except ConnectionError: 289 logging.warn('Restoring kernel doesn\'t help, still ' 290 'connection error.') 291 292 # DUT may be broken by a corrupted OS image. Restore OS image. 293 self._ensure_client_in_recovery() 294 logging.info('Try restore the OS image...') 295 self.faft_client.system.run_shell_command('chromeos-install --yes') 296 self.sync_and_warm_reboot() 297 self.wait_for_client_offline() 298 self.wait_dev_screen_and_ctrl_d() 299 try: 300 self.wait_for_client(install_deps=True) 301 logging.info('Successfully restore OS image.') 302 return 303 except ConnectionError: 304 logging.warn('Restoring OS image doesn\'t help, still connection ' 305 'error.') 306 307 def _ensure_client_in_recovery(self): 308 """Ensure client in recovery boot; reboot into it if necessary. 309 310 @raise TestError: if failed to boot the USB image. 311 """ 312 logging.info('Try boot into USB image...') 313 self.servo.switch_usbkey('host') 314 self.enable_rec_mode_and_reboot() 315 self.wait_fw_screen_and_plug_usb() 316 try: 317 self.wait_for_client(install_deps=True) 318 except ConnectionError: 319 raise error.TestError('Failed to boot the USB image.') 320 321 def _restore_routine_from_timeout(self): 322 """A routine to try to restore the system from a timeout error. 323 324 This method is called when FAFT failed to connect DUT after reboot. 325 326 @raise TestFail: This exception is already raised, with a decription 327 why it failed. 328 """ 329 # DUT is disconnected. Capture the UART output for debug. 330 self._record_uart_capture() 331 332 # TODO(waihong@chromium.org): Implement replugging the Ethernet to 333 # identify if it is a network flaky. 334 335 recovery_reason = self._retrieve_recovery_reason_from_trap() 336 337 # Reset client to a workable state. 338 self._reset_client() 339 340 # Raise the proper TestFail exception. 341 if recovery_reason: 342 raise error.TestFail('Trapped in the recovery screen (reason: %d) ' 343 'and timed out' % recovery_reason) 344 else: 345 raise error.TestFail('Timed out waiting for DUT reboot') 346 347 def assert_test_image_in_usb_disk(self, usb_dev=None, install_shim=False): 348 """Assert an USB disk plugged-in on servo and a test image inside. 349 350 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 351 If None, it is detected automatically. 352 @param install_shim: True to verify an install shim instead of a test 353 image. 354 @raise TestError: if USB disk not detected or not a test (install shim) 355 image. 356 """ 357 if self.check_setup_done('usb_check'): 358 return 359 if usb_dev: 360 assert self.servo.get_usbkey_direction() == 'host' 361 else: 362 self.servo.switch_usbkey('host') 363 usb_dev = self.servo.probe_host_usb_dev() 364 if not usb_dev: 365 raise error.TestError( 366 'An USB disk should be plugged in the servo board.') 367 368 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER) 369 logging.info('usb dev is %s', usb_dev) 370 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX') 371 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd)) 372 373 if install_shim: 374 dir_list = self.servo.system_output('ls -a %s' % 375 os.path.join(tmpd, 'root')) 376 check_passed = '.factory_installer' in dir_list 377 else: 378 check_passed = self.servo.system_output( 379 'grep -i "CHROMEOS_RELEASE_DESCRIPTION=.*test" %s' % 380 os.path.join(tmpd, 'etc/lsb-release'), 381 ignore_status=True) 382 for cmd in ('umount %s' % rootfs, 'sync', 'rm -rf %s' % tmpd): 383 self.servo.system(cmd) 384 385 if not check_passed: 386 raise error.TestError( 387 'No Chrome OS %s found on the USB flash plugged into servo' % 388 ('install shim' if install_shim else 'test')) 389 390 self.mark_setup_done('usb_check') 391 392 def setup_usbkey(self, usbkey, host=None, install_shim=False): 393 """Setup the USB disk for the test. 394 395 It checks the setup of USB disk and a valid ChromeOS test image inside. 396 It also muxes the USB disk to either the host or DUT by request. 397 398 @param usbkey: True if the USB disk is required for the test, False if 399 not required. 400 @param host: Optional, True to mux the USB disk to host, False to mux it 401 to DUT, default to do nothing. 402 @param install_shim: True to verify an install shim instead of a test 403 image. 404 """ 405 if usbkey: 406 self.assert_test_image_in_usb_disk(install_shim=install_shim) 407 elif host is None: 408 # USB disk is not required for the test. Better to mux it to host. 409 host = True 410 411 if host is True: 412 self.servo.switch_usbkey('host') 413 elif host is False: 414 self.servo.switch_usbkey('dut') 415 416 def get_usbdisk_path_on_dut(self): 417 """Get the path of the USB disk device plugged-in the servo on DUT. 418 419 Returns: 420 A string representing USB disk path, like '/dev/sdb', or None if 421 no USB disk is found. 422 """ 423 cmd = 'ls -d /dev/s*[a-z]' 424 original_value = self.servo.get_usbkey_direction() 425 426 # Make the dut unable to see the USB disk. 427 self.servo.switch_usbkey('off') 428 no_usb_set = set( 429 self.faft_client.system.run_shell_command_get_output(cmd)) 430 431 # Make the dut able to see the USB disk. 432 self.servo.switch_usbkey('dut') 433 time.sleep(self.faft_config.between_usb_plug) 434 has_usb_set = set( 435 self.faft_client.system.run_shell_command_get_output(cmd)) 436 437 # Back to its original value. 438 if original_value != self.servo.get_usbkey_direction(): 439 self.servo.switch_usbkey(original_value) 440 441 diff_set = has_usb_set - no_usb_set 442 if len(diff_set) == 1: 443 return diff_set.pop() 444 else: 445 return None 446 447 def _stop_service(self, service): 448 """Stops a upstart service on the client. 449 450 @param service: The name of the upstart service. 451 """ 452 logging.info('Stopping %s...', service) 453 command = 'status %s | grep stop || stop %s' % (service, service) 454 self.faft_client.system.run_shell_command(command) 455 456 def _start_service(self, service): 457 """Starts a upstart service on the client. 458 459 @param service: The name of the upstart service. 460 """ 461 logging.info('Starting %s...', service) 462 command = 'status %s | grep start || start %s' % (service, service) 463 self.faft_client.system.run_shell_command(command) 464 465 def _write_gbb_flags(self, new_flags): 466 """Write the GBB flags to the current firmware. 467 468 @param new_flags: The flags to write. 469 """ 470 gbb_flags = self.faft_client.bios.get_gbb_flags() 471 if gbb_flags == new_flags: 472 return 473 logging.info('Changing GBB flags from 0x%x to 0x%x.', 474 gbb_flags, new_flags) 475 self.faft_client.system.run_shell_command( 476 '/usr/share/vboot/bin/set_gbb_flags.sh 0x%x' % new_flags) 477 self.faft_client.bios.reload() 478 # If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state 479 if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON): 480 self.reboot_warm_trigger() 481 self.wait_dev_screen_and_ctrl_d() 482 self.wait_for_kernel_up() 483 484 def clear_set_gbb_flags(self, clear_mask, set_mask): 485 """Clear and set the GBB flags in the current flashrom. 486 487 @param clear_mask: A mask of flags to be cleared. 488 @param set_mask: A mask of flags to be set. 489 """ 490 gbb_flags = self.faft_client.bios.get_gbb_flags() 491 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask 492 self._write_gbb_flags(new_flags) 493 494 def check_ec_capability(self, required_cap=None, suppress_warning=False): 495 """Check if current platform has required EC capabilities. 496 497 @param required_cap: A list containing required EC capabilities. Pass in 498 None to only check for presence of Chrome EC. 499 @param suppress_warning: True to suppress any warning messages. 500 @return: True if requirements are met. Otherwise, False. 501 """ 502 if not self.faft_config.chrome_ec: 503 if not suppress_warning: 504 logging.warn('Requires Chrome EC to run this test.') 505 return False 506 507 if not required_cap: 508 return True 509 510 for cap in required_cap: 511 if cap not in self.faft_config.ec_capability: 512 if not suppress_warning: 513 logging.warn('Requires EC capability "%s" to run this ' 514 'test.', cap) 515 return False 516 517 return True 518 519 def check_root_part_on_non_recovery(self, part): 520 """Check the partition number of root device and on normal/dev boot. 521 522 @param part: A string of partition number, e.g.'3'. 523 @return: True if the root device matched and on normal/dev boot; 524 otherwise, False. 525 """ 526 return self.checkers.root_part_checker(part) and \ 527 self.checkers.crossystem_checker({ 528 'mainfw_type': ('normal', 'developer'), 529 }) 530 531 def _join_part(self, dev, part): 532 """Return a concatenated string of device and partition number. 533 534 @param dev: A string of device, e.g.'/dev/sda'. 535 @param part: A string of partition number, e.g.'3'. 536 @return: A concatenated string of device and partition number, 537 e.g.'/dev/sda3'. 538 539 >>> seq = FirmwareTest() 540 >>> seq._join_part('/dev/sda', '3') 541 '/dev/sda3' 542 >>> seq._join_part('/dev/mmcblk0', '2') 543 '/dev/mmcblk0p2' 544 """ 545 if 'mmcblk' in dev: 546 return dev + 'p' + part 547 else: 548 return dev + part 549 550 def copy_kernel_and_rootfs(self, from_part, to_part): 551 """Copy kernel and rootfs from from_part to to_part. 552 553 @param from_part: A string of partition number to be copied from. 554 @param to_part: A string of partition number to be copied to. 555 """ 556 root_dev = self.faft_client.system.get_root_dev() 557 logging.info('Copying kernel from %s to %s. Please wait...', 558 from_part, to_part) 559 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 560 (self._join_part(root_dev, self.KERNEL_MAP[from_part]), 561 self._join_part(root_dev, self.KERNEL_MAP[to_part]))) 562 logging.info('Copying rootfs from %s to %s. Please wait...', 563 from_part, to_part) 564 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 565 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]), 566 self._join_part(root_dev, self.ROOTFS_MAP[to_part]))) 567 568 def ensure_kernel_boot(self, part): 569 """Ensure the request kernel boot. 570 571 If not, it duplicates the current kernel to the requested kernel 572 and sets the requested higher priority to ensure it boot. 573 574 @param part: A string of kernel partition number or 'a'/'b'. 575 """ 576 if not self.checkers.root_part_checker(part): 577 if self.faft_client.kernel.diff_a_b(): 578 self.copy_kernel_and_rootfs( 579 from_part=self.OTHER_KERNEL_MAP[part], 580 to_part=part) 581 self.reset_and_prioritize_kernel(part) 582 583 def set_hardware_write_protect(self, enable): 584 """Set hardware write protect pin. 585 586 @param enable: True if asserting write protect pin. Otherwise, False. 587 """ 588 self.servo.set('fw_wp_vref', self.faft_config.wp_voltage) 589 self.servo.set('fw_wp_en', 'on') 590 self.servo.set('fw_wp', 'on' if enable else 'off') 591 592 def set_ec_write_protect_and_reboot(self, enable): 593 """Set EC write protect status and reboot to take effect. 594 595 The write protect state is only activated if both hardware write 596 protect pin is asserted and software write protect flag is set. 597 This method asserts/deasserts hardware write protect pin first, and 598 set corresponding EC software write protect flag. 599 600 If the device uses non-Chrome EC, set the software write protect via 601 flashrom. 602 603 If the device uses Chrome EC, a reboot is required for write protect 604 to take effect. Since the software write protect flag cannot be unset 605 if hardware write protect pin is asserted, we need to deasserted the 606 pin first if we are deactivating write protect. Similarly, a reboot 607 is required before we can modify the software flag. 608 609 @param enable: True if activating EC write protect. Otherwise, False. 610 """ 611 self.set_hardware_write_protect(enable) 612 if self.faft_config.chrome_ec: 613 self.set_chrome_ec_write_protect_and_reboot(enable) 614 else: 615 self.faft_client.ec.set_write_protect(enable) 616 self.sync_and_warm_reboot() 617 618 def set_chrome_ec_write_protect_and_reboot(self, enable): 619 """Set Chrome EC write protect status and reboot to take effect. 620 621 @param enable: True if activating EC write protect. Otherwise, False. 622 """ 623 if enable: 624 # Set write protect flag and reboot to take effect. 625 self.ec.set_flash_write_protect(enable) 626 self.sync_and_ec_reboot() 627 else: 628 # Reboot after deasserting hardware write protect pin to deactivate 629 # write protect. And then remove software write protect flag. 630 self.sync_and_ec_reboot() 631 self.ec.set_flash_write_protect(enable) 632 633 def _setup_ec_write_protect(self, ec_wp): 634 """Setup for EC write-protection. 635 636 It makes sure the EC in the requested write-protection state. If not, it 637 flips the state. Flipping the write-protection requires DUT reboot. 638 639 @param ec_wp: True to request EC write-protected; False to request EC 640 not write-protected; None to do nothing. 641 """ 642 if ec_wp is None: 643 self._old_ec_wp = None 644 return 645 self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'}) 646 if ec_wp != self._old_ec_wp: 647 logging.info('The test required EC is %swrite-protected. Reboot ' 648 'and flip the state.', '' if ec_wp else 'not ') 649 self.do_reboot_action((self.set_ec_write_protect_and_reboot, ec_wp)) 650 self.wait_dev_screen_and_ctrl_d() 651 652 def _restore_ec_write_protect(self): 653 """Restore the original EC write-protection.""" 654 if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None): 655 return 656 if not self.checkers.crossystem_checker( 657 {'wpsw_boot': '1' if self._old_ec_wp else '0'}): 658 logging.info('Restore original EC write protection and reboot.') 659 self.do_reboot_action((self.set_ec_write_protect_and_reboot, 660 self._old_ec_wp)) 661 self.wait_dev_screen_and_ctrl_d() 662 663 def press_ctrl_d(self, press_secs=''): 664 """Send Ctrl-D key to DUT. 665 666 @param press_secs : Str. Time to press key. 667 """ 668 self.servo.ctrl_d(press_secs) 669 670 def press_ctrl_u(self): 671 """Send Ctrl-U key to DUT. 672 673 @raise TestError: if a non-Chrome EC device or no Ctrl-U command given 674 on a no-build-in-keyboard device. 675 """ 676 if not self.faft_config.has_keyboard: 677 self.servo.ctrl_u() 678 elif self.check_ec_capability(['keyboard'], suppress_warning=True): 679 self.ec.key_down('<ctrl_l>') 680 self.ec.key_down('u') 681 self.ec.key_up('u') 682 self.ec.key_up('<ctrl_l>') 683 elif self.faft_config.has_keyboard: 684 raise error.TestError( 685 "Can't send Ctrl-U to DUT without using Chrome EC.") 686 else: 687 raise error.TestError( 688 "Should specify the ctrl_u_cmd argument.") 689 690 def press_enter(self, press_secs=''): 691 """Send Enter key to DUT. 692 693 @param press_secs: Seconds of holding the key. 694 """ 695 self.servo.enter_key(press_secs) 696 697 def wait_dev_screen_and_ctrl_d(self): 698 """Wait for firmware warning screen and press Ctrl-D.""" 699 time.sleep(self.faft_config.dev_screen) 700 self.press_ctrl_d() 701 702 def wait_fw_screen_and_ctrl_d(self): 703 """Wait for firmware warning screen and press Ctrl-D.""" 704 time.sleep(self.faft_config.firmware_screen) 705 self.press_ctrl_d() 706 707 def wait_fw_screen_and_ctrl_u(self): 708 """Wait for firmware warning screen and press Ctrl-U.""" 709 time.sleep(self.faft_config.firmware_screen) 710 self.press_ctrl_u() 711 712 def wait_fw_screen_and_trigger_recovery(self, need_dev_transition=False): 713 """Wait for firmware warning screen and trigger recovery boot. 714 715 @param need_dev_transition: True when needs dev mode transition, only 716 for Alex/ZGB. 717 """ 718 time.sleep(self.faft_config.firmware_screen) 719 720 # Pressing Enter for too long triggers a second key press. 721 # Let's press it without delay 722 self.press_enter(press_secs=0) 723 724 # For Alex/ZGB, there is a dev warning screen in text mode. 725 # Skip it by pressing Ctrl-D. 726 if need_dev_transition: 727 time.sleep(self.faft_config.legacy_text_screen) 728 self.press_ctrl_d() 729 730 def wait_fw_screen_and_unplug_usb(self): 731 """Wait for firmware warning screen and then unplug the servo USB.""" 732 time.sleep(self.faft_config.load_usb) 733 self.servo.switch_usbkey('host') 734 time.sleep(self.faft_config.between_usb_plug) 735 736 def wait_fw_screen_and_plug_usb(self): 737 """Wait for firmware warning screen and then unplug and plug the USB.""" 738 self.wait_fw_screen_and_unplug_usb() 739 self.servo.switch_usbkey('dut') 740 741 def wait_fw_screen_and_press_power(self): 742 """Wait for firmware warning screen and press power button.""" 743 time.sleep(self.faft_config.firmware_screen) 744 # While the firmware screen, the power button probing loop sleeps 745 # 0.25 second on every scan. Use the normal delay (1.2 second) for 746 # power press. 747 self.servo.power_normal_press() 748 749 def wait_longer_fw_screen_and_press_power(self): 750 """Wait for firmware screen without timeout and press power button.""" 751 time.sleep(self.faft_config.dev_screen_timeout) 752 self.wait_fw_screen_and_press_power() 753 754 def wait_fw_screen_and_close_lid(self): 755 """Wait for firmware warning screen and close lid.""" 756 time.sleep(self.faft_config.firmware_screen) 757 self.servo.lid_close() 758 759 def wait_longer_fw_screen_and_close_lid(self): 760 """Wait for firmware screen without timeout and close lid.""" 761 time.sleep(self.faft_config.firmware_screen) 762 self.wait_fw_screen_and_close_lid() 763 764 def _setup_uart_capture(self): 765 """Setup the CPU/EC UART capture.""" 766 self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt') 767 self.servo.set('cpu_uart_capture', 'on') 768 self.ec_uart_file = None 769 if self.faft_config.chrome_ec: 770 try: 771 self.servo.set('ec_uart_capture', 'on') 772 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt') 773 except error.TestFail as e: 774 if 'No control named' in str(e): 775 logging.warn('The servod is too old that ec_uart_capture ' 776 'not supported.') 777 else: 778 logging.info('Not a Google EC, cannot capture ec console output.') 779 780 def _record_uart_capture(self): 781 """Record the CPU/EC UART output stream to files.""" 782 if self.cpu_uart_file: 783 with open(self.cpu_uart_file, 'a') as f: 784 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream'))) 785 if self.ec_uart_file and self.faft_config.chrome_ec: 786 with open(self.ec_uart_file, 'a') as f: 787 f.write(ast.literal_eval(self.servo.get('ec_uart_stream'))) 788 789 def _cleanup_uart_capture(self): 790 """Cleanup the CPU/EC UART capture.""" 791 # Flush the remaining UART output. 792 self._record_uart_capture() 793 self.servo.set('cpu_uart_capture', 'off') 794 if self.ec_uart_file and self.faft_config.chrome_ec: 795 self.servo.set('ec_uart_capture', 'off') 796 797 def _fetch_servo_log(self): 798 """Fetch the servo log.""" 799 cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2) 800 servo_log = self.servo.system_output(cmd) 801 return None if servo_log == 'NOTFOUND' else servo_log 802 803 def _setup_servo_log(self): 804 """Setup the servo log capturing.""" 805 self.servo_log_original_len = -1 806 if self.servo.is_localhost(): 807 # No servo log recorded when servod runs locally. 808 return 809 810 servo_log = self._fetch_servo_log() 811 if servo_log: 812 self.servo_log_original_len = len(servo_log) 813 else: 814 logging.warn('Servo log file not found.') 815 816 def _record_servo_log(self): 817 """Record the servo log to the results directory.""" 818 if self.servo_log_original_len != -1: 819 servo_log = self._fetch_servo_log() 820 servo_log_file = os.path.join(self.resultsdir, 'servod.log') 821 with open(servo_log_file, 'a') as f: 822 f.write(servo_log[self.servo_log_original_len:]) 823 824 def _record_faft_client_log(self): 825 """Record the faft client log to the results directory.""" 826 client_log = self.faft_client.system.dump_log(True) 827 client_log_file = os.path.join(self.resultsdir, 'faft_client.log') 828 with open(client_log_file, 'w') as f: 829 f.write(client_log) 830 831 def _setup_gbb_flags(self): 832 """Setup the GBB flags for FAFT test.""" 833 if self.faft_config.gbb_version < 1.1: 834 logging.info('Skip modifying GBB on versions older than 1.1.') 835 return 836 837 if self.check_setup_done('gbb_flags'): 838 return 839 840 self._backup_gbb_flags = self.faft_client.bios.get_gbb_flags() 841 842 logging.info('Set proper GBB flags for test.') 843 self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY | 844 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON | 845 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB | 846 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK, 847 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM | 848 vboot.GBB_FLAG_FAFT_KEY_OVERIDE) 849 self.mark_setup_done('gbb_flags') 850 851 def drop_backup_gbb_flags(self): 852 """Drops the backup GBB flags. 853 854 This can be used when a test intends to permanently change GBB flags. 855 """ 856 self._backup_gbb_flags = None 857 858 def _restore_gbb_flags(self): 859 """Restore GBB flags to their original state.""" 860 if not self._backup_gbb_flags: 861 return 862 self._write_gbb_flags(self._backup_gbb_flags) 863 self.unmark_setup_done('gbb_flags') 864 865 def setup_tried_fwb(self, tried_fwb): 866 """Setup for fw B tried state. 867 868 It makes sure the system in the requested fw B tried state. If not, it 869 tries to do so. 870 871 @param tried_fwb: True if requested in tried_fwb=1; 872 False if tried_fwb=0. 873 """ 874 if tried_fwb: 875 if not self.checkers.crossystem_checker({'tried_fwb': '1'}): 876 logging.info( 877 'Firmware is not booted with tried_fwb. Reboot into it.') 878 self.faft_client.system.set_try_fw_b() 879 else: 880 if not self.checkers.crossystem_checker({'tried_fwb': '0'}): 881 logging.info( 882 'Firmware is booted with tried_fwb. Reboot to clear.') 883 884 def power_on(self): 885 """Switch DUT AC power on.""" 886 self._client.power_on(self.power_control) 887 888 def power_off(self): 889 """Switch DUT AC power off.""" 890 self._client.power_off(self.power_control) 891 892 def power_cycle(self): 893 """Power cycle DUT AC power.""" 894 self._client.power_cycle(self.power_control) 895 896 def enable_rec_mode_and_reboot(self): 897 """Switch to rec mode and reboot. 898 899 This method emulates the behavior of the old physical recovery switch, 900 i.e. switch ON + reboot + switch OFF, and the new keyboard controlled 901 recovery mode, i.e. just press Power + Esc + Refresh. 902 """ 903 psc = self.servo.get_power_state_controller() 904 psc.power_off() 905 psc.power_on(psc.REC_ON) 906 907 def enable_dev_mode_and_reboot(self): 908 """Switch to developer mode and reboot.""" 909 if self.faft_config.keyboard_dev: 910 self.enable_keyboard_dev_mode() 911 else: 912 self.servo.enable_development_mode() 913 self.faft_client.system.run_shell_command( 914 'chromeos-firmwareupdate --mode todev && reboot') 915 916 def enable_normal_mode_and_reboot(self): 917 """Switch to normal mode and reboot.""" 918 if self.faft_config.keyboard_dev: 919 self.disable_keyboard_dev_mode() 920 else: 921 self.servo.disable_development_mode() 922 self.faft_client.system.run_shell_command( 923 'chromeos-firmwareupdate --mode tonormal && reboot') 924 925 def wait_fw_screen_and_switch_keyboard_dev_mode(self, dev): 926 """Wait for firmware screen and then switch into or out of dev mode. 927 928 @param dev: True if switching into dev mode. Otherwise, False. 929 """ 930 time.sleep(self.faft_config.firmware_screen) 931 if dev: 932 self.press_ctrl_d() 933 time.sleep(self.faft_config.confirm_screen) 934 if self.faft_config.rec_button_dev_switch: 935 logging.info('RECOVERY button pressed to switch to dev mode') 936 self.servo.set('rec_mode', 'on') 937 time.sleep(self.faft_config.hold_cold_reset) 938 self.servo.set('rec_mode', 'off') 939 else: 940 logging.info('ENTER pressed to switch to dev mode') 941 self.press_enter() 942 else: 943 self.press_enter() 944 time.sleep(self.faft_config.confirm_screen) 945 self.press_enter() 946 947 def enable_keyboard_dev_mode(self): 948 """Enable keyboard controlled developer mode""" 949 logging.info("Enabling keyboard controlled developer mode") 950 # Plug out USB disk for preventing recovery boot without warning 951 self.servo.switch_usbkey('host') 952 # Rebooting EC with rec mode on. Should power on AP. 953 self.enable_rec_mode_and_reboot() 954 self.wait_for_client_offline() 955 self.wait_fw_screen_and_switch_keyboard_dev_mode(dev=True) 956 957 # TODO (crosbug.com/p/16231) remove this conditional completely if/when 958 # issue is resolved. 959 if self.faft_config.platform == 'Parrot': 960 self.wait_for_client_offline() 961 self.reboot_cold_trigger() 962 963 def disable_keyboard_dev_mode(self): 964 """Disable keyboard controlled developer mode""" 965 logging.info("Disabling keyboard controlled developer mode") 966 if (not self.faft_config.chrome_ec and 967 not self.faft_config.broken_rec_mode): 968 self.servo.disable_recovery_mode() 969 self.reboot_cold_trigger() 970 self.wait_for_client_offline() 971 self.wait_fw_screen_and_switch_keyboard_dev_mode(dev=False) 972 973 def setup_dev_mode(self, dev_mode): 974 """Setup for development mode. 975 976 It makes sure the system in the requested normal/dev mode. If not, it 977 tries to do so. 978 979 @param dev_mode: True if requested in dev mode; False if normal mode. 980 """ 981 if dev_mode: 982 if (not self.faft_config.keyboard_dev and 983 not self.checkers.crossystem_checker({'devsw_cur': '1'})): 984 logging.info('Dev switch is not on. Now switch it on.') 985 self.servo.enable_development_mode() 986 if not self.checkers.crossystem_checker({'devsw_boot': '1', 987 'mainfw_type': 'developer'}): 988 logging.info('System is not in dev mode. Reboot into it.') 989 if self._backup_dev_mode is None: 990 self._backup_dev_mode = False 991 if self.faft_config.keyboard_dev: 992 self.faft_client.system.run_shell_command( 993 'chromeos-firmwareupdate --mode todev && reboot') 994 self.do_reboot_action(self.enable_keyboard_dev_mode) 995 self.wait_dev_screen_and_ctrl_d() 996 else: 997 if (not self.faft_config.keyboard_dev and 998 not self.checkers.crossystem_checker({'devsw_cur': '0'})): 999 logging.info('Dev switch is not off. Now switch it off.') 1000 self.servo.disable_development_mode() 1001 if not self.checkers.crossystem_checker({'devsw_boot': '0', 1002 'mainfw_type': 'normal'}): 1003 logging.info('System is not in normal mode. Reboot into it.') 1004 if self._backup_dev_mode is None: 1005 self._backup_dev_mode = True 1006 if self.faft_config.keyboard_dev: 1007 self.faft_client.system.run_shell_command( 1008 'chromeos-firmwareupdate --mode tonormal && reboot') 1009 self.do_reboot_action(self.disable_keyboard_dev_mode) 1010 1011 def _restore_dev_mode(self): 1012 """Restores original dev mode status if it has changed.""" 1013 if self._backup_dev_mode is not None: 1014 self.setup_dev_mode(self._backup_dev_mode) 1015 self._backup_dev_mode = None 1016 1017 def setup_rw_boot(self, section='a'): 1018 """Make sure firmware is in RW-boot mode. 1019 1020 If the given firmware section is in RO-boot mode, turn off the RO-boot 1021 flag and reboot DUT into RW-boot mode. 1022 1023 @param section: A firmware section, either 'a' or 'b'. 1024 """ 1025 flags = self.faft_client.bios.get_preamble_flags(section) 1026 if flags & vboot.PREAMBLE_USE_RO_NORMAL: 1027 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL 1028 self.faft_client.bios.set_preamble_flags(section, flags) 1029 self.reboot_warm() 1030 1031 def setup_kernel(self, part): 1032 """Setup for kernel test. 1033 1034 It makes sure both kernel A and B bootable and the current boot is 1035 the requested kernel part. 1036 1037 @param part: A string of kernel partition number or 'a'/'b'. 1038 """ 1039 self.ensure_kernel_boot(part) 1040 logging.info('Checking the integrity of kernel B and rootfs B...') 1041 if (self.faft_client.kernel.diff_a_b() or 1042 not self.faft_client.rootfs.verify_rootfs('B')): 1043 logging.info('Copying kernel and rootfs from A to B...') 1044 self.copy_kernel_and_rootfs(from_part=part, 1045 to_part=self.OTHER_KERNEL_MAP[part]) 1046 self.reset_and_prioritize_kernel(part) 1047 1048 def reset_and_prioritize_kernel(self, part): 1049 """Make the requested partition highest priority. 1050 1051 This function also reset kerenl A and B to bootable. 1052 1053 @param part: A string of partition number to be prioritized. 1054 """ 1055 root_dev = self.faft_client.system.get_root_dev() 1056 # Reset kernel A and B to bootable. 1057 self.faft_client.system.run_shell_command( 1058 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev)) 1059 self.faft_client.system.run_shell_command( 1060 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev)) 1061 # Set kernel part highest priority. 1062 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' % 1063 (self.KERNEL_MAP[part], root_dev)) 1064 1065 def blocking_sync(self): 1066 """Run a blocking sync command.""" 1067 # The double calls to sync fakes a blocking call 1068 # since the first call returns before the flush 1069 # is complete, but the second will wait for the 1070 # first to finish. 1071 self.faft_client.system.run_shell_command('sync') 1072 self.faft_client.system.run_shell_command('sync') 1073 1074 1075 ################################################ 1076 # Reboot APIs 1077 1078 def reboot_warm(self, sync_before_boot=True, 1079 wait_for_dut_up=True, ctrl_d=False): 1080 """ 1081 Perform a warm reboot. 1082 1083 This is the highest level function that most users will need. 1084 It performs a sync, triggers a reboot and waits for kernel to boot. 1085 1086 @param sync_before_boot: bool, sync to disk before booting. 1087 @param wait_for_dut_up: bool, wait for dut to boot before returning. 1088 @param ctrl_d: bool, press ctrl-D at dev screen. 1089 """ 1090 if sync_before_boot: 1091 self.blocking_sync() 1092 self.reboot_warm_trigger() 1093 if ctrl_d: 1094 self.wait_dev_screen_and_ctrl_d() 1095 if wait_for_dut_up: 1096 self.wait_for_client_offline() 1097 self.wait_for_kernel_up() 1098 1099 def reboot_cold(self, sync_before_boot=True, 1100 wait_for_dut_up=True, ctrl_d=False): 1101 """ 1102 Perform a cold reboot. 1103 1104 This is the highest level function that most users will need. 1105 It performs a sync, triggers a reboot and waits for kernel to boot. 1106 1107 @param sync_before_boot: bool, sync to disk before booting. 1108 @param wait_for_dut_up: bool, wait for dut to boot before returning. 1109 @param ctrl_d: bool, press ctrl-D at dev screen. 1110 """ 1111 if sync_before_boot: 1112 self.blocking_sync() 1113 self.reboot_cold_trigger() 1114 if ctrl_d: 1115 self.wait_dev_screen_and_ctrl_d() 1116 if wait_for_dut_up: 1117 self.wait_for_client_offline() 1118 self.wait_for_kernel_up() 1119 1120 def do_reboot_action(self, func): 1121 """ 1122 Helper function that wraps the reboot function so that we check if the 1123 DUT went down. 1124 1125 @param func: function to trigger the reboot. 1126 """ 1127 logging.info("-[FAFT]-[ start do_reboot_action ]----------") 1128 boot_id = self.get_bootid() 1129 self._call_action(func) 1130 self.wait_for_client_offline(orig_boot_id=boot_id) 1131 logging.info("-[FAFT]-[ end do_reboot_action ]------------") 1132 1133 def wait_for_kernel_up(self, install_deps=False): 1134 """ 1135 Helper function that waits for the device to boot up to kernel. 1136 1137 @param install_deps: bool, install deps after boot. 1138 """ 1139 logging.info("-[FAFT]-[ start wait_for_kernel_up ]---") 1140 try: 1141 logging.info("Installing deps after boot : %s", install_deps) 1142 self.wait_for_client(install_deps=install_deps) 1143 # Stop update-engine as it may change firmware/kernel. 1144 self._stop_service('update-engine') 1145 except ConnectionError: 1146 logging.error('wait_for_client() timed out.') 1147 self._restore_routine_from_timeout() 1148 logging.info("-[FAFT]-[ end wait_for_kernel_up ]-----") 1149 1150 def reboot_warm_trigger(self): 1151 """Request a warm reboot. 1152 1153 A wrapper for underlying servo warm reset. 1154 """ 1155 # Use cold reset if the warm reset is broken. 1156 if self.faft_config.broken_warm_reset: 1157 logging.info('broken_warm_reset is True. Cold rebooting instead.') 1158 self.reboot_cold_trigger() 1159 else: 1160 self.servo.get_power_state_controller().warm_reset() 1161 1162 def reboot_cold_trigger(self): 1163 """Request a cold reboot. 1164 1165 A wrapper for underlying servo cold reset. 1166 """ 1167 self.servo.get_power_state_controller().reset() 1168 1169 def sync_and_warm_reboot(self): 1170 """Request the client sync and do a warm reboot. 1171 1172 This is the default reboot action on FAFT. 1173 """ 1174 self.blocking_sync() 1175 self.reboot_warm_trigger() 1176 1177 def sync_and_cold_reboot(self): 1178 """Request the client sync and do a cold reboot. 1179 1180 This reboot action is used to reset EC for recovery mode. 1181 """ 1182 self.blocking_sync() 1183 self.reboot_cold_trigger() 1184 1185 def sync_and_ec_reboot(self, flags=''): 1186 """Request the client sync and do a EC triggered reboot. 1187 1188 @param flags: Optional, a space-separated string of flags passed to EC 1189 reboot command, including: 1190 default: EC soft reboot; 1191 'hard': EC cold/hard reboot. 1192 """ 1193 self.blocking_sync() 1194 self.ec.reboot(flags) 1195 time.sleep(self.faft_config.ec_boot_to_console) 1196 self.check_lid_and_power_on() 1197 1198 def reboot_with_factory_install_shim(self): 1199 """Request reboot with factory install shim to reset TPM. 1200 1201 Factory install shim requires dev mode enabled. So this method switches 1202 firmware to dev mode first and reboot. The client uses factory install 1203 shim to reset TPM values. 1204 """ 1205 # Unplug USB first to avoid the complicated USB autoboot cases. 1206 self.servo.switch_usbkey('host') 1207 is_dev = self.checkers.crossystem_checker({'devsw_boot': '1'}) 1208 if not is_dev: 1209 self.enable_dev_mode_and_reboot() 1210 self.enable_rec_mode_and_reboot() 1211 self.wait_fw_screen_and_plug_usb() 1212 time.sleep(self.faft_config.install_shim_done) 1213 self.reboot_warm_trigger() 1214 1215 def full_power_off_and_on(self): 1216 """Shutdown the device by pressing power button and power on again.""" 1217 # Press power button to trigger Chrome OS normal shutdown process. 1218 # We use a customized delay since the normal-press 1.2s is not enough. 1219 self.servo.power_key(self.faft_config.hold_pwr_button) 1220 time.sleep(self.faft_config.shutdown) 1221 # Short press power button to boot DUT again. 1222 self.servo.power_short_press() 1223 1224 def check_lid_and_power_on(self): 1225 """ 1226 On devices with EC software sync, system powers on after EC reboots if 1227 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds. 1228 This method checks lid switch state and presses power button if 1229 necessary. 1230 """ 1231 if self.servo.get("lid_open") == "no": 1232 time.sleep(self.faft_config.software_sync) 1233 self.servo.power_short_press() 1234 1235 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic): 1236 """Modify the kernel header magic in USB stick. 1237 1238 The kernel header magic is the first 8-byte of kernel partition. 1239 We modify it to make it fail on kernel verification check. 1240 1241 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1242 @param from_magic: A string of magic which we change it from. 1243 @param to_magic: A string of magic which we change it to. 1244 @raise TestError: if failed to change magic. 1245 """ 1246 assert len(from_magic) == 8 1247 assert len(to_magic) == 8 1248 # USB image only contains one kernel. 1249 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a']) 1250 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part 1251 current_magic = self.servo.system_output(read_cmd) 1252 if current_magic == to_magic: 1253 logging.info("The kernel magic is already %s.", current_magic) 1254 return 1255 if current_magic != from_magic: 1256 raise error.TestError("Invalid kernel image on USB: wrong magic.") 1257 1258 logging.info('Modify the kernel magic in USB, from %s to %s.', 1259 from_magic, to_magic) 1260 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc " 1261 " 2>/dev/null" % (to_magic, kernel_part)) 1262 self.servo.system(write_cmd) 1263 1264 if self.servo.system_output(read_cmd) != to_magic: 1265 raise error.TestError("Failed to write new magic.") 1266 1267 def corrupt_usb_kernel(self, usb_dev): 1268 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD. 1269 1270 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1271 """ 1272 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC, 1273 self.CORRUPTED_MAGIC) 1274 1275 def restore_usb_kernel(self, usb_dev): 1276 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS. 1277 1278 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1279 """ 1280 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC, 1281 self.CHROMEOS_MAGIC) 1282 1283 def _call_action(self, action_tuple, check_status=False): 1284 """Call the action function with/without arguments. 1285 1286 @param action_tuple: A function, or a tuple (function, args, error_msg), 1287 in which, args and error_msg are optional. args is 1288 either a value or a tuple if multiple arguments. 1289 This can also be a list containing multiple 1290 function or tuple. In this case, these actions are 1291 called in sequence. 1292 @param check_status: Check the return value of action function. If not 1293 succeed, raises a TestFail exception. 1294 @return: The result value of the action function. 1295 @raise TestError: An error when the action function is not callable. 1296 @raise TestFail: When check_status=True, action function not succeed. 1297 """ 1298 if isinstance(action_tuple, list): 1299 return all([self._call_action(action, check_status=check_status) 1300 for action in action_tuple]) 1301 1302 action = action_tuple 1303 args = () 1304 error_msg = 'Not succeed' 1305 if isinstance(action_tuple, tuple): 1306 action = action_tuple[0] 1307 if len(action_tuple) >= 2: 1308 args = action_tuple[1] 1309 if not isinstance(args, tuple): 1310 args = (args,) 1311 if len(action_tuple) >= 3: 1312 error_msg = action_tuple[2] 1313 1314 if action is None: 1315 return 1316 1317 if not callable(action): 1318 raise error.TestError('action is not callable!') 1319 1320 info_msg = 'calling %s' % str(action) 1321 if args: 1322 info_msg += ' with args %s' % str(args) 1323 logging.info(info_msg) 1324 ret = action(*args) 1325 1326 if check_status and not ret: 1327 raise error.TestFail('%s: %s returning %s' % 1328 (error_msg, info_msg, str(ret))) 1329 return ret 1330 1331 def run_shutdown_process(self, shutdown_action, pre_power_action=None, 1332 post_power_action=None, shutdown_timeout=None): 1333 """Run shutdown_action(), which makes DUT shutdown, and power it on. 1334 1335 @param shutdown_action: function which makes DUT shutdown, like 1336 pressing power key. 1337 @param pre_power_action: function which is called before next power on. 1338 @param post_power_action: function which is called after next power on. 1339 @param shutdown_timeout: a timeout to confirm DUT shutdown. 1340 @raise TestFail: if the shutdown_action() failed to turn DUT off. 1341 """ 1342 self._call_action(shutdown_action) 1343 logging.info('Wait to ensure DUT shut down...') 1344 try: 1345 if shutdown_timeout is None: 1346 shutdown_timeout = self.faft_config.shutdown_timeout 1347 self.wait_for_client(timeout=shutdown_timeout) 1348 raise error.TestFail( 1349 'Should shut the device down after calling %s.' % 1350 str(shutdown_action)) 1351 except ConnectionError: 1352 logging.info( 1353 'DUT is surely shutdown. We are going to power it on again...') 1354 1355 if pre_power_action: 1356 self._call_action(pre_power_action) 1357 self.servo.power_short_press() 1358 if post_power_action: 1359 self._call_action(post_power_action) 1360 1361 def get_bootid(self, retry=3): 1362 """ 1363 Return the bootid. 1364 """ 1365 boot_id = None 1366 while retry: 1367 try: 1368 boot_id = self._client.get_boot_id() 1369 break 1370 except error.AutoservRunError: 1371 retry -= 1 1372 if retry: 1373 logging.info('Retry to get boot_id...') 1374 else: 1375 logging.warning('Failed to get boot_id.') 1376 logging.info('boot_id: %s', boot_id) 1377 return boot_id 1378 1379 def check_state(self, func): 1380 """ 1381 Wrapper around _call_action with check_status set to True. This is a 1382 helper function to be used by tests and is currently implemented by 1383 calling _call_action with check_status=True. 1384 1385 TODO: This function's arguments need to be made more stringent. And 1386 its functionality should be moved over to check functions directly in 1387 the future. 1388 1389 @param func: A function, or a tuple (function, args, error_msg), 1390 in which, args and error_msg are optional. args is 1391 either a value or a tuple if multiple arguments. 1392 This can also be a list containing multiple 1393 function or tuple. In this case, these actions are 1394 called in sequence. 1395 @return: The result value of the action function. 1396 @raise TestFail: If the function does notsucceed. 1397 """ 1398 logging.info("-[FAFT]-[ start stepstate_checker ]----------") 1399 self._call_action(func, check_status=True) 1400 logging.info("-[FAFT]-[ end state_checker ]----------------") 1401 1402 def get_current_firmware_sha(self): 1403 """Get current firmware sha of body and vblock. 1404 1405 @return: Current firmware sha follows the order ( 1406 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha) 1407 """ 1408 current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'), 1409 self.faft_client.bios.get_body_sha('a'), 1410 self.faft_client.bios.get_sig_sha('b'), 1411 self.faft_client.bios.get_body_sha('b')) 1412 if not all(current_firmware_sha): 1413 raise error.TestError('Failed to get firmware sha.') 1414 return current_firmware_sha 1415 1416 def is_firmware_changed(self): 1417 """Check if the current firmware changed, by comparing its SHA. 1418 1419 @return: True if it is changed, otherwise Flase. 1420 """ 1421 # Device may not be rebooted after test. 1422 self.faft_client.bios.reload() 1423 1424 current_sha = self.get_current_firmware_sha() 1425 1426 if current_sha == self._backup_firmware_sha: 1427 return False 1428 else: 1429 corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0]) 1430 corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1]) 1431 corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2]) 1432 corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3]) 1433 logging.info("Firmware changed:") 1434 logging.info('VBOOTA is changed: %s', corrupt_VBOOTA) 1435 logging.info('VBOOTB is changed: %s', corrupt_VBOOTB) 1436 logging.info('FVMAIN is changed: %s', corrupt_FVMAIN) 1437 logging.info('FVMAINB is changed: %s', corrupt_FVMAINB) 1438 return True 1439 1440 def backup_firmware(self, suffix='.original'): 1441 """Backup firmware to file, and then send it to host. 1442 1443 @param suffix: a string appended to backup file name 1444 """ 1445 remote_temp_dir = self.faft_client.system.create_temp_dir() 1446 self.faft_client.bios.dump_whole(os.path.join(remote_temp_dir, 'bios')) 1447 self._client.get_file(os.path.join(remote_temp_dir, 'bios'), 1448 os.path.join(self.resultsdir, 'bios' + suffix)) 1449 1450 self._backup_firmware_sha = self.get_current_firmware_sha() 1451 logging.info('Backup firmware stored in %s with suffix %s', 1452 self.resultsdir, suffix) 1453 1454 def is_firmware_saved(self): 1455 """Check if a firmware saved (called backup_firmware before). 1456 1457 @return: True if the firmware is backuped; otherwise False. 1458 """ 1459 return self._backup_firmware_sha != () 1460 1461 def clear_saved_firmware(self): 1462 """Clear the firmware saved by the method backup_firmware.""" 1463 self._backup_firmware_sha = () 1464 1465 def restore_firmware(self, suffix='.original'): 1466 """Restore firmware from host in resultsdir. 1467 1468 @param suffix: a string appended to backup file name 1469 """ 1470 if not self.is_firmware_changed(): 1471 return 1472 1473 # Backup current corrupted firmware. 1474 self.backup_firmware(suffix='.corrupt') 1475 1476 # Restore firmware. 1477 remote_temp_dir = self.faft_client.system.create_temp_dir() 1478 self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix), 1479 os.path.join(remote_temp_dir, 'bios')) 1480 1481 self.faft_client.bios.write_whole( 1482 os.path.join(remote_temp_dir, 'bios')) 1483 self.sync_and_warm_reboot() 1484 self.wait_for_client_offline() 1485 self.wait_dev_screen_and_ctrl_d() 1486 self.wait_for_client() 1487 1488 logging.info('Successfully restore firmware.') 1489 1490 def setup_firmwareupdate_shellball(self, shellball=None): 1491 """Deside a shellball to use in firmware update test. 1492 1493 Check if there is a given shellball, and it is a shell script. Then, 1494 send it to the remote host. Otherwise, use 1495 /usr/sbin/chromeos-firmwareupdate. 1496 1497 @param shellball: path of a shellball or default to None. 1498 1499 @return: Path of shellball in remote host. If use default shellball, 1500 reutrn None. 1501 """ 1502 updater_path = None 1503 if shellball: 1504 # Determine the firmware file is a shellball or a raw binary. 1505 is_shellball = (utils.system_output("file %s" % shellball).find( 1506 "shell script") != -1) 1507 if is_shellball: 1508 logging.info('Device will update firmware with shellball %s', 1509 shellball) 1510 temp_dir = self.faft_client.system.create_temp_dir( 1511 'shellball_') 1512 temp_shellball = os.path.join(temp_dir, 'updater.sh') 1513 self._client.send_file(shellball, temp_shellball) 1514 updater_path = temp_shellball 1515 else: 1516 raise error.TestFail( 1517 'The given shellball is not a shell script.') 1518 return updater_path 1519 1520 def is_kernel_changed(self): 1521 """Check if the current kernel is changed, by comparing its SHA1 hash. 1522 1523 @return: True if it is changed; otherwise, False. 1524 """ 1525 changed = False 1526 for p in ('A', 'B'): 1527 backup_sha = self._backup_kernel_sha.get(p, None) 1528 current_sha = self.faft_client.kernel.get_sha(p) 1529 if backup_sha != current_sha: 1530 changed = True 1531 logging.info('Kernel %s is changed', p) 1532 return changed 1533 1534 def backup_kernel(self, suffix='.original'): 1535 """Backup kernel to files, and the send them to host. 1536 1537 @param suffix: a string appended to backup file name. 1538 """ 1539 remote_temp_dir = self.faft_client.system.create_temp_dir() 1540 for p in ('A', 'B'): 1541 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1542 self.faft_client.kernel.dump(p, remote_path) 1543 self._client.get_file( 1544 remote_path, 1545 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix))) 1546 self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p) 1547 logging.info('Backup kernel stored in %s with suffix %s', 1548 self.resultsdir, suffix) 1549 1550 def is_kernel_saved(self): 1551 """Check if kernel images are saved (backup_kernel called before). 1552 1553 @return: True if the kernel is saved; otherwise, False. 1554 """ 1555 return len(self._backup_kernel_sha) != 0 1556 1557 def clear_saved_kernel(self): 1558 """Clear the kernel saved by backup_kernel().""" 1559 self._backup_kernel_sha = dict() 1560 1561 def restore_kernel(self, suffix='.original'): 1562 """Restore kernel from host in resultsdir. 1563 1564 @param suffix: a string appended to backup file name. 1565 """ 1566 if not self.is_kernel_changed(): 1567 return 1568 1569 # Backup current corrupted kernel. 1570 self.backup_kernel(suffix='.corrupt') 1571 1572 # Restore kernel. 1573 remote_temp_dir = self.faft_client.system.create_temp_dir() 1574 for p in ('A', 'B'): 1575 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1576 self._client.send_file( 1577 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)), 1578 remote_path) 1579 self.faft_client.kernel.write(p, remote_path) 1580 1581 self.sync_and_warm_reboot() 1582 self.wait_for_client_offline() 1583 self.wait_dev_screen_and_ctrl_d() 1584 self.wait_for_client() 1585 1586 logging.info('Successfully restored kernel.') 1587 1588 def backup_cgpt_attributes(self): 1589 """Backup CGPT partition table attributes.""" 1590 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes() 1591 1592 def restore_cgpt_attributes(self): 1593 """Restore CGPT partition table attributes.""" 1594 current_table = self.faft_client.cgpt.get_attributes() 1595 if current_table == self._backup_cgpt_attr: 1596 return 1597 logging.info('CGPT table is changed. Original: %r. Current: %r.', 1598 self._backup_cgpt_attr, 1599 current_table) 1600 self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr) 1601 1602 self.sync_and_warm_reboot() 1603 self.wait_for_client_offline() 1604 self.wait_dev_screen_and_ctrl_d() 1605 self.wait_for_client() 1606 1607 logging.info('Successfully restored CGPT table.') 1608