test_zipfile.py revision 49259359eebaead3973816731fcb98bff0673900
1# We can test part of the module without zlib. 2try: 3 import zlib 4except ImportError: 5 zlib = None 6 7import os 8import io 9import sys 10import time 11import shutil 12import struct 13import zipfile 14import unittest 15 16from StringIO import StringIO 17from tempfile import TemporaryFile 18from random import randint, random 19from unittest import skipUnless 20 21from test.test_support import TESTFN, TESTFN_UNICODE, TESTFN_ENCODING, \ 22 run_unittest, findfile, unlink, check_warnings 23try: 24 TESTFN_UNICODE.encode(TESTFN_ENCODING) 25except (UnicodeError, TypeError): 26 # Either the file system encoding is None, or the file name 27 # cannot be encoded in the file system encoding. 28 TESTFN_UNICODE = None 29 30TESTFN2 = TESTFN + "2" 31TESTFNDIR = TESTFN + "d" 32FIXEDTEST_SIZE = 1000 33 34SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 35 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 36 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 37 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 38 39 40class TestsWithSourceFile(unittest.TestCase): 41 def setUp(self): 42 self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random()) 43 for i in xrange(FIXEDTEST_SIZE)] 44 self.data = '\n'.join(self.line_gen) + '\n' 45 46 # Make a source file with some lines 47 with open(TESTFN, "wb") as fp: 48 fp.write(self.data) 49 50 def make_test_archive(self, f, compression): 51 # Create the ZIP archive 52 with zipfile.ZipFile(f, "w", compression) as zipfp: 53 zipfp.write(TESTFN, "another.name") 54 zipfp.write(TESTFN, TESTFN) 55 zipfp.writestr("strfile", self.data) 56 57 def zip_test(self, f, compression): 58 self.make_test_archive(f, compression) 59 60 # Read the ZIP archive 61 with zipfile.ZipFile(f, "r", compression) as zipfp: 62 self.assertEqual(zipfp.read(TESTFN), self.data) 63 self.assertEqual(zipfp.read("another.name"), self.data) 64 self.assertEqual(zipfp.read("strfile"), self.data) 65 66 # Print the ZIP directory 67 fp = StringIO() 68 stdout = sys.stdout 69 try: 70 sys.stdout = fp 71 zipfp.printdir() 72 finally: 73 sys.stdout = stdout 74 75 directory = fp.getvalue() 76 lines = directory.splitlines() 77 self.assertEqual(len(lines), 4) # Number of files + header 78 79 self.assertIn('File Name', lines[0]) 80 self.assertIn('Modified', lines[0]) 81 self.assertIn('Size', lines[0]) 82 83 fn, date, time_, size = lines[1].split() 84 self.assertEqual(fn, 'another.name') 85 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 86 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 87 self.assertEqual(size, str(len(self.data))) 88 89 # Check the namelist 90 names = zipfp.namelist() 91 self.assertEqual(len(names), 3) 92 self.assertIn(TESTFN, names) 93 self.assertIn("another.name", names) 94 self.assertIn("strfile", names) 95 96 # Check infolist 97 infos = zipfp.infolist() 98 names = [i.filename for i in infos] 99 self.assertEqual(len(names), 3) 100 self.assertIn(TESTFN, names) 101 self.assertIn("another.name", names) 102 self.assertIn("strfile", names) 103 for i in infos: 104 self.assertEqual(i.file_size, len(self.data)) 105 106 # check getinfo 107 for nm in (TESTFN, "another.name", "strfile"): 108 info = zipfp.getinfo(nm) 109 self.assertEqual(info.filename, nm) 110 self.assertEqual(info.file_size, len(self.data)) 111 112 # Check that testzip doesn't raise an exception 113 zipfp.testzip() 114 115 def test_stored(self): 116 for f in (TESTFN2, TemporaryFile(), StringIO()): 117 self.zip_test(f, zipfile.ZIP_STORED) 118 119 def zip_open_test(self, f, compression): 120 self.make_test_archive(f, compression) 121 122 # Read the ZIP archive 123 with zipfile.ZipFile(f, "r", compression) as zipfp: 124 zipdata1 = [] 125 with zipfp.open(TESTFN) as zipopen1: 126 while True: 127 read_data = zipopen1.read(256) 128 if not read_data: 129 break 130 zipdata1.append(read_data) 131 132 zipdata2 = [] 133 with zipfp.open("another.name") as zipopen2: 134 while True: 135 read_data = zipopen2.read(256) 136 if not read_data: 137 break 138 zipdata2.append(read_data) 139 140 self.assertEqual(''.join(zipdata1), self.data) 141 self.assertEqual(''.join(zipdata2), self.data) 142 143 def test_open_stored(self): 144 for f in (TESTFN2, TemporaryFile(), StringIO()): 145 self.zip_open_test(f, zipfile.ZIP_STORED) 146 147 def test_open_via_zip_info(self): 148 # Create the ZIP archive 149 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 150 zipfp.writestr("name", "foo") 151 with check_warnings(('', UserWarning)): 152 zipfp.writestr("name", "bar") 153 self.assertEqual(zipfp.namelist(), ["name"] * 2) 154 155 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 156 infos = zipfp.infolist() 157 data = "" 158 for info in infos: 159 with zipfp.open(info) as f: 160 data += f.read() 161 self.assertTrue(data == "foobar" or data == "barfoo") 162 data = "" 163 for info in infos: 164 data += zipfp.read(info) 165 self.assertTrue(data == "foobar" or data == "barfoo") 166 167 def zip_random_open_test(self, f, compression): 168 self.make_test_archive(f, compression) 169 170 # Read the ZIP archive 171 with zipfile.ZipFile(f, "r", compression) as zipfp: 172 zipdata1 = [] 173 with zipfp.open(TESTFN) as zipopen1: 174 while True: 175 read_data = zipopen1.read(randint(1, 1024)) 176 if not read_data: 177 break 178 zipdata1.append(read_data) 179 180 self.assertEqual(''.join(zipdata1), self.data) 181 182 def test_random_open_stored(self): 183 for f in (TESTFN2, TemporaryFile(), StringIO()): 184 self.zip_random_open_test(f, zipfile.ZIP_STORED) 185 186 def test_univeral_readaheads(self): 187 f = StringIO() 188 189 data = 'a\r\n' * 16 * 1024 190 with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp: 191 zipfp.writestr(TESTFN, data) 192 193 data2 = '' 194 with zipfile.ZipFile(f, 'r') as zipfp: 195 with zipfp.open(TESTFN, 'rU') as zipopen: 196 for line in zipopen: 197 data2 += line 198 199 self.assertEqual(data, data2.replace('\n', '\r\n')) 200 201 def zip_readline_read_test(self, f, compression): 202 self.make_test_archive(f, compression) 203 204 # Read the ZIP archive 205 with zipfile.ZipFile(f, "r") as zipfp: 206 with zipfp.open(TESTFN) as zipopen: 207 data = '' 208 while True: 209 read = zipopen.readline() 210 if not read: 211 break 212 data += read 213 214 read = zipopen.read(100) 215 if not read: 216 break 217 data += read 218 219 self.assertEqual(data, self.data) 220 221 def zip_readline_test(self, f, compression): 222 self.make_test_archive(f, compression) 223 224 # Read the ZIP archive 225 with zipfile.ZipFile(f, "r") as zipfp: 226 with zipfp.open(TESTFN) as zipopen: 227 for line in self.line_gen: 228 linedata = zipopen.readline() 229 self.assertEqual(linedata, line + '\n') 230 231 def zip_readlines_test(self, f, compression): 232 self.make_test_archive(f, compression) 233 234 # Read the ZIP archive 235 with zipfile.ZipFile(f, "r") as zipfp: 236 with zipfp.open(TESTFN) as zo: 237 ziplines = zo.readlines() 238 for line, zipline in zip(self.line_gen, ziplines): 239 self.assertEqual(zipline, line + '\n') 240 241 def zip_iterlines_test(self, f, compression): 242 self.make_test_archive(f, compression) 243 244 # Read the ZIP archive 245 with zipfile.ZipFile(f, "r") as zipfp: 246 for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)): 247 self.assertEqual(zipline, line + '\n') 248 249 def test_readline_read_stored(self): 250 # Issue #7610: calls to readline() interleaved with calls to read(). 251 for f in (TESTFN2, TemporaryFile(), StringIO()): 252 self.zip_readline_read_test(f, zipfile.ZIP_STORED) 253 254 def test_readline_stored(self): 255 for f in (TESTFN2, TemporaryFile(), StringIO()): 256 self.zip_readline_test(f, zipfile.ZIP_STORED) 257 258 def test_readlines_stored(self): 259 for f in (TESTFN2, TemporaryFile(), StringIO()): 260 self.zip_readlines_test(f, zipfile.ZIP_STORED) 261 262 def test_iterlines_stored(self): 263 for f in (TESTFN2, TemporaryFile(), StringIO()): 264 self.zip_iterlines_test(f, zipfile.ZIP_STORED) 265 266 @skipUnless(zlib, "requires zlib") 267 def test_deflated(self): 268 for f in (TESTFN2, TemporaryFile(), StringIO()): 269 self.zip_test(f, zipfile.ZIP_DEFLATED) 270 271 @skipUnless(zlib, "requires zlib") 272 def test_open_deflated(self): 273 for f in (TESTFN2, TemporaryFile(), StringIO()): 274 self.zip_open_test(f, zipfile.ZIP_DEFLATED) 275 276 @skipUnless(zlib, "requires zlib") 277 def test_random_open_deflated(self): 278 for f in (TESTFN2, TemporaryFile(), StringIO()): 279 self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) 280 281 @skipUnless(zlib, "requires zlib") 282 def test_readline_read_deflated(self): 283 # Issue #7610: calls to readline() interleaved with calls to read(). 284 for f in (TESTFN2, TemporaryFile(), StringIO()): 285 self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED) 286 287 @skipUnless(zlib, "requires zlib") 288 def test_readline_deflated(self): 289 for f in (TESTFN2, TemporaryFile(), StringIO()): 290 self.zip_readline_test(f, zipfile.ZIP_DEFLATED) 291 292 @skipUnless(zlib, "requires zlib") 293 def test_readlines_deflated(self): 294 for f in (TESTFN2, TemporaryFile(), StringIO()): 295 self.zip_readlines_test(f, zipfile.ZIP_DEFLATED) 296 297 @skipUnless(zlib, "requires zlib") 298 def test_iterlines_deflated(self): 299 for f in (TESTFN2, TemporaryFile(), StringIO()): 300 self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED) 301 302 @skipUnless(zlib, "requires zlib") 303 def test_low_compression(self): 304 """Check for cases where compressed data is larger than original.""" 305 # Create the ZIP archive 306 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: 307 zipfp.writestr("strfile", '12') 308 309 # Get an open object for strfile 310 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp: 311 with zipfp.open("strfile") as openobj: 312 self.assertEqual(openobj.read(1), '1') 313 self.assertEqual(openobj.read(1), '2') 314 315 def test_absolute_arcnames(self): 316 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 317 zipfp.write(TESTFN, "/absolute") 318 319 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 320 self.assertEqual(zipfp.namelist(), ["absolute"]) 321 322 def test_append_to_zip_file(self): 323 """Test appending to an existing zipfile.""" 324 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 325 zipfp.write(TESTFN, TESTFN) 326 327 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 328 zipfp.writestr("strfile", self.data) 329 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 330 331 def test_append_to_non_zip_file(self): 332 """Test appending to an existing file that is not a zipfile.""" 333 # NOTE: this test fails if len(d) < 22 because of the first 334 # line "fpin.seek(-22, 2)" in _EndRecData 335 data = 'I am not a ZipFile!'*10 336 with open(TESTFN2, 'wb') as f: 337 f.write(data) 338 339 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 340 zipfp.write(TESTFN, TESTFN) 341 342 with open(TESTFN2, 'rb') as f: 343 f.seek(len(data)) 344 with zipfile.ZipFile(f, "r") as zipfp: 345 self.assertEqual(zipfp.namelist(), [TESTFN]) 346 347 def test_ignores_newline_at_end(self): 348 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 349 zipfp.write(TESTFN, TESTFN) 350 with open(TESTFN2, 'a') as f: 351 f.write("\r\n\00\00\00") 352 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 353 self.assertIsInstance(zipfp, zipfile.ZipFile) 354 355 def test_ignores_stuff_appended_past_comments(self): 356 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 357 zipfp.comment = b"this is a comment" 358 zipfp.write(TESTFN, TESTFN) 359 with open(TESTFN2, 'a') as f: 360 f.write("abcdef\r\n") 361 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 362 self.assertIsInstance(zipfp, zipfile.ZipFile) 363 self.assertEqual(zipfp.comment, b"this is a comment") 364 365 def test_write_default_name(self): 366 """Check that calling ZipFile.write without arcname specified 367 produces the expected result.""" 368 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 369 zipfp.write(TESTFN) 370 self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read()) 371 372 @skipUnless(zlib, "requires zlib") 373 def test_per_file_compression(self): 374 """Check that files within a Zip archive can have different 375 compression options.""" 376 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 377 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 378 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 379 sinfo = zipfp.getinfo('storeme') 380 dinfo = zipfp.getinfo('deflateme') 381 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 382 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 383 384 def test_write_to_readonly(self): 385 """Check that trying to call write() on a readonly ZipFile object 386 raises a RuntimeError.""" 387 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 388 zipfp.writestr("somefile.txt", "bogus") 389 390 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 391 self.assertRaises(RuntimeError, zipfp.write, TESTFN) 392 393 def test_extract(self): 394 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 395 for fpath, fdata in SMALL_TEST_DATA: 396 zipfp.writestr(fpath, fdata) 397 398 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 399 for fpath, fdata in SMALL_TEST_DATA: 400 writtenfile = zipfp.extract(fpath) 401 402 # make sure it was written to the right place 403 correctfile = os.path.join(os.getcwd(), fpath) 404 correctfile = os.path.normpath(correctfile) 405 406 self.assertEqual(writtenfile, correctfile) 407 408 # make sure correct data is in correct file 409 self.assertEqual(fdata, open(writtenfile, "rb").read()) 410 os.remove(writtenfile) 411 412 # remove the test file subdirectories 413 shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) 414 415 def test_extract_all(self): 416 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 417 for fpath, fdata in SMALL_TEST_DATA: 418 zipfp.writestr(fpath, fdata) 419 420 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 421 zipfp.extractall() 422 for fpath, fdata in SMALL_TEST_DATA: 423 outfile = os.path.join(os.getcwd(), fpath) 424 425 self.assertEqual(fdata, open(outfile, "rb").read()) 426 os.remove(outfile) 427 428 # remove the test file subdirectories 429 shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) 430 431 def check_file(self, filename, content): 432 self.assertTrue(os.path.isfile(filename)) 433 with open(filename, 'rb') as f: 434 self.assertEqual(f.read(), content) 435 436 @skipUnless(TESTFN_UNICODE, "No Unicode filesystem semantics on this platform.") 437 def test_extract_unicode_filenames(self): 438 fnames = [u'foo.txt', os.path.basename(TESTFN_UNICODE)] 439 content = 'Test for unicode filename' 440 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 441 for fname in fnames: 442 zipfp.writestr(fname, content) 443 444 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 445 for fname in fnames: 446 writtenfile = zipfp.extract(fname) 447 448 # make sure it was written to the right place 449 correctfile = os.path.join(os.getcwd(), fname) 450 correctfile = os.path.normpath(correctfile) 451 self.assertEqual(writtenfile, correctfile) 452 453 self.check_file(writtenfile, content) 454 os.remove(writtenfile) 455 456 def test_extract_hackers_arcnames(self): 457 hacknames = [ 458 ('../foo/bar', 'foo/bar'), 459 ('foo/../bar', 'foo/bar'), 460 ('foo/../../bar', 'foo/bar'), 461 ('foo/bar/..', 'foo/bar'), 462 ('./../foo/bar', 'foo/bar'), 463 ('/foo/bar', 'foo/bar'), 464 ('/foo/../bar', 'foo/bar'), 465 ('/foo/../../bar', 'foo/bar'), 466 ] 467 if os.path.sep == '\\': 468 hacknames.extend([ 469 (r'..\foo\bar', 'foo/bar'), 470 (r'..\/foo\/bar', 'foo/bar'), 471 (r'foo/\..\/bar', 'foo/bar'), 472 (r'foo\/../\bar', 'foo/bar'), 473 (r'C:foo/bar', 'foo/bar'), 474 (r'C:/foo/bar', 'foo/bar'), 475 (r'C://foo/bar', 'foo/bar'), 476 (r'C:\foo\bar', 'foo/bar'), 477 (r'//conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 478 (r'\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 479 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 480 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 481 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 482 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 483 (r'//?/C:/foo/bar', '_/C_/foo/bar'), 484 (r'\\?\C:\foo\bar', '_/C_/foo/bar'), 485 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 486 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 487 ('../../foo../../ba..r', 'foo/ba..r'), 488 ]) 489 else: # Unix 490 hacknames.extend([ 491 ('//foo/bar', 'foo/bar'), 492 ('../../foo../../ba..r', 'foo../ba..r'), 493 (r'foo/..\bar', r'foo/..\bar'), 494 ]) 495 496 for arcname, fixedname in hacknames: 497 content = b'foobar' + arcname.encode() 498 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 499 zinfo = zipfile.ZipInfo() 500 # preserve backslashes 501 zinfo.filename = arcname 502 zinfo.external_attr = 0o600 << 16 503 zipfp.writestr(zinfo, content) 504 505 arcname = arcname.replace(os.sep, "/") 506 targetpath = os.path.join('target', 'subdir', 'subsub') 507 correctfile = os.path.join(targetpath, *fixedname.split('/')) 508 509 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 510 writtenfile = zipfp.extract(arcname, targetpath) 511 self.assertEqual(writtenfile, correctfile, 512 msg="extract %r" % arcname) 513 self.check_file(correctfile, content) 514 shutil.rmtree('target') 515 516 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 517 zipfp.extractall(targetpath) 518 self.check_file(correctfile, content) 519 shutil.rmtree('target') 520 521 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 522 523 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 524 writtenfile = zipfp.extract(arcname) 525 self.assertEqual(writtenfile, correctfile, 526 msg="extract %r" % arcname) 527 self.check_file(correctfile, content) 528 shutil.rmtree(fixedname.split('/')[0]) 529 530 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 531 zipfp.extractall() 532 self.check_file(correctfile, content) 533 shutil.rmtree(fixedname.split('/')[0]) 534 535 os.remove(TESTFN2) 536 537 def test_writestr_compression(self): 538 zipfp = zipfile.ZipFile(TESTFN2, "w") 539 zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) 540 if zlib: 541 zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) 542 543 info = zipfp.getinfo('a.txt') 544 self.assertEqual(info.compress_type, zipfile.ZIP_STORED) 545 546 if zlib: 547 info = zipfp.getinfo('b.txt') 548 self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) 549 550 551 def zip_test_writestr_permissions(self, f, compression): 552 # Make sure that writestr creates files with mode 0600, 553 # when it is passed a name rather than a ZipInfo instance. 554 555 self.make_test_archive(f, compression) 556 with zipfile.ZipFile(f, "r") as zipfp: 557 zinfo = zipfp.getinfo('strfile') 558 self.assertEqual(zinfo.external_attr, 0600 << 16) 559 560 def test_writestr_permissions(self): 561 for f in (TESTFN2, TemporaryFile(), StringIO()): 562 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 563 564 def test_close(self): 565 """Check that the zipfile is closed after the 'with' block.""" 566 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 567 for fpath, fdata in SMALL_TEST_DATA: 568 zipfp.writestr(fpath, fdata) 569 self.assertTrue(zipfp.fp is not None, 'zipfp is not open') 570 self.assertTrue(zipfp.fp is None, 'zipfp is not closed') 571 572 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 573 self.assertTrue(zipfp.fp is not None, 'zipfp is not open') 574 self.assertTrue(zipfp.fp is None, 'zipfp is not closed') 575 576 def test_close_on_exception(self): 577 """Check that the zipfile is closed if an exception is raised in the 578 'with' block.""" 579 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 580 for fpath, fdata in SMALL_TEST_DATA: 581 zipfp.writestr(fpath, fdata) 582 583 try: 584 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 585 raise zipfile.BadZipfile() 586 except zipfile.BadZipfile: 587 self.assertTrue(zipfp2.fp is None, 'zipfp is not closed') 588 589 def test_add_file_before_1980(self): 590 # Set atime and mtime to 1970-01-01 591 os.utime(TESTFN, (0, 0)) 592 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 593 self.assertRaises(ValueError, zipfp.write, TESTFN) 594 595 def tearDown(self): 596 unlink(TESTFN) 597 unlink(TESTFN2) 598 599 600class TestZip64InSmallFiles(unittest.TestCase): 601 # These tests test the ZIP64 functionality without using large files, 602 # see test_zipfile64 for proper tests. 603 604 def setUp(self): 605 self._limit = zipfile.ZIP64_LIMIT 606 zipfile.ZIP64_LIMIT = 5 607 608 line_gen = ("Test of zipfile line %d." % i 609 for i in range(0, FIXEDTEST_SIZE)) 610 self.data = '\n'.join(line_gen) 611 612 # Make a source file with some lines 613 with open(TESTFN, "wb") as fp: 614 fp.write(self.data) 615 616 def large_file_exception_test(self, f, compression): 617 with zipfile.ZipFile(f, "w", compression) as zipfp: 618 self.assertRaises(zipfile.LargeZipFile, 619 zipfp.write, TESTFN, "another.name") 620 621 def large_file_exception_test2(self, f, compression): 622 with zipfile.ZipFile(f, "w", compression) as zipfp: 623 self.assertRaises(zipfile.LargeZipFile, 624 zipfp.writestr, "another.name", self.data) 625 626 def test_large_file_exception(self): 627 for f in (TESTFN2, TemporaryFile(), StringIO()): 628 self.large_file_exception_test(f, zipfile.ZIP_STORED) 629 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 630 631 def zip_test(self, f, compression): 632 # Create the ZIP archive 633 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 634 zipfp.write(TESTFN, "another.name") 635 zipfp.write(TESTFN, TESTFN) 636 zipfp.writestr("strfile", self.data) 637 638 # Read the ZIP archive 639 with zipfile.ZipFile(f, "r", compression) as zipfp: 640 self.assertEqual(zipfp.read(TESTFN), self.data) 641 self.assertEqual(zipfp.read("another.name"), self.data) 642 self.assertEqual(zipfp.read("strfile"), self.data) 643 644 # Print the ZIP directory 645 fp = StringIO() 646 stdout = sys.stdout 647 try: 648 sys.stdout = fp 649 zipfp.printdir() 650 finally: 651 sys.stdout = stdout 652 653 directory = fp.getvalue() 654 lines = directory.splitlines() 655 self.assertEqual(len(lines), 4) # Number of files + header 656 657 self.assertIn('File Name', lines[0]) 658 self.assertIn('Modified', lines[0]) 659 self.assertIn('Size', lines[0]) 660 661 fn, date, time_, size = lines[1].split() 662 self.assertEqual(fn, 'another.name') 663 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 664 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 665 self.assertEqual(size, str(len(self.data))) 666 667 # Check the namelist 668 names = zipfp.namelist() 669 self.assertEqual(len(names), 3) 670 self.assertIn(TESTFN, names) 671 self.assertIn("another.name", names) 672 self.assertIn("strfile", names) 673 674 # Check infolist 675 infos = zipfp.infolist() 676 names = [i.filename for i in infos] 677 self.assertEqual(len(names), 3) 678 self.assertIn(TESTFN, names) 679 self.assertIn("another.name", names) 680 self.assertIn("strfile", names) 681 for i in infos: 682 self.assertEqual(i.file_size, len(self.data)) 683 684 # check getinfo 685 for nm in (TESTFN, "another.name", "strfile"): 686 info = zipfp.getinfo(nm) 687 self.assertEqual(info.filename, nm) 688 self.assertEqual(info.file_size, len(self.data)) 689 690 # Check that testzip doesn't raise an exception 691 zipfp.testzip() 692 693 def test_stored(self): 694 for f in (TESTFN2, TemporaryFile(), StringIO()): 695 self.zip_test(f, zipfile.ZIP_STORED) 696 697 @skipUnless(zlib, "requires zlib") 698 def test_deflated(self): 699 for f in (TESTFN2, TemporaryFile(), StringIO()): 700 self.zip_test(f, zipfile.ZIP_DEFLATED) 701 702 def test_absolute_arcnames(self): 703 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 704 allowZip64=True) as zipfp: 705 zipfp.write(TESTFN, "/absolute") 706 707 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 708 self.assertEqual(zipfp.namelist(), ["absolute"]) 709 710 def tearDown(self): 711 zipfile.ZIP64_LIMIT = self._limit 712 unlink(TESTFN) 713 unlink(TESTFN2) 714 715 716class PyZipFileTests(unittest.TestCase): 717 def test_write_pyfile(self): 718 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 719 fn = __file__ 720 if fn.endswith('.pyc') or fn.endswith('.pyo'): 721 fn = fn[:-1] 722 723 zipfp.writepy(fn) 724 725 bn = os.path.basename(fn) 726 self.assertNotIn(bn, zipfp.namelist()) 727 self.assertTrue(bn + 'o' in zipfp.namelist() or 728 bn + 'c' in zipfp.namelist()) 729 730 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 731 fn = __file__ 732 if fn.endswith(('.pyc', '.pyo')): 733 fn = fn[:-1] 734 735 zipfp.writepy(fn, "testpackage") 736 737 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 738 self.assertNotIn(bn, zipfp.namelist()) 739 self.assertTrue(bn + 'o' in zipfp.namelist() or 740 bn + 'c' in zipfp.namelist()) 741 742 def test_write_python_package(self): 743 import email 744 packagedir = os.path.dirname(email.__file__) 745 746 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 747 zipfp.writepy(packagedir) 748 749 # Check for a couple of modules at different levels of the 750 # hierarchy 751 names = zipfp.namelist() 752 self.assertTrue('email/__init__.pyo' in names or 753 'email/__init__.pyc' in names) 754 self.assertTrue('email/mime/text.pyo' in names or 755 'email/mime/text.pyc' in names) 756 757 def test_write_python_directory(self): 758 os.mkdir(TESTFN2) 759 try: 760 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 761 fp.write("print(42)\n") 762 763 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 764 fp.write("print(42 * 42)\n") 765 766 with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: 767 fp.write("bla bla bla\n") 768 769 zipfp = zipfile.PyZipFile(TemporaryFile(), "w") 770 zipfp.writepy(TESTFN2) 771 772 names = zipfp.namelist() 773 self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names) 774 self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names) 775 self.assertNotIn('mod2.txt', names) 776 777 finally: 778 shutil.rmtree(TESTFN2) 779 780 def test_write_non_pyfile(self): 781 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 782 open(TESTFN, 'w').write('most definitely not a python file') 783 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 784 os.remove(TESTFN) 785 786 787class OtherTests(unittest.TestCase): 788 zips_with_bad_crc = { 789 zipfile.ZIP_STORED: ( 790 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 791 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 792 b'ilehello,AworldP' 793 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 794 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 795 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 796 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 797 b'\0\0/\0\0\0\0\0'), 798 zipfile.ZIP_DEFLATED: ( 799 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 800 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 801 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 802 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 803 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 804 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 805 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 806 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'), 807 } 808 809 def test_unicode_filenames(self): 810 with zipfile.ZipFile(TESTFN, "w") as zf: 811 zf.writestr(u"foo.txt", "Test for unicode filename") 812 zf.writestr(u"\xf6.txt", "Test for unicode filename") 813 self.assertIsInstance(zf.infolist()[0].filename, unicode) 814 815 with zipfile.ZipFile(TESTFN, "r") as zf: 816 self.assertEqual(zf.filelist[0].filename, "foo.txt") 817 self.assertEqual(zf.filelist[1].filename, u"\xf6.txt") 818 819 def test_create_non_existent_file_for_append(self): 820 if os.path.exists(TESTFN): 821 os.unlink(TESTFN) 822 823 filename = 'testfile.txt' 824 content = 'hello, world. this is some content.' 825 826 try: 827 with zipfile.ZipFile(TESTFN, 'a') as zf: 828 zf.writestr(filename, content) 829 except IOError: 830 self.fail('Could not append data to a non-existent zip file.') 831 832 self.assertTrue(os.path.exists(TESTFN)) 833 834 with zipfile.ZipFile(TESTFN, 'r') as zf: 835 self.assertEqual(zf.read(filename), content) 836 837 def test_close_erroneous_file(self): 838 # This test checks that the ZipFile constructor closes the file object 839 # it opens if there's an error in the file. If it doesn't, the 840 # traceback holds a reference to the ZipFile object and, indirectly, 841 # the file object. 842 # On Windows, this causes the os.unlink() call to fail because the 843 # underlying file is still open. This is SF bug #412214. 844 # 845 with open(TESTFN, "w") as fp: 846 fp.write("this is not a legal zip file\n") 847 try: 848 zf = zipfile.ZipFile(TESTFN) 849 except zipfile.BadZipfile: 850 pass 851 852 def test_is_zip_erroneous_file(self): 853 """Check that is_zipfile() correctly identifies non-zip files.""" 854 # - passing a filename 855 with open(TESTFN, "w") as fp: 856 fp.write("this is not a legal zip file\n") 857 chk = zipfile.is_zipfile(TESTFN) 858 self.assertFalse(chk) 859 # - passing a file object 860 with open(TESTFN, "rb") as fp: 861 chk = zipfile.is_zipfile(fp) 862 self.assertTrue(not chk) 863 # - passing a file-like object 864 fp = StringIO() 865 fp.write("this is not a legal zip file\n") 866 chk = zipfile.is_zipfile(fp) 867 self.assertTrue(not chk) 868 fp.seek(0, 0) 869 chk = zipfile.is_zipfile(fp) 870 self.assertTrue(not chk) 871 872 def test_damaged_zipfile(self): 873 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 874 # - Create a valid zip file 875 fp = io.BytesIO() 876 with zipfile.ZipFile(fp, mode="w") as zipf: 877 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 878 zipfiledata = fp.getvalue() 879 880 # - Now create copies of it missing the last N bytes and make sure 881 # a BadZipFile exception is raised when we try to open it 882 for N in range(len(zipfiledata)): 883 fp = io.BytesIO(zipfiledata[:N]) 884 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp) 885 886 def test_is_zip_valid_file(self): 887 """Check that is_zipfile() correctly identifies zip files.""" 888 # - passing a filename 889 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 890 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 891 chk = zipfile.is_zipfile(TESTFN) 892 self.assertTrue(chk) 893 # - passing a file object 894 with open(TESTFN, "rb") as fp: 895 chk = zipfile.is_zipfile(fp) 896 self.assertTrue(chk) 897 fp.seek(0, 0) 898 zip_contents = fp.read() 899 # - passing a file-like object 900 fp = StringIO() 901 fp.write(zip_contents) 902 chk = zipfile.is_zipfile(fp) 903 self.assertTrue(chk) 904 fp.seek(0, 0) 905 chk = zipfile.is_zipfile(fp) 906 self.assertTrue(chk) 907 908 def test_non_existent_file_raises_IOError(self): 909 # make sure we don't raise an AttributeError when a partially-constructed 910 # ZipFile instance is finalized; this tests for regression on SF tracker 911 # bug #403871. 912 913 # The bug we're testing for caused an AttributeError to be raised 914 # when a ZipFile instance was created for a file that did not 915 # exist; the .fp member was not initialized but was needed by the 916 # __del__() method. Since the AttributeError is in the __del__(), 917 # it is ignored, but the user should be sufficiently annoyed by 918 # the message on the output that regression will be noticed 919 # quickly. 920 self.assertRaises(IOError, zipfile.ZipFile, TESTFN) 921 922 def test_empty_file_raises_BadZipFile(self): 923 with open(TESTFN, 'w') as f: 924 pass 925 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN) 926 927 with open(TESTFN, 'w') as fp: 928 fp.write("short file") 929 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN) 930 931 def test_closed_zip_raises_RuntimeError(self): 932 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 933 data = StringIO() 934 with zipfile.ZipFile(data, mode="w") as zipf: 935 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 936 937 # This is correct; calling .read on a closed ZipFile should raise 938 # a RuntimeError, and so should calling .testzip. An earlier 939 # version of .testzip would swallow this exception (and any other) 940 # and report that the first file in the archive was corrupt. 941 self.assertRaises(RuntimeError, zipf.read, "foo.txt") 942 self.assertRaises(RuntimeError, zipf.open, "foo.txt") 943 self.assertRaises(RuntimeError, zipf.testzip) 944 self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus") 945 open(TESTFN, 'w').write('zipfile test data') 946 self.assertRaises(RuntimeError, zipf.write, TESTFN) 947 948 def test_bad_constructor_mode(self): 949 """Check that bad modes passed to ZipFile constructor are caught.""" 950 self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q") 951 952 def test_bad_open_mode(self): 953 """Check that bad modes passed to ZipFile.open are caught.""" 954 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 955 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 956 957 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 958 # read the data to make sure the file is there 959 zipf.read("foo.txt") 960 self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q") 961 962 def test_read0(self): 963 """Check that calling read(0) on a ZipExtFile object returns an empty 964 string and doesn't advance file pointer.""" 965 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 966 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 967 # read the data to make sure the file is there 968 with zipf.open("foo.txt") as f: 969 for i in xrange(FIXEDTEST_SIZE): 970 self.assertEqual(f.read(0), '') 971 972 self.assertEqual(f.read(), "O, for a Muse of Fire!") 973 974 def test_open_non_existent_item(self): 975 """Check that attempting to call open() for an item that doesn't 976 exist in the archive raises a RuntimeError.""" 977 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 978 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 979 980 def test_bad_compression_mode(self): 981 """Check that bad compression methods passed to ZipFile.open are 982 caught.""" 983 self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1) 984 985 def test_unsupported_compression(self): 986 # data is declared as shrunk, but actually deflated 987 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 988 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 989 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 990 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 991 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 992 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 993 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 994 self.assertRaises(NotImplementedError, zipf.open, 'x') 995 996 def test_null_byte_in_filename(self): 997 """Check that a filename containing a null byte is properly 998 terminated.""" 999 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1000 zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!") 1001 self.assertEqual(zipf.namelist(), ['foo.txt']) 1002 1003 def test_struct_sizes(self): 1004 """Check that ZIP internal structure sizes are calculated correctly.""" 1005 self.assertEqual(zipfile.sizeEndCentDir, 22) 1006 self.assertEqual(zipfile.sizeCentralDir, 46) 1007 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1008 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1009 1010 def test_comments(self): 1011 """Check that comments on the archive are handled properly.""" 1012 1013 # check default comment is empty 1014 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1015 self.assertEqual(zipf.comment, '') 1016 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1017 1018 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1019 self.assertEqual(zipf.comment, '') 1020 1021 # check a simple short comment 1022 comment = 'Bravely taking to his feet, he beat a very brave retreat.' 1023 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1024 zipf.comment = comment 1025 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1026 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1027 self.assertEqual(zipf.comment, comment) 1028 1029 # check a comment of max length 1030 comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)]) 1031 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1032 zipf.comment = comment2 1033 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1034 1035 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1036 self.assertEqual(zipf.comment, comment2) 1037 1038 # check a comment that is too long is truncated 1039 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1040 with check_warnings(('', UserWarning)): 1041 zipf.comment = comment2 + 'oops' 1042 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1043 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1044 self.assertEqual(zipf.comment, comment2) 1045 1046 def test_change_comment_in_empty_archive(self): 1047 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1048 self.assertFalse(zipf.filelist) 1049 zipf.comment = b"this is a comment" 1050 with zipfile.ZipFile(TESTFN, "r") as zipf: 1051 self.assertEqual(zipf.comment, b"this is a comment") 1052 1053 def test_change_comment_in_nonempty_archive(self): 1054 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1055 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1056 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1057 self.assertTrue(zipf.filelist) 1058 zipf.comment = b"this is a comment" 1059 with zipfile.ZipFile(TESTFN, "r") as zipf: 1060 self.assertEqual(zipf.comment, b"this is a comment") 1061 1062 def check_testzip_with_bad_crc(self, compression): 1063 """Tests that files with bad CRCs return their name from testzip.""" 1064 zipdata = self.zips_with_bad_crc[compression] 1065 1066 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1067 # testzip returns the name of the first corrupt file, or None 1068 self.assertEqual('afile', zipf.testzip()) 1069 1070 def test_testzip_with_bad_crc_stored(self): 1071 self.check_testzip_with_bad_crc(zipfile.ZIP_STORED) 1072 1073 @skipUnless(zlib, "requires zlib") 1074 def test_testzip_with_bad_crc_deflated(self): 1075 self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED) 1076 1077 def check_read_with_bad_crc(self, compression): 1078 """Tests that files with bad CRCs raise a BadZipfile exception when read.""" 1079 zipdata = self.zips_with_bad_crc[compression] 1080 1081 # Using ZipFile.read() 1082 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1083 self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile') 1084 1085 # Using ZipExtFile.read() 1086 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1087 with zipf.open('afile', 'r') as corrupt_file: 1088 self.assertRaises(zipfile.BadZipfile, corrupt_file.read) 1089 1090 # Same with small reads (in order to exercise the buffering logic) 1091 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1092 with zipf.open('afile', 'r') as corrupt_file: 1093 corrupt_file.MIN_READ_SIZE = 2 1094 with self.assertRaises(zipfile.BadZipfile): 1095 while corrupt_file.read(2): 1096 pass 1097 1098 def test_read_with_bad_crc_stored(self): 1099 self.check_read_with_bad_crc(zipfile.ZIP_STORED) 1100 1101 @skipUnless(zlib, "requires zlib") 1102 def test_read_with_bad_crc_deflated(self): 1103 self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED) 1104 1105 def check_read_return_size(self, compression): 1106 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 1107 # than requested. 1108 for test_size in (1, 4095, 4096, 4097, 16384): 1109 file_size = test_size + 1 1110 junk = b''.join(struct.pack('B', randint(0, 255)) 1111 for x in range(file_size)) 1112 with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf: 1113 zipf.writestr('foo', junk) 1114 with zipf.open('foo', 'r') as fp: 1115 buf = fp.read(test_size) 1116 self.assertEqual(len(buf), test_size) 1117 1118 def test_read_return_size_stored(self): 1119 self.check_read_return_size(zipfile.ZIP_STORED) 1120 1121 @skipUnless(zlib, "requires zlib") 1122 def test_read_return_size_deflated(self): 1123 self.check_read_return_size(zipfile.ZIP_DEFLATED) 1124 1125 def test_empty_zipfile(self): 1126 # Check that creating a file in 'w' or 'a' mode and closing without 1127 # adding any files to the archives creates a valid empty ZIP file 1128 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1129 pass 1130 try: 1131 zipf = zipfile.ZipFile(TESTFN, mode="r") 1132 except zipfile.BadZipfile: 1133 self.fail("Unable to create empty ZIP file in 'w' mode") 1134 1135 with zipfile.ZipFile(TESTFN, mode="a") as zipf: 1136 pass 1137 try: 1138 zipf = zipfile.ZipFile(TESTFN, mode="r") 1139 except: 1140 self.fail("Unable to create empty ZIP file in 'a' mode") 1141 1142 def test_open_empty_file(self): 1143 # Issue 1710703: Check that opening a file with less than 22 bytes 1144 # raises a BadZipfile exception (rather than the previously unhelpful 1145 # IOError) 1146 with open(TESTFN, 'w') as f: 1147 pass 1148 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r') 1149 1150 def test_create_zipinfo_before_1980(self): 1151 self.assertRaises(ValueError, 1152 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1153 1154 def tearDown(self): 1155 unlink(TESTFN) 1156 unlink(TESTFN2) 1157 1158 1159class DecryptionTests(unittest.TestCase): 1160 """Check that ZIP decryption works. Since the library does not 1161 support encryption at the moment, we use a pre-generated encrypted 1162 ZIP file.""" 1163 1164 data = ( 1165 'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 1166 '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 1167 '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 1168 'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 1169 '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 1170 '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 1171 '\x00\x00L\x00\x00\x00\x00\x00' ) 1172 data2 = ( 1173 'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 1174 '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 1175 '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 1176 'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 1177 '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 1178 '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 1179 'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 1180 '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 1181 1182 plain = 'zipfile.py encryption test' 1183 plain2 = '\x00'*512 1184 1185 def setUp(self): 1186 with open(TESTFN, "wb") as fp: 1187 fp.write(self.data) 1188 self.zip = zipfile.ZipFile(TESTFN, "r") 1189 with open(TESTFN2, "wb") as fp: 1190 fp.write(self.data2) 1191 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 1192 1193 def tearDown(self): 1194 self.zip.close() 1195 os.unlink(TESTFN) 1196 self.zip2.close() 1197 os.unlink(TESTFN2) 1198 1199 def test_no_password(self): 1200 # Reading the encrypted file without password 1201 # must generate a RunTime exception 1202 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 1203 self.assertRaises(RuntimeError, self.zip2.read, "zero") 1204 1205 def test_bad_password(self): 1206 self.zip.setpassword("perl") 1207 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 1208 self.zip2.setpassword("perl") 1209 self.assertRaises(RuntimeError, self.zip2.read, "zero") 1210 1211 @skipUnless(zlib, "requires zlib") 1212 def test_good_password(self): 1213 self.zip.setpassword("python") 1214 self.assertEqual(self.zip.read("test.txt"), self.plain) 1215 self.zip2.setpassword("12345") 1216 self.assertEqual(self.zip2.read("zero"), self.plain2) 1217 1218 1219class TestsWithRandomBinaryFiles(unittest.TestCase): 1220 def setUp(self): 1221 datacount = randint(16, 64)*1024 + randint(1, 1024) 1222 self.data = ''.join(struct.pack('<f', random()*randint(-1000, 1000)) 1223 for i in xrange(datacount)) 1224 1225 # Make a source file with some lines 1226 with open(TESTFN, "wb") as fp: 1227 fp.write(self.data) 1228 1229 def tearDown(self): 1230 unlink(TESTFN) 1231 unlink(TESTFN2) 1232 1233 def make_test_archive(self, f, compression): 1234 # Create the ZIP archive 1235 with zipfile.ZipFile(f, "w", compression) as zipfp: 1236 zipfp.write(TESTFN, "another.name") 1237 zipfp.write(TESTFN, TESTFN) 1238 1239 def zip_test(self, f, compression): 1240 self.make_test_archive(f, compression) 1241 1242 # Read the ZIP archive 1243 with zipfile.ZipFile(f, "r", compression) as zipfp: 1244 testdata = zipfp.read(TESTFN) 1245 self.assertEqual(len(testdata), len(self.data)) 1246 self.assertEqual(testdata, self.data) 1247 self.assertEqual(zipfp.read("another.name"), self.data) 1248 1249 def test_stored(self): 1250 for f in (TESTFN2, TemporaryFile(), StringIO()): 1251 self.zip_test(f, zipfile.ZIP_STORED) 1252 1253 @skipUnless(zlib, "requires zlib") 1254 def test_deflated(self): 1255 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1256 self.zip_test(f, zipfile.ZIP_DEFLATED) 1257 1258 def zip_open_test(self, f, compression): 1259 self.make_test_archive(f, compression) 1260 1261 # Read the ZIP archive 1262 with zipfile.ZipFile(f, "r", compression) as zipfp: 1263 zipdata1 = [] 1264 with zipfp.open(TESTFN) as zipopen1: 1265 while True: 1266 read_data = zipopen1.read(256) 1267 if not read_data: 1268 break 1269 zipdata1.append(read_data) 1270 1271 zipdata2 = [] 1272 with zipfp.open("another.name") as zipopen2: 1273 while True: 1274 read_data = zipopen2.read(256) 1275 if not read_data: 1276 break 1277 zipdata2.append(read_data) 1278 1279 testdata1 = ''.join(zipdata1) 1280 self.assertEqual(len(testdata1), len(self.data)) 1281 self.assertEqual(testdata1, self.data) 1282 1283 testdata2 = ''.join(zipdata2) 1284 self.assertEqual(len(testdata2), len(self.data)) 1285 self.assertEqual(testdata2, self.data) 1286 1287 def test_open_stored(self): 1288 for f in (TESTFN2, TemporaryFile(), StringIO()): 1289 self.zip_open_test(f, zipfile.ZIP_STORED) 1290 1291 @skipUnless(zlib, "requires zlib") 1292 def test_open_deflated(self): 1293 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1294 self.zip_open_test(f, zipfile.ZIP_DEFLATED) 1295 1296 def zip_random_open_test(self, f, compression): 1297 self.make_test_archive(f, compression) 1298 1299 # Read the ZIP archive 1300 with zipfile.ZipFile(f, "r", compression) as zipfp: 1301 zipdata1 = [] 1302 with zipfp.open(TESTFN) as zipopen1: 1303 while True: 1304 read_data = zipopen1.read(randint(1, 1024)) 1305 if not read_data: 1306 break 1307 zipdata1.append(read_data) 1308 1309 testdata = ''.join(zipdata1) 1310 self.assertEqual(len(testdata), len(self.data)) 1311 self.assertEqual(testdata, self.data) 1312 1313 def test_random_open_stored(self): 1314 for f in (TESTFN2, TemporaryFile(), StringIO()): 1315 self.zip_random_open_test(f, zipfile.ZIP_STORED) 1316 1317 @skipUnless(zlib, "requires zlib") 1318 def test_random_open_deflated(self): 1319 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1320 self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) 1321 1322 1323@skipUnless(zlib, "requires zlib") 1324class TestsWithMultipleOpens(unittest.TestCase): 1325 def setUp(self): 1326 # Create the ZIP archive 1327 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: 1328 zipfp.writestr('ones', '1'*FIXEDTEST_SIZE) 1329 zipfp.writestr('twos', '2'*FIXEDTEST_SIZE) 1330 1331 def test_same_file(self): 1332 # Verify that (when the ZipFile is in control of creating file objects) 1333 # multiple open() calls can be made without interfering with each other. 1334 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1335 zopen1 = zipf.open('ones') 1336 zopen2 = zipf.open('ones') 1337 data1 = zopen1.read(500) 1338 data2 = zopen2.read(500) 1339 data1 += zopen1.read(500) 1340 data2 += zopen2.read(500) 1341 self.assertEqual(data1, data2) 1342 1343 def test_different_file(self): 1344 # Verify that (when the ZipFile is in control of creating file objects) 1345 # multiple open() calls can be made without interfering with each other. 1346 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1347 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 1348 data1 = zopen1.read(500) 1349 data2 = zopen2.read(500) 1350 data1 += zopen1.read(500) 1351 data2 += zopen2.read(500) 1352 self.assertEqual(data1, '1'*FIXEDTEST_SIZE) 1353 self.assertEqual(data2, '2'*FIXEDTEST_SIZE) 1354 1355 def test_interleaved(self): 1356 # Verify that (when the ZipFile is in control of creating file objects) 1357 # multiple open() calls can be made without interfering with each other. 1358 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1359 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 1360 data1 = zopen1.read(500) 1361 data2 = zopen2.read(500) 1362 data1 += zopen1.read(500) 1363 data2 += zopen2.read(500) 1364 self.assertEqual(data1, '1'*FIXEDTEST_SIZE) 1365 self.assertEqual(data2, '2'*FIXEDTEST_SIZE) 1366 1367 def tearDown(self): 1368 unlink(TESTFN2) 1369 1370 1371class TestWithDirectory(unittest.TestCase): 1372 def setUp(self): 1373 os.mkdir(TESTFN2) 1374 1375 def test_extract_dir(self): 1376 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 1377 zipf.extractall(TESTFN2) 1378 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 1379 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 1380 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 1381 1382 def test_bug_6050(self): 1383 # Extraction should succeed if directories already exist 1384 os.mkdir(os.path.join(TESTFN2, "a")) 1385 self.test_extract_dir() 1386 1387 def test_store_dir(self): 1388 os.mkdir(os.path.join(TESTFN2, "x")) 1389 zipf = zipfile.ZipFile(TESTFN, "w") 1390 zipf.write(os.path.join(TESTFN2, "x"), "x") 1391 self.assertTrue(zipf.filelist[0].filename.endswith("x/")) 1392 1393 def tearDown(self): 1394 shutil.rmtree(TESTFN2) 1395 if os.path.exists(TESTFN): 1396 unlink(TESTFN) 1397 1398 1399class UniversalNewlineTests(unittest.TestCase): 1400 def setUp(self): 1401 self.line_gen = ["Test of zipfile line %d." % i 1402 for i in xrange(FIXEDTEST_SIZE)] 1403 self.seps = ('\r', '\r\n', '\n') 1404 self.arcdata, self.arcfiles = {}, {} 1405 for n, s in enumerate(self.seps): 1406 self.arcdata[s] = s.join(self.line_gen) + s 1407 self.arcfiles[s] = '%s-%d' % (TESTFN, n) 1408 open(self.arcfiles[s], "wb").write(self.arcdata[s]) 1409 1410 def make_test_archive(self, f, compression): 1411 # Create the ZIP archive 1412 with zipfile.ZipFile(f, "w", compression) as zipfp: 1413 for fn in self.arcfiles.values(): 1414 zipfp.write(fn, fn) 1415 1416 def read_test(self, f, compression): 1417 self.make_test_archive(f, compression) 1418 1419 # Read the ZIP archive 1420 with zipfile.ZipFile(f, "r") as zipfp: 1421 for sep, fn in self.arcfiles.items(): 1422 with zipfp.open(fn, "rU") as fp: 1423 zipdata = fp.read() 1424 self.assertEqual(self.arcdata[sep], zipdata) 1425 1426 def readline_read_test(self, f, compression): 1427 self.make_test_archive(f, compression) 1428 1429 # Read the ZIP archive 1430 zipfp = zipfile.ZipFile(f, "r") 1431 for sep, fn in self.arcfiles.items(): 1432 with zipfp.open(fn, "rU") as zipopen: 1433 data = '' 1434 while True: 1435 read = zipopen.readline() 1436 if not read: 1437 break 1438 data += read 1439 1440 read = zipopen.read(5) 1441 if not read: 1442 break 1443 data += read 1444 1445 self.assertEqual(data, self.arcdata['\n']) 1446 1447 zipfp.close() 1448 1449 def readline_test(self, f, compression): 1450 self.make_test_archive(f, compression) 1451 1452 # Read the ZIP archive 1453 with zipfile.ZipFile(f, "r") as zipfp: 1454 for sep, fn in self.arcfiles.items(): 1455 with zipfp.open(fn, "rU") as zipopen: 1456 for line in self.line_gen: 1457 linedata = zipopen.readline() 1458 self.assertEqual(linedata, line + '\n') 1459 1460 def readlines_test(self, f, compression): 1461 self.make_test_archive(f, compression) 1462 1463 # Read the ZIP archive 1464 with zipfile.ZipFile(f, "r") as zipfp: 1465 for sep, fn in self.arcfiles.items(): 1466 with zipfp.open(fn, "rU") as fp: 1467 ziplines = fp.readlines() 1468 for line, zipline in zip(self.line_gen, ziplines): 1469 self.assertEqual(zipline, line + '\n') 1470 1471 def iterlines_test(self, f, compression): 1472 self.make_test_archive(f, compression) 1473 1474 # Read the ZIP archive 1475 with zipfile.ZipFile(f, "r") as zipfp: 1476 for sep, fn in self.arcfiles.items(): 1477 for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")): 1478 self.assertEqual(zipline, line + '\n') 1479 1480 def test_read_stored(self): 1481 for f in (TESTFN2, TemporaryFile(), StringIO()): 1482 self.read_test(f, zipfile.ZIP_STORED) 1483 1484 def test_readline_read_stored(self): 1485 # Issue #7610: calls to readline() interleaved with calls to read(). 1486 for f in (TESTFN2, TemporaryFile(), StringIO()): 1487 self.readline_read_test(f, zipfile.ZIP_STORED) 1488 1489 def test_readline_stored(self): 1490 for f in (TESTFN2, TemporaryFile(), StringIO()): 1491 self.readline_test(f, zipfile.ZIP_STORED) 1492 1493 def test_readlines_stored(self): 1494 for f in (TESTFN2, TemporaryFile(), StringIO()): 1495 self.readlines_test(f, zipfile.ZIP_STORED) 1496 1497 def test_iterlines_stored(self): 1498 for f in (TESTFN2, TemporaryFile(), StringIO()): 1499 self.iterlines_test(f, zipfile.ZIP_STORED) 1500 1501 @skipUnless(zlib, "requires zlib") 1502 def test_read_deflated(self): 1503 for f in (TESTFN2, TemporaryFile(), StringIO()): 1504 self.read_test(f, zipfile.ZIP_DEFLATED) 1505 1506 @skipUnless(zlib, "requires zlib") 1507 def test_readline_read_deflated(self): 1508 # Issue #7610: calls to readline() interleaved with calls to read(). 1509 for f in (TESTFN2, TemporaryFile(), StringIO()): 1510 self.readline_read_test(f, zipfile.ZIP_DEFLATED) 1511 1512 @skipUnless(zlib, "requires zlib") 1513 def test_readline_deflated(self): 1514 for f in (TESTFN2, TemporaryFile(), StringIO()): 1515 self.readline_test(f, zipfile.ZIP_DEFLATED) 1516 1517 @skipUnless(zlib, "requires zlib") 1518 def test_readlines_deflated(self): 1519 for f in (TESTFN2, TemporaryFile(), StringIO()): 1520 self.readlines_test(f, zipfile.ZIP_DEFLATED) 1521 1522 @skipUnless(zlib, "requires zlib") 1523 def test_iterlines_deflated(self): 1524 for f in (TESTFN2, TemporaryFile(), StringIO()): 1525 self.iterlines_test(f, zipfile.ZIP_DEFLATED) 1526 1527 def tearDown(self): 1528 for sep, fn in self.arcfiles.items(): 1529 os.remove(fn) 1530 unlink(TESTFN) 1531 unlink(TESTFN2) 1532 1533 1534def test_main(): 1535 run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests, 1536 PyZipFileTests, DecryptionTests, TestsWithMultipleOpens, 1537 TestWithDirectory, UniversalNewlineTests, 1538 TestsWithRandomBinaryFiles) 1539 1540if __name__ == "__main__": 1541 test_main() 1542