1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17import tempfile
18import time
19import unittest
20import zipfile
21
22import common
23
24
25def random_string_with_holes(size, block_size, step_size):
26  data = ["\0"] * size
27  for begin in range(0, size, step_size):
28    end = begin + block_size
29    data[begin:end] = os.urandom(block_size)
30  return "".join(data)
31
32def get_2gb_string():
33  kilobytes = 1024
34  megabytes = 1024 * kilobytes
35  gigabytes = 1024 * megabytes
36
37  size = int(2 * gigabytes + 1)
38  block_size = 4 * kilobytes
39  step_size = 4 * megabytes
40  two_gb_string = random_string_with_holes(
41        size, block_size, step_size)
42  return two_gb_string
43
44
45class CommonZipTest(unittest.TestCase):
46  def _verify(self, zip_file, zip_file_name, arcname, contents,
47              test_file_name=None, expected_stat=None, expected_mode=0o644,
48              expected_compress_type=zipfile.ZIP_STORED):
49    # Verify the stat if present.
50    if test_file_name is not None:
51      new_stat = os.stat(test_file_name)
52      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
53      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
54
55    # Reopen the zip file to verify.
56    zip_file = zipfile.ZipFile(zip_file_name, "r")
57
58    # Verify the timestamp.
59    info = zip_file.getinfo(arcname)
60    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
61
62    # Verify the file mode.
63    mode = (info.external_attr >> 16) & 0o777
64    self.assertEqual(mode, expected_mode)
65
66    # Verify the compress type.
67    self.assertEqual(info.compress_type, expected_compress_type)
68
69    # Verify the zip contents.
70    self.assertEqual(zip_file.read(arcname), contents)
71    self.assertIsNone(zip_file.testzip())
72
73  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
74    extra_zipwrite_args = dict(extra_zipwrite_args or {})
75
76    test_file = tempfile.NamedTemporaryFile(delete=False)
77    test_file_name = test_file.name
78
79    zip_file = tempfile.NamedTemporaryFile(delete=False)
80    zip_file_name = zip_file.name
81
82    # File names within an archive strip the leading slash.
83    arcname = extra_zipwrite_args.get("arcname", test_file_name)
84    if arcname[0] == "/":
85      arcname = arcname[1:]
86
87    zip_file.close()
88    zip_file = zipfile.ZipFile(zip_file_name, "w")
89
90    try:
91      test_file.write(contents)
92      test_file.close()
93
94      expected_stat = os.stat(test_file_name)
95      expected_mode = extra_zipwrite_args.get("perms", 0o644)
96      expected_compress_type = extra_zipwrite_args.get("compress_type",
97                                                       zipfile.ZIP_STORED)
98      time.sleep(5)  # Make sure the atime/mtime will change measurably.
99
100      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
101      common.ZipClose(zip_file)
102
103      self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
104                   expected_stat, expected_mode, expected_compress_type)
105    finally:
106      os.remove(test_file_name)
107      os.remove(zip_file_name)
108
109  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
110    extra_args = dict(extra_args or {})
111
112    zip_file = tempfile.NamedTemporaryFile(delete=False)
113    zip_file_name = zip_file.name
114    zip_file.close()
115
116    zip_file = zipfile.ZipFile(zip_file_name, "w")
117
118    try:
119      expected_compress_type = extra_args.get("compress_type",
120                                              zipfile.ZIP_STORED)
121      time.sleep(5)  # Make sure the atime/mtime will change measurably.
122
123      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
124        arcname = zinfo_or_arcname
125        expected_mode = extra_args.get("perms", 0o644)
126      else:
127        arcname = zinfo_or_arcname.filename
128        expected_mode = extra_args.get("perms",
129                                       zinfo_or_arcname.external_attr >> 16)
130
131      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
132      common.ZipClose(zip_file)
133
134      self._verify(zip_file, zip_file_name, arcname, contents,
135                   expected_mode=expected_mode,
136                   expected_compress_type=expected_compress_type)
137    finally:
138      os.remove(zip_file_name)
139
140  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
141    extra_args = dict(extra_args or {})
142
143    zip_file = tempfile.NamedTemporaryFile(delete=False)
144    zip_file_name = zip_file.name
145
146    test_file = tempfile.NamedTemporaryFile(delete=False)
147    test_file_name = test_file.name
148
149    arcname_large = test_file_name
150    arcname_small = "bar"
151
152    # File names within an archive strip the leading slash.
153    if arcname_large[0] == "/":
154      arcname_large = arcname_large[1:]
155
156    zip_file.close()
157    zip_file = zipfile.ZipFile(zip_file_name, "w")
158
159    try:
160      test_file.write(large)
161      test_file.close()
162
163      expected_stat = os.stat(test_file_name)
164      expected_mode = 0o644
165      expected_compress_type = extra_args.get("compress_type",
166                                              zipfile.ZIP_STORED)
167      time.sleep(5)  # Make sure the atime/mtime will change measurably.
168
169      common.ZipWrite(zip_file, test_file_name, **extra_args)
170      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
171      common.ZipClose(zip_file)
172
173      # Verify the contents written by ZipWrite().
174      self._verify(zip_file, zip_file_name, arcname_large, large,
175                   test_file_name, expected_stat, expected_mode,
176                   expected_compress_type)
177
178      # Verify the contents written by ZipWriteStr().
179      self._verify(zip_file, zip_file_name, arcname_small, small,
180                   expected_compress_type=expected_compress_type)
181    finally:
182      os.remove(zip_file_name)
183      os.remove(test_file_name)
184
185  def _test_reset_ZIP64_LIMIT(self, func, *args):
186    default_limit = (1 << 31) - 1
187    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
188    func(*args)
189    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
190
191  def test_ZipWrite(self):
192    file_contents = os.urandom(1024)
193    self._test_ZipWrite(file_contents)
194
195  def test_ZipWrite_with_opts(self):
196    file_contents = os.urandom(1024)
197    self._test_ZipWrite(file_contents, {
198        "arcname": "foobar",
199        "perms": 0o777,
200        "compress_type": zipfile.ZIP_DEFLATED,
201    })
202    self._test_ZipWrite(file_contents, {
203        "arcname": "foobar",
204        "perms": 0o700,
205        "compress_type": zipfile.ZIP_STORED,
206    })
207
208  def test_ZipWrite_large_file(self):
209    file_contents = get_2gb_string()
210    self._test_ZipWrite(file_contents, {
211        "compress_type": zipfile.ZIP_DEFLATED,
212    })
213
214  def test_ZipWrite_resets_ZIP64_LIMIT(self):
215    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
216
217  def test_ZipWriteStr(self):
218    random_string = os.urandom(1024)
219    # Passing arcname
220    self._test_ZipWriteStr("foo", random_string)
221
222    # Passing zinfo
223    zinfo = zipfile.ZipInfo(filename="foo")
224    self._test_ZipWriteStr(zinfo, random_string)
225
226    # Timestamp in the zinfo should be overwritten.
227    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
228    self._test_ZipWriteStr(zinfo, random_string)
229
230  def test_ZipWriteStr_with_opts(self):
231    random_string = os.urandom(1024)
232    # Passing arcname
233    self._test_ZipWriteStr("foo", random_string, {
234        "perms": 0o700,
235        "compress_type": zipfile.ZIP_DEFLATED,
236    })
237    self._test_ZipWriteStr("bar", random_string, {
238        "compress_type": zipfile.ZIP_STORED,
239    })
240
241    # Passing zinfo
242    zinfo = zipfile.ZipInfo(filename="foo")
243    self._test_ZipWriteStr(zinfo, random_string, {
244        "compress_type": zipfile.ZIP_DEFLATED,
245    })
246    self._test_ZipWriteStr(zinfo, random_string, {
247        "perms": 0o600,
248        "compress_type": zipfile.ZIP_STORED,
249    })
250
251  def test_ZipWriteStr_large_file(self):
252    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
253    # the workaround. We will only test the case of writing a string into a
254    # large archive.
255    long_string = get_2gb_string()
256    short_string = os.urandom(1024)
257    self._test_ZipWriteStr_large_file(long_string, short_string, {
258        "compress_type": zipfile.ZIP_DEFLATED,
259    })
260
261  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
262    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
263    zinfo = zipfile.ZipInfo(filename="foo")
264    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
265
266  def test_bug21309935(self):
267    zip_file = tempfile.NamedTemporaryFile(delete=False)
268    zip_file_name = zip_file.name
269    zip_file.close()
270
271    try:
272      random_string = os.urandom(1024)
273      zip_file = zipfile.ZipFile(zip_file_name, "w")
274      # Default perms should be 0o644 when passing the filename.
275      common.ZipWriteStr(zip_file, "foo", random_string)
276      # Honor the specified perms.
277      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
278      # The perms in zinfo should be untouched.
279      zinfo = zipfile.ZipInfo(filename="baz")
280      zinfo.external_attr = 0o740 << 16
281      common.ZipWriteStr(zip_file, zinfo, random_string)
282      # Explicitly specified perms has the priority.
283      zinfo = zipfile.ZipInfo(filename="qux")
284      zinfo.external_attr = 0o700 << 16
285      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
286      common.ZipClose(zip_file)
287
288      self._verify(zip_file, zip_file_name, "foo", random_string,
289                   expected_mode=0o644)
290      self._verify(zip_file, zip_file_name, "bar", random_string,
291                   expected_mode=0o755)
292      self._verify(zip_file, zip_file_name, "baz", random_string,
293                   expected_mode=0o740)
294      self._verify(zip_file, zip_file_name, "qux", random_string,
295                   expected_mode=0o400)
296    finally:
297      os.remove(zip_file_name)
298