check_target_files_signatures.py revision a07bf049b9ac92b9c4f82092ac0b10f78c762998
1#!/usr/bin/env python
2#
3# Copyright (C) 2009 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Check the signatures of all APKs in a target_files .zip file.  With
19-c, compare the signatures of each package to the ones in a separate
20target_files (usually a previously distributed build for the same
21device) and flag any changes.
22
23Usage:  check_target_file_signatures [flags] target_files
24
25  -c  (--compare_with)  <other_target_files>
26      Look for compatibility problems between the two sets of target
27      files (eg., packages whose keys have changed).
28
29  -l  (--local_cert_dirs)  <dir,dir,...>
30      Comma-separated list of top-level directories to scan for
31      .x509.pem files.  Defaults to "vendor,build".  Where cert files
32      can be found that match APK signatures, the filename will be
33      printed as the cert name, otherwise a hash of the cert plus its
34      subject string will be printed instead.
35
36  -t  (--text)
37      Dump the certificate information for both packages in comparison
38      mode (this output is normally suppressed).
39
40"""
41
42import sys
43
44if sys.hexversion < 0x02070000:
45  print >> sys.stderr, "Python 2.7 or newer is required."
46  sys.exit(1)
47
48import os
49import re
50import shutil
51import subprocess
52import zipfile
53
54import common
55
56# Work around a bug in python's zipfile module that prevents opening
57# of zipfiles if any entry has an extra field of between 1 and 3 bytes
58# (which is common with zipaligned APKs).  This overrides the
59# ZipInfo._decodeExtra() method (which contains the bug) with an empty
60# version (since we don't need to decode the extra field anyway).
61class MyZipInfo(zipfile.ZipInfo):
62  def _decodeExtra(self):
63    pass
64zipfile.ZipInfo = MyZipInfo
65
66OPTIONS = common.OPTIONS
67
68OPTIONS.text = False
69OPTIONS.compare_with = None
70OPTIONS.local_cert_dirs = ("vendor", "build")
71
72PROBLEMS = []
73PROBLEM_PREFIX = []
74
75def AddProblem(msg):
76  PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
77def Push(msg):
78  PROBLEM_PREFIX.append(msg)
79def Pop():
80  PROBLEM_PREFIX.pop()
81
82
83def Banner(msg):
84  print "-" * 70
85  print "  ", msg
86  print "-" * 70
87
88
89def GetCertSubject(cert):
90  p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
91                 stdin=subprocess.PIPE,
92                 stdout=subprocess.PIPE)
93  out, err = p.communicate(cert)
94  if err and not err.strip():
95    return "(error reading cert subject)"
96  for line in out.split("\n"):
97    line = line.strip()
98    if line.startswith("Subject:"):
99      return line[8:].strip()
100  return "(unknown cert subject)"
101
102
103class CertDB(object):
104  def __init__(self):
105    self.certs = {}
106
107  def Add(self, cert, name=None):
108    if cert in self.certs:
109      if name:
110        self.certs[cert] = self.certs[cert] + "," + name
111    else:
112      if name is None:
113        name = "unknown cert %s (%s)" % (common.sha1(cert).hexdigest()[:12],
114                                         GetCertSubject(cert))
115      self.certs[cert] = name
116
117  def Get(self, cert):
118    """Return the name for a given cert."""
119    return self.certs.get(cert, None)
120
121  def FindLocalCerts(self):
122    to_load = []
123    for top in OPTIONS.local_cert_dirs:
124      for dirpath, _, filenames in os.walk(top):
125        certs = [os.path.join(dirpath, i)
126                 for i in filenames if i.endswith(".x509.pem")]
127        if certs:
128          to_load.extend(certs)
129
130    for i in to_load:
131      f = open(i)
132      cert = common.ParseCertificate(f.read())
133      f.close()
134      name, _ = os.path.splitext(i)
135      name, _ = os.path.splitext(name)
136      self.Add(cert, name)
137
138ALL_CERTS = CertDB()
139
140
141def CertFromPKCS7(data, filename):
142  """Read the cert out of a PKCS#7-format file (which is what is
143  stored in a signed .apk)."""
144  Push(filename + ":")
145  try:
146    p = common.Run(["openssl", "pkcs7",
147                    "-inform", "DER",
148                    "-outform", "PEM",
149                    "-print_certs"],
150                   stdin=subprocess.PIPE,
151                   stdout=subprocess.PIPE)
152    out, err = p.communicate(data)
153    if err and not err.strip():
154      AddProblem("error reading cert:\n" + err)
155      return None
156
157    cert = common.ParseCertificate(out)
158    if not cert:
159      AddProblem("error parsing cert output")
160      return None
161    return cert
162  finally:
163    Pop()
164
165
166class APK(object):
167  def __init__(self, full_filename, filename):
168    self.filename = filename
169    self.certs = None
170    self.shared_uid = None
171    self.package = None
172
173    Push(filename+":")
174    try:
175      self.RecordCerts(full_filename)
176      self.ReadManifest(full_filename)
177    finally:
178      Pop()
179
180  def RecordCerts(self, full_filename):
181    out = set()
182    try:
183      f = open(full_filename)
184      apk = zipfile.ZipFile(f, "r")
185      pkcs7 = None
186      for info in apk.infolist():
187        if info.filename.startswith("META-INF/") and \
188           (info.filename.endswith(".DSA") or info.filename.endswith(".RSA")):
189          pkcs7 = apk.read(info.filename)
190          cert = CertFromPKCS7(pkcs7, info.filename)
191          out.add(cert)
192          ALL_CERTS.Add(cert)
193      if not pkcs7:
194        AddProblem("no signature")
195    finally:
196      f.close()
197      self.certs = frozenset(out)
198
199  def ReadManifest(self, full_filename):
200    p = common.Run(["aapt", "dump", "xmltree", full_filename,
201                    "AndroidManifest.xml"],
202                   stdout=subprocess.PIPE)
203    manifest, err = p.communicate()
204    if err:
205      AddProblem("failed to read manifest")
206      return
207
208    self.shared_uid = None
209    self.package = None
210
211    for line in manifest.split("\n"):
212      line = line.strip()
213      m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
214      if m:
215        name = m.group(1)
216        if name == "android:sharedUserId":
217          if self.shared_uid is not None:
218            AddProblem("multiple sharedUserId declarations")
219          self.shared_uid = m.group(2)
220        elif name == "package":
221          if self.package is not None:
222            AddProblem("multiple package declarations")
223          self.package = m.group(2)
224
225    if self.package is None:
226      AddProblem("no package declaration")
227
228
229class TargetFiles(object):
230  def __init__(self):
231    self.max_pkg_len = 30
232    self.max_fn_len = 20
233    self.apks = None
234    self.apks_by_basename = None
235    self.certmap = None
236
237  def LoadZipFile(self, filename):
238    # First read the APK certs file to figure out whether there are compressed
239    # APKs in the archive. If we do have compressed APKs in the archive, then we
240    # must decompress them individually before we perform any analysis.
241
242    # This is the list of wildcards of files we extract from |filename|.
243    apk_extensions = ['*.apk']
244
245    self.certmap, compressed_extension = common.ReadApkCerts(zipfile.ZipFile(filename, "r"))
246    if compressed_extension:
247      apk_extensions.append("*.apk" + compressed_extension)
248
249    d, z = common.UnzipTemp(filename, apk_extensions)
250    try:
251      self.apks = {}
252      self.apks_by_basename = {}
253      for dirpath, _, filenames in os.walk(d):
254        for fn in filenames:
255          # Decompress compressed APKs before we begin processing them.
256          if compressed_extension and fn.endswith(compressed_extension):
257            # First strip the compressed extension from the file.
258            uncompressed_fn = fn[:-len(compressed_extension)]
259
260            # Decompress the compressed file to the output file.
261            common.Gunzip(os.path.join(dirpath, fn),
262                          os.path.join(dirpath, uncompressed_fn))
263
264            # Finally, delete the compressed file and use the uncompressed file
265            # for further processing. Note that the deletion is not strictly required,
266            # but is done here to ensure that we're not using too much space in
267            # the temporary directory.
268            os.remove(os.path.join(dirpath, fn))
269            fn = uncompressed_fn
270
271
272          if fn.endswith(".apk"):
273            fullname = os.path.join(dirpath, fn)
274            displayname = fullname[len(d)+1:]
275            apk = APK(fullname, displayname)
276            self.apks[apk.filename] = apk
277            self.apks_by_basename[os.path.basename(apk.filename)] = apk
278
279            self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
280            self.max_fn_len = max(self.max_fn_len, len(apk.filename))
281    finally:
282      shutil.rmtree(d)
283
284    z.close()
285
286  def CheckSharedUids(self):
287    """Look for any instances where packages signed with different
288    certs request the same sharedUserId."""
289    apks_by_uid = {}
290    for apk in self.apks.itervalues():
291      if apk.shared_uid:
292        apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
293
294    for uid in sorted(apks_by_uid.keys()):
295      apks = apks_by_uid[uid]
296      for apk in apks[1:]:
297        if apk.certs != apks[0].certs:
298          break
299      else:
300        # all packages have the same set of certs; this uid is fine.
301        continue
302
303      AddProblem("different cert sets for packages with uid %s" % (uid,))
304
305      print "uid %s is shared by packages with different cert sets:" % (uid,)
306      for apk in apks:
307        print "%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename)
308        for cert in apk.certs:
309          print "   ", ALL_CERTS.Get(cert)
310      print
311
312  def CheckExternalSignatures(self):
313    for apk_filename, certname in self.certmap.iteritems():
314      if certname == "EXTERNAL":
315        # Apps marked EXTERNAL should be signed with the test key
316        # during development, then manually re-signed after
317        # predexopting.  Consider it an error if this app is now
318        # signed with any key that is present in our tree.
319        apk = self.apks_by_basename[apk_filename]
320        name = ALL_CERTS.Get(apk.cert)
321        if not name.startswith("unknown "):
322          Push(apk.filename)
323          AddProblem("hasn't been signed with EXTERNAL cert")
324          Pop()
325
326  def PrintCerts(self):
327    """Display a table of packages grouped by cert."""
328    by_cert = {}
329    for apk in self.apks.itervalues():
330      for cert in apk.certs:
331        by_cert.setdefault(cert, []).append((apk.package, apk))
332
333    order = [(-len(v), k) for (k, v) in by_cert.iteritems()]
334    order.sort()
335
336    for _, cert in order:
337      print "%s:" % (ALL_CERTS.Get(cert),)
338      apks = by_cert[cert]
339      apks.sort()
340      for _, apk in apks:
341        if apk.shared_uid:
342          print "  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
343                                        self.max_pkg_len, apk.package,
344                                        apk.shared_uid)
345        else:
346          print "  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package)
347      print
348
349  def CompareWith(self, other):
350    """Look for instances where a given package that exists in both
351    self and other have different certs."""
352
353    all_apks = set(self.apks.keys())
354    all_apks.update(other.apks.keys())
355
356    max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
357
358    by_certpair = {}
359
360    for i in all_apks:
361      if i in self.apks:
362        if i in other.apks:
363          # in both; should have same set of certs
364          if self.apks[i].certs != other.apks[i].certs:
365            by_certpair.setdefault((other.apks[i].certs,
366                                    self.apks[i].certs), []).append(i)
367        else:
368          print "%s [%s]: new APK (not in comparison target_files)" % (
369              i, self.apks[i].filename)
370      else:
371        if i in other.apks:
372          print "%s [%s]: removed APK (only in comparison target_files)" % (
373              i, other.apks[i].filename)
374
375    if by_certpair:
376      AddProblem("some APKs changed certs")
377      Banner("APK signing differences")
378      for (old, new), packages in sorted(by_certpair.items()):
379        for i, o in enumerate(old):
380          if i == 0:
381            print "was", ALL_CERTS.Get(o)
382          else:
383            print "   ", ALL_CERTS.Get(o)
384        for i, n in enumerate(new):
385          if i == 0:
386            print "now", ALL_CERTS.Get(n)
387          else:
388            print "   ", ALL_CERTS.Get(n)
389        for i in sorted(packages):
390          old_fn = other.apks[i].filename
391          new_fn = self.apks[i].filename
392          if old_fn == new_fn:
393            print "  %-*s  [%s]" % (max_pkg_len, i, old_fn)
394          else:
395            print "  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
396                                                  old_fn, new_fn)
397        print
398
399
400def main(argv):
401  def option_handler(o, a):
402    if o in ("-c", "--compare_with"):
403      OPTIONS.compare_with = a
404    elif o in ("-l", "--local_cert_dirs"):
405      OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
406    elif o in ("-t", "--text"):
407      OPTIONS.text = True
408    else:
409      return False
410    return True
411
412  args = common.ParseOptions(argv, __doc__,
413                             extra_opts="c:l:t",
414                             extra_long_opts=["compare_with=",
415                                              "local_cert_dirs="],
416                             extra_option_handler=option_handler)
417
418  if len(args) != 1:
419    common.Usage(__doc__)
420    sys.exit(1)
421
422  ALL_CERTS.FindLocalCerts()
423
424  Push("input target_files:")
425  try:
426    target_files = TargetFiles()
427    target_files.LoadZipFile(args[0])
428  finally:
429    Pop()
430
431  compare_files = None
432  if OPTIONS.compare_with:
433    Push("comparison target_files:")
434    try:
435      compare_files = TargetFiles()
436      compare_files.LoadZipFile(OPTIONS.compare_with)
437    finally:
438      Pop()
439
440  if OPTIONS.text or not compare_files:
441    Banner("target files")
442    target_files.PrintCerts()
443  target_files.CheckSharedUids()
444  target_files.CheckExternalSignatures()
445  if compare_files:
446    if OPTIONS.text:
447      Banner("comparison files")
448      compare_files.PrintCerts()
449    target_files.CompareWith(compare_files)
450
451  if PROBLEMS:
452    print "%d problem(s) found:\n" % (len(PROBLEMS),)
453    for p in PROBLEMS:
454      print p
455    return 1
456
457  return 0
458
459
460if __name__ == '__main__':
461  try:
462    r = main(sys.argv[1:])
463    sys.exit(r)
464  except common.ExternalError as e:
465    print
466    print "   ERROR: %s" % (e,)
467    print
468    sys.exit(1)
469