1#!/usr/bin/env python
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Verify a given OTA package with the specifed certificate.
19"""
20
21from __future__ import print_function
22
23import argparse
24import re
25import subprocess
26import sys
27import tempfile
28import zipfile
29
30from hashlib import sha1
31from hashlib import sha256
32
33import common
34
35
36def CertUsesSha256(cert):
37  """Check if the cert uses SHA-256 hashing algorithm."""
38
39  cmd = ['openssl', 'x509', '-text', '-noout', '-in', cert]
40  p1 = common.Run(cmd, stdout=subprocess.PIPE)
41  cert_dump, _ = p1.communicate()
42
43  algorithm = re.search(r'Signature Algorithm: ([a-zA-Z0-9]+)', cert_dump)
44  assert algorithm, "Failed to identify the signature algorithm."
45
46  assert not algorithm.group(1).startswith('ecdsa'), (
47      'This script doesn\'t support verifying ECDSA signed package yet.')
48
49  return algorithm.group(1).startswith('sha256')
50
51
52def VerifyPackage(cert, package):
53  """Verify the given package with the certificate.
54
55  (Comments from bootable/recovery/verifier.cpp:)
56
57  An archive with a whole-file signature will end in six bytes:
58
59    (2-byte signature start) $ff $ff (2-byte comment size)
60
61  (As far as the ZIP format is concerned, these are part of the
62  archive comment.) We start by reading this footer, this tells
63  us how far back from the end we have to start reading to find
64  the whole comment.
65  """
66
67  print('Package: %s' % (package,))
68  print('Certificate: %s' % (cert,))
69
70  # Read in the package.
71  with open(package) as package_file:
72    package_bytes = package_file.read()
73
74  length = len(package_bytes)
75  assert length >= 6, "Not big enough to contain footer."
76
77  footer = [ord(x) for x in package_bytes[-6:]]
78  assert footer[2] == 0xff and footer[3] == 0xff, "Footer is wrong."
79
80  signature_start_from_end = (footer[1] << 8) + footer[0]
81  assert signature_start_from_end > 6, "Signature start is in the footer."
82
83  signature_start = length - signature_start_from_end
84
85  # Determine how much of the file is covered by the signature. This is
86  # everything except the signature data and length, which includes all of the
87  # EOCD except for the comment length field (2 bytes) and the comment data.
88  comment_len = (footer[5] << 8) + footer[4]
89  signed_len = length - comment_len - 2
90
91  print('Package length: %d' % (length,))
92  print('Comment length: %d' % (comment_len,))
93  print('Signed data length: %d' % (signed_len,))
94  print('Signature start: %d' % (signature_start,))
95
96  use_sha256 = CertUsesSha256(cert)
97  print('Use SHA-256: %s' % (use_sha256,))
98
99  h = sha256() if use_sha256 else sha1()
100  h.update(package_bytes[:signed_len])
101  package_digest = h.hexdigest().lower()
102
103  print('Digest: %s' % (package_digest,))
104
105  # Get the signature from the input package.
106  signature = package_bytes[signature_start:-6]
107  sig_file = common.MakeTempFile(prefix='sig-')
108  with open(sig_file, 'wb') as f:
109    f.write(signature)
110
111  # Parse the signature and get the hash.
112  cmd = ['openssl', 'asn1parse', '-inform', 'DER', '-in', sig_file]
113  p1 = common.Run(cmd, stdout=subprocess.PIPE)
114  sig, _ = p1.communicate()
115  assert p1.returncode == 0, "Failed to parse the signature."
116
117  digest_line = sig.strip().split('\n')[-1]
118  digest_string = digest_line.split(':')[3]
119  digest_file = common.MakeTempFile(prefix='digest-')
120  with open(digest_file, 'wb') as f:
121    f.write(digest_string.decode('hex'))
122
123  # Verify the digest by outputing the decrypted result in ASN.1 structure.
124  decrypted_file = common.MakeTempFile(prefix='decrypted-')
125  cmd = ['openssl', 'rsautl', '-verify', '-certin', '-inkey', cert,
126         '-in', digest_file, '-out', decrypted_file]
127  p1 = common.Run(cmd, stdout=subprocess.PIPE)
128  p1.communicate()
129  assert p1.returncode == 0, "Failed to run openssl rsautl -verify."
130
131  # Parse the output ASN.1 structure.
132  cmd = ['openssl', 'asn1parse', '-inform', 'DER', '-in', decrypted_file]
133  p1 = common.Run(cmd, stdout=subprocess.PIPE)
134  decrypted_output, _ = p1.communicate()
135  assert p1.returncode == 0, "Failed to parse the output."
136
137  digest_line = decrypted_output.strip().split('\n')[-1]
138  digest_string = digest_line.split(':')[3].lower()
139
140  # Verify that the two digest strings match.
141  assert package_digest == digest_string, "Verification failed."
142
143  # Verified successfully upon reaching here.
144  print('\nWhole package signature VERIFIED\n')
145
146
147def VerifyAbOtaPayload(cert, package):
148  """Verifies the payload and metadata signatures in an A/B OTA payload."""
149  package_zip = zipfile.ZipFile(package, 'r')
150  if 'payload.bin' not in package_zip.namelist():
151    common.ZipClose(package_zip)
152    return
153
154  print('Verifying A/B OTA payload signatures...')
155
156  # Dump pubkey from the certificate.
157  pubkey = common.MakeTempFile(prefix="key-", suffix=".pem")
158  with open(pubkey, 'wb') as pubkey_fp:
159    pubkey_fp.write(common.ExtractPublicKey(cert))
160
161  package_dir = common.MakeTempDir(prefix='package-')
162
163  # Signature verification with delta_generator.
164  payload_file = package_zip.extract('payload.bin', package_dir)
165  cmd = ['delta_generator',
166         '--in_file=' + payload_file,
167         '--public_key=' + pubkey]
168  proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
169  stdoutdata, _ = proc.communicate()
170  assert proc.returncode == 0, \
171      'Failed to verify payload with delta_generator: %s\n%s' % (package,
172                                                                 stdoutdata)
173  common.ZipClose(package_zip)
174
175  # Verified successfully upon reaching here.
176  print('\nPayload signatures VERIFIED\n\n')
177
178
179def main():
180  parser = argparse.ArgumentParser()
181  parser.add_argument('certificate', help='The certificate to be used.')
182  parser.add_argument('package', help='The OTA package to be verified.')
183  args = parser.parse_args()
184
185  VerifyPackage(args.certificate, args.package)
186  VerifyAbOtaPayload(args.certificate, args.package)
187
188
189if __name__ == '__main__':
190  try:
191    main()
192  except AssertionError as err:
193    print('\n    ERROR: %s\n' % (err,))
194    sys.exit(1)
195  finally:
196    common.Cleanup()
197