cr50_utils.py revision 162a032f5f848d4a530eae1c314259076258bcd4
1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import re 8 9from autotest_lib.client.common_lib import error 10 11 12RO = 'ro' 13RW = 'rw' 14CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod' 15CR50_STATE = '/var/cache/cr50*' 16GET_CR50_VERSION = 'cat /var/cache/cr50-version' 17GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages' 18UPDATE_FAILURE = 'unexpected cr50-update exit code' 19DUMMY_VER = '-1.-1.-1' 20# This dictionary is used to search the usb_updater output for the version 21# strings. There are two usb_updater commands that will return versions: 22# 'fwver' and 'binver'. 23# 24# 'fwver' is used to get the running RO and RW versions from cr50 25# 'binver' gets the version strings for each RO and RW region in the given 26# file 27# 28# The value in the dictionary is the regular expression that can be used to 29# find the version strings for each region. 30VERSION_RE = { 31 "--fwver" : '\nRO (?P<ro>\S+).*\nRW (?P<rw>\S+)', 32 "--binver" : 'RO_A:(?P<ro_a>\S+).*RW_A:(?P<rw_a>\S+).*' \ 33 'RO_B:(?P<ro_b>\S+).*RW_B:(?P<rw_b>\S+)', 34} 35 36 37def AssertVersionsAreEqual(name_a, ver_a, name_b, ver_b): 38 """Raise an error ver_a isn't the same as ver_b 39 40 Args: 41 name_a: the name of section a 42 ver_a: the version string for section a 43 name_b: the name of section b 44 ver_b: the version string for section b 45 46 Raises: 47 AssertionError if ver_a is not equal to ver_b 48 """ 49 assert ver_a == ver_b, ("Versions do not match: %s %s %s %s" % 50 (name_a, ver_a, name_b, ver_b)) 51 52 53def GetNewestVersion(ver_a, ver_b): 54 """Compare the versions. Return the newest one. If they are the same return 55 None.""" 56 a = [int(x) for x in ver_a.split('.')] 57 b = [int(x) for x in ver_b.split('.')] 58 59 if a > b: 60 return ver_a 61 if b > a: 62 return ver_b 63 return None 64 65 66def GetVersion(versions, name): 67 """Return the version string from the dictionary. 68 69 Get the version for each key in the versions dictionary that contains the 70 substring name. Make sure all of the versions match and return the version 71 string. Raise an error if the versions don't match. 72 73 Args: 74 version: dictionary with the partition names as keys and the 75 partition version strings as values. 76 name: the string used to find the relevant items in versions. 77 Returns: 78 the version from versions or "-1.-1.-1" if an invalid RO was detected. 79 """ 80 ver = None 81 key = None 82 for k, v in versions.iteritems(): 83 if name in k: 84 if v == DUMMY_VER: 85 logging.info("Detected invalid %s %s", name, v) 86 return v 87 elif ver: 88 AssertVersionsAreEqual(key, ver, k, v) 89 else: 90 ver = v 91 key = k 92 return ver 93 94 95def FindVersion(output, arg): 96 """Find the ro and rw versions. 97 98 @param output: The string to search 99 @param arg: string representing the usb_updater option, either 100 '--binver' or '--fwver' 101 @param compare: raise an error if the ro or rw versions don't match 102 """ 103 versions = re.search(VERSION_RE[arg], output) 104 versions = versions.groupdict() 105 ro = GetVersion(versions, RO) 106 rw = GetVersion(versions, RW) 107 return ro, rw 108 109 110def GetSavedVersion(client): 111 """Return the saved version from /var/cache/cr50-version""" 112 result = client.run(GET_CR50_VERSION).stdout.strip() 113 return FindVersion(result, "--fwver") 114 115 116def GetVersionFromUpdater(client, args): 117 """Return the version from usb_updater""" 118 result = client.run("usb_updater %s" % ' '.join(args)).stdout.strip() 119 return FindVersion(result, args[0]) 120 121 122def GetFwVersion(client): 123 """Get the running version using 'usb_updater --fwver'""" 124 return GetVersionFromUpdater(client, ["--fwver"]) 125 126 127def GetBinVersion(client, image=CR50_FILE): 128 """Get the image version using 'usb_updater --binver image'""" 129 return GetVersionFromUpdater(client, ["--binver", image]) 130 131 132def GetVersionString(ver): 133 return 'RO %s RW %s' % (ver[0], ver[1]) 134 135 136def GetRunningVersion(client): 137 """Get the running Cr50 version. 138 139 The version from usb_updater and /var/cache/cr50-version should be the 140 same. Get both versions and make sure they match. 141 142 Returns: 143 running_ver: a tuple with the ro and rw version strings 144 Raises: 145 TestFail 146 - If the version in /var/cache/cr50-version is not the same as the 147 version from 'usb_updater --fwver' 148 """ 149 running_ver = GetFwVersion(client) 150 saved_ver = GetSavedVersion(client) 151 152 AssertVersionsAreEqual("Running", GetVersionString(running_ver), 153 "Saved", GetVersionString(saved_ver)) 154 return running_ver 155 156 157def CheckForFailures(client, last_message): 158 """Check for any unexpected cr50-update exit codes. 159 160 This only checks the cr50 update messages that have happened since 161 last_message. If a unexpected exit code is detected it will raise an error> 162 163 Args: 164 last_message: the last cr50 message from the last update run 165 166 Returns: 167 the last cr50 message in /var/log/messages 168 169 Raises: 170 TestFail 171 - If there is a unexpected cr50-update exit code after last_message 172 in /var/log/messages 173 """ 174 messages = client.run(GET_CR50_MESSAGES).stdout.strip() 175 if last_message: 176 messages = messages.rsplit(last_message, 1)[-1] 177 if UPDATE_FAILURE in messages: 178 logging.debug(messages) 179 raise error.TestFail("Detected unexpected exit code during update") 180 return messages.rsplit('\n', 1)[-1] 181 182 183def VerifyUpdate(client, ver='', last_message=''): 184 """Verify that the saved update state is correct and there were no 185 unexpected cr50-update exit codes since the last update. 186 187 Returns: 188 new_ver: a tuple containing the running ro and rw versions 189 last_message: The last cr50 update message in /var/log/messages 190 """ 191 # Check that there were no unexpected reboots from cr50-result 192 last_message = CheckForFailures(client, last_message) 193 logging.debug("last cr50 message %s", last_message) 194 195 new_ver = GetRunningVersion(client) 196 if ver != '': 197 if DUMMY_VER != ver[0]: 198 AssertVersionsAreEqual("Old RO", ver[0], "Updated RO", new_ver[0]) 199 AssertVersionsAreEqual("Old RW", ver[1], "Updated RW", new_ver[1]) 200 return new_ver, last_message 201 202 203def ClearUpdateStateAndReboot(client): 204 """Removes the cr50 status files in /var/cache and reboots the AP""" 205 client.run("rm %s" % CR50_STATE) 206 client.reboot() 207 208 209def InstallImage(client, src, dest=CR50_FILE): 210 """Copy the image at src to dest on the dut 211 Args: 212 src: the image location of the server 213 dest: the desired location on the dut 214 Returns: 215 The filename where the image was copied to on the dut, a tuple 216 containing the RO and RW version of the file 217 """ 218 # Send the file to the DUT 219 client.send_file(src, dest) 220 221 ver = GetBinVersion(client, dest) 222 client.run("sync") 223 return dest, ver 224