firmware_test.py revision 004a8cd96f2c9bc228ca9d58428e88d36fc36307
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import ctypes
7import logging
8import os
9import re
10import time
11import uuid
12
13from autotest_lib.client.bin import utils
14from autotest_lib.client.common_lib import error
15from autotest_lib.server import test
16from autotest_lib.server.cros import vboot_constants as vboot
17from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
18from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
19from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
20from autotest_lib.server.cros.faft.utils.mode_switcher import ModeSwitcher
21from autotest_lib.server.cros.servo import chrome_ec
22
23
24class ConnectionError(Exception):
25    """Raised on an error of connecting DUT."""
26    pass
27
28
29class FAFTBase(test.test):
30    """The base class of FAFT classes.
31
32    It launches the FAFTClient on DUT, such that the test can access its
33    firmware functions and interfaces. It also provides some methods to
34    handle the reboot mechanism, in order to ensure FAFTClient is still
35    connected after reboot.
36    """
37    def initialize(self, host):
38        """Create a FAFTClient object and install the dependency."""
39        self.servo = host.servo
40        self.servo.initialize_dut()
41        self._client = host
42        self.faft_client = RPCProxy(host)
43        self.lockfile = '/var/tmp/faft/lock'
44
45    def wait_for_client(self, timeout=100):
46        """Wait for the client to come back online.
47
48        New remote processes will be launched if their used flags are enabled.
49
50        @param timeout: Time in seconds to wait for the client SSH daemon to
51                        come up.
52        @raise ConnectionError: Failed to connect DUT.
53        """
54        if not self._client.wait_up(timeout):
55            raise ConnectionError()
56        # Check the FAFT client is avaiable.
57        self.faft_client.system.is_available()
58
59    def wait_for_client_offline(self, timeout=60, orig_boot_id=None):
60        """Wait for the client to come offline.
61
62        @param timeout: Time in seconds to wait the client to come offline.
63        @param orig_boot_id: A string containing the original boot id.
64        @raise ConnectionError: Failed to connect DUT.
65        """
66        # When running against panther, we see that sometimes
67        # ping_wait_down() does not work correctly. There needs to
68        # be some investigation to the root cause.
69        # If we sleep for 120s before running get_boot_id(), it
70        # does succeed. But if we change this to ping_wait_down()
71        # there are implications on the wait time when running
72        # commands at the fw screens.
73        if not self._client.ping_wait_down(timeout):
74            if orig_boot_id and self._client.get_boot_id() != orig_boot_id:
75                logging.warn('Reboot done very quickly.')
76                return
77            raise ConnectionError()
78
79
80class FirmwareTest(FAFTBase):
81    """
82    Base class that sets up helper objects/functions for firmware tests.
83
84    TODO: add documentaion as the FAFT rework progresses.
85    """
86    version = 1
87
88    # Mapping of partition number of kernel and rootfs.
89    KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
90    ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
91    OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
92    OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
93
94    CHROMEOS_MAGIC = "CHROMEOS"
95    CORRUPTED_MAGIC = "CORRUPTD"
96
97    _SERVOD_LOG = '/var/log/servod.log'
98
99    _ROOTFS_PARTITION_NUMBER = 3
100
101    _backup_firmware_sha = ()
102    _backup_kernel_sha = dict()
103    _backup_cgpt_attr = dict()
104    _backup_gbb_flags = None
105    _backup_dev_mode = None
106
107    # Class level variable, keep track the states of one time setup.
108    # This variable is preserved across tests which inherit this class.
109    _global_setup_done = {
110        'gbb_flags': False,
111        'reimage': False,
112        'usb_check': False,
113    }
114
115    @classmethod
116    def check_setup_done(cls, label):
117        """Check if the given setup is done.
118
119        @param label: The label of the setup.
120        """
121        return cls._global_setup_done[label]
122
123    @classmethod
124    def mark_setup_done(cls, label):
125        """Mark the given setup done.
126
127        @param label: The label of the setup.
128        """
129        cls._global_setup_done[label] = True
130
131    @classmethod
132    def unmark_setup_done(cls, label):
133        """Mark the given setup not done.
134
135        @param label: The label of the setup.
136        """
137        cls._global_setup_done[label] = False
138
139    def initialize(self, host, cmdline_args, ec_wp=None):
140        super(FirmwareTest, self).initialize(host)
141        self.run_id = str(uuid.uuid4())
142        logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
143        # Parse arguments from command line
144        args = {}
145        self.power_control = host.POWER_CONTROL_RPM
146        for arg in cmdline_args:
147            match = re.search("^(\w+)=(.+)", arg)
148            if match:
149                args[match.group(1)] = match.group(2)
150        if 'power_control' in args:
151            self.power_control = args['power_control']
152            if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
153                raise error.TestError('Valid values for --args=power_control '
154                                      'are %s. But you entered wrong argument '
155                                      'as "%s".'
156                                       % (host.POWER_CONTROL_VALID_ARGS,
157                                       self.power_control))
158
159        self.faft_config = FAFTConfig(
160                self.faft_client.system.get_platform_name())
161        self.checkers = FAFTCheckers(self)
162        self.switcher = ModeSwitcher(self)
163
164        if self.faft_config.chrome_ec:
165            self.ec = chrome_ec.ChromeEC(self.servo)
166
167        self._setup_uart_capture()
168        self._setup_servo_log()
169        self._record_system_info()
170        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
171        logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
172        if self.fw_vboot2:
173            self.faft_client.system.set_fw_try_next('A')
174            if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B':
175                logging.info('mainfw_act is B. rebooting to set it A')
176                self.switcher.mode_aware_reboot()
177        self._setup_gbb_flags()
178        self._stop_service('update-engine')
179        self._create_faft_lockfile()
180        self._setup_ec_write_protect(ec_wp)
181        # See chromium:239034 regarding needing this sync.
182        self.blocking_sync()
183        logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
184
185    def cleanup(self):
186        """Autotest cleanup function."""
187        # Unset state checker in case it's set by subclass
188        logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
189        try:
190            self.faft_client.system.is_available()
191        except:
192            # Remote is not responding. Revive DUT so that subsequent tests
193            # don't fail.
194            self._restore_routine_from_timeout()
195        self.switcher.restore_mode()
196        self._restore_ec_write_protect()
197        self._restore_gbb_flags()
198        self._start_service('update-engine')
199        self._remove_faft_lockfile()
200        self._record_servo_log()
201        self._record_faft_client_log()
202        self._cleanup_uart_capture()
203        super(FirmwareTest, self).cleanup()
204        logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
205
206    def _record_system_info(self):
207        """Record some critical system info to the attr keyval.
208
209        This info is used by generate_test_report later.
210        """
211        self.write_attr_keyval({
212            'fw_version': self.faft_client.ec.get_version(),
213            'hwid': self.faft_client.system.get_crossystem_value('hwid'),
214            'fwid': self.faft_client.system.get_crossystem_value('fwid'),
215        })
216
217    def invalidate_firmware_setup(self):
218        """Invalidate all firmware related setup state.
219
220        This method is called when the firmware is re-flashed. It resets all
221        firmware related setup states so that the next test setup properly
222        again.
223        """
224        self.unmark_setup_done('gbb_flags')
225
226    def _retrieve_recovery_reason_from_trap(self):
227        """Try to retrieve the recovery reason from a trapped recovery screen.
228
229        @return: The recovery_reason, 0 if any error.
230        """
231        recovery_reason = 0
232        logging.info('Try to retrieve recovery reason...')
233        if self.servo.get_usbkey_direction() == 'dut':
234            self.switcher.bypass_rec_mode()
235        else:
236            self.servo.switch_usbkey('dut')
237
238        try:
239            self.wait_for_client()
240            lines = self.faft_client.system.run_shell_command_get_output(
241                        'crossystem recovery_reason')
242            recovery_reason = int(lines[0])
243            logging.info('Got the recovery reason %d.', recovery_reason)
244        except ConnectionError:
245            logging.error('Failed to get the recovery reason due to connection '
246                          'error.')
247        return recovery_reason
248
249    def _reset_client(self):
250        """Reset client to a workable state.
251
252        This method is called when the client is not responsive. It may be
253        caused by the following cases:
254          - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
255          - corrupted firmware;
256          - corrutped OS image.
257        """
258        # DUT may halt on a firmware screen. Try cold reboot.
259        logging.info('Try cold reboot...')
260        self.switcher.mode_aware_reboot(reboot_type='cold',
261                                        sync_before_boot=False,
262                                        wait_for_dut_up=False)
263        self.wait_for_client_offline()
264        self.switcher.bypass_dev_mode()
265        try:
266            self.wait_for_client()
267            return
268        except ConnectionError:
269            logging.warn('Cold reboot doesn\'t help, still connection error.')
270
271        # DUT may be broken by a corrupted firmware. Restore firmware.
272        # We assume the recovery boot still works fine. Since the recovery
273        # code is in RO region and all FAFT tests don't change the RO region
274        # except GBB.
275        if self.is_firmware_saved():
276            self._ensure_client_in_recovery()
277            logging.info('Try restore the original firmware...')
278            if self.is_firmware_changed():
279                try:
280                    self.restore_firmware()
281                    return
282                except ConnectionError:
283                    logging.warn('Restoring firmware doesn\'t help, still '
284                                 'connection error.')
285
286        # Perhaps it's kernel that's broken. Let's try restoring it.
287        if self.is_kernel_saved():
288            self._ensure_client_in_recovery()
289            logging.info('Try restore the original kernel...')
290            if self.is_kernel_changed():
291                try:
292                    self.restore_kernel()
293                    return
294                except ConnectionError:
295                    logging.warn('Restoring kernel doesn\'t help, still '
296                                 'connection error.')
297
298        # DUT may be broken by a corrupted OS image. Restore OS image.
299        self._ensure_client_in_recovery()
300        logging.info('Try restore the OS image...')
301        self.faft_client.system.run_shell_command('chromeos-install --yes')
302        self.switcher.mode_aware_reboot(wait_for_dut_up=False)
303        self.wait_for_client_offline()
304        self.switcher.bypass_dev_mode()
305        try:
306            self.wait_for_client()
307            logging.info('Successfully restore OS image.')
308            return
309        except ConnectionError:
310            logging.warn('Restoring OS image doesn\'t help, still connection '
311                         'error.')
312
313    def _ensure_client_in_recovery(self):
314        """Ensure client in recovery boot; reboot into it if necessary.
315
316        @raise TestError: if failed to boot the USB image.
317        """
318        logging.info('Try boot into USB image...')
319        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
320                                     wait_for_dut_up=False)
321        self.servo.switch_usbkey('host')
322        self.switcher.bypass_rec_mode()
323        try:
324            self.wait_for_client()
325        except ConnectionError:
326            raise error.TestError('Failed to boot the USB image.')
327
328    def _restore_routine_from_timeout(self):
329        """A routine to try to restore the system from a timeout error.
330
331        This method is called when FAFT failed to connect DUT after reboot.
332
333        @raise TestFail: This exception is already raised, with a decription
334                         why it failed.
335        """
336        # DUT is disconnected. Capture the UART output for debug.
337        self._record_uart_capture()
338
339        # TODO(waihong@chromium.org): Implement replugging the Ethernet to
340        # identify if it is a network flaky.
341
342        recovery_reason = self._retrieve_recovery_reason_from_trap()
343
344        # Reset client to a workable state.
345        self._reset_client()
346
347        # Raise the proper TestFail exception.
348        if recovery_reason:
349            raise error.TestFail('Trapped in the recovery screen (reason: %d) '
350                                 'and timed out' % recovery_reason)
351        else:
352            raise error.TestFail('Timed out waiting for DUT reboot')
353
354    def assert_test_image_in_usb_disk(self, usb_dev=None, install_shim=False):
355        """Assert an USB disk plugged-in on servo and a test image inside.
356
357        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
358                        If None, it is detected automatically.
359        @param install_shim: True to verify an install shim instead of a test
360                             image.
361        @raise TestError: if USB disk not detected or not a test (install shim)
362                          image.
363        """
364        if self.check_setup_done('usb_check'):
365            return
366        if usb_dev:
367            assert self.servo.get_usbkey_direction() == 'host'
368        else:
369            self.servo.switch_usbkey('host')
370            usb_dev = self.servo.probe_host_usb_dev()
371            if not usb_dev:
372                raise error.TestError(
373                        'An USB disk should be plugged in the servo board.')
374
375        rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
376        logging.info('usb dev is %s', usb_dev)
377        tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
378        self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
379
380        try:
381            if install_shim:
382                dir_list = self.servo.system_output('ls -a %s' %
383                                                    os.path.join(tmpd, 'root'))
384                if '.factory_installer' not in dir_list:
385                    raise error.TestError(
386                        'USB stick in servo is not a factory install shim')
387            else:
388                usb_lsb = self.servo.system_output('cat %s' %
389                    os.path.join(tmpd, 'etc/lsb-release'))
390                logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
391                dut_lsb = '\n'.join(self.faft_client.system.
392                    run_shell_command_get_output('cat /etc/lsb-release'))
393                logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
394                if not re.search(r'RELEASE_DESCRIPTION=.*(T|t)est', usb_lsb):
395                    raise error.TestError('USB stick in servo is no test image')
396                usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
397                dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
398                if usb_board != dut_board:
399                    raise error.TestError('USB stick in servo contains a %s '
400                        'image, but DUT is a %s' % (usb_board, dut_board))
401        finally:
402            for cmd in ('umount %s' % rootfs, 'sync', 'rm -rf %s' % tmpd):
403                self.servo.system(cmd)
404
405        self.mark_setup_done('usb_check')
406
407    def setup_usbkey(self, usbkey, host=None, install_shim=False):
408        """Setup the USB disk for the test.
409
410        It checks the setup of USB disk and a valid ChromeOS test image inside.
411        It also muxes the USB disk to either the host or DUT by request.
412
413        @param usbkey: True if the USB disk is required for the test, False if
414                       not required.
415        @param host: Optional, True to mux the USB disk to host, False to mux it
416                    to DUT, default to do nothing.
417        @param install_shim: True to verify an install shim instead of a test
418                             image.
419        """
420        if usbkey:
421            self.assert_test_image_in_usb_disk(install_shim=install_shim)
422        elif host is None:
423            # USB disk is not required for the test. Better to mux it to host.
424            host = True
425
426        if host is True:
427            self.servo.switch_usbkey('host')
428        elif host is False:
429            self.servo.switch_usbkey('dut')
430
431    def get_usbdisk_path_on_dut(self):
432        """Get the path of the USB disk device plugged-in the servo on DUT.
433
434        Returns:
435          A string representing USB disk path, like '/dev/sdb', or None if
436          no USB disk is found.
437        """
438        cmd = 'ls -d /dev/s*[a-z]'
439        original_value = self.servo.get_usbkey_direction()
440
441        # Make the dut unable to see the USB disk.
442        self.servo.switch_usbkey('off')
443        no_usb_set = set(
444            self.faft_client.system.run_shell_command_get_output(cmd))
445
446        # Make the dut able to see the USB disk.
447        self.servo.switch_usbkey('dut')
448        time.sleep(self.faft_config.between_usb_plug)
449        has_usb_set = set(
450            self.faft_client.system.run_shell_command_get_output(cmd))
451
452        # Back to its original value.
453        if original_value != self.servo.get_usbkey_direction():
454            self.servo.switch_usbkey(original_value)
455
456        diff_set = has_usb_set - no_usb_set
457        if len(diff_set) == 1:
458            return diff_set.pop()
459        else:
460            return None
461
462    def _create_faft_lockfile(self):
463        """Creates the FAFT lockfile."""
464        logging.info('Creating FAFT lockfile...')
465        command = 'touch %s' % (self.lockfile)
466        self.faft_client.system.run_shell_command(command)
467
468    def _remove_faft_lockfile(self):
469        """Removes the FAFT lockfile."""
470        logging.info('Removing FAFT lockfile...')
471        command = 'rm -f %s' % (self.lockfile)
472        self.faft_client.system.run_shell_command(command)
473
474    def _stop_service(self, service):
475        """Stops a upstart service on the client.
476
477        @param service: The name of the upstart service.
478        """
479        logging.info('Stopping %s...', service)
480        command = 'status %s | grep stop || stop %s' % (service, service)
481        self.faft_client.system.run_shell_command(command)
482
483    def _start_service(self, service):
484        """Starts a upstart service on the client.
485
486        @param service: The name of the upstart service.
487        """
488        logging.info('Starting %s...', service)
489        command = 'status %s | grep start || start %s' % (service, service)
490        self.faft_client.system.run_shell_command(command)
491
492    def _write_gbb_flags(self, new_flags):
493        """Write the GBB flags to the current firmware.
494
495        @param new_flags: The flags to write.
496        """
497        gbb_flags = self.faft_client.bios.get_gbb_flags()
498        if gbb_flags == new_flags:
499            return
500        logging.info('Changing GBB flags from 0x%x to 0x%x.',
501                     gbb_flags, new_flags)
502        self.faft_client.system.run_shell_command(
503                '/usr/share/vboot/bin/set_gbb_flags.sh 0x%x' % new_flags)
504        self.faft_client.bios.reload()
505        # If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state
506        if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON):
507            self.switcher.mode_aware_reboot()
508
509    def clear_set_gbb_flags(self, clear_mask, set_mask):
510        """Clear and set the GBB flags in the current flashrom.
511
512        @param clear_mask: A mask of flags to be cleared.
513        @param set_mask: A mask of flags to be set.
514        """
515        gbb_flags = self.faft_client.bios.get_gbb_flags()
516        new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
517        self._write_gbb_flags(new_flags)
518
519    def check_ec_capability(self, required_cap=None, suppress_warning=False):
520        """Check if current platform has required EC capabilities.
521
522        @param required_cap: A list containing required EC capabilities. Pass in
523                             None to only check for presence of Chrome EC.
524        @param suppress_warning: True to suppress any warning messages.
525        @return: True if requirements are met. Otherwise, False.
526        """
527        if not self.faft_config.chrome_ec:
528            if not suppress_warning:
529                logging.warn('Requires Chrome EC to run this test.')
530            return False
531
532        if not required_cap:
533            return True
534
535        for cap in required_cap:
536            if cap not in self.faft_config.ec_capability:
537                if not suppress_warning:
538                    logging.warn('Requires EC capability "%s" to run this '
539                                 'test.', cap)
540                return False
541
542        return True
543
544    def check_root_part_on_non_recovery(self, part):
545        """Check the partition number of root device and on normal/dev boot.
546
547        @param part: A string of partition number, e.g.'3'.
548        @return: True if the root device matched and on normal/dev boot;
549                 otherwise, False.
550        """
551        return self.checkers.root_part_checker(part) and \
552                self.checkers.crossystem_checker({
553                    'mainfw_type': ('normal', 'developer'),
554                })
555
556    def _join_part(self, dev, part):
557        """Return a concatenated string of device and partition number.
558
559        @param dev: A string of device, e.g.'/dev/sda'.
560        @param part: A string of partition number, e.g.'3'.
561        @return: A concatenated string of device and partition number,
562                 e.g.'/dev/sda3'.
563
564        >>> seq = FirmwareTest()
565        >>> seq._join_part('/dev/sda', '3')
566        '/dev/sda3'
567        >>> seq._join_part('/dev/mmcblk0', '2')
568        '/dev/mmcblk0p2'
569        """
570        if 'mmcblk' in dev:
571            return dev + 'p' + part
572        else:
573            return dev + part
574
575    def copy_kernel_and_rootfs(self, from_part, to_part):
576        """Copy kernel and rootfs from from_part to to_part.
577
578        @param from_part: A string of partition number to be copied from.
579        @param to_part: A string of partition number to be copied to.
580        """
581        root_dev = self.faft_client.system.get_root_dev()
582        logging.info('Copying kernel from %s to %s. Please wait...',
583                     from_part, to_part)
584        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
585                (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
586                 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
587        logging.info('Copying rootfs from %s to %s. Please wait...',
588                     from_part, to_part)
589        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
590                (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
591                 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
592
593    def ensure_kernel_boot(self, part):
594        """Ensure the request kernel boot.
595
596        If not, it duplicates the current kernel to the requested kernel
597        and sets the requested higher priority to ensure it boot.
598
599        @param part: A string of kernel partition number or 'a'/'b'.
600        """
601        if not self.checkers.root_part_checker(part):
602            if self.faft_client.kernel.diff_a_b():
603                self.copy_kernel_and_rootfs(
604                        from_part=self.OTHER_KERNEL_MAP[part],
605                        to_part=part)
606            self.reset_and_prioritize_kernel(part)
607
608    def set_hardware_write_protect(self, enable):
609        """Set hardware write protect pin.
610
611        @param enable: True if asserting write protect pin. Otherwise, False.
612        """
613        self.servo.set('fw_wp_vref', self.faft_config.wp_voltage)
614        self.servo.set('fw_wp_en', 'on')
615        self.servo.set('fw_wp', 'on' if enable else 'off')
616
617    def set_ec_write_protect_and_reboot(self, enable):
618        """Set EC write protect status and reboot to take effect.
619
620        The write protect state is only activated if both hardware write
621        protect pin is asserted and software write protect flag is set.
622        This method asserts/deasserts hardware write protect pin first, and
623        set corresponding EC software write protect flag.
624
625        If the device uses non-Chrome EC, set the software write protect via
626        flashrom.
627
628        If the device uses Chrome EC, a reboot is required for write protect
629        to take effect. Since the software write protect flag cannot be unset
630        if hardware write protect pin is asserted, we need to deasserted the
631        pin first if we are deactivating write protect. Similarly, a reboot
632        is required before we can modify the software flag.
633
634        @param enable: True if activating EC write protect. Otherwise, False.
635        """
636        self.set_hardware_write_protect(enable)
637        if self.faft_config.chrome_ec:
638            self.set_chrome_ec_write_protect_and_reboot(enable)
639        else:
640            self.faft_client.ec.set_write_protect(enable)
641            self.switcher.mode_aware_reboot()
642
643    def set_chrome_ec_write_protect_and_reboot(self, enable):
644        """Set Chrome EC write protect status and reboot to take effect.
645
646        @param enable: True if activating EC write protect. Otherwise, False.
647        """
648        if enable:
649            # Set write protect flag and reboot to take effect.
650            self.ec.set_flash_write_protect(enable)
651            self.sync_and_ec_reboot()
652        else:
653            # Reboot after deasserting hardware write protect pin to deactivate
654            # write protect. And then remove software write protect flag.
655            self.sync_and_ec_reboot()
656            self.ec.set_flash_write_protect(enable)
657
658    def _setup_ec_write_protect(self, ec_wp):
659        """Setup for EC write-protection.
660
661        It makes sure the EC in the requested write-protection state. If not, it
662        flips the state. Flipping the write-protection requires DUT reboot.
663
664        @param ec_wp: True to request EC write-protected; False to request EC
665                      not write-protected; None to do nothing.
666        """
667        if ec_wp is None:
668            self._old_ec_wp = None
669            return
670        self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'})
671        if ec_wp != self._old_ec_wp:
672            logging.info('The test required EC is %swrite-protected. Reboot '
673                         'and flip the state.', '' if ec_wp else 'not ')
674            self.switcher.mode_aware_reboot(
675                    'custom',
676                     lambda:self.set_ec_write_protect_and_reboot(ec_wp))
677
678    def _restore_ec_write_protect(self):
679        """Restore the original EC write-protection."""
680        if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None):
681            return
682        if not self.checkers.crossystem_checker(
683                {'wpsw_boot': '1' if self._old_ec_wp else '0'}):
684            logging.info('Restore original EC write protection and reboot.')
685            self.switcher.mode_aware_reboot(
686                    'custom',
687                    lambda:self.set_ec_write_protect_and_reboot(
688                            self._old_ec_wp))
689
690    def _setup_uart_capture(self):
691        """Setup the CPU/EC/PD UART capture."""
692        self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
693        self.servo.set('cpu_uart_capture', 'on')
694        self.ec_uart_file = None
695        self.usbpd_uart_file = None
696        if self.faft_config.chrome_ec:
697            try:
698                self.servo.set('ec_uart_capture', 'on')
699                self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
700            except error.TestFail as e:
701                if 'No control named' in str(e):
702                    logging.warn('The servod is too old that ec_uart_capture '
703                                 'not supported.')
704            # Log separate PD console if supported
705            if self.check_ec_capability(['usbpd_uart'], suppress_warning=True):
706                try:
707                    self.servo.set('usbpd_uart_capture', 'on')
708                    self.usbpd_uart_file = os.path.join(self.resultsdir,
709                                                        'usbpd_uart.txt')
710                except error.TestFail as e:
711                    if 'No control named' in str(e):
712                        logging.warn('The servod is too old that '
713                                     'usbpd_uart_capture is not supported.')
714        else:
715            logging.info('Not a Google EC, cannot capture ec console output.')
716
717    def _record_uart_capture(self):
718        """Record the CPU/EC/PD UART output stream to files."""
719        if self.cpu_uart_file:
720            with open(self.cpu_uart_file, 'a') as f:
721                f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
722        if self.ec_uart_file and self.faft_config.chrome_ec:
723            with open(self.ec_uart_file, 'a') as f:
724                f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
725        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
726            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
727            with open(self.usbpd_uart_file, 'a') as f:
728                f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream')))
729
730    def _cleanup_uart_capture(self):
731        """Cleanup the CPU/EC/PD UART capture."""
732        # Flush the remaining UART output.
733        self._record_uart_capture()
734        self.servo.set('cpu_uart_capture', 'off')
735        if self.ec_uart_file and self.faft_config.chrome_ec:
736            self.servo.set('ec_uart_capture', 'off')
737        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
738            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
739            self.servo.set('usbpd_uart_capture', 'off')
740
741    def _fetch_servo_log(self):
742        """Fetch the servo log."""
743        cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
744        servo_log = self.servo.system_output(cmd)
745        return None if servo_log == 'NOTFOUND' else servo_log
746
747    def _setup_servo_log(self):
748        """Setup the servo log capturing."""
749        self.servo_log_original_len = -1
750        if self.servo.is_localhost():
751            # No servo log recorded when servod runs locally.
752            return
753
754        servo_log = self._fetch_servo_log()
755        if servo_log:
756            self.servo_log_original_len = len(servo_log)
757        else:
758            logging.warn('Servo log file not found.')
759
760    def _record_servo_log(self):
761        """Record the servo log to the results directory."""
762        if self.servo_log_original_len != -1:
763            servo_log = self._fetch_servo_log()
764            servo_log_file = os.path.join(self.resultsdir, 'servod.log')
765            with open(servo_log_file, 'a') as f:
766                f.write(servo_log[self.servo_log_original_len:])
767
768    def _record_faft_client_log(self):
769        """Record the faft client log to the results directory."""
770        client_log = self.faft_client.system.dump_log(True)
771        client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
772        with open(client_log_file, 'w') as f:
773            f.write(client_log)
774
775    def _setup_gbb_flags(self):
776        """Setup the GBB flags for FAFT test."""
777        if self.faft_config.gbb_version < 1.1:
778            logging.info('Skip modifying GBB on versions older than 1.1.')
779            return
780
781        if self.check_setup_done('gbb_flags'):
782            return
783
784        self._backup_gbb_flags = self.faft_client.bios.get_gbb_flags()
785
786        logging.info('Set proper GBB flags for test.')
787        self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY |
788                                 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
789                                 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB |
790                                 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK,
791                                 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM |
792                                 vboot.GBB_FLAG_FAFT_KEY_OVERIDE)
793        self.mark_setup_done('gbb_flags')
794
795    def drop_backup_gbb_flags(self):
796        """Drops the backup GBB flags.
797
798        This can be used when a test intends to permanently change GBB flags.
799        """
800        self._backup_gbb_flags = None
801
802    def _restore_gbb_flags(self):
803        """Restore GBB flags to their original state."""
804        if not self._backup_gbb_flags:
805            return
806        self._write_gbb_flags(self._backup_gbb_flags)
807        self.unmark_setup_done('gbb_flags')
808
809    def setup_tried_fwb(self, tried_fwb):
810        """Setup for fw B tried state.
811
812        It makes sure the system in the requested fw B tried state. If not, it
813        tries to do so.
814
815        @param tried_fwb: True if requested in tried_fwb=1;
816                          False if tried_fwb=0.
817        """
818        if tried_fwb:
819            if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
820                logging.info(
821                    'Firmware is not booted with tried_fwb. Reboot into it.')
822                self.faft_client.system.set_try_fw_b()
823        else:
824            if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
825                logging.info(
826                    'Firmware is booted with tried_fwb. Reboot to clear.')
827
828    def power_on(self):
829        """Switch DUT AC power on."""
830        self._client.power_on(self.power_control)
831
832    def power_off(self):
833        """Switch DUT AC power off."""
834        self._client.power_off(self.power_control)
835
836    def power_cycle(self):
837        """Power cycle DUT AC power."""
838        self._client.power_cycle(self.power_control)
839
840    def setup_rw_boot(self, section='a'):
841        """Make sure firmware is in RW-boot mode.
842
843        If the given firmware section is in RO-boot mode, turn off the RO-boot
844        flag and reboot DUT into RW-boot mode.
845
846        @param section: A firmware section, either 'a' or 'b'.
847        """
848        flags = self.faft_client.bios.get_preamble_flags(section)
849        if flags & vboot.PREAMBLE_USE_RO_NORMAL:
850            flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
851            self.faft_client.bios.set_preamble_flags(section, flags)
852            self.switcher.mode_aware_reboot()
853
854    def setup_kernel(self, part):
855        """Setup for kernel test.
856
857        It makes sure both kernel A and B bootable and the current boot is
858        the requested kernel part.
859
860        @param part: A string of kernel partition number or 'a'/'b'.
861        """
862        self.ensure_kernel_boot(part)
863        logging.info('Checking the integrity of kernel B and rootfs B...')
864        if (self.faft_client.kernel.diff_a_b() or
865                not self.faft_client.rootfs.verify_rootfs('B')):
866            logging.info('Copying kernel and rootfs from A to B...')
867            self.copy_kernel_and_rootfs(from_part=part,
868                                        to_part=self.OTHER_KERNEL_MAP[part])
869        self.reset_and_prioritize_kernel(part)
870
871    def reset_and_prioritize_kernel(self, part):
872        """Make the requested partition highest priority.
873
874        This function also reset kerenl A and B to bootable.
875
876        @param part: A string of partition number to be prioritized.
877        """
878        root_dev = self.faft_client.system.get_root_dev()
879        # Reset kernel A and B to bootable.
880        self.faft_client.system.run_shell_command(
881            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
882        self.faft_client.system.run_shell_command(
883            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
884        # Set kernel part highest priority.
885        self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
886                (self.KERNEL_MAP[part], root_dev))
887
888    def blocking_sync(self):
889        """Run a blocking sync command."""
890        # The double calls to sync fakes a blocking call
891        # since the first call returns before the flush
892        # is complete, but the second will wait for the
893        # first to finish.
894        self.faft_client.system.run_shell_command('sync')
895        self.faft_client.system.run_shell_command('sync')
896
897        # sync only sends SYNCHRONIZE_CACHE but doesn't
898        # check the status. For mmc devices, use `mmc
899        # status get` command to send an empty command to
900        # wait for the disk to be available again.  For
901        # other devices, hdparm sends TUR to check if
902        # a device is ready for transfer operation.
903        root_dev = self.faft_client.system.get_root_dev()
904        if 'mmcblk' in root_dev:
905            self.faft_client.system.run_shell_command('mmc status get %s' %
906                                                      root_dev)
907        else:
908            self.faft_client.system.run_shell_command('hdparm -f %s' % root_dev)
909
910    def wait_for_kernel_up(self):
911        """
912        Helper function that waits for the device to boot up to kernel.
913        """
914        logging.info("-[FAFT]-[ start wait_for_kernel_up ]---")
915        # Wait for the system to respond to ping before attempting ssh
916        if not self._client.ping_wait_up(90):
917            logging.warning("-[FAFT]-[ system did not respond to ping ]")
918        try:
919            self.wait_for_client()
920            # Stop update-engine as it may change firmware/kernel.
921            self._stop_service('update-engine')
922        except ConnectionError:
923            logging.error('wait_for_client() timed out.')
924            self._restore_routine_from_timeout()
925        logging.info("-[FAFT]-[ end wait_for_kernel_up ]-----")
926
927    def sync_and_ec_reboot(self, flags=''):
928        """Request the client sync and do a EC triggered reboot.
929
930        @param flags: Optional, a space-separated string of flags passed to EC
931                      reboot command, including:
932                          default: EC soft reboot;
933                          'hard': EC cold/hard reboot.
934        """
935        self.blocking_sync()
936        self.ec.reboot(flags)
937        time.sleep(self.faft_config.ec_boot_to_console)
938        self.check_lid_and_power_on()
939
940    def reboot_with_factory_install_shim(self):
941        """Request reboot with factory install shim to reset TPM.
942
943        Factory install shim requires dev mode enabled. So this method switches
944        firmware to dev mode first and reboot. The client uses factory install
945        shim to reset TPM values.
946        """
947        # Unplug USB first to avoid the complicated USB autoboot cases.
948        is_dev = self.checkers.crossystem_checker({'devsw_boot': '1'})
949        if not is_dev:
950            self.switcher.reboot_to_mode(to_mode='dev', wait_for_dut_up=False)
951        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False)
952        self.switcher.bypass_rec_mode()
953        time.sleep(self.faft_config.install_shim_done)
954        self.switcher.mode_aware_reboot()
955
956    def full_power_off_and_on(self):
957        """Shutdown the device by pressing power button and power on again."""
958        boot_id = self.get_bootid()
959        # Press power button to trigger Chrome OS normal shutdown process.
960        # We use a customized delay since the normal-press 1.2s is not enough.
961        self.servo.power_key(self.faft_config.hold_pwr_button)
962        # device can take 44-51 seconds to restart,
963        # add buffer from the default timeout of 60 seconds.
964        self.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
965        time.sleep(self.faft_config.shutdown)
966        # Short press power button to boot DUT again.
967        self.servo.power_short_press()
968
969    def check_lid_and_power_on(self):
970        """
971        On devices with EC software sync, system powers on after EC reboots if
972        lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
973        This method checks lid switch state and presses power button if
974        necessary.
975        """
976        if self.servo.get("lid_open") == "no":
977            time.sleep(self.faft_config.software_sync)
978            self.servo.power_short_press()
979
980    def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
981        """Modify the kernel header magic in USB stick.
982
983        The kernel header magic is the first 8-byte of kernel partition.
984        We modify it to make it fail on kernel verification check.
985
986        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
987        @param from_magic: A string of magic which we change it from.
988        @param to_magic: A string of magic which we change it to.
989        @raise TestError: if failed to change magic.
990        """
991        assert len(from_magic) == 8
992        assert len(to_magic) == 8
993        # USB image only contains one kernel.
994        kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
995        read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
996        current_magic = self.servo.system_output(read_cmd)
997        if current_magic == to_magic:
998            logging.info("The kernel magic is already %s.", current_magic)
999            return
1000        if current_magic != from_magic:
1001            raise error.TestError("Invalid kernel image on USB: wrong magic.")
1002
1003        logging.info('Modify the kernel magic in USB, from %s to %s.',
1004                     from_magic, to_magic)
1005        write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1006                     " 2>/dev/null" % (to_magic, kernel_part))
1007        self.servo.system(write_cmd)
1008
1009        if self.servo.system_output(read_cmd) != to_magic:
1010            raise error.TestError("Failed to write new magic.")
1011
1012    def corrupt_usb_kernel(self, usb_dev):
1013        """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1014
1015        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1016        """
1017        self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1018                                self.CORRUPTED_MAGIC)
1019
1020    def restore_usb_kernel(self, usb_dev):
1021        """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1022
1023        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1024        """
1025        self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1026                                self.CHROMEOS_MAGIC)
1027
1028    def _call_action(self, action_tuple, check_status=False):
1029        """Call the action function with/without arguments.
1030
1031        @param action_tuple: A function, or a tuple (function, args, error_msg),
1032                             in which, args and error_msg are optional. args is
1033                             either a value or a tuple if multiple arguments.
1034                             This can also be a list containing multiple
1035                             function or tuple. In this case, these actions are
1036                             called in sequence.
1037        @param check_status: Check the return value of action function. If not
1038                             succeed, raises a TestFail exception.
1039        @return: The result value of the action function.
1040        @raise TestError: An error when the action function is not callable.
1041        @raise TestFail: When check_status=True, action function not succeed.
1042        """
1043        if isinstance(action_tuple, list):
1044            return all([self._call_action(action, check_status=check_status)
1045                        for action in action_tuple])
1046
1047        action = action_tuple
1048        args = ()
1049        error_msg = 'Not succeed'
1050        if isinstance(action_tuple, tuple):
1051            action = action_tuple[0]
1052            if len(action_tuple) >= 2:
1053                args = action_tuple[1]
1054                if not isinstance(args, tuple):
1055                    args = (args,)
1056            if len(action_tuple) >= 3:
1057                error_msg = action_tuple[2]
1058
1059        if action is None:
1060            return
1061
1062        if not callable(action):
1063            raise error.TestError('action is not callable!')
1064
1065        info_msg = 'calling %s' % str(action)
1066        if args:
1067            info_msg += ' with args %s' % str(args)
1068        logging.info(info_msg)
1069        ret = action(*args)
1070
1071        if check_status and not ret:
1072            raise error.TestFail('%s: %s returning %s' %
1073                                 (error_msg, info_msg, str(ret)))
1074        return ret
1075
1076    def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1077            post_power_action=None, shutdown_timeout=None):
1078        """Run shutdown_action(), which makes DUT shutdown, and power it on.
1079
1080        @param shutdown_action: function which makes DUT shutdown, like
1081                                pressing power key.
1082        @param pre_power_action: function which is called before next power on.
1083        @param post_power_action: function which is called after next power on.
1084        @param shutdown_timeout: a timeout to confirm DUT shutdown.
1085        @raise TestFail: if the shutdown_action() failed to turn DUT off.
1086        """
1087        self._call_action(shutdown_action)
1088        logging.info('Wait to ensure DUT shut down...')
1089        try:
1090            if shutdown_timeout is None:
1091                shutdown_timeout = self.faft_config.shutdown_timeout
1092            self.wait_for_client(timeout=shutdown_timeout)
1093            raise error.TestFail(
1094                    'Should shut the device down after calling %s.' %
1095                    str(shutdown_action))
1096        except ConnectionError:
1097            logging.info(
1098                'DUT is surely shutdown. We are going to power it on again...')
1099
1100        if pre_power_action:
1101            self._call_action(pre_power_action)
1102        self.servo.power_short_press()
1103        if post_power_action:
1104            self._call_action(post_power_action)
1105
1106    def get_bootid(self, retry=3):
1107        """
1108        Return the bootid.
1109        """
1110        boot_id = None
1111        while retry:
1112            try:
1113                boot_id = self._client.get_boot_id()
1114                break
1115            except error.AutoservRunError:
1116                retry -= 1
1117                if retry:
1118                    logging.info('Retry to get boot_id...')
1119                else:
1120                    logging.warning('Failed to get boot_id.')
1121        logging.info('boot_id: %s', boot_id)
1122        return boot_id
1123
1124    def check_state(self, func):
1125        """
1126        Wrapper around _call_action with check_status set to True. This is a
1127        helper function to be used by tests and is currently implemented by
1128        calling _call_action with check_status=True.
1129
1130        TODO: This function's arguments need to be made more stringent. And
1131        its functionality should be moved over to check functions directly in
1132        the future.
1133
1134        @param func: A function, or a tuple (function, args, error_msg),
1135                             in which, args and error_msg are optional. args is
1136                             either a value or a tuple if multiple arguments.
1137                             This can also be a list containing multiple
1138                             function or tuple. In this case, these actions are
1139                             called in sequence.
1140        @return: The result value of the action function.
1141        @raise TestFail: If the function does notsucceed.
1142        """
1143        logging.info("-[FAFT]-[ start stepstate_checker ]----------")
1144        self._call_action(func, check_status=True)
1145        logging.info("-[FAFT]-[ end state_checker ]----------------")
1146
1147    def get_current_firmware_sha(self):
1148        """Get current firmware sha of body and vblock.
1149
1150        @return: Current firmware sha follows the order (
1151                 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
1152        """
1153        current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
1154                                self.faft_client.bios.get_body_sha('a'),
1155                                self.faft_client.bios.get_sig_sha('b'),
1156                                self.faft_client.bios.get_body_sha('b'))
1157        if not all(current_firmware_sha):
1158            raise error.TestError('Failed to get firmware sha.')
1159        return current_firmware_sha
1160
1161    def is_firmware_changed(self):
1162        """Check if the current firmware changed, by comparing its SHA.
1163
1164        @return: True if it is changed, otherwise Flase.
1165        """
1166        # Device may not be rebooted after test.
1167        self.faft_client.bios.reload()
1168
1169        current_sha = self.get_current_firmware_sha()
1170
1171        if current_sha == self._backup_firmware_sha:
1172            return False
1173        else:
1174            corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
1175            corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
1176            corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
1177            corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
1178            logging.info("Firmware changed:")
1179            logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
1180            logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
1181            logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
1182            logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
1183            return True
1184
1185    def backup_firmware(self, suffix='.original'):
1186        """Backup firmware to file, and then send it to host.
1187
1188        @param suffix: a string appended to backup file name
1189        """
1190        remote_temp_dir = self.faft_client.system.create_temp_dir()
1191        self.faft_client.bios.dump_whole(os.path.join(remote_temp_dir, 'bios'))
1192        self._client.get_file(os.path.join(remote_temp_dir, 'bios'),
1193                              os.path.join(self.resultsdir, 'bios' + suffix))
1194
1195        self._backup_firmware_sha = self.get_current_firmware_sha()
1196        logging.info('Backup firmware stored in %s with suffix %s',
1197            self.resultsdir, suffix)
1198
1199    def is_firmware_saved(self):
1200        """Check if a firmware saved (called backup_firmware before).
1201
1202        @return: True if the firmware is backuped; otherwise False.
1203        """
1204        return self._backup_firmware_sha != ()
1205
1206    def clear_saved_firmware(self):
1207        """Clear the firmware saved by the method backup_firmware."""
1208        self._backup_firmware_sha = ()
1209
1210    def restore_firmware(self, suffix='.original'):
1211        """Restore firmware from host in resultsdir.
1212
1213        @param suffix: a string appended to backup file name
1214        """
1215        if not self.is_firmware_changed():
1216            return
1217
1218        # Backup current corrupted firmware.
1219        self.backup_firmware(suffix='.corrupt')
1220
1221        # Restore firmware.
1222        remote_temp_dir = self.faft_client.system.create_temp_dir()
1223        self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
1224                               os.path.join(remote_temp_dir, 'bios'))
1225
1226        self.faft_client.bios.write_whole(
1227            os.path.join(remote_temp_dir, 'bios'))
1228        self.switcher.mode_aware_reboot()
1229        logging.info('Successfully restore firmware.')
1230
1231    def setup_firmwareupdate_shellball(self, shellball=None):
1232        """Deside a shellball to use in firmware update test.
1233
1234        Check if there is a given shellball, and it is a shell script. Then,
1235        send it to the remote host. Otherwise, use
1236        /usr/sbin/chromeos-firmwareupdate.
1237
1238        @param shellball: path of a shellball or default to None.
1239
1240        @return: Path of shellball in remote host. If use default shellball,
1241                 reutrn None.
1242        """
1243        updater_path = None
1244        if shellball:
1245            # Determine the firmware file is a shellball or a raw binary.
1246            is_shellball = (utils.system_output("file %s" % shellball).find(
1247                    "shell script") != -1)
1248            if is_shellball:
1249                logging.info('Device will update firmware with shellball %s',
1250                             shellball)
1251                temp_dir = self.faft_client.system.create_temp_dir(
1252                            'shellball_')
1253                temp_shellball = os.path.join(temp_dir, 'updater.sh')
1254                self._client.send_file(shellball, temp_shellball)
1255                updater_path = temp_shellball
1256            else:
1257                raise error.TestFail(
1258                    'The given shellball is not a shell script.')
1259            return updater_path
1260
1261    def is_kernel_changed(self):
1262        """Check if the current kernel is changed, by comparing its SHA1 hash.
1263
1264        @return: True if it is changed; otherwise, False.
1265        """
1266        changed = False
1267        for p in ('A', 'B'):
1268            backup_sha = self._backup_kernel_sha.get(p, None)
1269            current_sha = self.faft_client.kernel.get_sha(p)
1270            if backup_sha != current_sha:
1271                changed = True
1272                logging.info('Kernel %s is changed', p)
1273        return changed
1274
1275    def backup_kernel(self, suffix='.original'):
1276        """Backup kernel to files, and the send them to host.
1277
1278        @param suffix: a string appended to backup file name.
1279        """
1280        remote_temp_dir = self.faft_client.system.create_temp_dir()
1281        for p in ('A', 'B'):
1282            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1283            self.faft_client.kernel.dump(p, remote_path)
1284            self._client.get_file(
1285                    remote_path,
1286                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
1287            self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
1288        logging.info('Backup kernel stored in %s with suffix %s',
1289            self.resultsdir, suffix)
1290
1291    def is_kernel_saved(self):
1292        """Check if kernel images are saved (backup_kernel called before).
1293
1294        @return: True if the kernel is saved; otherwise, False.
1295        """
1296        return len(self._backup_kernel_sha) != 0
1297
1298    def clear_saved_kernel(self):
1299        """Clear the kernel saved by backup_kernel()."""
1300        self._backup_kernel_sha = dict()
1301
1302    def restore_kernel(self, suffix='.original'):
1303        """Restore kernel from host in resultsdir.
1304
1305        @param suffix: a string appended to backup file name.
1306        """
1307        if not self.is_kernel_changed():
1308            return
1309
1310        # Backup current corrupted kernel.
1311        self.backup_kernel(suffix='.corrupt')
1312
1313        # Restore kernel.
1314        remote_temp_dir = self.faft_client.system.create_temp_dir()
1315        for p in ('A', 'B'):
1316            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1317            self._client.send_file(
1318                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
1319                    remote_path)
1320            self.faft_client.kernel.write(p, remote_path)
1321
1322        self.switcher.mode_aware_reboot()
1323        logging.info('Successfully restored kernel.')
1324
1325    def backup_cgpt_attributes(self):
1326        """Backup CGPT partition table attributes."""
1327        self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
1328
1329    def restore_cgpt_attributes(self):
1330        """Restore CGPT partition table attributes."""
1331        current_table = self.faft_client.cgpt.get_attributes()
1332        if current_table == self._backup_cgpt_attr:
1333            return
1334        logging.info('CGPT table is changed. Original: %r. Current: %r.',
1335                     self._backup_cgpt_attr,
1336                     current_table)
1337        self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
1338
1339        self.switcher.mode_aware_reboot()
1340        logging.info('Successfully restored CGPT table.')
1341
1342    def try_fwb(self, count=0):
1343        """set to try booting FWB count # times
1344
1345        Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
1346        vboot2
1347
1348        @param count: an integer specifying value to program into
1349                      fwb_tries(vb1)/fw_try_next(vb2)
1350        """
1351        if self.fw_vboot2:
1352            self.faft_client.system.set_fw_try_next('B', count)
1353        else:
1354            # vboot1: we need to boot into fwb at least once
1355            if not count:
1356                count = count + 1
1357            self.faft_client.system.set_try_fw_b(count)
1358
1359