1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Code to provide functions for FAFT tests.
6
7These can be exposed via a xmlrpci server running on the DUT.
8"""
9
10import functools, os, tempfile
11
12import common
13from autotest_lib.client.cros.faft.utils import (cgpt_handler,
14                                                 os_interface,
15                                                 firmware_check_keys,
16                                                 firmware_updater,
17                                                 flashrom_handler,
18                                                 kernel_handler,
19                                                 rootfs_handler,
20                                                 saft_flashrom_util,
21                                                 tpm_handler,
22                                                )
23
24
25def allow_multiple_section_input(image_operator):
26    """Decorate a method to support multiple sections.
27
28    @param image_operator: Method accepting one section as its argument.
29    """
30    @functools.wraps(image_operator)
31    def wrapper(self, section):
32        """Wrapper method to support multiple sections.
33
34        @param section: A list of sections of just a section.
35        """
36        if type(section) in (tuple, list):
37            for sec in section:
38                image_operator(self, sec)
39        else:
40            image_operator(self, section)
41    return wrapper
42
43
44class LazyFlashromHandlerProxy:
45    """Proxy of FlashromHandler for lazy initialization."""
46    _loaded = False
47    _obj = None
48
49    def __init__(self, *args, **kargs):
50        self._args = args
51        self._kargs = kargs
52
53    def _load(self):
54        self._obj = flashrom_handler.FlashromHandler()
55        self._obj.init(*self._args, **self._kargs)
56        self._obj.new_image()
57        self._loaded = True
58
59    def __getattr__(self, name):
60        if not self._loaded:
61            self._load()
62        return getattr(self._obj, name)
63
64    def reload(self):
65        """Reload the FlashromHandler class."""
66        self._loaded = False
67
68
69class RPCFunctions(object):
70    """A class which aggregates some useful functions for firmware testing.
71
72    This class can be exposed via a XMLRPC server such that its functions can
73    be accessed remotely. Method naming should fit the naming rule
74    '_[categories]_[method_name]' where categories contains system, ec, bios,
75    kernel, cgpt, tpm, updater, etc. Methods should be called by
76    'FAFTClient.[categories].[method_name]', because _dispatch will rename
77    this name to '_[categories]_[method_name]'.
78
79    Attributes:
80        _os_if: An object to encapsulate OS services functions.
81        _bios_handler: An object to automate BIOS flashrom testing.
82        _ec_handler: An object to automate EC flashrom testing.
83        _kernel_handler: An object to provide kernel related actions.
84        _log_file: Path of the log file.
85        _tpm_handler: An object to control TPM device.
86        _updater: An object to update firmware.
87        _temp_path: Path of a temp directory.
88        _keys_path: Path of a directory, keys/, in temp directory.
89        _work_path: Path of a directory, work/, in temp directory.
90    """
91    def __init__(self):
92        """Initialize the data attributes of this class."""
93        # TODO(waihong): Move the explicit object.init() methods to the
94        # objects' constructors (OSInterface, FlashromHandler,
95        # KernelHandler, and TpmHandler).
96        self._os_if = os_interface.OSInterface()
97        # We keep the state of FAFT test in a permanent directory over reboots.
98        state_dir = '/var/tmp/faft'
99        self._log_file = os.path.join(state_dir, 'faft_client.log')
100        self._os_if.init(state_dir, log_file=self._log_file)
101        os.chdir(state_dir)
102
103        self._bios_handler = LazyFlashromHandlerProxy(
104                                saft_flashrom_util,
105                                self._os_if,
106                                None,
107                                '/usr/share/vboot/devkeys',
108                                'bios')
109
110        self._ec_handler = None
111        if self._os_if.run_shell_command_get_status('mosys ec info') == 0:
112            self._ec_handler = LazyFlashromHandlerProxy(
113                                  saft_flashrom_util,
114                                  self._os_if,
115                                  'ec_root_key.vpubk',
116                                  '/usr/share/vboot/devkeys',
117                                  'ec')
118        else:
119            self._os_if.log('No EC is reported by mosys.')
120
121        self._kernel_handler = kernel_handler.KernelHandler()
122        self._kernel_handler.init(self._os_if,
123                                  dev_key_path='/usr/share/vboot/devkeys',
124                                  internal_disk=True)
125
126        # FIXME(waihong): Add back the TPM support.
127        if not self._os_if.is_android:
128            self._tpm_handler = tpm_handler.TpmHandler()
129            self._tpm_handler.init(self._os_if)
130        else:
131            self._tpm_handler = None
132
133        self._cgpt_handler = cgpt_handler.CgptHandler(self._os_if)
134
135        self._rootfs_handler = rootfs_handler.RootfsHandler()
136        self._rootfs_handler.init(self._os_if)
137
138        self._updater = firmware_updater.FirmwareUpdater(self._os_if)
139        self._check_keys = firmware_check_keys.firmwareCheckKeys()
140
141        # Initialize temporary directory path
142        self._temp_path = '/var/tmp/faft/autest'
143        self._keys_path = os.path.join(self._temp_path, 'keys')
144        self._work_path = os.path.join(self._temp_path, 'work')
145
146    def _dispatch(self, method, params):
147        """This _dispatch method handles string conversion especially.
148
149        Since we turn off allow_dotted_names option. So any string conversion,
150        like str(FAFTClient.method), i.e. FAFTClient.method.__str__, failed
151        via XML RPC call.
152        """
153        is_str = method.endswith('.__str__')
154        if is_str:
155            method = method.rsplit('.', 1)[0]
156
157        categories = ('system', 'host', 'bios', 'ec', 'kernel',
158                      'tpm', 'cgpt', 'updater', 'rootfs')
159        try:
160            if method.split('.', 1)[0] in categories:
161                func = getattr(self, '_%s_%s' % (method.split('.', 1)[0],
162                                                 method.split('.', 1)[1]))
163            else:
164                func = getattr(self, method)
165        except AttributeError:
166            raise Exception('method "%s" is not supported' % method)
167
168        if is_str:
169            return str(func)
170        else:
171            self._os_if.log('Dispatching method %s with args %r' %
172                    (func.__name__, params))
173            return func(*params)
174
175    def _system_is_available(self):
176        """Function for polling the RPC server availability.
177
178        @return: Always True.
179        """
180        return True
181
182    def _system_has_host(self):
183        """Return True if a host is connected to DUT."""
184        return self._os_if.has_host()
185
186    def _system_wait_for_client(self, timeout):
187        """Wait for the client to come back online.
188
189        @param timeout: Time in seconds to wait for the client SSH daemon to
190                        come up.
191        @return: True if succeed; otherwise False.
192        """
193        return self._os_if.wait_for_device(timeout)
194
195    def _system_wait_for_client_offline(self, timeout):
196        """Wait for the client to come offline.
197
198        @param timeout: Time in seconds to wait the client to come offline.
199        @return: True if succeed; otherwise False.
200        """
201        return self._os_if.wait_for_no_device(timeout)
202
203    def _system_dump_log(self, remove_log=False):
204        """Dump the log file.
205
206        @param remove_log: Remove the log file after dump.
207        @return: String of the log file content.
208        """
209        log = open(self._log_file).read()
210        if remove_log:
211            os.remove(self._log_file)
212        return log
213
214    def _system_run_shell_command(self, command):
215        """Run shell command.
216
217        @param command: A shell command to be run.
218        """
219        self._os_if.run_shell_command(command)
220
221    def _system_run_shell_command_get_output(self, command):
222        """Run shell command and get its console output.
223
224        @param command: A shell command to be run.
225        @return: A list of strings stripped of the newline characters.
226        """
227        return self._os_if.run_shell_command_get_output(command)
228
229    def _host_run_shell_command(self, command):
230        """Run shell command on the host.
231
232        @param command: A shell command to be run.
233        """
234        self._os_if.run_host_shell_command(command)
235
236    def _host_run_shell_command_get_output(self, command):
237        """Run shell command and get its console output on the host.
238
239        @param command: A shell command to be run.
240        @return: A list of strings stripped of the newline characters.
241        """
242        return self._os_if.run_host_shell_command_get_output(command)
243
244    def _host_run_nonblock_shell_command(self, command):
245        """Run non-blocking shell command
246
247        @param command: A shell command to be run.
248        @return: none
249        """
250        return self._os_if.run_host_shell_command(command, False)
251
252    def _system_software_reboot(self):
253        """Request software reboot."""
254        self._os_if.run_shell_command('reboot')
255
256    def _system_get_platform_name(self):
257        """Get the platform name of the current system.
258
259        @return: A string of the platform name.
260        """
261        # 'mosys platform name' sometimes fails. Let's get the verbose output.
262        lines = self._os_if.run_shell_command_get_output(
263                '(mosys -vvv platform name 2>&1) || echo Failed')
264        if lines[-1].strip() == 'Failed':
265            raise Exception('Failed getting platform name: ' + '\n'.join(lines))
266        return lines[-1]
267
268    def _system_get_crossystem_value(self, key):
269        """Get crossystem value of the requested key.
270
271        @param key: A crossystem key.
272        @return: A string of the requested crossystem value.
273        """
274        return self._os_if.run_shell_command_get_output(
275                'crossystem %s' % key)[0]
276
277    def _system_get_root_dev(self):
278        """Get the name of root device without partition number.
279
280        @return: A string of the root device without partition number.
281        """
282        return self._os_if.get_root_dev()
283
284    def _system_get_root_part(self):
285        """Get the name of root device with partition number.
286
287        @return: A string of the root device with partition number.
288        """
289        return self._os_if.get_root_part()
290
291    def _system_set_try_fw_b(self, count=1):
292        """Set 'Try Frimware B' flag in crossystem.
293
294        @param count: # times to try booting into FW B
295        """
296        self._os_if.cs.fwb_tries = count
297
298    def _system_set_fw_try_next(self, next, count=0):
299        """Set fw_try_next to A or B
300
301        @param next: Next FW to reboot to (A or B)
302        @param count: # of times to try booting into FW <next>
303        """
304        self._os_if.cs.fw_try_next = next
305        if count:
306            self._os_if.cs.fw_try_count = count
307
308    def _system_get_fw_vboot2(self):
309        """Get fw_vboot2"""
310        try:
311            return self._os_if.cs.fw_vboot2 == '1'
312        except os_interface.OSInterfaceError:
313            return False
314
315    def _system_request_recovery_boot(self):
316        """Request running in recovery mode on the restart."""
317        self._os_if.cs.request_recovery()
318
319    def _system_get_dev_boot_usb(self):
320        """Get dev_boot_usb value which controls developer mode boot from USB.
321
322        @return: True if enable, False if disable.
323        """
324        return self._os_if.cs.dev_boot_usb == '1'
325
326    def _system_set_dev_boot_usb(self, value):
327        """Set dev_boot_usb value which controls developer mode boot from USB.
328
329        @param value: True to enable, False to disable.
330        """
331        self._os_if.cs.dev_boot_usb = 1 if value else 0
332
333    def _system_is_removable_device_boot(self):
334        """Check the current boot device is removable.
335
336        @return: True: if a removable device boots.
337                 False: if a non-removable device boots.
338        """
339        root_part = self._os_if.get_root_part()
340        return self._os_if.is_removable_device(root_part)
341
342    def _system_create_temp_dir(self, prefix='backup_'):
343        """Create a temporary directory and return the path."""
344        return tempfile.mkdtemp(prefix=prefix)
345
346    def _bios_reload(self):
347        """Reload the firmware image that may be changed."""
348        self._bios_handler.reload()
349
350    def _bios_get_gbb_flags(self):
351        """Get the GBB flags.
352
353        @return: An integer of the GBB flags.
354        """
355        return self._bios_handler.get_gbb_flags()
356
357    def _bios_set_gbb_flags(self, flags):
358        """Set the GBB flags.
359
360        @param flags: An integer of the GBB flags.
361        """
362        self._bios_handler.set_gbb_flags(flags, write_through=True)
363
364    def _bios_get_preamble_flags(self, section):
365        """Get the preamble flags of a firmware section.
366
367        @param section: A firmware section, either 'a' or 'b'.
368        @return: An integer of the preamble flags.
369        """
370        return self._bios_handler.get_section_flags(section)
371
372    def _bios_set_preamble_flags(self, section, flags):
373        """Set the preamble flags of a firmware section.
374
375        @param section: A firmware section, either 'a' or 'b'.
376        @param flags: An integer of preamble flags.
377        """
378        version = self._bios_get_version(section)
379        self._bios_handler.set_section_version(section, version, flags,
380                                               write_through=True)
381
382    def _bios_get_body_sha(self, section):
383        """Get SHA1 hash of BIOS RW firmware section.
384
385        @param section: A firmware section, either 'a' or 'b'.
386        @param flags: An integer of preamble flags.
387        """
388        return self._bios_handler.get_section_sha(section)
389
390    def _bios_get_sig_sha(self, section):
391        """Get SHA1 hash of firmware vblock in section."""
392        return self._bios_handler.get_section_sig_sha(section)
393
394    @allow_multiple_section_input
395    def _bios_corrupt_sig(self, section):
396        """Corrupt the requested firmware section signature.
397
398        @param section: A firmware section, either 'a' or 'b'.
399        """
400        self._bios_handler.corrupt_firmware(section)
401
402    @allow_multiple_section_input
403    def _bios_restore_sig(self, section):
404        """Restore the previously corrupted firmware section signature.
405
406        @param section: A firmware section, either 'a' or 'b'.
407        """
408        self._bios_handler.restore_firmware(section)
409
410    @allow_multiple_section_input
411    def _bios_corrupt_body(self, section):
412        """Corrupt the requested firmware section body.
413
414        @param section: A firmware section, either 'a' or 'b'.
415        """
416        self._bios_handler.corrupt_firmware_body(section)
417
418    @allow_multiple_section_input
419    def _bios_restore_body(self, section):
420        """Restore the previously corrupted firmware section body.
421
422        @param section: A firmware section, either 'a' or 'b'.
423        """
424        self._bios_handler.restore_firmware_body(section)
425
426    def __bios_modify_version(self, section, delta):
427        """Modify firmware version for the requested section, by adding delta.
428
429        The passed in delta, a positive or a negative number, is added to the
430        original firmware version.
431        """
432        original_version = self._bios_get_version(section)
433        new_version = original_version + delta
434        flags = self._bios_handler.get_section_flags(section)
435        self._os_if.log(
436                'Setting firmware section %s version from %d to %d' % (
437                section, original_version, new_version))
438        self._bios_handler.set_section_version(section, new_version, flags,
439                                               write_through=True)
440
441    @allow_multiple_section_input
442    def _bios_move_version_backward(self, section):
443        """Decrement firmware version for the requested section."""
444        self.__bios_modify_version(section, -1)
445
446    @allow_multiple_section_input
447    def _bios_move_version_forward(self, section):
448        """Increase firmware version for the requested section."""
449        self.__bios_modify_version(section, 1)
450
451    def _bios_get_version(self, section):
452        """Retrieve firmware version of a section."""
453        return self._bios_handler.get_section_version(section)
454
455    def _bios_get_datakey_version(self, section):
456        """Return firmware data key version."""
457        return self._bios_handler.get_section_datakey_version(section)
458
459    def _bios_get_kernel_subkey_version(self, section):
460        """Return kernel subkey version."""
461        return self._bios_handler.get_section_kernel_subkey_version(section)
462
463    def _bios_dump_whole(self, bios_path):
464        """Dump the current BIOS firmware to a file, specified by bios_path.
465
466        @param bios_path: The path of the BIOS image to be written.
467        """
468        self._bios_handler.dump_whole(bios_path)
469
470    def _bios_write_whole(self, bios_path):
471        """Write the firmware from bios_path to the current system.
472
473        @param bios_path: The path of the source BIOS image.
474        """
475        self._bios_handler.new_image(bios_path)
476        self._bios_handler.write_whole()
477
478    def _ec_get_version(self):
479        """Get EC version via mosys.
480
481        @return: A string of the EC version.
482        """
483        return self._os_if.run_shell_command_get_output(
484                'mosys ec info | sed "s/.*| //"')[0]
485
486    def _ec_get_firmware_sha(self):
487        """Get SHA1 hash of EC RW firmware section."""
488        return self._ec_handler.get_section_sha('rw')
489
490    @allow_multiple_section_input
491    def _ec_corrupt_sig(self, section):
492        """Corrupt the requested EC section signature.
493
494        @param section: A EC section, either 'a' or 'b'.
495        """
496        self._ec_handler.corrupt_firmware(section, corrupt_all=True)
497
498    @allow_multiple_section_input
499    def _ec_restore_sig(self, section):
500        """Restore the previously corrupted EC section signature.
501
502        @param section: An EC section, either 'a' or 'b'.
503        """
504        self._ec_handler.restore_firmware(section, restore_all=True)
505
506    @allow_multiple_section_input
507    def _ec_corrupt_body(self, section):
508        """Corrupt the requested EC section body.
509
510        @param section: An EC section, either 'a' or 'b'.
511        """
512        self._ec_handler.corrupt_firmware_body(section, corrupt_all=True)
513
514    @allow_multiple_section_input
515    def _ec_restore_body(self, section):
516        """Restore the previously corrupted EC section body.
517
518        @param section: An EC section, either 'a' or 'b'.
519        """
520        self._ec_handler.restore_firmware_body(section, restore_all=True)
521
522    def _ec_dump_firmware(self, ec_path):
523        """Dump the current EC firmware to a file, specified by ec_path.
524
525        @param ec_path: The path of the EC image to be written.
526        """
527        self._ec_handler.dump_whole(ec_path)
528
529    def _ec_set_write_protect(self, enable):
530        """Enable write protect of the EC flash chip.
531
532        @param enable: True if activating EC write protect. Otherwise, False.
533        """
534        if enable:
535            self._ec_handler.enable_write_protect()
536        else:
537            self._ec_handler.disable_write_protect()
538
539    @allow_multiple_section_input
540    def _kernel_corrupt_sig(self, section):
541        """Corrupt the requested kernel section.
542
543        @param section: A kernel section, either 'a' or 'b'.
544        """
545        self._kernel_handler.corrupt_kernel(section)
546
547    @allow_multiple_section_input
548    def _kernel_restore_sig(self, section):
549        """Restore the requested kernel section (previously corrupted).
550
551        @param section: A kernel section, either 'a' or 'b'.
552        """
553        self._kernel_handler.restore_kernel(section)
554
555    def __kernel_modify_version(self, section, delta):
556        """Modify kernel version for the requested section, by adding delta.
557
558        The passed in delta, a positive or a negative number, is added to the
559        original kernel version.
560        """
561        original_version = self._kernel_handler.get_version(section)
562        new_version = original_version + delta
563        self._os_if.log(
564                'Setting kernel section %s version from %d to %d' % (
565                section, original_version, new_version))
566        self._kernel_handler.set_version(section, new_version)
567
568    @allow_multiple_section_input
569    def _kernel_move_version_backward(self, section):
570        """Decrement kernel version for the requested section."""
571        self.__kernel_modify_version(section, -1)
572
573    @allow_multiple_section_input
574    def _kernel_move_version_forward(self, section):
575        """Increase kernel version for the requested section."""
576        self.__kernel_modify_version(section, 1)
577
578    def _kernel_get_version(self, section):
579        """Return kernel version."""
580        return self._kernel_handler.get_version(section)
581
582    def _kernel_get_datakey_version(self, section):
583        """Return kernel datakey version."""
584        return self._kernel_handler.get_datakey_version(section)
585
586    def _kernel_diff_a_b(self):
587        """Compare kernel A with B.
588
589        @return: True: if kernel A is different with B.
590                 False: if kernel A is the same as B.
591        """
592        rootdev = self._os_if.get_root_dev()
593        kernel_a = self._os_if.join_part(rootdev, '2')
594        kernel_b = self._os_if.join_part(rootdev, '4')
595
596        # The signature (some kind of hash) for the kernel body is stored in
597        # the beginning. So compare the first 64KB (including header, preamble,
598        # and signature) should be enough to check them identical.
599        header_a = self._os_if.read_partition(kernel_a, 0x10000)
600        header_b = self._os_if.read_partition(kernel_b, 0x10000)
601
602        return header_a != header_b
603
604    def _kernel_resign_with_keys(self, section, key_path=None):
605        """Resign kernel with temporary key."""
606        self._kernel_handler.resign_kernel(section, key_path)
607
608    def _kernel_dump(self, section, kernel_path):
609        """Dump the specified kernel to a file.
610
611        @param section: The kernel to dump. May be A or B.
612        @param kernel_path: The path to the kernel image to be written.
613        """
614        self._kernel_handler.dump_kernel(section, kernel_path)
615
616    def _kernel_write(self, section, kernel_path):
617        """Write a kernel image to the specified section.
618
619        @param section: The kernel to dump. May be A or B.
620        @param kernel_path: The path to the kernel image.
621        """
622        self._kernel_handler.write_kernel(section, kernel_path)
623
624    def _kernel_get_sha(self, section):
625        """Return the SHA1 hash of the specified kernel section."""
626        return self._kernel_handler.get_sha(section)
627
628    def _tpm_get_firmware_version(self):
629        """Retrieve tpm firmware body version."""
630        return self._tpm_handler.get_fw_version()
631
632    def _tpm_get_firmware_datakey_version(self):
633        """Retrieve tpm firmware data key version."""
634        return self._tpm_handler.get_fw_body_version()
635
636    def _cgpt_get_attributes(self):
637        """Get kernel attributes."""
638        rootdev = self._system_get_root_dev()
639        self._cgpt_handler.read_device_info(rootdev)
640        return {'A': self._cgpt_handler.get_partition(rootdev, 'KERN-A'),
641                'B': self._cgpt_handler.get_partition(rootdev, 'KERN-B')}
642
643    def _cgpt_set_attributes(self, attributes):
644        """Set kernel attributes."""
645        rootdev = self._system_get_root_dev()
646        allowed = ['priority', 'tries', 'successful']
647        for p in ('A', 'B'):
648            if p not in attributes:
649                continue
650            attr = dict()
651            for k in allowed:
652                if k in attributes[p]:
653                    attr[k] = attributes[p][k]
654            if attr:
655                self._cgpt_handler.set_partition(rootdev, 'KERN-%s' % p, attr)
656
657    def _updater_cleanup(self):
658        self._updater.cleanup_temp_dir()
659
660    def _updater_get_fwid(self):
661        """Retrieve shellball's fwid.
662
663        @return: Shellball's fwid.
664        """
665        return self._updater.retrieve_fwid()
666
667    def _updater_resign_firmware(self, version):
668        """Resign firmware with version.
669
670        @param version: new version number.
671        """
672        self._updater.resign_firmware(version)
673
674    def _updater_repack_shellball(self, append):
675        """Repack shellball with new fwid.
676
677        @param append: use for new fwid naming.
678        """
679        self._updater.repack_shellball(append)
680
681    def _updater_run_autoupdate(self, append):
682        """Run chromeos-firmwareupdate with autoupdate mode."""
683        options = ['--noupdate_ec', '--nocheck_rw_compatible']
684        self._updater.run_firmwareupdate(mode='autoupdate',
685                                         updater_append=append,
686                                         options=options)
687
688    def _updater_run_factory_install(self):
689        """Run chromeos-firmwareupdate with factory_install mode."""
690        options = ['--noupdate_ec']
691        self._updater.run_firmwareupdate(mode='factory_install',
692                                         options=options)
693
694    def _updater_run_bootok(self, append):
695        """Run chromeos-firmwareupdate with bootok mode."""
696        self._updater.run_firmwareupdate(mode='bootok',
697                                         updater_append=append)
698
699    def _updater_run_recovery(self):
700        """Run chromeos-firmwareupdate with recovery mode."""
701        options = ['--noupdate_ec', '--nocheck_rw_compatible']
702        self._updater.run_firmwareupdate(mode='recovery',
703                                         options=options)
704
705    def _updater_get_temp_path(self):
706        """Get updater's temp directory path."""
707        return self._updater.get_temp_path()
708
709    def _updater_get_keys_path(self):
710        """Get updater's keys directory path."""
711        return self._updater.get_keys_path()
712
713    def _updater_get_work_path(self):
714        """Get updater's work directory path."""
715        return self._updater.get_work_path()
716
717    def _rootfs_verify_rootfs(self, section):
718        """Verifies the integrity of the root FS.
719
720        @param section: The rootfs to verify. May be A or B.
721        """
722        return self._rootfs_handler.verify_rootfs(section)
723
724    def _system_check_keys(self, expected_sequence):
725        """Check the keys sequence was as expected.
726
727        @param expected_sequence: A list of expected key sequences.
728        """
729        return self._check_keys.check_keys(expected_sequence)
730
731    def cleanup(self):
732        """Cleanup for the RPC server. Currently nothing."""
733        pass
734