test_common.py revision f3282b4a7fda46dfb546f2822e0f2081b4ced7ff
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17import tempfile
18import time
19import unittest
20import zipfile
21
22import common
23
24
25def random_string_with_holes(size, block_size, step_size):
26  data = ["\0"] * size
27  for begin in range(0, size, step_size):
28    end = begin + block_size
29    data[begin:end] = os.urandom(block_size)
30  return "".join(data)
31
32def get_2gb_string():
33  kilobytes = 1024
34  megabytes = 1024 * kilobytes
35  gigabytes = 1024 * megabytes
36
37  size = int(2 * gigabytes + 1)
38  block_size = 4 * kilobytes
39  step_size = 4 * megabytes
40  two_gb_string = random_string_with_holes(
41        size, block_size, step_size)
42  return two_gb_string
43
44
45class CommonZipTest(unittest.TestCase):
46  def _verify(self, zip_file, zip_file_name, arcname, contents,
47              test_file_name=None, expected_stat=None, expected_mode=0o644,
48              expected_compress_type=zipfile.ZIP_STORED):
49    # Verify the stat if present.
50    if test_file_name is not None:
51      new_stat = os.stat(test_file_name)
52      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
53      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
54
55    # Reopen the zip file to verify.
56    zip_file = zipfile.ZipFile(zip_file_name, "r")
57
58    # Verify the timestamp.
59    info = zip_file.getinfo(arcname)
60    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
61
62    # Verify the file mode.
63    mode = (info.external_attr >> 16) & 0o777
64    self.assertEqual(mode, expected_mode)
65
66    # Verify the compress type.
67    self.assertEqual(info.compress_type, expected_compress_type)
68
69    # Verify the zip contents.
70    self.assertEqual(zip_file.read(arcname), contents)
71    self.assertIsNone(zip_file.testzip())
72
73  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
74    extra_zipwrite_args = dict(extra_zipwrite_args or {})
75
76    test_file = tempfile.NamedTemporaryFile(delete=False)
77    test_file_name = test_file.name
78
79    zip_file = tempfile.NamedTemporaryFile(delete=False)
80    zip_file_name = zip_file.name
81
82    # File names within an archive strip the leading slash.
83    arcname = extra_zipwrite_args.get("arcname", test_file_name)
84    if arcname[0] == "/":
85      arcname = arcname[1:]
86
87    zip_file.close()
88    zip_file = zipfile.ZipFile(zip_file_name, "w")
89
90    try:
91      test_file.write(contents)
92      test_file.close()
93
94      expected_stat = os.stat(test_file_name)
95      expected_mode = extra_zipwrite_args.get("perms", 0o644)
96      expected_compress_type = extra_zipwrite_args.get("compress_type",
97                                                       zipfile.ZIP_STORED)
98      time.sleep(5)  # Make sure the atime/mtime will change measurably.
99
100      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
101      common.ZipClose(zip_file)
102
103      self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
104                   expected_stat, expected_mode, expected_compress_type)
105    finally:
106      os.remove(test_file_name)
107      os.remove(zip_file_name)
108
109  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
110    extra_args = dict(extra_args or {})
111
112    zip_file = tempfile.NamedTemporaryFile(delete=False)
113    zip_file_name = zip_file.name
114    zip_file.close()
115
116    zip_file = zipfile.ZipFile(zip_file_name, "w")
117
118    try:
119      expected_compress_type = extra_args.get("compress_type",
120                                              zipfile.ZIP_STORED)
121      time.sleep(5)  # Make sure the atime/mtime will change measurably.
122
123      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
124        zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
125      else:
126        zinfo = zinfo_or_arcname
127      arcname = zinfo.filename
128
129      common.ZipWriteStr(zip_file, zinfo, contents, **extra_args)
130      common.ZipClose(zip_file)
131
132      self._verify(zip_file, zip_file_name, arcname, contents,
133                   expected_compress_type=expected_compress_type)
134    finally:
135      os.remove(zip_file_name)
136
137  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
138    extra_args = dict(extra_args or {})
139
140    zip_file = tempfile.NamedTemporaryFile(delete=False)
141    zip_file_name = zip_file.name
142
143    test_file = tempfile.NamedTemporaryFile(delete=False)
144    test_file_name = test_file.name
145
146    arcname_large = test_file_name
147    arcname_small = "bar"
148
149    # File names within an archive strip the leading slash.
150    if arcname_large[0] == "/":
151      arcname_large = arcname_large[1:]
152
153    zip_file.close()
154    zip_file = zipfile.ZipFile(zip_file_name, "w")
155
156    try:
157      test_file.write(large)
158      test_file.close()
159
160      expected_stat = os.stat(test_file_name)
161      expected_mode = 0o644
162      expected_compress_type = extra_args.get("compress_type",
163                                              zipfile.ZIP_STORED)
164      time.sleep(5)  # Make sure the atime/mtime will change measurably.
165
166      common.ZipWrite(zip_file, test_file_name, **extra_args)
167      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
168      common.ZipClose(zip_file)
169
170      # Verify the contents written by ZipWrite().
171      self._verify(zip_file, zip_file_name, arcname_large, large,
172                   test_file_name, expected_stat, expected_mode,
173                   expected_compress_type)
174
175      # Verify the contents written by ZipWriteStr().
176      self._verify(zip_file, zip_file_name, arcname_small, small,
177                   expected_compress_type=expected_compress_type)
178    finally:
179      os.remove(zip_file_name)
180      os.remove(test_file_name)
181
182  def _test_reset_ZIP64_LIMIT(self, func, *args):
183    default_limit = (1 << 31) - 1
184    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
185    func(*args)
186    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
187
188  def test_ZipWrite(self):
189    file_contents = os.urandom(1024)
190    self._test_ZipWrite(file_contents)
191
192  def test_ZipWrite_with_opts(self):
193    file_contents = os.urandom(1024)
194    self._test_ZipWrite(file_contents, {
195        "arcname": "foobar",
196        "perms": 0o777,
197        "compress_type": zipfile.ZIP_DEFLATED,
198    })
199    self._test_ZipWrite(file_contents, {
200        "arcname": "foobar",
201        "perms": 0o700,
202        "compress_type": zipfile.ZIP_STORED,
203    })
204
205  def test_ZipWrite_large_file(self):
206    file_contents = get_2gb_string()
207    self._test_ZipWrite(file_contents, {
208        "compress_type": zipfile.ZIP_DEFLATED,
209    })
210
211  def test_ZipWrite_resets_ZIP64_LIMIT(self):
212    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
213
214  def test_ZipWriteStr(self):
215    random_string = os.urandom(1024)
216    # Passing arcname
217    self._test_ZipWriteStr("foo", random_string)
218
219    # Passing zinfo
220    zinfo = zipfile.ZipInfo(filename="foo")
221    self._test_ZipWriteStr(zinfo, random_string)
222
223    # Timestamp in the zinfo should be overwritten.
224    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
225    self._test_ZipWriteStr(zinfo, random_string)
226
227  def test_ZipWriteStr_with_opts(self):
228    random_string = os.urandom(1024)
229    # Passing arcname
230    self._test_ZipWriteStr("foo", random_string, {
231        "compress_type": zipfile.ZIP_DEFLATED,
232    })
233    self._test_ZipWriteStr("foo", random_string, {
234        "compress_type": zipfile.ZIP_STORED,
235    })
236
237    # Passing zinfo
238    zinfo = zipfile.ZipInfo(filename="foo")
239    self._test_ZipWriteStr(zinfo, random_string, {
240        "compress_type": zipfile.ZIP_DEFLATED,
241    })
242    self._test_ZipWriteStr(zinfo, random_string, {
243        "compress_type": zipfile.ZIP_STORED,
244    })
245
246  def test_ZipWriteStr_large_file(self):
247    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
248    # the workaround. We will only test the case of writing a string into a
249    # large archive.
250    long_string = get_2gb_string()
251    short_string = os.urandom(1024)
252    self._test_ZipWriteStr_large_file(long_string, short_string, {
253        "compress_type": zipfile.ZIP_DEFLATED,
254    })
255
256  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
257    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
258    zinfo = zipfile.ZipInfo(filename="foo")
259    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
260