cr50_utils.py revision e4052c8d15695f41a3b3794cce48be52c22e4ada
1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import argparse 6import logging 7import re 8 9from autotest_lib.client.common_lib import error 10 11 12RO = 'ro' 13RW = 'rw' 14CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod' 15CR50_STATE = '/var/cache/cr50*' 16GET_CR50_VERSION = 'cat /var/cache/cr50-version' 17GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages' 18UPDATE_FAILURE = 'unexpected cr50-update exit code' 19DUMMY_VER = '-1.-1.-1' 20# This dictionary is used to search the usb_updater output for the version 21# strings. There are two usb_updater commands that will return versions: 22# 'fwver' and 'binvers'. 23# 24# 'fwver' is used to get the running RO and RW versions from cr50 25# 'binvers' gets the version strings for each RO and RW region in the given 26# file 27# 28# The value in the dictionary is the regular expression that can be used to 29# find the version strings for each region. 30# 31# --fwver 32# example output: 33# open_device 18d1:5014 34# found interface 3 endpoint 4, chunk_len 64 35# READY 36# ------- 37# start 38# target running protocol version 6 39# keyids: RO 0xaa66150f, RW 0xde88588d 40# offsets: backup RO at 0x40000, backup RW at 0x44000 41# Current versions: 42# RO 0.0.10 43# RW 0.0.21 44# match groupdict: 45# { 46# 'ro': '0.0.10', 47# 'rw': '0.0.21' 48# } 49# 50# --binvers 51# example output: 52# read 524288(0x80000) bytes from /tmp/cr50.bin 53# RO_A:0.0.10 RW_A:0.0.21[00000000:00000000:00000000] 54# RO_B:0.0.10 RW_B:0.0.21[00000000:00000000:00000000] 55# match groupdict: 56# { 57# 'rw_b': '0.0.21', 58# 'rw_a': '0.0.21', 59# 'ro_b': '0.0.10', 60# 'ro_a': '0.0.10', 61# 'bid_a': '00000000:00000000:00000000', 62# 'bid_b': '00000000:00000000:00000000' 63# } 64VERSION_RE = { 65 '--fwver' : '\nRO (?P<ro>\S+).*\nRW (?P<rw>\S+)', 66 '--binvers' : 'RO_A:(?P<ro_a>[\d\.]+).*' \ 67 'RW_A:(?P<rw_a>[\d\.]+)(\[(?P<bid_a>[\d\:a-fA-F]+)\])?.*' \ 68 'RO_B:(?P<ro_b>\S+).*' \ 69 'RW_B:(?P<rw_b>[\d\.]+)(\[(?P<bid_b>[\d\:a-fA-F]+)\])?.*', 70} 71UPDATE_TIMEOUT = 60 72UPDATE_OK = 1 73 74ERASED_BID_INT = 0xffffffff 75# With an erased bid, the flags and board id will both be erased 76ERASED_BID = (ERASED_BID_INT, ERASED_BID_INT) 77 78usb_update = argparse.ArgumentParser() 79# use /dev/tpm0 to send the command 80usb_update.add_argument('-s', '--systemdev', dest='systemdev', 81 action='store_true') 82# fwver, binver, and board id are used to get information about cr50 or an 83# image. 84usb_update.add_argument('-b', '--binvers', '-f', '--fwver', '-i', '--board_id', 85 dest='info_cmd', action='store_true') 86# upstart and post_reset will post resets instead of rebooting immediately 87usb_update.add_argument('-u', '--upstart', '-p', '--post_reset', 88 dest='post_reset', action='store_true') 89usb_update.add_argument('extras', nargs=argparse.REMAINDER) 90 91 92def AssertVersionsAreEqual(name_a, ver_a, name_b, ver_b): 93 """Raise an error ver_a isn't the same as ver_b 94 95 Args: 96 name_a: the name of section a 97 ver_a: the version string for section a 98 name_b: the name of section b 99 ver_b: the version string for section b 100 101 Raises: 102 AssertionError if ver_a is not equal to ver_b 103 """ 104 assert ver_a == ver_b, ('Versions do not match: %s %s %s %s' % 105 (name_a, ver_a, name_b, ver_b)) 106 107 108def GetNewestVersion(ver_a, ver_b): 109 """Compare the versions. Return the newest one. If they are the same return 110 None.""" 111 a = [int(x) for x in ver_a.split('.')] 112 b = [int(x) for x in ver_b.split('.')] 113 114 if a > b: 115 return ver_a 116 if b > a: 117 return ver_b 118 return None 119 120 121def GetVersion(versions, name): 122 """Return the version string from the dictionary. 123 124 Get the version for each key in the versions dictionary that contains the 125 substring name. Make sure all of the versions match and return the version 126 string. Raise an error if the versions don't match. 127 128 Args: 129 version: dictionary with the partition names as keys and the 130 partition version strings as values. 131 name: the string used to find the relevant items in versions. 132 133 Returns: 134 the version from versions or "-1.-1.-1" if an invalid RO was detected. 135 """ 136 ver = None 137 key = None 138 for k, v in versions.iteritems(): 139 if name in k: 140 if v == DUMMY_VER: 141 logging.info('Detected invalid %s %s', name, v) 142 return v 143 elif ver: 144 AssertVersionsAreEqual(key, ver, k, v) 145 else: 146 ver = v 147 key = k 148 return ver 149 150 151def FindVersion(output, arg): 152 """Find the ro and rw versions. 153 154 Args: 155 output: The string to search 156 arg: string representing the usb_updater option, either '--binvers' or 157 '--fwver' 158 159 Returns: 160 a tuple of the ro and rw versions 161 """ 162 versions = re.search(VERSION_RE[arg], output) 163 versions = versions.groupdict() 164 ro = GetVersion(versions, RO) 165 rw = GetVersion(versions, RW) 166 return ro, rw 167 168 169def GetSavedVersion(client): 170 """Return the saved version from /var/cache/cr50-version""" 171 result = client.run(GET_CR50_VERSION).stdout.strip() 172 return FindVersion(result, '--fwver') 173 174 175def UsbUpdater(client, args): 176 """Run usb_update with the given args. 177 178 Args: 179 client: the object to run commands on 180 args: a list of strings that contiain the usb_updater args 181 182 Returns: 183 the result of usb_update 184 """ 185 options = usb_update.parse_args(args) 186 187 result = client.run('status trunksd') 188 if options.systemdev and 'running' in result.stdout: 189 client.run('stop trunksd') 190 191 # If we are updating the cr50 image, usb_update will return a non-zero exit 192 # status so we should ignore it. 193 ignore_status = not options.info_cmd 194 # immediate reboots are only honored if the command is sent using /dev/tpm0 195 expect_reboot = (options.systemdev and not options.post_reset and 196 not options.info_cmd) 197 198 result = client.run('usb_updater %s' % ' '.join(args), 199 ignore_status=ignore_status, 200 ignore_timeout=expect_reboot, 201 timeout=UPDATE_TIMEOUT) 202 203 # After a posted reboot, the usb_update exit code should equal 1. 204 if result.exit_status and result.exit_status != UPDATE_OK: 205 logging.debug(result) 206 raise error.TestFail('Unexpected usb_update exit code after %s %d' % 207 (' '.join(args), result.exit_status)) 208 return result 209 210 211def GetVersionFromUpdater(client, args): 212 """Return the version from usb_updater""" 213 result = UsbUpdater(client, args).stdout.strip() 214 return FindVersion(result, args[0]) 215 216 217def GetFwVersion(client): 218 """Get the running version using 'usb_updater --fwver'""" 219 return GetVersionFromUpdater(client, ['--fwver', '-s']) 220 221 222def GetBinVersion(client, image=CR50_FILE): 223 """Get the image version using 'usb_updater --binvers image'""" 224 # TODO(mruthven) b/37958867: change to ["--binvers", image] when usb_updater 225 # is fixed 226 return GetVersionFromUpdater(client, ['--binvers', image, image]) 227 228 229def GetVersionString(ver): 230 """Combine the RO and RW tuple into a understandable string""" 231 return 'RO %s RW %s' % (ver[0], ver[1]) 232 233 234def GetRunningVersion(client): 235 """Get the running Cr50 version. 236 237 The version from usb_updater and /var/cache/cr50-version should be the 238 same. Get both versions and make sure they match. 239 240 Args: 241 client: the object to run commands on 242 243 Returns: 244 running_ver: a tuple with the ro and rw version strings 245 246 Raises: 247 TestFail 248 - If the version in /var/cache/cr50-version is not the same as the 249 version from 'usb_updater --fwver' 250 """ 251 running_ver = GetFwVersion(client) 252 saved_ver = GetSavedVersion(client) 253 254 AssertVersionsAreEqual('Running', GetVersionString(running_ver), 255 'Saved', GetVersionString(saved_ver)) 256 return running_ver 257 258 259def CheckForFailures(client, last_message): 260 """Check for any unexpected cr50-update exit codes. 261 262 This only checks the cr50 update messages that have happened since 263 last_message. If a unexpected exit code is detected it will raise an error> 264 265 Args: 266 client: the object to run commands on 267 last_message: the last cr50 message from the last update run 268 269 Returns: 270 the last cr50 message in /var/log/messages 271 272 Raises: 273 TestFail 274 - If there is a unexpected cr50-update exit code after last_message 275 in /var/log/messages 276 """ 277 messages = client.run(GET_CR50_MESSAGES).stdout.strip() 278 if last_message: 279 messages = messages.rsplit(last_message, 1)[-1].split('\n') 280 failures = [] 281 for message in messages: 282 if UPDATE_FAILURE in message: 283 failures.append(message) 284 if len(failures): 285 logging.info(messages) 286 raise error.TestFail('Detected unexpected exit code during update: ' 287 '%s' % failures) 288 return messages[-1] 289 290 291def VerifyUpdate(client, ver='', last_message=''): 292 """Verify that the saved update state is correct and there were no 293 unexpected cr50-update exit codes since the last update. 294 295 Args: 296 client: the object to run commands on 297 ver: the expected version tuple (ro ver, rw ver) 298 last_message: the last cr50 message from the last update run 299 300 Returns: 301 new_ver: a tuple containing the running ro and rw versions 302 last_message: The last cr50 update message in /var/log/messages 303 """ 304 # Check that there were no unexpected reboots from cr50-result 305 last_message = CheckForFailures(client, last_message) 306 logging.debug('last cr50 message %s', last_message) 307 308 new_ver = GetRunningVersion(client) 309 if ver != '': 310 if DUMMY_VER != ver[0]: 311 AssertVersionsAreEqual('Old RO', ver[0], 'Updated RO', new_ver[0]) 312 AssertVersionsAreEqual('Old RW', ver[1], 'Updated RW', new_ver[1]) 313 return new_ver, last_message 314 315 316def ClearUpdateStateAndReboot(client): 317 """Removes the cr50 status files in /var/cache and reboots the AP""" 318 client.run('rm %s' % CR50_STATE) 319 client.reboot() 320 321 322def InstallImage(client, src, dest=CR50_FILE): 323 """Copy the image at src to dest on the dut 324 325 Args: 326 client: the object to run commands on 327 src: the image location of the server 328 dest: the desired location on the dut 329 330 Returns: 331 The filename where the image was copied to on the dut, a tuple 332 containing the RO and RW version of the file 333 """ 334 # Send the file to the DUT 335 client.send_file(src, dest) 336 337 ver = GetBinVersion(client, dest) 338 client.run('sync') 339 return dest, ver 340 341 342def GetSymbolicBoardId(symbolic_board_id): 343 """Convert the symbolic board id str to an int 344 345 Args: 346 symbolic_board_id: a ASCII string. It can be up to 4 characters 347 348 Returns: 349 the symbolic board id string converted to an int 350 """ 351 board_id = 0 352 for c in symbolic_board_id: 353 board_id = ord(c) | (board_id << 8) 354 return board_id 355 356 357def GetExpectedBoardId(board_id): 358 """"Return the usb_updater interpretation of board_id 359 360 Args: 361 board_id: a int or string value of the board id 362 363 Returns: 364 a int representation of the board id 365 """ 366 if type(board_id) == int: 367 return board_id 368 369 if len(board_id) <= 4: 370 return GetSymbolicBoardId(board_id) 371 372 return int(board_id, 16) 373 374 375def GetExpectedFlags(flags): 376 """If flags are not specified, usb_updater will set them to 0xff00 377 378 Args: 379 flags: The int value or None 380 381 Returns: 382 the original flags or 0xff00 if flags is None 383 """ 384 return flags if flags != None else 0xff00 385 386 387def GetBoardId(client): 388 """Return the board id and flags 389 390 Args: 391 client: the object to run commands on 392 393 Returns: 394 a tuple with the hex value board id, flags 395 396 Raises: 397 TestFail if the second board id response field is not ~board_id 398 """ 399 result = UsbUpdater(client, ['-s', '-i']).stdout.strip() 400 board_id_info = result.split('Board ID space: ')[-1].strip().split(':') 401 board_id, board_id_inv, flags = [int(val, 16) for val in board_id_info] 402 logging.info('BOARD_ID: %x:%x:%x', board_id, board_id_inv, flags) 403 404 if board_id == board_id_inv == flags == ERASED_BID_INT: 405 logging.info('board id is erased') 406 elif board_id & board_id_inv: 407 raise error.TestFail('board_id_inv should be ~board_id got %x %x' % 408 (board_id, board_id_inv)) 409 return board_id, flags 410 411 412def CheckBoardId(client, board_id, flags): 413 """Compare the given board_id and flags to the running board_id and flags 414 415 Interpret board_id and flags how usb_updater would interpret them, then 416 compare those interpreted values to the running board_id and flags. 417 418 Args: 419 client: the object to run commands on 420 board_id: a hex, symbolic or int value for board_id 421 flags: the int value of flags or None 422 423 Raises: 424 TestFail if the new board id info does not match 425 """ 426 # Read back the board id and flags 427 new_board_id, new_flags = GetBoardId(client) 428 429 expected_board_id = GetExpectedBoardId(board_id) 430 expected_flags = GetExpectedFlags(flags) 431 432 if new_board_id != expected_board_id or new_flags != expected_flags: 433 raise error.TestFail('Failed to set board id expected %x:%x, but got ' 434 '%x:%x' % (expected_board_id, expected_flags, 435 new_board_id, new_flags)) 436 437 438def SetBoardId(client, board_id, flags=None): 439 """Sets the board id and flags 440 441 Args: 442 client: the object to run commands on 443 board_id: a string of the symbolic board id or board id hex value. If 444 the string is less than 4 characters long it will be 445 considered a symbolic value 446 flags: the desired flag value. If board_id is a symbolic value, then 447 this will be ignored. 448 449 Raises: 450 TestFail if we were unable to set the flags to the correct value 451 """ 452 453 board_id_arg = board_id 454 if flags != None: 455 board_id_arg += ':' + hex(flags) 456 457 # Set the board id using the given board id and flags 458 result = UsbUpdater(client, ['-s', '-i', board_id_arg]).stdout.strip() 459 460 CheckBoardId(client, board_id, flags) 461