1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import os
18import subprocess
19import tempfile
20import time
21import unittest
22import zipfile
23from hashlib import sha1
24
25import common
26import test_utils
27import validate_target_files
28from rangelib import RangeSet
29
30
31KiB = 1024
32MiB = 1024 * KiB
33GiB = 1024 * MiB
34
35
36def get_2gb_string():
37  size = int(2 * GiB + 1)
38  block_size = 4 * KiB
39  step_size = 4 * MiB
40  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
41  for _ in range(0, size, step_size):
42    yield os.urandom(block_size)
43    yield '\0' * (step_size - block_size)
44
45
46class CommonZipTest(unittest.TestCase):
47  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
48              test_file_name=None, expected_stat=None, expected_mode=0o644,
49              expected_compress_type=zipfile.ZIP_STORED):
50    # Verify the stat if present.
51    if test_file_name is not None:
52      new_stat = os.stat(test_file_name)
53      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
54      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
55
56    # Reopen the zip file to verify.
57    zip_file = zipfile.ZipFile(zip_file_name, "r")
58
59    # Verify the timestamp.
60    info = zip_file.getinfo(arcname)
61    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
62
63    # Verify the file mode.
64    mode = (info.external_attr >> 16) & 0o777
65    self.assertEqual(mode, expected_mode)
66
67    # Verify the compress type.
68    self.assertEqual(info.compress_type, expected_compress_type)
69
70    # Verify the zip contents.
71    entry = zip_file.open(arcname)
72    sha1_hash = sha1()
73    for chunk in iter(lambda: entry.read(4 * MiB), ''):
74      sha1_hash.update(chunk)
75    self.assertEqual(expected_hash, sha1_hash.hexdigest())
76    self.assertIsNone(zip_file.testzip())
77
78  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
79    extra_zipwrite_args = dict(extra_zipwrite_args or {})
80
81    test_file = tempfile.NamedTemporaryFile(delete=False)
82    test_file_name = test_file.name
83
84    zip_file = tempfile.NamedTemporaryFile(delete=False)
85    zip_file_name = zip_file.name
86
87    # File names within an archive strip the leading slash.
88    arcname = extra_zipwrite_args.get("arcname", test_file_name)
89    if arcname[0] == "/":
90      arcname = arcname[1:]
91
92    zip_file.close()
93    zip_file = zipfile.ZipFile(zip_file_name, "w")
94
95    try:
96      sha1_hash = sha1()
97      for data in contents:
98        sha1_hash.update(data)
99        test_file.write(data)
100      test_file.close()
101
102      expected_stat = os.stat(test_file_name)
103      expected_mode = extra_zipwrite_args.get("perms", 0o644)
104      expected_compress_type = extra_zipwrite_args.get("compress_type",
105                                                       zipfile.ZIP_STORED)
106      time.sleep(5)  # Make sure the atime/mtime will change measurably.
107
108      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
109      common.ZipClose(zip_file)
110
111      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
112                   test_file_name, expected_stat, expected_mode,
113                   expected_compress_type)
114    finally:
115      os.remove(test_file_name)
116      os.remove(zip_file_name)
117
118  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
119    extra_args = dict(extra_args or {})
120
121    zip_file = tempfile.NamedTemporaryFile(delete=False)
122    zip_file_name = zip_file.name
123    zip_file.close()
124
125    zip_file = zipfile.ZipFile(zip_file_name, "w")
126
127    try:
128      expected_compress_type = extra_args.get("compress_type",
129                                              zipfile.ZIP_STORED)
130      time.sleep(5)  # Make sure the atime/mtime will change measurably.
131
132      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
133        arcname = zinfo_or_arcname
134        expected_mode = extra_args.get("perms", 0o644)
135      else:
136        arcname = zinfo_or_arcname.filename
137        expected_mode = extra_args.get("perms",
138                                       zinfo_or_arcname.external_attr >> 16)
139
140      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
141      common.ZipClose(zip_file)
142
143      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
144                   expected_mode=expected_mode,
145                   expected_compress_type=expected_compress_type)
146    finally:
147      os.remove(zip_file_name)
148
149  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
150    extra_args = dict(extra_args or {})
151
152    zip_file = tempfile.NamedTemporaryFile(delete=False)
153    zip_file_name = zip_file.name
154
155    test_file = tempfile.NamedTemporaryFile(delete=False)
156    test_file_name = test_file.name
157
158    arcname_large = test_file_name
159    arcname_small = "bar"
160
161    # File names within an archive strip the leading slash.
162    if arcname_large[0] == "/":
163      arcname_large = arcname_large[1:]
164
165    zip_file.close()
166    zip_file = zipfile.ZipFile(zip_file_name, "w")
167
168    try:
169      sha1_hash = sha1()
170      for data in large:
171        sha1_hash.update(data)
172        test_file.write(data)
173      test_file.close()
174
175      expected_stat = os.stat(test_file_name)
176      expected_mode = 0o644
177      expected_compress_type = extra_args.get("compress_type",
178                                              zipfile.ZIP_STORED)
179      time.sleep(5)  # Make sure the atime/mtime will change measurably.
180
181      common.ZipWrite(zip_file, test_file_name, **extra_args)
182      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
183      common.ZipClose(zip_file)
184
185      # Verify the contents written by ZipWrite().
186      self._verify(zip_file, zip_file_name, arcname_large,
187                   sha1_hash.hexdigest(), test_file_name, expected_stat,
188                   expected_mode, expected_compress_type)
189
190      # Verify the contents written by ZipWriteStr().
191      self._verify(zip_file, zip_file_name, arcname_small,
192                   sha1(small).hexdigest(),
193                   expected_compress_type=expected_compress_type)
194    finally:
195      os.remove(zip_file_name)
196      os.remove(test_file_name)
197
198  def _test_reset_ZIP64_LIMIT(self, func, *args):
199    default_limit = (1 << 31) - 1
200    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
201    func(*args)
202    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
203
204  def test_ZipWrite(self):
205    file_contents = os.urandom(1024)
206    self._test_ZipWrite(file_contents)
207
208  def test_ZipWrite_with_opts(self):
209    file_contents = os.urandom(1024)
210    self._test_ZipWrite(file_contents, {
211        "arcname": "foobar",
212        "perms": 0o777,
213        "compress_type": zipfile.ZIP_DEFLATED,
214    })
215    self._test_ZipWrite(file_contents, {
216        "arcname": "foobar",
217        "perms": 0o700,
218        "compress_type": zipfile.ZIP_STORED,
219    })
220
221  def test_ZipWrite_large_file(self):
222    file_contents = get_2gb_string()
223    self._test_ZipWrite(file_contents, {
224        "compress_type": zipfile.ZIP_DEFLATED,
225    })
226
227  def test_ZipWrite_resets_ZIP64_LIMIT(self):
228    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
229
230  def test_ZipWriteStr(self):
231    random_string = os.urandom(1024)
232    # Passing arcname
233    self._test_ZipWriteStr("foo", random_string)
234
235    # Passing zinfo
236    zinfo = zipfile.ZipInfo(filename="foo")
237    self._test_ZipWriteStr(zinfo, random_string)
238
239    # Timestamp in the zinfo should be overwritten.
240    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
241    self._test_ZipWriteStr(zinfo, random_string)
242
243  def test_ZipWriteStr_with_opts(self):
244    random_string = os.urandom(1024)
245    # Passing arcname
246    self._test_ZipWriteStr("foo", random_string, {
247        "perms": 0o700,
248        "compress_type": zipfile.ZIP_DEFLATED,
249    })
250    self._test_ZipWriteStr("bar", random_string, {
251        "compress_type": zipfile.ZIP_STORED,
252    })
253
254    # Passing zinfo
255    zinfo = zipfile.ZipInfo(filename="foo")
256    self._test_ZipWriteStr(zinfo, random_string, {
257        "compress_type": zipfile.ZIP_DEFLATED,
258    })
259    self._test_ZipWriteStr(zinfo, random_string, {
260        "perms": 0o600,
261        "compress_type": zipfile.ZIP_STORED,
262    })
263
264  def test_ZipWriteStr_large_file(self):
265    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
266    # the workaround. We will only test the case of writing a string into a
267    # large archive.
268    long_string = get_2gb_string()
269    short_string = os.urandom(1024)
270    self._test_ZipWriteStr_large_file(long_string, short_string, {
271        "compress_type": zipfile.ZIP_DEFLATED,
272    })
273
274  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
275    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
276    zinfo = zipfile.ZipInfo(filename="foo")
277    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
278
279  def test_bug21309935(self):
280    zip_file = tempfile.NamedTemporaryFile(delete=False)
281    zip_file_name = zip_file.name
282    zip_file.close()
283
284    try:
285      random_string = os.urandom(1024)
286      zip_file = zipfile.ZipFile(zip_file_name, "w")
287      # Default perms should be 0o644 when passing the filename.
288      common.ZipWriteStr(zip_file, "foo", random_string)
289      # Honor the specified perms.
290      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
291      # The perms in zinfo should be untouched.
292      zinfo = zipfile.ZipInfo(filename="baz")
293      zinfo.external_attr = 0o740 << 16
294      common.ZipWriteStr(zip_file, zinfo, random_string)
295      # Explicitly specified perms has the priority.
296      zinfo = zipfile.ZipInfo(filename="qux")
297      zinfo.external_attr = 0o700 << 16
298      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
299      common.ZipClose(zip_file)
300
301      self._verify(zip_file, zip_file_name, "foo",
302                   sha1(random_string).hexdigest(),
303                   expected_mode=0o644)
304      self._verify(zip_file, zip_file_name, "bar",
305                   sha1(random_string).hexdigest(),
306                   expected_mode=0o755)
307      self._verify(zip_file, zip_file_name, "baz",
308                   sha1(random_string).hexdigest(),
309                   expected_mode=0o740)
310      self._verify(zip_file, zip_file_name, "qux",
311                   sha1(random_string).hexdigest(),
312                   expected_mode=0o400)
313    finally:
314      os.remove(zip_file_name)
315
316  def test_ZipDelete(self):
317    zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
318    output_zip = zipfile.ZipFile(zip_file.name, 'w',
319                                 compression=zipfile.ZIP_DEFLATED)
320    with tempfile.NamedTemporaryFile() as entry_file:
321      entry_file.write(os.urandom(1024))
322      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
323      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
324      common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
325      common.ZipClose(output_zip)
326    zip_file.close()
327
328    try:
329      common.ZipDelete(zip_file.name, 'Test2')
330      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
331        entries = check_zip.namelist()
332        self.assertTrue('Test1' in entries)
333        self.assertFalse('Test2' in entries)
334        self.assertTrue('Test3' in entries)
335
336      self.assertRaises(AssertionError, common.ZipDelete, zip_file.name,
337                        'Test2')
338      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
339        entries = check_zip.namelist()
340        self.assertTrue('Test1' in entries)
341        self.assertFalse('Test2' in entries)
342        self.assertTrue('Test3' in entries)
343
344      common.ZipDelete(zip_file.name, ['Test3'])
345      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
346        entries = check_zip.namelist()
347        self.assertTrue('Test1' in entries)
348        self.assertFalse('Test2' in entries)
349        self.assertFalse('Test3' in entries)
350
351      common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
352      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
353        entries = check_zip.namelist()
354        self.assertFalse('Test1' in entries)
355        self.assertFalse('Test2' in entries)
356        self.assertFalse('Test3' in entries)
357    finally:
358      os.remove(zip_file.name)
359
360
361class CommonApkUtilsTest(unittest.TestCase):
362  """Tests the APK utils related functions."""
363
364  APKCERTS_TXT1 = (
365      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
366      ' private_key="certs/devkey.pk8"\n'
367      'name="Settings.apk"'
368      ' certificate="build/target/product/security/platform.x509.pem"'
369      ' private_key="build/target/product/security/platform.pk8"\n'
370      'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
371  )
372
373  APKCERTS_CERTMAP1 = {
374      'RecoveryLocalizer.apk' : 'certs/devkey',
375      'Settings.apk' : 'build/target/product/security/platform',
376      'TV.apk' : 'PRESIGNED',
377  }
378
379  APKCERTS_TXT2 = (
380      'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
381      ' private_key="certs/compressed1.pk8" compressed="gz"\n'
382      'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
383      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
384      'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
385      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
386      'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
387      ' private_key="certs/compressed3.pk8" compressed="gz"\n'
388  )
389
390  APKCERTS_CERTMAP2 = {
391      'Compressed1.apk' : 'certs/compressed1',
392      'Compressed2a.apk' : 'certs/compressed2',
393      'Compressed2b.apk' : 'certs/compressed2',
394      'Compressed3.apk' : 'certs/compressed3',
395  }
396
397  APKCERTS_TXT3 = (
398      'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
399      ' private_key="certs/compressed4.pk8" compressed="xz"\n'
400  )
401
402  APKCERTS_CERTMAP3 = {
403      'Compressed4.apk' : 'certs/compressed4',
404  }
405
406  def setUp(self):
407    self.testdata_dir = test_utils.get_testdata_dir()
408
409  def tearDown(self):
410    common.Cleanup()
411
412  @staticmethod
413  def _write_apkcerts_txt(apkcerts_txt, additional=None):
414    if additional is None:
415      additional = []
416    target_files = common.MakeTempFile(suffix='.zip')
417    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
418      target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
419      for entry in additional:
420        target_files_zip.writestr(entry, '')
421    return target_files
422
423  def test_ReadApkCerts_NoncompressedApks(self):
424    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
425    with zipfile.ZipFile(target_files, 'r') as input_zip:
426      certmap, ext = common.ReadApkCerts(input_zip)
427
428    self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
429    self.assertIsNone(ext)
430
431  def test_ReadApkCerts_CompressedApks(self):
432    # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
433    # not stored in '.gz' format, so it shouldn't be considered as installed.
434    target_files = self._write_apkcerts_txt(
435        self.APKCERTS_TXT2,
436        ['Compressed1.apk.gz', 'Compressed3.apk'])
437
438    with zipfile.ZipFile(target_files, 'r') as input_zip:
439      certmap, ext = common.ReadApkCerts(input_zip)
440
441    self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
442    self.assertEqual('.gz', ext)
443
444    # Alternative case with '.xz'.
445    target_files = self._write_apkcerts_txt(
446        self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
447
448    with zipfile.ZipFile(target_files, 'r') as input_zip:
449      certmap, ext = common.ReadApkCerts(input_zip)
450
451    self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
452    self.assertEqual('.xz', ext)
453
454  def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
455    target_files = self._write_apkcerts_txt(
456        self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
457        ['Compressed1.apk.gz', 'Compressed3.apk'])
458
459    with zipfile.ZipFile(target_files, 'r') as input_zip:
460      certmap, ext = common.ReadApkCerts(input_zip)
461
462    certmap_merged = self.APKCERTS_CERTMAP1.copy()
463    certmap_merged.update(self.APKCERTS_CERTMAP2)
464    self.assertDictEqual(certmap_merged, certmap)
465    self.assertEqual('.gz', ext)
466
467  def test_ReadApkCerts_MultipleCompressionMethods(self):
468    target_files = self._write_apkcerts_txt(
469        self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
470        ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
471
472    with zipfile.ZipFile(target_files, 'r') as input_zip:
473      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
474
475  def test_ReadApkCerts_MismatchingKeys(self):
476    malformed_apkcerts_txt = (
477        'name="App1.apk" certificate="certs/cert1.x509.pem"'
478        ' private_key="certs/cert2.pk8"\n'
479    )
480    target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
481
482    with zipfile.ZipFile(target_files, 'r') as input_zip:
483      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
484
485  def test_ExtractPublicKey(self):
486    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
487    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
488    with open(pubkey, 'rb') as pubkey_fp:
489      self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
490
491  def test_ExtractPublicKey_invalidInput(self):
492    wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
493    self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
494
495  def test_ParseCertificate(self):
496    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
497
498    cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
499    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
500    expected, _ = proc.communicate()
501    self.assertEqual(0, proc.returncode)
502
503    with open(cert) as cert_fp:
504      actual = common.ParseCertificate(cert_fp.read())
505    self.assertEqual(expected, actual)
506
507
508class CommonUtilsTest(unittest.TestCase):
509
510  def tearDown(self):
511    common.Cleanup()
512
513  def test_GetSparseImage_emptyBlockMapFile(self):
514    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
515    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
516      target_files_zip.write(
517          test_utils.construct_sparse_image([
518              (0xCAC1, 6),
519              (0xCAC3, 3),
520              (0xCAC1, 4)]),
521          arcname='IMAGES/system.img')
522      target_files_zip.writestr('IMAGES/system.map', '')
523      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
524      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
525
526    tempdir = common.UnzipTemp(target_files)
527    with zipfile.ZipFile(target_files, 'r') as input_zip:
528      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
529
530    self.assertDictEqual(
531        {
532            '__COPY': RangeSet("0"),
533            '__NONZERO-0': RangeSet("1-5 9-12"),
534        },
535        sparse_image.file_map)
536
537  def test_GetSparseImage_invalidImageName(self):
538    self.assertRaises(
539        AssertionError, common.GetSparseImage, 'system2', None, None, False)
540    self.assertRaises(
541        AssertionError, common.GetSparseImage, 'unknown', None, None, False)
542
543  def test_GetSparseImage_missingBlockMapFile(self):
544    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
545    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
546      target_files_zip.write(
547          test_utils.construct_sparse_image([
548              (0xCAC1, 6),
549              (0xCAC3, 3),
550              (0xCAC1, 4)]),
551          arcname='IMAGES/system.img')
552      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
553      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
554
555    tempdir = common.UnzipTemp(target_files)
556    with zipfile.ZipFile(target_files, 'r') as input_zip:
557      self.assertRaises(
558          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
559          False)
560
561  def test_GetSparseImage_sharedBlocks_notAllowed(self):
562    """Tests the case of having overlapping blocks but disallowed."""
563    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
564    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
565      target_files_zip.write(
566          test_utils.construct_sparse_image([(0xCAC2, 16)]),
567          arcname='IMAGES/system.img')
568      # Block 10 is shared between two files.
569      target_files_zip.writestr(
570          'IMAGES/system.map',
571          '\n'.join([
572              '/system/file1 1-5 9-10',
573              '/system/file2 10-12']))
574      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
575      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
576
577    tempdir = common.UnzipTemp(target_files)
578    with zipfile.ZipFile(target_files, 'r') as input_zip:
579      self.assertRaises(
580          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
581          False)
582
583  def test_GetSparseImage_sharedBlocks_allowed(self):
584    """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
585    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
586    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
587      # Construct an image with a care_map of "0-5 9-12".
588      target_files_zip.write(
589          test_utils.construct_sparse_image([(0xCAC2, 16)]),
590          arcname='IMAGES/system.img')
591      # Block 10 is shared between two files.
592      target_files_zip.writestr(
593          'IMAGES/system.map',
594          '\n'.join([
595              '/system/file1 1-5 9-10',
596              '/system/file2 10-12']))
597      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
598      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
599
600    tempdir = common.UnzipTemp(target_files)
601    with zipfile.ZipFile(target_files, 'r') as input_zip:
602      sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
603
604    self.assertDictEqual(
605        {
606            '__COPY': RangeSet("0"),
607            '__NONZERO-0': RangeSet("6-8 13-15"),
608            '/system/file1': RangeSet("1-5 9-10"),
609            '/system/file2': RangeSet("11-12"),
610        },
611        sparse_image.file_map)
612
613    # '/system/file2' should be marked with 'uses_shared_blocks', but not with
614    # 'incomplete'.
615    self.assertTrue(
616        sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
617    self.assertNotIn(
618        'incomplete', sparse_image.file_map['/system/file2'].extra)
619
620    # All other entries should look normal without any tags.
621    self.assertFalse(sparse_image.file_map['__COPY'].extra)
622    self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
623    self.assertFalse(sparse_image.file_map['/system/file1'].extra)
624
625  def test_GetSparseImage_incompleteRanges(self):
626    """Tests the case of ext4 images with holes."""
627    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
628    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
629      target_files_zip.write(
630          test_utils.construct_sparse_image([(0xCAC2, 16)]),
631          arcname='IMAGES/system.img')
632      target_files_zip.writestr(
633          'IMAGES/system.map',
634          '\n'.join([
635              '/system/file1 1-5 9-10',
636              '/system/file2 11-12']))
637      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
638      # '/system/file2' has less blocks listed (2) than actual (3).
639      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
640
641    tempdir = common.UnzipTemp(target_files)
642    with zipfile.ZipFile(target_files, 'r') as input_zip:
643      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
644
645    self.assertFalse(sparse_image.file_map['/system/file1'].extra)
646    self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
647
648
649class InstallRecoveryScriptFormatTest(unittest.TestCase):
650  """Checks the format of install-recovery.sh.
651
652  Its format should match between common.py and validate_target_files.py.
653  """
654
655  def setUp(self):
656    self._tempdir = common.MakeTempDir()
657    # Create a dummy dict that contains the fstab info for boot&recovery.
658    self._info = {"fstab" : {}}
659    dummy_fstab = [
660        "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
661        "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
662    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
663    # Construct the gzipped recovery.img and boot.img
664    self.recovery_data = bytearray([
665        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
666        0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
667        0x08, 0x00, 0x00, 0x00
668    ])
669    # echo -n "boot" | gzip -f | hd
670    self.boot_data = bytearray([
671        0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
672        0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
673    ])
674
675  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
676    loc = os.path.join(self._tempdir, prefix, name)
677    if not os.path.exists(os.path.dirname(loc)):
678      os.makedirs(os.path.dirname(loc))
679    with open(loc, "w+") as f:
680      f.write(data)
681
682  def test_full_recovery(self):
683    recovery_image = common.File("recovery.img", self.recovery_data)
684    boot_image = common.File("boot.img", self.boot_data)
685    self._info["full_recovery_image"] = "true"
686
687    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
688                             recovery_image, boot_image, self._info)
689    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
690                                                        self._info)
691
692  def test_recovery_from_boot(self):
693    recovery_image = common.File("recovery.img", self.recovery_data)
694    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
695    boot_image = common.File("boot.img", self.boot_data)
696    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
697
698    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
699                             recovery_image, boot_image, self._info)
700    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
701                                                        self._info)
702    # Validate 'recovery-from-boot' with bonus argument.
703    self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
704    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
705                             recovery_image, boot_image, self._info)
706    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
707                                                        self._info)
708
709  def tearDown(self):
710    common.Cleanup()
711