firmware_Cr50Update.py revision 70b80cf0fc5ecd653b22aa06503c9462d94e014f
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib.cros import cr50_utils
11from autotest_lib.server import autotest, test
12from autotest_lib.server.cros import debugd_dev_tools
13from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
14
15
16class firmware_Cr50Update(FirmwareTest):
17    """
18    Verify a dut can update to the given image.
19
20    Copy the new image onto the device and clear the update state to force
21    cr50-update to run. The test will fail if Cr50 does not update or if the
22    update script encounters any errors.
23
24    @param image: the location of the update image
25    @param image_type: string representing the image type. If it is "dev" then
26                       don't check the RO versions when comparing versions.
27    """
28    version = 1
29    UPDATE_TIMEOUT = 20
30    ERASE_NVMEM = "erase_nvmem"
31
32    DEV_NAME = "dev_image"
33    OLD_RELEASE_NAME = "old_release_image"
34    RELEASE_NAME = "release_image"
35    ORIGINAL_NAME = "original_image"
36    RESTORE_ORIGINAL_TRIES = 3
37    SUCCESS = 0
38    UPSTART_SUCCESS = 1
39
40
41    def initialize(self, host, cmdline_args, release_image, dev_image,
42                   old_release_image="", test=""):
43        """Initialize servo and process the given images"""
44        self.processed_images = False
45
46        super(firmware_Cr50Update, self).initialize(host, cmdline_args)
47        if not hasattr(self, "cr50"):
48            raise error.TestNAError('Test can only be run on devices with '
49                                    'access to the Cr50 console')
50        # Make sure ccd is disabled so it won't interfere with the update
51        self.cr50.ccd_disable()
52
53        self.rootfs_tool = debugd_dev_tools.RootfsVerificationTool()
54        self.rootfs_tool.initialize(host)
55        if not self.rootfs_tool.is_enabled():
56            logging.debug('Removing rootfs verification.')
57            # 'enable' actually disables rootfs verification
58            self.rootfs_tool.enable()
59
60        self.host = host
61        self.test = test.lower()
62
63        # A dict used to store relevant information for each image
64        self.images = {}
65
66        running_rw = cr50_utils.GetRunningVersion(self.host)[1]
67        # Get the original image from the cr50 firmware directory on the dut
68        self.save_original_image(cr50_utils.CR50_FILE)
69
70        # If Cr50 is not running the image from the cr50 firmware directory,
71        # then raise an error, otherwise the test will not be able to restore
72        # the original state during cleanup.
73        if running_rw != self.original_rw:
74            raise error.TestError("Can't determine original Cr50 version. "
75                                  "Running %s, but saved %s." %
76                                  (running_rw, self.original_rw))
77
78        # Process the given images in order of oldest to newest. Get the version
79        # info and add them to the update order
80        self.update_order = []
81        if self.test != self.ERASE_NVMEM and old_release_image:
82            self.add_image_to_update_order(self.OLD_RELEASE_NAME,
83                                           old_release_image)
84        self.add_image_to_update_order(self.RELEASE_NAME, release_image)
85        self.add_image_to_update_order(self.DEV_NAME, dev_image)
86        self.verify_update_order()
87        self.processed_images = True
88        logging.info("Update %s", self.update_order)
89
90        # Update to the dev image
91        self.run_update(self.DEV_NAME)
92
93
94    def restore_original_image(self):
95        """Update to the image that was running at the start of the test.
96
97        Returns SUCCESS if the update was successful or the update error if it
98        failed.
99        """
100        rv = self.SUCCESS
101        _, running_rw, is_dev = self.cr50.get_active_version_info()
102        new_rw = cr50_utils.GetNewestVersion(running_rw, self.original_rw)
103
104        # If Cr50 is running the original image, then no update is needed.
105        if new_rw is None:
106            return rv
107
108        try:
109            # If a rollback is needed, update to the dev image so it can
110            # rollback to the original image.
111            if new_rw != self.original_rw and not is_dev:
112                logging.info("Updating to dev image to enable rollback")
113                self.run_update(self.DEV_NAME, use_usb_update=True)
114
115            logging.info("Updating to the original image %s",
116                         self.original_rw)
117            self.run_update(self.ORIGINAL_NAME, use_usb_update=True)
118        except Exception, e:
119            logging.info("cleanup update from %s to %s failed", running_rw,
120                          self.original_rw)
121            logging.debug(e)
122            rv = e
123        self.cr50.ccd_enable()
124        return rv
125
126
127    def cleanup(self):
128        """Update Cr50 to the image it was running at the start of the test"""
129        logging.warning('rootfs verification is disabled')
130
131        # Make sure keepalive is disabled
132        self.cr50.ccd_enable()
133        self.cr50.send_command("ccd keepalive disable")
134
135        # Restore the original Cr50 image
136        if self.processed_images:
137            for i in xrange(self.RESTORE_ORIGINAL_TRIES):
138                rv = self.restore_original_image()
139                if rv == self.SUCCESS:
140                    logging.info("Successfully restored the original image")
141                    break
142            if rv != self.SUCCESS:
143                raise error.TestError("Could not restore the original image")
144
145        super(firmware_Cr50Update, self).cleanup()
146
147
148    def run_usb_update(self, dest, is_newer):
149        """Run usb_update with the given image.
150
151        If the new image version is newer than the one Cr50 is running, then
152        the upstart option will be used. This will reboot the AP after
153        usb_updater runs, so Cr50 will finish the update and jump to the new
154        image.
155
156        If the new image version is older than the one Cr50 is running, then the
157        best usb_updater can do is flash the image into the inactive partition.
158        To finish th update, the 'rw' command will need to be used to force a
159        rollback.
160
161        @param dest: the image location.
162        @param is_newer: True if the rw version of the update image is newer
163                         than the one Cr50 is running.
164        """
165
166        result = self.host.run("status trunksd")
167        if 'running' in result.stdout:
168            self.host.run("stop trunksd")
169
170        # Enable CCD, so we can detect the Cr50 reboot.
171        self.cr50.ccd_enable()
172        if is_newer:
173            # Using -u usb_updater will post a reboot request but not reboot
174            # immediately.
175            result = self.host.run("usb_updater -s -u %s" % dest,
176                                   ignore_status=True)
177            # After a posted reboot, the usb_update exit code should equal 1.
178            if result.exit_status != self.UPSTART_SUCCESS:
179                logging.debug(result)
180                raise error.TestError("Got unexpected usb_update exit code")
181            # Reset the AP to finish the Cr50 update.
182            self.cr50.send_command("sysrst pulse")
183        else:
184            logging.info("Flashing image into inactive partition")
185            # The image at 'dest' is older than the one Cr50 is running, so
186            # upstart cannot be used. Without -u Cr50 will flash the image into
187            # the inactive partition and reboot immediately.
188            result = self.host.run("usb_updater -s %s" % dest,
189                                   ignore_timeout=True,
190                                   timeout=self.UPDATE_TIMEOUT)
191            logging.info(result)
192
193        # After usb_updater finishes running, Cr50 will reboot. Wait until Cr50
194        # reboots before continuing. Cr50 reboot can be detected by detecting
195        # when CCD stops working.
196        self.cr50.wait_for_ccd_disable()
197
198
199    def finish_rollback(self, image_rw, erase_nvmem):
200        """Rollback to the image in the inactive partition.
201
202        Use the cr50 'rw' command to set the reset counter high enough to
203        trigger a rollback, erase nvmem if requested, and then reboot cr50 to
204        finish the rollback.
205
206        @param image_rw: the rw version of the update image.
207        @param erase_nvmem: True if nvmem needs to be erased during the
208                            rollback.
209        """
210        self.cr50.ccd_enable()
211        inactive_info = self.cr50.get_inactive_version_info()
212        if inactive_info[1] != image_rw:
213            raise error.TestError("Image is not in inactive partition")
214
215        logging.info("Attempting Rollback")
216
217        # Enable CCD keepalive before turning off the DUT. It will be removed
218        # after the Cr50 reboot
219        self.cr50.send_command("ccd keepalive enable")
220
221        # Shutdown the device so it won't react to any of the following commands
222        self.ec.reboot("ap-off")
223
224        # CCD may disapppear after resetting the EC. If it does, re-enable it.
225        # TODO: remove this when CCD is no longer disabled after ec reset.
226        try:
227            self.cr50.wait_for_ccd_disable(timeout=15)
228        except error.TestFail, e:
229            pass
230        self.cr50.ccd_enable()
231
232        if erase_nvmem:
233            logging.info("Erasing nvmem")
234            self.cr50.erase_nvmem()
235
236        self.cr50.rollback()
237
238        # Verify the inactive partition is the active one after the rollback.
239        if inactive_info != self.cr50.get_active_version_info():
240            raise error.TestError("Rollback failed")
241
242        # Verify the system boots normally after erasing nvmem
243        self.check_state((self.checkers.crossystem_checker,
244                          {'mainfw_type': 'normal'}))
245
246
247    def run_update(self, image_name, erase_nvmem=False, use_usb_update=False):
248        """Copy the image to the DUT and upate to it.
249
250        Normal updates will use the cr50-update script to update. If a rollback
251        is True, use usb_update to flash the image and then use the 'rw'
252        commands to force a rollback. On rollback updates erase_nvmem can be
253        used to request that nvmem be erased during the rollback.
254
255        @param image_name: the key in the images dict that can be used to
256                           retrieve the image info.
257        @param erase_nvmem: True if nvmem needs to be erased during the
258                            rollback.
259        @param use_usb_update: True if usb_updater should be used directly
260                               instead of running the update script.
261        """
262        self.cr50.ccd_disable()
263        # Get the current update information
264        image_ver, image_ver_str, image_path = self.images[image_name]
265
266        dest, ver = cr50_utils.InstallImage(self.host, image_path)
267        assert ver == image_ver, "Install failed"
268        image_rw = image_ver[1]
269
270        running_ver = cr50_utils.GetRunningVersion(self.host)
271        running_ver_str = cr50_utils.GetVersionString(running_ver)
272
273        # If the given image is older than the running one, then we will need
274        # to do a rollback to complete the update.
275        rollback = (cr50_utils.GetNewestVersion(running_ver[1], image_rw) !=
276                    image_rw)
277        logging.info("Attempting %s from %s to %s",
278                     "rollback" if rollback else "update", running_ver_str,
279                     image_ver_str)
280
281        # If a rollback is needed, flash the image into the inactive partition,
282        # on or use usb_update to update to the new image if it is requested.
283        if use_usb_update or rollback:
284            self.run_usb_update(dest, not rollback)
285        # Use cr50 console commands to rollback to the old image.
286        if rollback:
287            self.finish_rollback(image_rw, erase_nvmem)
288        # Running the usb update or rollback will enable ccd. Disable it again.
289        self.cr50.ccd_disable()
290
291        # Get the last cr50 update related message from /var/log/messages
292        last_message = cr50_utils.CheckForFailures(self.host, '')
293
294        # Clear the update state and reboot, so cr50-update will run again.
295        cr50_utils.ClearUpdateStateAndReboot(self.host)
296
297        # Verify the version has been updated and that there have been no
298        # unexpected usb_updater exit codes.
299        cr50_utils.VerifyUpdate(self.host, image_ver, last_message)
300
301        logging.info("Successfully updated from %s to %s %s", running_ver_str,
302                     image_name, image_ver_str)
303
304
305    def add_image_to_update_order(self, image_name, image_path):
306        """Process the image. Add it to the update_order list and images dict.
307
308        Copy the image to the DUT and get version information.
309
310        Store the image information in the images dictionary and add it to the
311        update_order list.
312
313        Args:
314            image_name: string that is what the image should be called. Used as
315                        the key in the images dict.
316            image_path: the path for the image.
317
318        Raises:
319            TestError if the image could not be found.
320        """
321        tmp_file = '/tmp/%s.bin' % image_name
322
323        if not os.path.isfile(image_path):
324            raise error.TestError("Failed to locate %s" % image_name)
325
326        _, ver = cr50_utils.InstallImage(self.host, image_path, tmp_file)
327        ver_str = cr50_utils.GetVersionString(ver)
328
329        self.update_order.append([image_name, ver[1]])
330        self.images[image_name] = (ver, ver_str, image_path)
331        logging.info("%s stored at %s with version %s", image_name, image_path,
332                     ver_str)
333
334
335    def verify_update_order(self):
336        """Verify each image in the update order has a higher version than the
337        last.
338
339        The test uses the cr50 update script to update to the next image in the
340        update order. If the versions are not in ascending order then the update
341        won't work. Cr50 cannot update to an older version using the standard
342        update process.
343
344        Raises:
345            TestError if the versions are not in ascending order.
346        """
347        for i, update_info in enumerate(self.update_order[1::]):
348            last_name, last_rw = self.update_order[i]
349            name, rw = update_info
350            if cr50_utils.GetNewestVersion(last_rw, rw) != rw:
351                raise error.TestError("%s is version %s. %s needs to have a "
352                                      "higher version, but it has %s" %
353                                      (last_name, last_rw, name, rw))
354
355
356    def save_original_image(self, dut_path):
357        """Save the image currently running on the DUT.
358
359        Copy the image from the DUT to the local tmp directory and get version
360        information. Store the information in the images dict.
361
362        @param dut_path: the location of the cr50 prod image on the DUT.
363        """
364        name = self.ORIGINAL_NAME
365        local_dest = '/tmp/%s.bin' % name
366
367        self.host.get_file(dut_path, local_dest)
368
369        ver = cr50_utils.GetBinVersion(self.host, dut_path)
370        ver_str = cr50_utils.GetVersionString(ver)
371
372        self.images[name] = (ver, ver_str, local_dest)
373        logging.info("%s stored at %s with version %s", name, local_dest,
374                     ver_str)
375
376        self.original_rw = ver[1]
377
378
379    def after_run_once(self):
380        """Add log printing what iteration we just completed"""
381        logging.info("Update iteration %s ran successfully", self.iteration)
382
383
384    def run_once(self, host, cmdline_args, release_image, dev_image,
385                 old_release_image="", test=""):
386        for i, update_info in enumerate(self.update_order):
387            erase_nvmem = self.test == self.ERASE_NVMEM
388            self.run_update(update_info[0], erase_nvmem=erase_nvmem)
389