test_common.py revision f3282b4a7fda46dfb546f2822e0f2081b4ced7ff
1# 2# Copyright (C) 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16import os 17import tempfile 18import time 19import unittest 20import zipfile 21 22import common 23 24 25def random_string_with_holes(size, block_size, step_size): 26 data = ["\0"] * size 27 for begin in range(0, size, step_size): 28 end = begin + block_size 29 data[begin:end] = os.urandom(block_size) 30 return "".join(data) 31 32def get_2gb_string(): 33 kilobytes = 1024 34 megabytes = 1024 * kilobytes 35 gigabytes = 1024 * megabytes 36 37 size = int(2 * gigabytes + 1) 38 block_size = 4 * kilobytes 39 step_size = 4 * megabytes 40 two_gb_string = random_string_with_holes( 41 size, block_size, step_size) 42 return two_gb_string 43 44 45class CommonZipTest(unittest.TestCase): 46 def _verify(self, zip_file, zip_file_name, arcname, contents, 47 test_file_name=None, expected_stat=None, expected_mode=0o644, 48 expected_compress_type=zipfile.ZIP_STORED): 49 # Verify the stat if present. 50 if test_file_name is not None: 51 new_stat = os.stat(test_file_name) 52 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode)) 53 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime)) 54 55 # Reopen the zip file to verify. 56 zip_file = zipfile.ZipFile(zip_file_name, "r") 57 58 # Verify the timestamp. 59 info = zip_file.getinfo(arcname) 60 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) 61 62 # Verify the file mode. 63 mode = (info.external_attr >> 16) & 0o777 64 self.assertEqual(mode, expected_mode) 65 66 # Verify the compress type. 67 self.assertEqual(info.compress_type, expected_compress_type) 68 69 # Verify the zip contents. 70 self.assertEqual(zip_file.read(arcname), contents) 71 self.assertIsNone(zip_file.testzip()) 72 73 def _test_ZipWrite(self, contents, extra_zipwrite_args=None): 74 extra_zipwrite_args = dict(extra_zipwrite_args or {}) 75 76 test_file = tempfile.NamedTemporaryFile(delete=False) 77 test_file_name = test_file.name 78 79 zip_file = tempfile.NamedTemporaryFile(delete=False) 80 zip_file_name = zip_file.name 81 82 # File names within an archive strip the leading slash. 83 arcname = extra_zipwrite_args.get("arcname", test_file_name) 84 if arcname[0] == "/": 85 arcname = arcname[1:] 86 87 zip_file.close() 88 zip_file = zipfile.ZipFile(zip_file_name, "w") 89 90 try: 91 test_file.write(contents) 92 test_file.close() 93 94 expected_stat = os.stat(test_file_name) 95 expected_mode = extra_zipwrite_args.get("perms", 0o644) 96 expected_compress_type = extra_zipwrite_args.get("compress_type", 97 zipfile.ZIP_STORED) 98 time.sleep(5) # Make sure the atime/mtime will change measurably. 99 100 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) 101 common.ZipClose(zip_file) 102 103 self._verify(zip_file, zip_file_name, arcname, contents, test_file_name, 104 expected_stat, expected_mode, expected_compress_type) 105 finally: 106 os.remove(test_file_name) 107 os.remove(zip_file_name) 108 109 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None): 110 extra_args = dict(extra_args or {}) 111 112 zip_file = tempfile.NamedTemporaryFile(delete=False) 113 zip_file_name = zip_file.name 114 zip_file.close() 115 116 zip_file = zipfile.ZipFile(zip_file_name, "w") 117 118 try: 119 expected_compress_type = extra_args.get("compress_type", 120 zipfile.ZIP_STORED) 121 time.sleep(5) # Make sure the atime/mtime will change measurably. 122 123 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo): 124 zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname) 125 else: 126 zinfo = zinfo_or_arcname 127 arcname = zinfo.filename 128 129 common.ZipWriteStr(zip_file, zinfo, contents, **extra_args) 130 common.ZipClose(zip_file) 131 132 self._verify(zip_file, zip_file_name, arcname, contents, 133 expected_compress_type=expected_compress_type) 134 finally: 135 os.remove(zip_file_name) 136 137 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None): 138 extra_args = dict(extra_args or {}) 139 140 zip_file = tempfile.NamedTemporaryFile(delete=False) 141 zip_file_name = zip_file.name 142 143 test_file = tempfile.NamedTemporaryFile(delete=False) 144 test_file_name = test_file.name 145 146 arcname_large = test_file_name 147 arcname_small = "bar" 148 149 # File names within an archive strip the leading slash. 150 if arcname_large[0] == "/": 151 arcname_large = arcname_large[1:] 152 153 zip_file.close() 154 zip_file = zipfile.ZipFile(zip_file_name, "w") 155 156 try: 157 test_file.write(large) 158 test_file.close() 159 160 expected_stat = os.stat(test_file_name) 161 expected_mode = 0o644 162 expected_compress_type = extra_args.get("compress_type", 163 zipfile.ZIP_STORED) 164 time.sleep(5) # Make sure the atime/mtime will change measurably. 165 166 common.ZipWrite(zip_file, test_file_name, **extra_args) 167 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args) 168 common.ZipClose(zip_file) 169 170 # Verify the contents written by ZipWrite(). 171 self._verify(zip_file, zip_file_name, arcname_large, large, 172 test_file_name, expected_stat, expected_mode, 173 expected_compress_type) 174 175 # Verify the contents written by ZipWriteStr(). 176 self._verify(zip_file, zip_file_name, arcname_small, small, 177 expected_compress_type=expected_compress_type) 178 finally: 179 os.remove(zip_file_name) 180 os.remove(test_file_name) 181 182 def _test_reset_ZIP64_LIMIT(self, func, *args): 183 default_limit = (1 << 31) - 1 184 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 185 func(*args) 186 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 187 188 def test_ZipWrite(self): 189 file_contents = os.urandom(1024) 190 self._test_ZipWrite(file_contents) 191 192 def test_ZipWrite_with_opts(self): 193 file_contents = os.urandom(1024) 194 self._test_ZipWrite(file_contents, { 195 "arcname": "foobar", 196 "perms": 0o777, 197 "compress_type": zipfile.ZIP_DEFLATED, 198 }) 199 self._test_ZipWrite(file_contents, { 200 "arcname": "foobar", 201 "perms": 0o700, 202 "compress_type": zipfile.ZIP_STORED, 203 }) 204 205 def test_ZipWrite_large_file(self): 206 file_contents = get_2gb_string() 207 self._test_ZipWrite(file_contents, { 208 "compress_type": zipfile.ZIP_DEFLATED, 209 }) 210 211 def test_ZipWrite_resets_ZIP64_LIMIT(self): 212 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "") 213 214 def test_ZipWriteStr(self): 215 random_string = os.urandom(1024) 216 # Passing arcname 217 self._test_ZipWriteStr("foo", random_string) 218 219 # Passing zinfo 220 zinfo = zipfile.ZipInfo(filename="foo") 221 self._test_ZipWriteStr(zinfo, random_string) 222 223 # Timestamp in the zinfo should be overwritten. 224 zinfo.date_time = (2015, 3, 1, 15, 30, 0) 225 self._test_ZipWriteStr(zinfo, random_string) 226 227 def test_ZipWriteStr_with_opts(self): 228 random_string = os.urandom(1024) 229 # Passing arcname 230 self._test_ZipWriteStr("foo", random_string, { 231 "compress_type": zipfile.ZIP_DEFLATED, 232 }) 233 self._test_ZipWriteStr("foo", random_string, { 234 "compress_type": zipfile.ZIP_STORED, 235 }) 236 237 # Passing zinfo 238 zinfo = zipfile.ZipInfo(filename="foo") 239 self._test_ZipWriteStr(zinfo, random_string, { 240 "compress_type": zipfile.ZIP_DEFLATED, 241 }) 242 self._test_ZipWriteStr(zinfo, random_string, { 243 "compress_type": zipfile.ZIP_STORED, 244 }) 245 246 def test_ZipWriteStr_large_file(self): 247 # zipfile.writestr() doesn't work when the str size is over 2GiB even with 248 # the workaround. We will only test the case of writing a string into a 249 # large archive. 250 long_string = get_2gb_string() 251 short_string = os.urandom(1024) 252 self._test_ZipWriteStr_large_file(long_string, short_string, { 253 "compress_type": zipfile.ZIP_DEFLATED, 254 }) 255 256 def test_ZipWriteStr_resets_ZIP64_LIMIT(self): 257 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "") 258 zinfo = zipfile.ZipInfo(filename="foo") 259 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "") 260