cr50_utils.py revision e4052c8d15695f41a3b3794cce48be52c22e4ada
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import argparse
6import logging
7import re
8
9from autotest_lib.client.common_lib import error
10
11
12RO = 'ro'
13RW = 'rw'
14CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod'
15CR50_STATE = '/var/cache/cr50*'
16GET_CR50_VERSION = 'cat /var/cache/cr50-version'
17GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages'
18UPDATE_FAILURE = 'unexpected cr50-update exit code'
19DUMMY_VER = '-1.-1.-1'
20# This dictionary is used to search the usb_updater output for the version
21# strings. There are two usb_updater commands that will return versions:
22# 'fwver' and 'binvers'.
23#
24# 'fwver'   is used to get the running RO and RW versions from cr50
25# 'binvers'  gets the version strings for each RO and RW region in the given
26#            file
27#
28# The value in the dictionary is the regular expression that can be used to
29# find the version strings for each region.
30#
31# --fwver
32#   example output:
33#           open_device 18d1:5014
34#           found interface 3 endpoint 4, chunk_len 64
35#           READY
36#           -------
37#           start
38#           target running protocol version 6
39#           keyids: RO 0xaa66150f, RW 0xde88588d
40#           offsets: backup RO at 0x40000, backup RW at 0x44000
41#           Current versions:
42#           RO 0.0.10
43#           RW 0.0.21
44#   match groupdict:
45#           {
46#               'ro': '0.0.10',
47#               'rw': '0.0.21'
48#           }
49#
50# --binvers
51#   example output:
52#           read 524288(0x80000) bytes from /tmp/cr50.bin
53#           RO_A:0.0.10 RW_A:0.0.21[00000000:00000000:00000000]
54#           RO_B:0.0.10 RW_B:0.0.21[00000000:00000000:00000000]
55#   match groupdict:
56#           {
57#               'rw_b': '0.0.21',
58#               'rw_a': '0.0.21',
59#               'ro_b': '0.0.10',
60#               'ro_a': '0.0.10',
61#               'bid_a': '00000000:00000000:00000000',
62#               'bid_b': '00000000:00000000:00000000'
63#           }
64VERSION_RE = {
65    '--fwver' : '\nRO (?P<ro>\S+).*\nRW (?P<rw>\S+)',
66    '--binvers' : 'RO_A:(?P<ro_a>[\d\.]+).*' \
67           'RW_A:(?P<rw_a>[\d\.]+)(\[(?P<bid_a>[\d\:a-fA-F]+)\])?.*' \
68           'RO_B:(?P<ro_b>\S+).*' \
69           'RW_B:(?P<rw_b>[\d\.]+)(\[(?P<bid_b>[\d\:a-fA-F]+)\])?.*',
70}
71UPDATE_TIMEOUT = 60
72UPDATE_OK = 1
73
74ERASED_BID_INT = 0xffffffff
75# With an erased bid, the flags and board id will both be erased
76ERASED_BID = (ERASED_BID_INT, ERASED_BID_INT)
77
78usb_update = argparse.ArgumentParser()
79# use /dev/tpm0 to send the command
80usb_update.add_argument('-s', '--systemdev', dest='systemdev',
81                        action='store_true')
82# fwver, binver, and board id are used to get information about cr50 or an
83# image.
84usb_update.add_argument('-b', '--binvers', '-f', '--fwver', '-i', '--board_id',
85                        dest='info_cmd', action='store_true')
86# upstart and post_reset will post resets instead of rebooting immediately
87usb_update.add_argument('-u', '--upstart', '-p', '--post_reset',
88                        dest='post_reset', action='store_true')
89usb_update.add_argument('extras', nargs=argparse.REMAINDER)
90
91
92def AssertVersionsAreEqual(name_a, ver_a, name_b, ver_b):
93    """Raise an error ver_a isn't the same as ver_b
94
95    Args:
96        name_a: the name of section a
97        ver_a: the version string for section a
98        name_b: the name of section b
99        ver_b: the version string for section b
100
101    Raises:
102        AssertionError if ver_a is not equal to ver_b
103    """
104    assert ver_a == ver_b, ('Versions do not match: %s %s %s %s' %
105                            (name_a, ver_a, name_b, ver_b))
106
107
108def GetNewestVersion(ver_a, ver_b):
109    """Compare the versions. Return the newest one. If they are the same return
110    None."""
111    a = [int(x) for x in ver_a.split('.')]
112    b = [int(x) for x in ver_b.split('.')]
113
114    if a > b:
115        return ver_a
116    if b > a:
117        return ver_b
118    return None
119
120
121def GetVersion(versions, name):
122    """Return the version string from the dictionary.
123
124    Get the version for each key in the versions dictionary that contains the
125    substring name. Make sure all of the versions match and return the version
126    string. Raise an error if the versions don't match.
127
128    Args:
129        version: dictionary with the partition names as keys and the
130                 partition version strings as values.
131        name: the string used to find the relevant items in versions.
132
133    Returns:
134        the version from versions or "-1.-1.-1" if an invalid RO was detected.
135    """
136    ver = None
137    key = None
138    for k, v in versions.iteritems():
139        if name in k:
140            if v == DUMMY_VER:
141                logging.info('Detected invalid %s %s', name, v)
142                return v
143            elif ver:
144                AssertVersionsAreEqual(key, ver, k, v)
145            else:
146                ver = v
147                key = k
148    return ver
149
150
151def FindVersion(output, arg):
152    """Find the ro and rw versions.
153
154    Args:
155        output: The string to search
156        arg: string representing the usb_updater option, either '--binvers' or
157             '--fwver'
158
159    Returns:
160        a tuple of the ro and rw versions
161    """
162    versions = re.search(VERSION_RE[arg], output)
163    versions = versions.groupdict()
164    ro = GetVersion(versions, RO)
165    rw = GetVersion(versions, RW)
166    return ro, rw
167
168
169def GetSavedVersion(client):
170    """Return the saved version from /var/cache/cr50-version"""
171    result = client.run(GET_CR50_VERSION).stdout.strip()
172    return FindVersion(result, '--fwver')
173
174
175def UsbUpdater(client, args):
176    """Run usb_update with the given args.
177
178    Args:
179        client: the object to run commands on
180        args: a list of strings that contiain the usb_updater args
181
182    Returns:
183        the result of usb_update
184    """
185    options = usb_update.parse_args(args)
186
187    result = client.run('status trunksd')
188    if options.systemdev and 'running' in result.stdout:
189        client.run('stop trunksd')
190
191    # If we are updating the cr50 image, usb_update will return a non-zero exit
192    # status so we should ignore it.
193    ignore_status = not options.info_cmd
194    # immediate reboots are only honored if the command is sent using /dev/tpm0
195    expect_reboot = (options.systemdev and not options.post_reset and
196                     not options.info_cmd)
197
198    result = client.run('usb_updater %s' % ' '.join(args),
199                        ignore_status=ignore_status,
200                        ignore_timeout=expect_reboot,
201                        timeout=UPDATE_TIMEOUT)
202
203    # After a posted reboot, the usb_update exit code should equal 1.
204    if result.exit_status and result.exit_status != UPDATE_OK:
205        logging.debug(result)
206        raise error.TestFail('Unexpected usb_update exit code after %s %d' %
207                             (' '.join(args), result.exit_status))
208    return result
209
210
211def GetVersionFromUpdater(client, args):
212    """Return the version from usb_updater"""
213    result = UsbUpdater(client, args).stdout.strip()
214    return FindVersion(result, args[0])
215
216
217def GetFwVersion(client):
218    """Get the running version using 'usb_updater --fwver'"""
219    return GetVersionFromUpdater(client, ['--fwver', '-s'])
220
221
222def GetBinVersion(client, image=CR50_FILE):
223    """Get the image version using 'usb_updater --binvers image'"""
224    # TODO(mruthven) b/37958867: change to ["--binvers", image] when usb_updater
225    # is fixed
226    return GetVersionFromUpdater(client, ['--binvers', image, image])
227
228
229def GetVersionString(ver):
230    """Combine the RO and RW tuple into a understandable string"""
231    return 'RO %s RW %s' % (ver[0], ver[1])
232
233
234def GetRunningVersion(client):
235    """Get the running Cr50 version.
236
237    The version from usb_updater and /var/cache/cr50-version should be the
238    same. Get both versions and make sure they match.
239
240    Args:
241        client: the object to run commands on
242
243    Returns:
244        running_ver: a tuple with the ro and rw version strings
245
246    Raises:
247        TestFail
248        - If the version in /var/cache/cr50-version is not the same as the
249          version from 'usb_updater --fwver'
250    """
251    running_ver = GetFwVersion(client)
252    saved_ver = GetSavedVersion(client)
253
254    AssertVersionsAreEqual('Running', GetVersionString(running_ver),
255                           'Saved', GetVersionString(saved_ver))
256    return running_ver
257
258
259def CheckForFailures(client, last_message):
260    """Check for any unexpected cr50-update exit codes.
261
262    This only checks the cr50 update messages that have happened since
263    last_message. If a unexpected exit code is detected it will raise an error>
264
265    Args:
266        client: the object to run commands on
267        last_message: the last cr50 message from the last update run
268
269    Returns:
270        the last cr50 message in /var/log/messages
271
272    Raises:
273        TestFail
274            - If there is a unexpected cr50-update exit code after last_message
275              in /var/log/messages
276    """
277    messages = client.run(GET_CR50_MESSAGES).stdout.strip()
278    if last_message:
279        messages = messages.rsplit(last_message, 1)[-1].split('\n')
280        failures = []
281        for message in messages:
282            if UPDATE_FAILURE in message:
283                failures.append(message)
284        if len(failures):
285            logging.info(messages)
286            raise error.TestFail('Detected unexpected exit code during update: '
287                                 '%s' % failures)
288    return messages[-1]
289
290
291def VerifyUpdate(client, ver='', last_message=''):
292    """Verify that the saved update state is correct and there were no
293    unexpected cr50-update exit codes since the last update.
294
295    Args:
296        client: the object to run commands on
297        ver: the expected version tuple (ro ver, rw ver)
298        last_message: the last cr50 message from the last update run
299
300    Returns:
301        new_ver: a tuple containing the running ro and rw versions
302        last_message: The last cr50 update message in /var/log/messages
303    """
304    # Check that there were no unexpected reboots from cr50-result
305    last_message = CheckForFailures(client, last_message)
306    logging.debug('last cr50 message %s', last_message)
307
308    new_ver = GetRunningVersion(client)
309    if ver != '':
310        if DUMMY_VER != ver[0]:
311            AssertVersionsAreEqual('Old RO', ver[0], 'Updated RO', new_ver[0])
312        AssertVersionsAreEqual('Old RW', ver[1], 'Updated RW', new_ver[1])
313    return new_ver, last_message
314
315
316def ClearUpdateStateAndReboot(client):
317    """Removes the cr50 status files in /var/cache and reboots the AP"""
318    client.run('rm %s' % CR50_STATE)
319    client.reboot()
320
321
322def InstallImage(client, src, dest=CR50_FILE):
323    """Copy the image at src to dest on the dut
324
325    Args:
326        client: the object to run commands on
327        src: the image location of the server
328        dest: the desired location on the dut
329
330    Returns:
331        The filename where the image was copied to on the dut, a tuple
332        containing the RO and RW version of the file
333    """
334    # Send the file to the DUT
335    client.send_file(src, dest)
336
337    ver = GetBinVersion(client, dest)
338    client.run('sync')
339    return dest, ver
340
341
342def GetSymbolicBoardId(symbolic_board_id):
343    """Convert the symbolic board id str to an int
344
345    Args:
346        symbolic_board_id: a ASCII string. It can be up to 4 characters
347
348    Returns:
349        the symbolic board id string converted to an int
350    """
351    board_id = 0
352    for c in symbolic_board_id:
353        board_id = ord(c) | (board_id << 8)
354    return board_id
355
356
357def GetExpectedBoardId(board_id):
358    """"Return the usb_updater interpretation of board_id
359
360    Args:
361        board_id: a int or string value of the board id
362
363    Returns:
364        a int representation of the board id
365    """
366    if type(board_id) == int:
367        return board_id
368
369    if len(board_id) <= 4:
370        return GetSymbolicBoardId(board_id)
371
372    return int(board_id, 16)
373
374
375def GetExpectedFlags(flags):
376    """If flags are not specified, usb_updater will set them to 0xff00
377
378    Args:
379        flags: The int value or None
380
381    Returns:
382        the original flags or 0xff00 if flags is None
383    """
384    return flags if flags != None else 0xff00
385
386
387def GetBoardId(client):
388    """Return the board id and flags
389
390    Args:
391        client: the object to run commands on
392
393    Returns:
394        a tuple with the hex value board id, flags
395
396    Raises:
397        TestFail if the second board id response field is not ~board_id
398    """
399    result = UsbUpdater(client, ['-s', '-i']).stdout.strip()
400    board_id_info = result.split('Board ID space: ')[-1].strip().split(':')
401    board_id, board_id_inv, flags = [int(val, 16) for val in board_id_info]
402    logging.info('BOARD_ID: %x:%x:%x', board_id, board_id_inv, flags)
403
404    if board_id == board_id_inv == flags == ERASED_BID_INT:
405        logging.info('board id is erased')
406    elif board_id & board_id_inv:
407        raise error.TestFail('board_id_inv should be ~board_id got %x %x' %
408                             (board_id, board_id_inv))
409    return board_id, flags
410
411
412def CheckBoardId(client, board_id, flags):
413    """Compare the given board_id and flags to the running board_id and flags
414
415    Interpret board_id and flags how usb_updater would interpret them, then
416    compare those interpreted values to the running board_id and flags.
417
418    Args:
419        client: the object to run commands on
420        board_id: a hex, symbolic or int value for board_id
421        flags: the int value of flags or None
422
423    Raises:
424        TestFail if the new board id info does not match
425    """
426    # Read back the board id and flags
427    new_board_id, new_flags = GetBoardId(client)
428
429    expected_board_id = GetExpectedBoardId(board_id)
430    expected_flags = GetExpectedFlags(flags)
431
432    if new_board_id != expected_board_id or new_flags != expected_flags:
433        raise error.TestFail('Failed to set board id expected %x:%x, but got '
434                             '%x:%x' % (expected_board_id, expected_flags,
435                             new_board_id, new_flags))
436
437
438def SetBoardId(client, board_id, flags=None):
439    """Sets the board id and flags
440
441    Args:
442        client: the object to run commands on
443        board_id: a string of the symbolic board id or board id hex value. If
444                  the string is less than 4 characters long it will be
445                  considered a symbolic value
446        flags: the desired flag value. If board_id is a symbolic value, then
447               this will be ignored.
448
449    Raises:
450        TestFail if we were unable to set the flags to the correct value
451    """
452
453    board_id_arg = board_id
454    if flags != None:
455        board_id_arg += ':' + hex(flags)
456
457    # Set the board id using the given board id and flags
458    result = UsbUpdater(client, ['-s', '-i', board_id_arg]).stdout.strip()
459
460    CheckBoardId(client, board_id, flags)
461