cr50_utils.py revision 162a032f5f848d4a530eae1c314259076258bcd4
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import re
8
9from autotest_lib.client.common_lib import error
10
11
12RO = 'ro'
13RW = 'rw'
14CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod'
15CR50_STATE = '/var/cache/cr50*'
16GET_CR50_VERSION = 'cat /var/cache/cr50-version'
17GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages'
18UPDATE_FAILURE = 'unexpected cr50-update exit code'
19DUMMY_VER = '-1.-1.-1'
20# This dictionary is used to search the usb_updater output for the version
21# strings. There are two usb_updater commands that will return versions:
22# 'fwver' and 'binver'.
23#
24# 'fwver'   is used to get the running RO and RW versions from cr50
25# 'binver'  gets the version strings for each RO and RW region in the given
26#           file
27#
28# The value in the dictionary is the regular expression that can be used to
29# find the version strings for each region.
30VERSION_RE = {
31    "--fwver" : '\nRO (?P<ro>\S+).*\nRW (?P<rw>\S+)',
32    "--binver" : 'RO_A:(?P<ro_a>\S+).*RW_A:(?P<rw_a>\S+).*' \
33           'RO_B:(?P<ro_b>\S+).*RW_B:(?P<rw_b>\S+)',
34}
35
36
37def AssertVersionsAreEqual(name_a, ver_a, name_b, ver_b):
38    """Raise an error ver_a isn't the same as ver_b
39
40    Args:
41        name_a: the name of section a
42        ver_a: the version string for section a
43        name_b: the name of section b
44        ver_b: the version string for section b
45
46    Raises:
47        AssertionError if ver_a is not equal to ver_b
48    """
49    assert ver_a == ver_b, ("Versions do not match: %s %s %s %s" %
50                            (name_a, ver_a, name_b, ver_b))
51
52
53def GetNewestVersion(ver_a, ver_b):
54    """Compare the versions. Return the newest one. If they are the same return
55    None."""
56    a = [int(x) for x in ver_a.split('.')]
57    b = [int(x) for x in ver_b.split('.')]
58
59    if a > b:
60        return ver_a
61    if b > a:
62        return ver_b
63    return None
64
65
66def GetVersion(versions, name):
67    """Return the version string from the dictionary.
68
69    Get the version for each key in the versions dictionary that contains the
70    substring name. Make sure all of the versions match and return the version
71    string. Raise an error if the versions don't match.
72
73    Args:
74        version: dictionary with the partition names as keys and the
75                 partition version strings as values.
76        name: the string used to find the relevant items in versions.
77    Returns:
78        the version from versions or "-1.-1.-1" if an invalid RO was detected.
79    """
80    ver = None
81    key = None
82    for k, v in versions.iteritems():
83        if name in k:
84            if v == DUMMY_VER:
85                logging.info("Detected invalid %s %s", name, v)
86                return v
87            elif ver:
88                AssertVersionsAreEqual(key, ver, k, v)
89            else:
90                ver = v
91                key = k
92    return ver
93
94
95def FindVersion(output, arg):
96    """Find the ro and rw versions.
97
98    @param output: The string to search
99    @param arg: string representing the usb_updater option, either
100                '--binver' or '--fwver'
101    @param compare: raise an error if the ro or rw versions don't match
102    """
103    versions = re.search(VERSION_RE[arg], output)
104    versions = versions.groupdict()
105    ro = GetVersion(versions, RO)
106    rw = GetVersion(versions, RW)
107    return ro, rw
108
109
110def GetSavedVersion(client):
111    """Return the saved version from /var/cache/cr50-version"""
112    result = client.run(GET_CR50_VERSION).stdout.strip()
113    return FindVersion(result, "--fwver")
114
115
116def GetVersionFromUpdater(client, args):
117    """Return the version from usb_updater"""
118    result = client.run("usb_updater %s" % ' '.join(args)).stdout.strip()
119    return FindVersion(result, args[0])
120
121
122def GetFwVersion(client):
123    """Get the running version using 'usb_updater --fwver'"""
124    return GetVersionFromUpdater(client, ["--fwver"])
125
126
127def GetBinVersion(client, image=CR50_FILE):
128    """Get the image version using 'usb_updater --binver image'"""
129    return GetVersionFromUpdater(client, ["--binver", image])
130
131
132def GetVersionString(ver):
133    return 'RO %s RW %s' % (ver[0], ver[1])
134
135
136def GetRunningVersion(client):
137    """Get the running Cr50 version.
138
139    The version from usb_updater and /var/cache/cr50-version should be the
140    same. Get both versions and make sure they match.
141
142    Returns:
143        running_ver: a tuple with the ro and rw version strings
144    Raises:
145        TestFail
146        - If the version in /var/cache/cr50-version is not the same as the
147          version from 'usb_updater --fwver'
148    """
149    running_ver = GetFwVersion(client)
150    saved_ver = GetSavedVersion(client)
151
152    AssertVersionsAreEqual("Running", GetVersionString(running_ver),
153                           "Saved", GetVersionString(saved_ver))
154    return running_ver
155
156
157def CheckForFailures(client, last_message):
158    """Check for any unexpected cr50-update exit codes.
159
160    This only checks the cr50 update messages that have happened since
161    last_message. If a unexpected exit code is detected it will raise an error>
162
163    Args:
164        last_message: the last cr50 message from the last update run
165
166    Returns:
167        the last cr50 message in /var/log/messages
168
169    Raises:
170        TestFail
171            - If there is a unexpected cr50-update exit code after last_message
172              in /var/log/messages
173    """
174    messages = client.run(GET_CR50_MESSAGES).stdout.strip()
175    if last_message:
176        messages = messages.rsplit(last_message, 1)[-1]
177        if UPDATE_FAILURE in messages:
178            logging.debug(messages)
179            raise error.TestFail("Detected unexpected exit code during update")
180    return messages.rsplit('\n', 1)[-1]
181
182
183def VerifyUpdate(client, ver='', last_message=''):
184    """Verify that the saved update state is correct and there were no
185    unexpected cr50-update exit codes since the last update.
186
187    Returns:
188        new_ver: a tuple containing the running ro and rw versions
189        last_message: The last cr50 update message in /var/log/messages
190    """
191    # Check that there were no unexpected reboots from cr50-result
192    last_message = CheckForFailures(client, last_message)
193    logging.debug("last cr50 message %s", last_message)
194
195    new_ver = GetRunningVersion(client)
196    if ver != '':
197        if DUMMY_VER != ver[0]:
198            AssertVersionsAreEqual("Old RO", ver[0], "Updated RO", new_ver[0])
199        AssertVersionsAreEqual("Old RW", ver[1], "Updated RW", new_ver[1])
200    return new_ver, last_message
201
202
203def ClearUpdateStateAndReboot(client):
204    """Removes the cr50 status files in /var/cache and reboots the AP"""
205    client.run("rm %s" % CR50_STATE)
206    client.reboot()
207
208
209def InstallImage(client, src, dest=CR50_FILE):
210    """Copy the image at src to dest on the dut
211    Args:
212        src: the image location of the server
213        dest: the desired location on the dut
214    Returns:
215        The filename where the image was copied to on the dut, a tuple
216        containing the RO and RW version of the file
217    """
218    # Send the file to the DUT
219    client.send_file(src, dest)
220
221    ver = GetBinVersion(client, dest)
222    client.run("sync")
223    return dest, ver
224