cr50_utils.py revision 5f0442d3daab45adf7d0dafd14075f8b3ac1ef41
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import argparse
6import logging
7import os
8import re
9
10from autotest_lib.client.common_lib import error
11
12
13RO = 'ro'
14RW = 'rw'
15CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod'
16CR50_STATE = '/var/cache/cr50*'
17GET_CR50_VERSION = 'cat /var/cache/cr50-version'
18GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages'
19UPDATE_FAILURE = 'unexpected cr50-update exit code'
20DUMMY_VER = '-1.-1.-1'
21# This dictionary is used to search the usb_updater output for the version
22# strings. There are two usb_updater commands that will return versions:
23# 'fwver' and 'binvers'.
24#
25# 'fwver'   is used to get the running RO and RW versions from cr50
26# 'binvers'  gets the version strings for each RO and RW region in the given
27#            file
28#
29# The value in the dictionary is the regular expression that can be used to
30# find the version strings for each region.
31VERSION_RE = {
32    "--fwver" : '\nRO (?P<ro>\S+).*\nRW (?P<rw>\S+)',
33    "--binvers" : 'RO_A:(?P<ro_a>\S+).*RW_A:(?P<rw_a>\S+).*' \
34           'RO_B:(?P<ro_b>\S+).*RW_B:(?P<rw_b>\S+)',
35}
36UPDATE_TIMEOUT = 60
37UPDATE_OK = 1
38
39ERASED_BID_INT = 0xffffffff
40# With an erased bid, the flags and board id will both be erased
41ERASED_BID = (ERASED_BID_INT, ERASED_BID_INT)
42
43usb_update = argparse.ArgumentParser()
44# use /dev/tpm0 to send the command
45usb_update.add_argument('-s', '--systemdev', dest='systemdev',
46                        action='store_true')
47# fwver, binver, and board id are used to get information about cr50 or an
48# image.
49usb_update.add_argument('-b', '--binvers', '-f', '--fwver', '-i', '--board_id',
50                        dest='info_cmd', action='store_true')
51# upstart and post_reset will post resets instead of rebooting immediately
52usb_update.add_argument('-u', '--upstart', '-p', '--post_reset',
53                        dest='post_reset', action='store_true')
54usb_update.add_argument('extras', nargs=argparse.REMAINDER)
55
56
57def AssertVersionsAreEqual(name_a, ver_a, name_b, ver_b):
58    """Raise an error ver_a isn't the same as ver_b
59
60    Args:
61        name_a: the name of section a
62        ver_a: the version string for section a
63        name_b: the name of section b
64        ver_b: the version string for section b
65
66    Raises:
67        AssertionError if ver_a is not equal to ver_b
68    """
69    assert ver_a == ver_b, ("Versions do not match: %s %s %s %s" %
70                            (name_a, ver_a, name_b, ver_b))
71
72
73def GetNewestVersion(ver_a, ver_b):
74    """Compare the versions. Return the newest one. If they are the same return
75    None."""
76    a = [int(x) for x in ver_a.split('.')]
77    b = [int(x) for x in ver_b.split('.')]
78
79    if a > b:
80        return ver_a
81    if b > a:
82        return ver_b
83    return None
84
85
86def GetVersion(versions, name):
87    """Return the version string from the dictionary.
88
89    Get the version for each key in the versions dictionary that contains the
90    substring name. Make sure all of the versions match and return the version
91    string. Raise an error if the versions don't match.
92
93    Args:
94        version: dictionary with the partition names as keys and the
95                 partition version strings as values.
96        name: the string used to find the relevant items in versions.
97
98    Returns:
99        the version from versions or "-1.-1.-1" if an invalid RO was detected.
100    """
101    ver = None
102    key = None
103    for k, v in versions.iteritems():
104        if name in k:
105            if v == DUMMY_VER:
106                logging.info("Detected invalid %s %s", name, v)
107                return v
108            elif ver:
109                AssertVersionsAreEqual(key, ver, k, v)
110            else:
111                ver = v
112                key = k
113    return ver
114
115
116def FindVersion(output, arg):
117    """Find the ro and rw versions.
118
119    Args:
120        output: The string to search
121        arg: string representing the usb_updater option, either '--binvers' or
122             '--fwver'
123
124    Returns:
125        a tuple of the ro and rw versions
126    """
127    versions = re.search(VERSION_RE[arg], output)
128    versions = versions.groupdict()
129    ro = GetVersion(versions, RO)
130    rw = GetVersion(versions, RW)
131    return ro, rw
132
133
134def GetSavedVersion(client):
135    """Return the saved version from /var/cache/cr50-version"""
136    result = client.run(GET_CR50_VERSION).stdout.strip()
137    return FindVersion(result, "--fwver")
138
139
140def UsbUpdater(client, args):
141    """Run usb_update with the given args.
142
143    Args:
144        client: the object to run commands on
145        args: a list of strings that contiain the usb_updater args
146
147    Returns:
148        the result of usb_update
149    """
150    options = usb_update.parse_args(args)
151
152    result = client.run("status trunksd")
153    if options.systemdev and 'running' in result.stdout:
154        client.run("stop trunksd")
155
156    # If we are updating the cr50 image, usb_update will return a non-zero exit
157    # status so we should ignore it.
158    ignore_status = not options.info_cmd
159    # immediate reboots are only honored if the command is sent using /dev/tpm0
160    expect_reboot = (options.systemdev and not options.post_reset and
161                     not options.info_cmd)
162
163    result = client.run("usb_updater %s" % ' '.join(args),
164                        ignore_status=ignore_status,
165                        ignore_timeout=expect_reboot,
166                        timeout=UPDATE_TIMEOUT)
167
168    # After a posted reboot, the usb_update exit code should equal 1.
169    if result.exit_status and result.exit_status != UPDATE_OK:
170        logging.debug(result)
171        raise error.TestFail("Unexpected usb_update exit code after %s %d" %
172                             (' '.join(args), result.exit_status))
173    return result
174
175
176def GetVersionFromUpdater(client, args):
177    """Return the version from usb_updater"""
178    result = UsbUpdater(client, args).stdout.strip()
179    return FindVersion(result, args[0])
180
181
182def GetFwVersion(client):
183    """Get the running version using 'usb_updater --fwver'"""
184    return GetVersionFromUpdater(client, ['--fwver', '-s'])
185
186
187def GetBinVersion(client, image=CR50_FILE):
188    """Get the image version using 'usb_updater --binvers image'"""
189    # TODO(mruthven) b/37958867: change to ["--binvers", image] when usb_updater
190    # is fixed
191    return GetVersionFromUpdater(client, ['--binvers', image, image, '-s'])
192
193
194def GetVersionString(ver):
195    return 'RO %s RW %s' % (ver[0], ver[1])
196
197
198def GetRunningVersion(client):
199    """Get the running Cr50 version.
200
201    The version from usb_updater and /var/cache/cr50-version should be the
202    same. Get both versions and make sure they match.
203
204    Args:
205        client: the object to run commands on
206
207    Returns:
208        running_ver: a tuple with the ro and rw version strings
209
210    Raises:
211        TestFail
212        - If the version in /var/cache/cr50-version is not the same as the
213          version from 'usb_updater --fwver'
214    """
215    running_ver = GetFwVersion(client)
216    saved_ver = GetSavedVersion(client)
217
218    AssertVersionsAreEqual("Running", GetVersionString(running_ver),
219                           "Saved", GetVersionString(saved_ver))
220    return running_ver
221
222
223def CheckForFailures(client, last_message):
224    """Check for any unexpected cr50-update exit codes.
225
226    This only checks the cr50 update messages that have happened since
227    last_message. If a unexpected exit code is detected it will raise an error>
228
229    Args:
230        client: the object to run commands on
231        last_message: the last cr50 message from the last update run
232
233    Returns:
234        the last cr50 message in /var/log/messages
235
236    Raises:
237        TestFail
238            - If there is a unexpected cr50-update exit code after last_message
239              in /var/log/messages
240    """
241    messages = client.run(GET_CR50_MESSAGES).stdout.strip()
242    if last_message:
243        messages = messages.rsplit(last_message, 1)[-1].split('\n')
244        failures = []
245        for message in messages:
246            if UPDATE_FAILURE in message:
247                failures.append(message)
248        if len(failures):
249            logging.info(messages)
250            raise error.TestFail("Detected unexpected exit code during update: "
251                                 "%s" % failures)
252    return messages[-1]
253
254
255def VerifyUpdate(client, ver='', last_message=''):
256    """Verify that the saved update state is correct and there were no
257    unexpected cr50-update exit codes since the last update.
258
259    Args:
260        client: the object to run commands on
261        ver: the expected version tuple (ro ver, rw ver)
262        last_message: the last cr50 message from the last update run
263
264    Returns:
265        new_ver: a tuple containing the running ro and rw versions
266        last_message: The last cr50 update message in /var/log/messages
267    """
268    # Check that there were no unexpected reboots from cr50-result
269    last_message = CheckForFailures(client, last_message)
270    logging.debug("last cr50 message %s", last_message)
271
272    new_ver = GetRunningVersion(client)
273    if ver != '':
274        if DUMMY_VER != ver[0]:
275            AssertVersionsAreEqual("Old RO", ver[0], "Updated RO", new_ver[0])
276        AssertVersionsAreEqual("Old RW", ver[1], "Updated RW", new_ver[1])
277    return new_ver, last_message
278
279
280def ClearUpdateStateAndReboot(client):
281    """Removes the cr50 status files in /var/cache and reboots the AP"""
282    client.run("rm %s" % CR50_STATE)
283    client.reboot()
284
285
286def InstallImage(client, src, dest=CR50_FILE):
287    """Copy the image at src to dest on the dut
288
289    Args:
290        client: the object to run commands on
291        src: the image location of the server
292        dest: the desired location on the dut
293
294    Returns:
295        The filename where the image was copied to on the dut, a tuple
296        containing the RO and RW version of the file
297    """
298    # Send the file to the DUT
299    client.send_file(src, dest)
300
301    ver = GetBinVersion(client, dest)
302    client.run("sync")
303    return dest, ver
304
305
306def GetSymbolicBoardId(symbolic_board_id):
307    """Convert the symbolic board id str to an int
308
309    Args:
310        symbolic_board_id: a ASCII string. It can be up to 4 characters
311
312    Returns:
313        the symbolic board id string converted to an int
314    """
315    board_id = 0
316    for c in symbolic_board_id:
317        board_id = ord(c) | (board_id << 8)
318    return board_id
319
320
321def GetExpectedBoardId(board_id):
322    """"Return the usb_updater interpretation of board_id
323
324    Args:
325        board_id: a int or string value of the board id
326
327    Returns:
328        a int representation of the board id
329    """
330    if type(board_id) == int:
331        return board_id
332
333    if len(board_id) <= 4:
334        return GetSymbolicBoardId(board_id)
335
336    return int(board_id, 16)
337
338
339def GetExpectedFlags(flags):
340    """If flags are not specified, usb_updater will set them to 0xff00
341
342    Args:
343        flags: The int value or None
344
345    Returns:
346        the original flags or 0xff00 if flags is None
347    """
348    return flags if flags != None else 0xff00
349
350
351def GetBoardId(client):
352    """Return the board id and flags
353
354    Args:
355        client: the object to run commands on
356
357    Returns:
358        a tuple with the hex value board id, flags
359
360    Raises:
361        TestFail if the second board id response field is not ~board_id
362    """
363    result = UsbUpdater(client, ["-i"]).stdout.strip()
364    board_id_info = result.split("Board ID space: ")[-1].strip().split(":")
365    board_id, board_id_inv, flags = [int(val, 16) for val in board_id_info]
366    logging.info('BOARD_ID: %x:%x:%x', board_id, board_id_inv, flags)
367
368    if board_id == board_id_inv == flags == ERASED_BID_INT:
369        logging.info('board id is erased')
370    elif board_id & board_id_inv:
371        raise error.TestFail('board_id_inv should be ~board_id got %x %x' %
372                             (board_id, board_id_inv))
373    return board_id, flags
374
375
376def CheckBoardId(client, board_id, flags):
377    """Compare the given board_id and flags to the running board_id and flags
378
379    Interpret board_id and flags how usb_updater would interpret them, then
380    compare those interpreted values to the running board_id and flags.
381
382    Args:
383        client: the object to run commands on
384        board_id: a hex, symbolic or int value for board_id
385        flags: the int value of flags or None
386
387    Raises:
388        TestFail if the new board id info does not match
389    """
390    # Read back the board id and flags
391    new_board_id, new_flags = GetBoardId(client)
392
393    expected_board_id = GetExpectedBoardId(board_id)
394    expected_flags = GetExpectedFlags(flags)
395
396    if new_board_id != expected_board_id or new_flags != expected_flags:
397        raise error.TestFail('Failed to set board id expected %x:%x, but got '
398                             '%x:%x' % (expected_board_id, expected_flags,
399                             new_board_id, new_flags))
400
401
402def SetBoardId(client, board_id, flags=None):
403    """Sets the board id and flags
404
405    Args:
406        client: the object to run commands on
407        board_id: a string of the symbolic board id or board id hex value. If
408                  the string is less than 4 characters long it will be
409                  considered a symbolic value
410        flags: the desired flag value. If board_id is a symbolic value, then
411               this will be ignored.
412
413    Raises:
414        TestFail if we were unable to set the flags to the correct value
415    """
416
417    board_id_arg = board_id
418    if flags != None:
419        board_id_arg += ':' + hex(flags)
420
421    # Set the board id using the given board id and flags
422    result = UsbUpdater(client, ["-s", "-i", board_id_arg]).stdout.strip()
423
424    CheckBoardId(client, board_id, flags)
425