test_tarfile.py revision 53ad0cd2842b7327bde4ca04ee11c544e522ff43
1import sys 2import os 3import io 4import shutil 5from hashlib import md5 6 7import unittest 8import tarfile 9 10from test import support 11 12# Check for our compression modules. 13try: 14 import gzip 15except ImportError: 16 gzip = None 17try: 18 import bz2 19except ImportError: 20 bz2 = None 21try: 22 import lzma 23except ImportError: 24 lzma = None 25 26def md5sum(data): 27 return md5(data).hexdigest() 28 29TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 30tarname = support.findfile("testtar.tar") 31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 33xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 34tmpname = os.path.join(TEMPDIR, "tmp.tar") 35 36md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 37md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 38 39 40class TarTest: 41 tarname = tarname 42 suffix = '' 43 open = io.FileIO 44 taropen = tarfile.TarFile.taropen 45 46 @property 47 def mode(self): 48 return self.prefix + self.suffix 49 50@support.requires_gzip 51class GzipTest: 52 tarname = gzipname 53 suffix = 'gz' 54 open = gzip.GzipFile if gzip else None 55 taropen = tarfile.TarFile.gzopen 56 57@support.requires_bz2 58class Bz2Test: 59 tarname = bz2name 60 suffix = 'bz2' 61 open = bz2.BZ2File if bz2 else None 62 taropen = tarfile.TarFile.bz2open 63 64@support.requires_lzma 65class LzmaTest: 66 tarname = xzname 67 suffix = 'xz' 68 open = lzma.LZMAFile if lzma else None 69 taropen = tarfile.TarFile.xzopen 70 71 72class ReadTest(TarTest): 73 74 prefix = "r:" 75 76 def setUp(self): 77 self.tar = tarfile.open(self.tarname, mode=self.mode, 78 encoding="iso8859-1") 79 80 def tearDown(self): 81 self.tar.close() 82 83 84class UstarReadTest(ReadTest, unittest.TestCase): 85 86 def test_fileobj_regular_file(self): 87 tarinfo = self.tar.getmember("ustar/regtype") 88 with self.tar.extractfile(tarinfo) as fobj: 89 data = fobj.read() 90 self.assertEqual(len(data), tarinfo.size, 91 "regular file extraction failed") 92 self.assertEqual(md5sum(data), md5_regtype, 93 "regular file extraction failed") 94 95 def test_fileobj_readlines(self): 96 self.tar.extract("ustar/regtype", TEMPDIR) 97 tarinfo = self.tar.getmember("ustar/regtype") 98 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 99 lines1 = fobj1.readlines() 100 101 with self.tar.extractfile(tarinfo) as fobj: 102 fobj2 = io.TextIOWrapper(fobj) 103 lines2 = fobj2.readlines() 104 self.assertEqual(lines1, lines2, 105 "fileobj.readlines() failed") 106 self.assertEqual(len(lines2), 114, 107 "fileobj.readlines() failed") 108 self.assertEqual(lines2[83], 109 "I will gladly admit that Python is not the fastest " 110 "running scripting language.\n", 111 "fileobj.readlines() failed") 112 113 def test_fileobj_iter(self): 114 self.tar.extract("ustar/regtype", TEMPDIR) 115 tarinfo = self.tar.getmember("ustar/regtype") 116 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 117 lines1 = fobj1.readlines() 118 with self.tar.extractfile(tarinfo) as fobj2: 119 lines2 = list(io.TextIOWrapper(fobj2)) 120 self.assertEqual(lines1, lines2, 121 "fileobj.__iter__() failed") 122 123 def test_fileobj_seek(self): 124 self.tar.extract("ustar/regtype", TEMPDIR) 125 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 126 data = fobj.read() 127 128 tarinfo = self.tar.getmember("ustar/regtype") 129 fobj = self.tar.extractfile(tarinfo) 130 131 text = fobj.read() 132 fobj.seek(0) 133 self.assertEqual(0, fobj.tell(), 134 "seek() to file's start failed") 135 fobj.seek(2048, 0) 136 self.assertEqual(2048, fobj.tell(), 137 "seek() to absolute position failed") 138 fobj.seek(-1024, 1) 139 self.assertEqual(1024, fobj.tell(), 140 "seek() to negative relative position failed") 141 fobj.seek(1024, 1) 142 self.assertEqual(2048, fobj.tell(), 143 "seek() to positive relative position failed") 144 s = fobj.read(10) 145 self.assertEqual(s, data[2048:2058], 146 "read() after seek failed") 147 fobj.seek(0, 2) 148 self.assertEqual(tarinfo.size, fobj.tell(), 149 "seek() to file's end failed") 150 self.assertEqual(fobj.read(), b"", 151 "read() at file's end did not return empty string") 152 fobj.seek(-tarinfo.size, 2) 153 self.assertEqual(0, fobj.tell(), 154 "relative seek() to file's end failed") 155 fobj.seek(512) 156 s1 = fobj.readlines() 157 fobj.seek(512) 158 s2 = fobj.readlines() 159 self.assertEqual(s1, s2, 160 "readlines() after seek failed") 161 fobj.seek(0) 162 self.assertEqual(len(fobj.readline()), fobj.tell(), 163 "tell() after readline() failed") 164 fobj.seek(512) 165 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 166 "tell() after seek() and readline() failed") 167 fobj.seek(0) 168 line = fobj.readline() 169 self.assertEqual(fobj.read(), data[len(line):], 170 "read() after readline() failed") 171 fobj.close() 172 173 def test_fileobj_text(self): 174 with self.tar.extractfile("ustar/regtype") as fobj: 175 fobj = io.TextIOWrapper(fobj) 176 data = fobj.read().encode("iso8859-1") 177 self.assertEqual(md5sum(data), md5_regtype) 178 try: 179 fobj.seek(100) 180 except AttributeError: 181 # Issue #13815: seek() complained about a missing 182 # flush() method. 183 self.fail("seeking failed in text mode") 184 185 # Test if symbolic and hard links are resolved by extractfile(). The 186 # test link members each point to a regular member whose data is 187 # supposed to be exported. 188 def _test_fileobj_link(self, lnktype, regtype): 189 with self.tar.extractfile(lnktype) as a, \ 190 self.tar.extractfile(regtype) as b: 191 self.assertEqual(a.name, b.name) 192 193 def test_fileobj_link1(self): 194 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 195 196 def test_fileobj_link2(self): 197 self._test_fileobj_link("./ustar/linktest2/lnktype", 198 "ustar/linktest1/regtype") 199 200 def test_fileobj_symlink1(self): 201 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 202 203 def test_fileobj_symlink2(self): 204 self._test_fileobj_link("./ustar/linktest2/symtype", 205 "ustar/linktest1/regtype") 206 207 def test_issue14160(self): 208 self._test_fileobj_link("symtype2", "ustar/regtype") 209 210class GzipUstarReadTest(GzipTest, UstarReadTest): 211 pass 212 213class Bz2UstarReadTest(Bz2Test, UstarReadTest): 214 pass 215 216class LzmaUstarReadTest(LzmaTest, UstarReadTest): 217 pass 218 219 220class CommonReadTest(ReadTest): 221 222 def test_empty_tarfile(self): 223 # Test for issue6123: Allow opening empty archives. 224 # This test checks if tarfile.open() is able to open an empty tar 225 # archive successfully. Note that an empty tar archive is not the 226 # same as an empty file! 227 with tarfile.open(tmpname, self.mode.replace("r", "w")): 228 pass 229 try: 230 tar = tarfile.open(tmpname, self.mode) 231 tar.getnames() 232 except tarfile.ReadError: 233 self.fail("tarfile.open() failed on empty archive") 234 else: 235 self.assertListEqual(tar.getmembers(), []) 236 finally: 237 tar.close() 238 239 def test_non_existent_tarfile(self): 240 # Test for issue11513: prevent non-existent gzipped tarfiles raising 241 # multiple exceptions. 242 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 243 tarfile.open("xxx", self.mode) 244 245 def test_null_tarfile(self): 246 # Test for issue6123: Allow opening empty archives. 247 # This test guarantees that tarfile.open() does not treat an empty 248 # file as an empty tar archive. 249 with open(tmpname, "wb"): 250 pass 251 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 252 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 253 254 def test_ignore_zeros(self): 255 # Test TarFile's ignore_zeros option. 256 for char in (b'\0', b'a'): 257 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 258 # are ignored correctly. 259 with self.open(tmpname, "w") as fobj: 260 fobj.write(char * 1024) 261 fobj.write(tarfile.TarInfo("foo").tobuf()) 262 263 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 264 try: 265 self.assertListEqual(tar.getnames(), ["foo"], 266 "ignore_zeros=True should have skipped the %r-blocks" % 267 char) 268 finally: 269 tar.close() 270 271 272class MiscReadTestBase(CommonReadTest): 273 def test_no_name_argument(self): 274 with open(self.tarname, "rb") as fobj: 275 tar = tarfile.open(fileobj=fobj, mode=self.mode) 276 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 277 278 def test_no_name_attribute(self): 279 with open(self.tarname, "rb") as fobj: 280 data = fobj.read() 281 fobj = io.BytesIO(data) 282 self.assertRaises(AttributeError, getattr, fobj, "name") 283 tar = tarfile.open(fileobj=fobj, mode=self.mode) 284 self.assertEqual(tar.name, None) 285 286 def test_empty_name_attribute(self): 287 with open(self.tarname, "rb") as fobj: 288 data = fobj.read() 289 fobj = io.BytesIO(data) 290 fobj.name = "" 291 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 292 self.assertEqual(tar.name, None) 293 294 def test_illegal_mode_arg(self): 295 with open(tmpname, 'wb'): 296 pass 297 with self.assertRaisesRegex(ValueError, 'mode must be '): 298 tar = self.taropen(tmpname, 'q') 299 with self.assertRaisesRegex(ValueError, 'mode must be '): 300 tar = self.taropen(tmpname, 'rw') 301 with self.assertRaisesRegex(ValueError, 'mode must be '): 302 tar = self.taropen(tmpname, '') 303 304 def test_fileobj_with_offset(self): 305 # Skip the first member and store values from the second member 306 # of the testtar. 307 tar = tarfile.open(self.tarname, mode=self.mode) 308 try: 309 tar.next() 310 t = tar.next() 311 name = t.name 312 offset = t.offset 313 with tar.extractfile(t) as f: 314 data = f.read() 315 finally: 316 tar.close() 317 318 # Open the testtar and seek to the offset of the second member. 319 with self.open(self.tarname) as fobj: 320 fobj.seek(offset) 321 322 # Test if the tarfile starts with the second member. 323 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 324 t = tar.next() 325 self.assertEqual(t.name, name) 326 # Read to the end of fileobj and test if seeking back to the 327 # beginning works. 328 tar.getmembers() 329 self.assertEqual(tar.extractfile(t).read(), data, 330 "seek back did not work") 331 tar.close() 332 333 def test_fail_comp(self): 334 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 335 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 336 with open(tarname, "rb") as fobj: 337 self.assertRaises(tarfile.ReadError, tarfile.open, 338 fileobj=fobj, mode=self.mode) 339 340 def test_v7_dirtype(self): 341 # Test old style dirtype member (bug #1336623): 342 # Old V7 tars create directory members using an AREGTYPE 343 # header with a "/" appended to the filename field. 344 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 345 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 346 "v7 dirtype failed") 347 348 def test_xstar_type(self): 349 # The xstar format stores extra atime and ctime fields inside the 350 # space reserved for the prefix field. The prefix field must be 351 # ignored in this case, otherwise it will mess up the name. 352 try: 353 self.tar.getmember("misc/regtype-xstar") 354 except KeyError: 355 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 356 357 def test_check_members(self): 358 for tarinfo in self.tar: 359 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 360 "wrong mtime for %s" % tarinfo.name) 361 if not tarinfo.name.startswith("ustar/"): 362 continue 363 self.assertEqual(tarinfo.uname, "tarfile", 364 "wrong uname for %s" % tarinfo.name) 365 366 def test_find_members(self): 367 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 368 "could not find all members") 369 370 @unittest.skipUnless(hasattr(os, "link"), 371 "Missing hardlink implementation") 372 @support.skip_unless_symlink 373 def test_extract_hardlink(self): 374 # Test hardlink extraction (e.g. bug #857297). 375 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 376 tar.extract("ustar/regtype", TEMPDIR) 377 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype")) 378 379 tar.extract("ustar/lnktype", TEMPDIR) 380 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype")) 381 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 382 data = f.read() 383 self.assertEqual(md5sum(data), md5_regtype) 384 385 tar.extract("ustar/symtype", TEMPDIR) 386 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype")) 387 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 388 data = f.read() 389 self.assertEqual(md5sum(data), md5_regtype) 390 391 def test_extractall(self): 392 # Test if extractall() correctly restores directory permissions 393 # and times (see issue1735). 394 tar = tarfile.open(tarname, encoding="iso8859-1") 395 DIR = os.path.join(TEMPDIR, "extractall") 396 os.mkdir(DIR) 397 try: 398 directories = [t for t in tar if t.isdir()] 399 tar.extractall(DIR, directories) 400 for tarinfo in directories: 401 path = os.path.join(DIR, tarinfo.name) 402 if sys.platform != "win32": 403 # Win32 has no support for fine grained permissions. 404 self.assertEqual(tarinfo.mode & 0o777, 405 os.stat(path).st_mode & 0o777) 406 def format_mtime(mtime): 407 if isinstance(mtime, float): 408 return "{} ({})".format(mtime, mtime.hex()) 409 else: 410 return "{!r} (int)".format(mtime) 411 file_mtime = os.path.getmtime(path) 412 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 413 format_mtime(tarinfo.mtime), 414 format_mtime(file_mtime), 415 path) 416 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 417 finally: 418 tar.close() 419 shutil.rmtree(DIR) 420 421 def test_extract_directory(self): 422 dirtype = "ustar/dirtype" 423 DIR = os.path.join(TEMPDIR, "extractdir") 424 os.mkdir(DIR) 425 try: 426 with tarfile.open(tarname, encoding="iso8859-1") as tar: 427 tarinfo = tar.getmember(dirtype) 428 tar.extract(tarinfo, path=DIR) 429 extracted = os.path.join(DIR, dirtype) 430 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 431 if sys.platform != "win32": 432 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 433 finally: 434 shutil.rmtree(DIR) 435 436 def test_init_close_fobj(self): 437 # Issue #7341: Close the internal file object in the TarFile 438 # constructor in case of an error. For the test we rely on 439 # the fact that opening an empty file raises a ReadError. 440 empty = os.path.join(TEMPDIR, "empty") 441 with open(empty, "wb") as fobj: 442 fobj.write(b"") 443 444 try: 445 tar = object.__new__(tarfile.TarFile) 446 try: 447 tar.__init__(empty) 448 except tarfile.ReadError: 449 self.assertTrue(tar.fileobj.closed) 450 else: 451 self.fail("ReadError not raised") 452 finally: 453 support.unlink(empty) 454 455 def test_parallel_iteration(self): 456 # Issue #16601: Restarting iteration over tarfile continued 457 # from where it left off. 458 with tarfile.open(self.tarname) as tar: 459 for m1, m2 in zip(tar, tar): 460 self.assertEqual(m1.offset, m2.offset) 461 self.assertEqual(m1.get_info(), m2.get_info()) 462 463class MiscReadTest(MiscReadTestBase, unittest.TestCase): 464 test_fail_comp = None 465 466class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 467 pass 468 469class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 470 def test_no_name_argument(self): 471 self.skipTest("BZ2File have no name attribute") 472 473class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 474 def test_no_name_argument(self): 475 self.skipTest("LZMAFile have no name attribute") 476 477 478class StreamReadTest(CommonReadTest, unittest.TestCase): 479 480 prefix="r|" 481 482 def test_read_through(self): 483 # Issue #11224: A poorly designed _FileInFile.read() method 484 # caused seeking errors with stream tar files. 485 for tarinfo in self.tar: 486 if not tarinfo.isreg(): 487 continue 488 with self.tar.extractfile(tarinfo) as fobj: 489 while True: 490 try: 491 buf = fobj.read(512) 492 except tarfile.StreamError: 493 self.fail("simple read-through using " 494 "TarFile.extractfile() failed") 495 if not buf: 496 break 497 498 def test_fileobj_regular_file(self): 499 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 500 with self.tar.extractfile(tarinfo) as fobj: 501 data = fobj.read() 502 self.assertEqual(len(data), tarinfo.size, 503 "regular file extraction failed") 504 self.assertEqual(md5sum(data), md5_regtype, 505 "regular file extraction failed") 506 507 def test_provoke_stream_error(self): 508 tarinfos = self.tar.getmembers() 509 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 510 self.assertRaises(tarfile.StreamError, f.read) 511 512 def test_compare_members(self): 513 tar1 = tarfile.open(tarname, encoding="iso8859-1") 514 try: 515 tar2 = self.tar 516 517 while True: 518 t1 = tar1.next() 519 t2 = tar2.next() 520 if t1 is None: 521 break 522 self.assertIsNotNone(t2, "stream.next() failed.") 523 524 if t2.islnk() or t2.issym(): 525 with self.assertRaises(tarfile.StreamError): 526 tar2.extractfile(t2) 527 continue 528 529 v1 = tar1.extractfile(t1) 530 v2 = tar2.extractfile(t2) 531 if v1 is None: 532 continue 533 self.assertIsNotNone(v2, "stream.extractfile() failed") 534 self.assertEqual(v1.read(), v2.read(), 535 "stream extraction failed") 536 finally: 537 tar1.close() 538 539class GzipStreamReadTest(GzipTest, StreamReadTest): 540 pass 541 542class Bz2StreamReadTest(Bz2Test, StreamReadTest): 543 pass 544 545class LzmaStreamReadTest(LzmaTest, StreamReadTest): 546 pass 547 548 549class DetectReadTest(TarTest, unittest.TestCase): 550 def _testfunc_file(self, name, mode): 551 try: 552 tar = tarfile.open(name, mode) 553 except tarfile.ReadError as e: 554 self.fail() 555 else: 556 tar.close() 557 558 def _testfunc_fileobj(self, name, mode): 559 try: 560 with open(name, "rb") as f: 561 tar = tarfile.open(name, mode, fileobj=f) 562 except tarfile.ReadError as e: 563 self.fail() 564 else: 565 tar.close() 566 567 def _test_modes(self, testfunc): 568 if self.suffix: 569 with self.assertRaises(tarfile.ReadError): 570 tarfile.open(tarname, mode="r:" + self.suffix) 571 with self.assertRaises(tarfile.ReadError): 572 tarfile.open(tarname, mode="r|" + self.suffix) 573 with self.assertRaises(tarfile.ReadError): 574 tarfile.open(self.tarname, mode="r:") 575 with self.assertRaises(tarfile.ReadError): 576 tarfile.open(self.tarname, mode="r|") 577 testfunc(self.tarname, "r") 578 testfunc(self.tarname, "r:" + self.suffix) 579 testfunc(self.tarname, "r:*") 580 testfunc(self.tarname, "r|" + self.suffix) 581 testfunc(self.tarname, "r|*") 582 583 def test_detect_file(self): 584 self._test_modes(self._testfunc_file) 585 586 def test_detect_fileobj(self): 587 self._test_modes(self._testfunc_fileobj) 588 589class GzipDetectReadTest(GzipTest, DetectReadTest): 590 pass 591 592class Bz2DetectReadTest(Bz2Test, DetectReadTest): 593 def test_detect_stream_bz2(self): 594 # Originally, tarfile's stream detection looked for the string 595 # "BZh91" at the start of the file. This is incorrect because 596 # the '9' represents the blocksize (900kB). If the file was 597 # compressed using another blocksize autodetection fails. 598 with open(tarname, "rb") as fobj: 599 data = fobj.read() 600 601 # Compress with blocksize 100kB, the file starts with "BZh11". 602 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 603 fobj.write(data) 604 605 self._testfunc_file(tmpname, "r|*") 606 607class LzmaDetectReadTest(LzmaTest, DetectReadTest): 608 pass 609 610 611class MemberReadTest(ReadTest, unittest.TestCase): 612 613 def _test_member(self, tarinfo, chksum=None, **kwargs): 614 if chksum is not None: 615 with self.tar.extractfile(tarinfo) as f: 616 self.assertEqual(md5sum(f.read()), chksum, 617 "wrong md5sum for %s" % tarinfo.name) 618 619 kwargs["mtime"] = 0o7606136617 620 kwargs["uid"] = 1000 621 kwargs["gid"] = 100 622 if "old-v7" not in tarinfo.name: 623 # V7 tar can't handle alphabetic owners. 624 kwargs["uname"] = "tarfile" 625 kwargs["gname"] = "tarfile" 626 for k, v in kwargs.items(): 627 self.assertEqual(getattr(tarinfo, k), v, 628 "wrong value in %s field of %s" % (k, tarinfo.name)) 629 630 def test_find_regtype(self): 631 tarinfo = self.tar.getmember("ustar/regtype") 632 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 633 634 def test_find_conttype(self): 635 tarinfo = self.tar.getmember("ustar/conttype") 636 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 637 638 def test_find_dirtype(self): 639 tarinfo = self.tar.getmember("ustar/dirtype") 640 self._test_member(tarinfo, size=0) 641 642 def test_find_dirtype_with_size(self): 643 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 644 self._test_member(tarinfo, size=255) 645 646 def test_find_lnktype(self): 647 tarinfo = self.tar.getmember("ustar/lnktype") 648 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 649 650 def test_find_symtype(self): 651 tarinfo = self.tar.getmember("ustar/symtype") 652 self._test_member(tarinfo, size=0, linkname="regtype") 653 654 def test_find_blktype(self): 655 tarinfo = self.tar.getmember("ustar/blktype") 656 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 657 658 def test_find_chrtype(self): 659 tarinfo = self.tar.getmember("ustar/chrtype") 660 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 661 662 def test_find_fifotype(self): 663 tarinfo = self.tar.getmember("ustar/fifotype") 664 self._test_member(tarinfo, size=0) 665 666 def test_find_sparse(self): 667 tarinfo = self.tar.getmember("ustar/sparse") 668 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 669 670 def test_find_gnusparse(self): 671 tarinfo = self.tar.getmember("gnu/sparse") 672 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 673 674 def test_find_gnusparse_00(self): 675 tarinfo = self.tar.getmember("gnu/sparse-0.0") 676 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 677 678 def test_find_gnusparse_01(self): 679 tarinfo = self.tar.getmember("gnu/sparse-0.1") 680 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 681 682 def test_find_gnusparse_10(self): 683 tarinfo = self.tar.getmember("gnu/sparse-1.0") 684 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 685 686 def test_find_umlauts(self): 687 tarinfo = self.tar.getmember("ustar/umlauts-" 688 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 689 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 690 691 def test_find_ustar_longname(self): 692 name = "ustar/" + "12345/" * 39 + "1234567/longname" 693 self.assertIn(name, self.tar.getnames()) 694 695 def test_find_regtype_oldv7(self): 696 tarinfo = self.tar.getmember("misc/regtype-old-v7") 697 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 698 699 def test_find_pax_umlauts(self): 700 self.tar.close() 701 self.tar = tarfile.open(self.tarname, mode=self.mode, 702 encoding="iso8859-1") 703 tarinfo = self.tar.getmember("pax/umlauts-" 704 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 705 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 706 707 708class LongnameTest: 709 710 def test_read_longname(self): 711 # Test reading of longname (bug #1471427). 712 longname = self.subdir + "/" + "123/" * 125 + "longname" 713 try: 714 tarinfo = self.tar.getmember(longname) 715 except KeyError: 716 self.fail("longname not found") 717 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 718 "read longname as dirtype") 719 720 def test_read_longlink(self): 721 longname = self.subdir + "/" + "123/" * 125 + "longname" 722 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 723 try: 724 tarinfo = self.tar.getmember(longlink) 725 except KeyError: 726 self.fail("longlink not found") 727 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 728 729 def test_truncated_longname(self): 730 longname = self.subdir + "/" + "123/" * 125 + "longname" 731 tarinfo = self.tar.getmember(longname) 732 offset = tarinfo.offset 733 self.tar.fileobj.seek(offset) 734 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 735 with self.assertRaises(tarfile.ReadError): 736 tarfile.open(name="foo.tar", fileobj=fobj) 737 738 def test_header_offset(self): 739 # Test if the start offset of the TarInfo object includes 740 # the preceding extended header. 741 longname = self.subdir + "/" + "123/" * 125 + "longname" 742 offset = self.tar.getmember(longname).offset 743 with open(tarname, "rb") as fobj: 744 fobj.seek(offset) 745 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 746 "iso8859-1", "strict") 747 self.assertEqual(tarinfo.type, self.longnametype) 748 749 750class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 751 752 subdir = "gnu" 753 longnametype = tarfile.GNUTYPE_LONGNAME 754 755 # Since 3.2 tarfile is supposed to accurately restore sparse members and 756 # produce files with holes. This is what we actually want to test here. 757 # Unfortunately, not all platforms/filesystems support sparse files, and 758 # even on platforms that do it is non-trivial to make reliable assertions 759 # about holes in files. Therefore, we first do one basic test which works 760 # an all platforms, and after that a test that will work only on 761 # platforms/filesystems that prove to support sparse files. 762 def _test_sparse_file(self, name): 763 self.tar.extract(name, TEMPDIR) 764 filename = os.path.join(TEMPDIR, name) 765 with open(filename, "rb") as fobj: 766 data = fobj.read() 767 self.assertEqual(md5sum(data), md5_sparse, 768 "wrong md5sum for %s" % name) 769 770 if self._fs_supports_holes(): 771 s = os.stat(filename) 772 self.assertLess(s.st_blocks * 512, s.st_size) 773 774 def test_sparse_file_old(self): 775 self._test_sparse_file("gnu/sparse") 776 777 def test_sparse_file_00(self): 778 self._test_sparse_file("gnu/sparse-0.0") 779 780 def test_sparse_file_01(self): 781 self._test_sparse_file("gnu/sparse-0.1") 782 783 def test_sparse_file_10(self): 784 self._test_sparse_file("gnu/sparse-1.0") 785 786 @staticmethod 787 def _fs_supports_holes(): 788 # Return True if the platform knows the st_blocks stat attribute and 789 # uses st_blocks units of 512 bytes, and if the filesystem is able to 790 # store holes in files. 791 if sys.platform.startswith("linux"): 792 # Linux evidentially has 512 byte st_blocks units. 793 name = os.path.join(TEMPDIR, "sparse-test") 794 with open(name, "wb") as fobj: 795 fobj.seek(4096) 796 fobj.truncate() 797 s = os.stat(name) 798 os.remove(name) 799 return s.st_blocks == 0 800 else: 801 return False 802 803 804class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 805 806 subdir = "pax" 807 longnametype = tarfile.XHDTYPE 808 809 def test_pax_global_headers(self): 810 tar = tarfile.open(tarname, encoding="iso8859-1") 811 try: 812 tarinfo = tar.getmember("pax/regtype1") 813 self.assertEqual(tarinfo.uname, "foo") 814 self.assertEqual(tarinfo.gname, "bar") 815 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 816 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 817 818 tarinfo = tar.getmember("pax/regtype2") 819 self.assertEqual(tarinfo.uname, "") 820 self.assertEqual(tarinfo.gname, "bar") 821 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 822 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 823 824 tarinfo = tar.getmember("pax/regtype3") 825 self.assertEqual(tarinfo.uname, "tarfile") 826 self.assertEqual(tarinfo.gname, "tarfile") 827 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 828 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 829 finally: 830 tar.close() 831 832 def test_pax_number_fields(self): 833 # All following number fields are read from the pax header. 834 tar = tarfile.open(tarname, encoding="iso8859-1") 835 try: 836 tarinfo = tar.getmember("pax/regtype4") 837 self.assertEqual(tarinfo.size, 7011) 838 self.assertEqual(tarinfo.uid, 123) 839 self.assertEqual(tarinfo.gid, 123) 840 self.assertEqual(tarinfo.mtime, 1041808783.0) 841 self.assertEqual(type(tarinfo.mtime), float) 842 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 843 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 844 finally: 845 tar.close() 846 847 848class WriteTestBase(TarTest): 849 # Put all write tests in here that are supposed to be tested 850 # in all possible mode combinations. 851 852 def test_fileobj_no_close(self): 853 fobj = io.BytesIO() 854 tar = tarfile.open(fileobj=fobj, mode=self.mode) 855 tar.addfile(tarfile.TarInfo("foo")) 856 tar.close() 857 self.assertFalse(fobj.closed, "external fileobjs must never closed") 858 859 860class WriteTest(WriteTestBase, unittest.TestCase): 861 862 prefix = "w:" 863 864 def test_100_char_name(self): 865 # The name field in a tar header stores strings of at most 100 chars. 866 # If a string is shorter than 100 chars it has to be padded with '\0', 867 # which implies that a string of exactly 100 chars is stored without 868 # a trailing '\0'. 869 name = "0123456789" * 10 870 tar = tarfile.open(tmpname, self.mode) 871 try: 872 t = tarfile.TarInfo(name) 873 tar.addfile(t) 874 finally: 875 tar.close() 876 877 tar = tarfile.open(tmpname) 878 try: 879 self.assertEqual(tar.getnames()[0], name, 880 "failed to store 100 char filename") 881 finally: 882 tar.close() 883 884 def test_tar_size(self): 885 # Test for bug #1013882. 886 tar = tarfile.open(tmpname, self.mode) 887 try: 888 path = os.path.join(TEMPDIR, "file") 889 with open(path, "wb") as fobj: 890 fobj.write(b"aaa") 891 tar.add(path) 892 finally: 893 tar.close() 894 self.assertGreater(os.path.getsize(tmpname), 0, 895 "tarfile is empty") 896 897 # The test_*_size tests test for bug #1167128. 898 def test_file_size(self): 899 tar = tarfile.open(tmpname, self.mode) 900 try: 901 path = os.path.join(TEMPDIR, "file") 902 with open(path, "wb"): 903 pass 904 tarinfo = tar.gettarinfo(path) 905 self.assertEqual(tarinfo.size, 0) 906 907 with open(path, "wb") as fobj: 908 fobj.write(b"aaa") 909 tarinfo = tar.gettarinfo(path) 910 self.assertEqual(tarinfo.size, 3) 911 finally: 912 tar.close() 913 914 def test_directory_size(self): 915 path = os.path.join(TEMPDIR, "directory") 916 os.mkdir(path) 917 try: 918 tar = tarfile.open(tmpname, self.mode) 919 try: 920 tarinfo = tar.gettarinfo(path) 921 self.assertEqual(tarinfo.size, 0) 922 finally: 923 tar.close() 924 finally: 925 os.rmdir(path) 926 927 @unittest.skipUnless(hasattr(os, "link"), 928 "Missing hardlink implementation") 929 def test_link_size(self): 930 link = os.path.join(TEMPDIR, "link") 931 target = os.path.join(TEMPDIR, "link_target") 932 with open(target, "wb") as fobj: 933 fobj.write(b"aaa") 934 os.link(target, link) 935 try: 936 tar = tarfile.open(tmpname, self.mode) 937 try: 938 # Record the link target in the inodes list. 939 tar.gettarinfo(target) 940 tarinfo = tar.gettarinfo(link) 941 self.assertEqual(tarinfo.size, 0) 942 finally: 943 tar.close() 944 finally: 945 os.remove(target) 946 os.remove(link) 947 948 @support.skip_unless_symlink 949 def test_symlink_size(self): 950 path = os.path.join(TEMPDIR, "symlink") 951 os.symlink("link_target", path) 952 try: 953 tar = tarfile.open(tmpname, self.mode) 954 try: 955 tarinfo = tar.gettarinfo(path) 956 self.assertEqual(tarinfo.size, 0) 957 finally: 958 tar.close() 959 finally: 960 os.remove(path) 961 962 def test_add_self(self): 963 # Test for #1257255. 964 dstname = os.path.abspath(tmpname) 965 tar = tarfile.open(tmpname, self.mode) 966 try: 967 self.assertEqual(tar.name, dstname, 968 "archive name must be absolute") 969 tar.add(dstname) 970 self.assertEqual(tar.getnames(), [], 971 "added the archive to itself") 972 973 cwd = os.getcwd() 974 os.chdir(TEMPDIR) 975 tar.add(dstname) 976 os.chdir(cwd) 977 self.assertEqual(tar.getnames(), [], 978 "added the archive to itself") 979 finally: 980 tar.close() 981 982 def test_exclude(self): 983 tempdir = os.path.join(TEMPDIR, "exclude") 984 os.mkdir(tempdir) 985 try: 986 for name in ("foo", "bar", "baz"): 987 name = os.path.join(tempdir, name) 988 support.create_empty_file(name) 989 990 exclude = os.path.isfile 991 992 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 993 try: 994 with support.check_warnings(("use the filter argument", 995 DeprecationWarning)): 996 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 997 finally: 998 tar.close() 999 1000 tar = tarfile.open(tmpname, "r") 1001 try: 1002 self.assertEqual(len(tar.getmembers()), 1) 1003 self.assertEqual(tar.getnames()[0], "empty_dir") 1004 finally: 1005 tar.close() 1006 finally: 1007 shutil.rmtree(tempdir) 1008 1009 def test_filter(self): 1010 tempdir = os.path.join(TEMPDIR, "filter") 1011 os.mkdir(tempdir) 1012 try: 1013 for name in ("foo", "bar", "baz"): 1014 name = os.path.join(tempdir, name) 1015 support.create_empty_file(name) 1016 1017 def filter(tarinfo): 1018 if os.path.basename(tarinfo.name) == "bar": 1019 return 1020 tarinfo.uid = 123 1021 tarinfo.uname = "foo" 1022 return tarinfo 1023 1024 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1025 try: 1026 tar.add(tempdir, arcname="empty_dir", filter=filter) 1027 finally: 1028 tar.close() 1029 1030 # Verify that filter is a keyword-only argument 1031 with self.assertRaises(TypeError): 1032 tar.add(tempdir, "empty_dir", True, None, filter) 1033 1034 tar = tarfile.open(tmpname, "r") 1035 try: 1036 for tarinfo in tar: 1037 self.assertEqual(tarinfo.uid, 123) 1038 self.assertEqual(tarinfo.uname, "foo") 1039 self.assertEqual(len(tar.getmembers()), 3) 1040 finally: 1041 tar.close() 1042 finally: 1043 shutil.rmtree(tempdir) 1044 1045 # Guarantee that stored pathnames are not modified. Don't 1046 # remove ./ or ../ or double slashes. Still make absolute 1047 # pathnames relative. 1048 # For details see bug #6054. 1049 def _test_pathname(self, path, cmp_path=None, dir=False): 1050 # Create a tarfile with an empty member named path 1051 # and compare the stored name with the original. 1052 foo = os.path.join(TEMPDIR, "foo") 1053 if not dir: 1054 support.create_empty_file(foo) 1055 else: 1056 os.mkdir(foo) 1057 1058 tar = tarfile.open(tmpname, self.mode) 1059 try: 1060 tar.add(foo, arcname=path) 1061 finally: 1062 tar.close() 1063 1064 tar = tarfile.open(tmpname, "r") 1065 try: 1066 t = tar.next() 1067 finally: 1068 tar.close() 1069 1070 if not dir: 1071 os.remove(foo) 1072 else: 1073 os.rmdir(foo) 1074 1075 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1076 1077 1078 @support.skip_unless_symlink 1079 def test_extractall_symlinks(self): 1080 # Test if extractall works properly when tarfile contains symlinks 1081 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1082 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1083 os.mkdir(tempdir) 1084 try: 1085 source_file = os.path.join(tempdir,'source') 1086 target_file = os.path.join(tempdir,'symlink') 1087 with open(source_file,'w') as f: 1088 f.write('something\n') 1089 os.symlink(source_file, target_file) 1090 tar = tarfile.open(temparchive,'w') 1091 tar.add(source_file) 1092 tar.add(target_file) 1093 tar.close() 1094 # Let's extract it to the location which contains the symlink 1095 tar = tarfile.open(temparchive,'r') 1096 # this should not raise OSError: [Errno 17] File exists 1097 try: 1098 tar.extractall(path=tempdir) 1099 except OSError: 1100 self.fail("extractall failed with symlinked files") 1101 finally: 1102 tar.close() 1103 finally: 1104 os.unlink(temparchive) 1105 shutil.rmtree(tempdir) 1106 1107 def test_pathnames(self): 1108 self._test_pathname("foo") 1109 self._test_pathname(os.path.join("foo", ".", "bar")) 1110 self._test_pathname(os.path.join("foo", "..", "bar")) 1111 self._test_pathname(os.path.join(".", "foo")) 1112 self._test_pathname(os.path.join(".", "foo", ".")) 1113 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1114 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1115 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1116 self._test_pathname(os.path.join("..", "foo")) 1117 self._test_pathname(os.path.join("..", "foo", "..")) 1118 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1119 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1120 1121 self._test_pathname("foo" + os.sep + os.sep + "bar") 1122 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1123 1124 def test_abs_pathnames(self): 1125 if sys.platform == "win32": 1126 self._test_pathname("C:\\foo", "foo") 1127 else: 1128 self._test_pathname("/foo", "foo") 1129 self._test_pathname("///foo", "foo") 1130 1131 def test_cwd(self): 1132 # Test adding the current working directory. 1133 cwd = os.getcwd() 1134 os.chdir(TEMPDIR) 1135 try: 1136 tar = tarfile.open(tmpname, self.mode) 1137 try: 1138 tar.add(".") 1139 finally: 1140 tar.close() 1141 1142 tar = tarfile.open(tmpname, "r") 1143 try: 1144 for t in tar: 1145 if t.name != ".": 1146 self.assertTrue(t.name.startswith("./"), t.name) 1147 finally: 1148 tar.close() 1149 finally: 1150 os.chdir(cwd) 1151 1152class GzipWriteTest(GzipTest, WriteTest): 1153 pass 1154 1155class Bz2WriteTest(Bz2Test, WriteTest): 1156 pass 1157 1158class LzmaWriteTest(LzmaTest, WriteTest): 1159 pass 1160 1161 1162class StreamWriteTest(WriteTestBase, unittest.TestCase): 1163 1164 prefix = "w|" 1165 decompressor = None 1166 1167 def test_stream_padding(self): 1168 # Test for bug #1543303. 1169 tar = tarfile.open(tmpname, self.mode) 1170 tar.close() 1171 if self.decompressor: 1172 dec = self.decompressor() 1173 with open(tmpname, "rb") as fobj: 1174 data = fobj.read() 1175 data = dec.decompress(data) 1176 self.assertFalse(dec.unused_data, "found trailing data") 1177 else: 1178 with self.open(tmpname) as fobj: 1179 data = fobj.read() 1180 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1181 "incorrect zero padding") 1182 1183 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1184 "Missing umask implementation") 1185 def test_file_mode(self): 1186 # Test for issue #8464: Create files with correct 1187 # permissions. 1188 if os.path.exists(tmpname): 1189 os.remove(tmpname) 1190 1191 original_umask = os.umask(0o022) 1192 try: 1193 tar = tarfile.open(tmpname, self.mode) 1194 tar.close() 1195 mode = os.stat(tmpname).st_mode & 0o777 1196 self.assertEqual(mode, 0o644, "wrong file permissions") 1197 finally: 1198 os.umask(original_umask) 1199 1200class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1201 pass 1202 1203class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1204 decompressor = bz2.BZ2Decompressor if bz2 else None 1205 1206class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1207 decompressor = lzma.LZMADecompressor if lzma else None 1208 1209 1210class GNUWriteTest(unittest.TestCase): 1211 # This testcase checks for correct creation of GNU Longname 1212 # and Longlink extended headers (cp. bug #812325). 1213 1214 def _length(self, s): 1215 blocks = len(s) // 512 + 1 1216 return blocks * 512 1217 1218 def _calc_size(self, name, link=None): 1219 # Initial tar header 1220 count = 512 1221 1222 if len(name) > tarfile.LENGTH_NAME: 1223 # GNU longname extended header + longname 1224 count += 512 1225 count += self._length(name) 1226 if link is not None and len(link) > tarfile.LENGTH_LINK: 1227 # GNU longlink extended header + longlink 1228 count += 512 1229 count += self._length(link) 1230 return count 1231 1232 def _test(self, name, link=None): 1233 tarinfo = tarfile.TarInfo(name) 1234 if link: 1235 tarinfo.linkname = link 1236 tarinfo.type = tarfile.LNKTYPE 1237 1238 tar = tarfile.open(tmpname, "w") 1239 try: 1240 tar.format = tarfile.GNU_FORMAT 1241 tar.addfile(tarinfo) 1242 1243 v1 = self._calc_size(name, link) 1244 v2 = tar.offset 1245 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1246 finally: 1247 tar.close() 1248 1249 tar = tarfile.open(tmpname) 1250 try: 1251 member = tar.next() 1252 self.assertIsNotNone(member, 1253 "unable to read longname member") 1254 self.assertEqual(tarinfo.name, member.name, 1255 "unable to read longname member") 1256 self.assertEqual(tarinfo.linkname, member.linkname, 1257 "unable to read longname member") 1258 finally: 1259 tar.close() 1260 1261 def test_longname_1023(self): 1262 self._test(("longnam/" * 127) + "longnam") 1263 1264 def test_longname_1024(self): 1265 self._test(("longnam/" * 127) + "longname") 1266 1267 def test_longname_1025(self): 1268 self._test(("longnam/" * 127) + "longname_") 1269 1270 def test_longlink_1023(self): 1271 self._test("name", ("longlnk/" * 127) + "longlnk") 1272 1273 def test_longlink_1024(self): 1274 self._test("name", ("longlnk/" * 127) + "longlink") 1275 1276 def test_longlink_1025(self): 1277 self._test("name", ("longlnk/" * 127) + "longlink_") 1278 1279 def test_longnamelink_1023(self): 1280 self._test(("longnam/" * 127) + "longnam", 1281 ("longlnk/" * 127) + "longlnk") 1282 1283 def test_longnamelink_1024(self): 1284 self._test(("longnam/" * 127) + "longname", 1285 ("longlnk/" * 127) + "longlink") 1286 1287 def test_longnamelink_1025(self): 1288 self._test(("longnam/" * 127) + "longname_", 1289 ("longlnk/" * 127) + "longlink_") 1290 1291 1292@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1293class HardlinkTest(unittest.TestCase): 1294 # Test the creation of LNKTYPE (hardlink) members in an archive. 1295 1296 def setUp(self): 1297 self.foo = os.path.join(TEMPDIR, "foo") 1298 self.bar = os.path.join(TEMPDIR, "bar") 1299 1300 with open(self.foo, "wb") as fobj: 1301 fobj.write(b"foo") 1302 1303 os.link(self.foo, self.bar) 1304 1305 self.tar = tarfile.open(tmpname, "w") 1306 self.tar.add(self.foo) 1307 1308 def tearDown(self): 1309 self.tar.close() 1310 support.unlink(self.foo) 1311 support.unlink(self.bar) 1312 1313 def test_add_twice(self): 1314 # The same name will be added as a REGTYPE every 1315 # time regardless of st_nlink. 1316 tarinfo = self.tar.gettarinfo(self.foo) 1317 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1318 "add file as regular failed") 1319 1320 def test_add_hardlink(self): 1321 tarinfo = self.tar.gettarinfo(self.bar) 1322 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1323 "add file as hardlink failed") 1324 1325 def test_dereference_hardlink(self): 1326 self.tar.dereference = True 1327 tarinfo = self.tar.gettarinfo(self.bar) 1328 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1329 "dereferencing hardlink failed") 1330 1331 1332class PaxWriteTest(GNUWriteTest): 1333 1334 def _test(self, name, link=None): 1335 # See GNUWriteTest. 1336 tarinfo = tarfile.TarInfo(name) 1337 if link: 1338 tarinfo.linkname = link 1339 tarinfo.type = tarfile.LNKTYPE 1340 1341 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1342 try: 1343 tar.addfile(tarinfo) 1344 finally: 1345 tar.close() 1346 1347 tar = tarfile.open(tmpname) 1348 try: 1349 if link: 1350 l = tar.getmembers()[0].linkname 1351 self.assertEqual(link, l, "PAX longlink creation failed") 1352 else: 1353 n = tar.getmembers()[0].name 1354 self.assertEqual(name, n, "PAX longname creation failed") 1355 finally: 1356 tar.close() 1357 1358 def test_pax_global_header(self): 1359 pax_headers = { 1360 "foo": "bar", 1361 "uid": "0", 1362 "mtime": "1.23", 1363 "test": "\xe4\xf6\xfc", 1364 "\xe4\xf6\xfc": "test"} 1365 1366 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1367 pax_headers=pax_headers) 1368 try: 1369 tar.addfile(tarfile.TarInfo("test")) 1370 finally: 1371 tar.close() 1372 1373 # Test if the global header was written correctly. 1374 tar = tarfile.open(tmpname, encoding="iso8859-1") 1375 try: 1376 self.assertEqual(tar.pax_headers, pax_headers) 1377 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1378 # Test if all the fields are strings. 1379 for key, val in tar.pax_headers.items(): 1380 self.assertIsNot(type(key), bytes) 1381 self.assertIsNot(type(val), bytes) 1382 if key in tarfile.PAX_NUMBER_FIELDS: 1383 try: 1384 tarfile.PAX_NUMBER_FIELDS[key](val) 1385 except (TypeError, ValueError): 1386 self.fail("unable to convert pax header field") 1387 finally: 1388 tar.close() 1389 1390 def test_pax_extended_header(self): 1391 # The fields from the pax header have priority over the 1392 # TarInfo. 1393 pax_headers = {"path": "foo", "uid": "123"} 1394 1395 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1396 encoding="iso8859-1") 1397 try: 1398 t = tarfile.TarInfo() 1399 t.name = "\xe4\xf6\xfc" # non-ASCII 1400 t.uid = 8**8 # too large 1401 t.pax_headers = pax_headers 1402 tar.addfile(t) 1403 finally: 1404 tar.close() 1405 1406 tar = tarfile.open(tmpname, encoding="iso8859-1") 1407 try: 1408 t = tar.getmembers()[0] 1409 self.assertEqual(t.pax_headers, pax_headers) 1410 self.assertEqual(t.name, "foo") 1411 self.assertEqual(t.uid, 123) 1412 finally: 1413 tar.close() 1414 1415 1416class UstarUnicodeTest(unittest.TestCase): 1417 1418 format = tarfile.USTAR_FORMAT 1419 1420 def test_iso8859_1_filename(self): 1421 self._test_unicode_filename("iso8859-1") 1422 1423 def test_utf7_filename(self): 1424 self._test_unicode_filename("utf7") 1425 1426 def test_utf8_filename(self): 1427 self._test_unicode_filename("utf-8") 1428 1429 def _test_unicode_filename(self, encoding): 1430 tar = tarfile.open(tmpname, "w", format=self.format, 1431 encoding=encoding, errors="strict") 1432 try: 1433 name = "\xe4\xf6\xfc" 1434 tar.addfile(tarfile.TarInfo(name)) 1435 finally: 1436 tar.close() 1437 1438 tar = tarfile.open(tmpname, encoding=encoding) 1439 try: 1440 self.assertEqual(tar.getmembers()[0].name, name) 1441 finally: 1442 tar.close() 1443 1444 def test_unicode_filename_error(self): 1445 tar = tarfile.open(tmpname, "w", format=self.format, 1446 encoding="ascii", errors="strict") 1447 try: 1448 tarinfo = tarfile.TarInfo() 1449 1450 tarinfo.name = "\xe4\xf6\xfc" 1451 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1452 1453 tarinfo.name = "foo" 1454 tarinfo.uname = "\xe4\xf6\xfc" 1455 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1456 finally: 1457 tar.close() 1458 1459 def test_unicode_argument(self): 1460 tar = tarfile.open(tarname, "r", 1461 encoding="iso8859-1", errors="strict") 1462 try: 1463 for t in tar: 1464 self.assertIs(type(t.name), str) 1465 self.assertIs(type(t.linkname), str) 1466 self.assertIs(type(t.uname), str) 1467 self.assertIs(type(t.gname), str) 1468 finally: 1469 tar.close() 1470 1471 def test_uname_unicode(self): 1472 t = tarfile.TarInfo("foo") 1473 t.uname = "\xe4\xf6\xfc" 1474 t.gname = "\xe4\xf6\xfc" 1475 1476 tar = tarfile.open(tmpname, mode="w", format=self.format, 1477 encoding="iso8859-1") 1478 try: 1479 tar.addfile(t) 1480 finally: 1481 tar.close() 1482 1483 tar = tarfile.open(tmpname, encoding="iso8859-1") 1484 try: 1485 t = tar.getmember("foo") 1486 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1487 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1488 1489 if self.format != tarfile.PAX_FORMAT: 1490 tar.close() 1491 tar = tarfile.open(tmpname, encoding="ascii") 1492 t = tar.getmember("foo") 1493 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1494 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1495 finally: 1496 tar.close() 1497 1498 1499class GNUUnicodeTest(UstarUnicodeTest): 1500 1501 format = tarfile.GNU_FORMAT 1502 1503 def test_bad_pax_header(self): 1504 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1505 # without a hdrcharset=BINARY header. 1506 for encoding, name in ( 1507 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1508 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1509 with tarfile.open(tarname, encoding=encoding, 1510 errors="surrogateescape") as tar: 1511 try: 1512 t = tar.getmember(name) 1513 except KeyError: 1514 self.fail("unable to read bad GNU tar pax header") 1515 1516 1517class PAXUnicodeTest(UstarUnicodeTest): 1518 1519 format = tarfile.PAX_FORMAT 1520 1521 # PAX_FORMAT ignores encoding in write mode. 1522 test_unicode_filename_error = None 1523 1524 def test_binary_header(self): 1525 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1526 for encoding, name in ( 1527 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1528 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1529 with tarfile.open(tarname, encoding=encoding, 1530 errors="surrogateescape") as tar: 1531 try: 1532 t = tar.getmember(name) 1533 except KeyError: 1534 self.fail("unable to read POSIX.1-2008 binary header") 1535 1536 1537class AppendTestBase: 1538 # Test append mode (cp. patch #1652681). 1539 1540 def setUp(self): 1541 self.tarname = tmpname 1542 if os.path.exists(self.tarname): 1543 os.remove(self.tarname) 1544 1545 def _create_testtar(self, mode="w:"): 1546 with tarfile.open(tarname, encoding="iso8859-1") as src: 1547 t = src.getmember("ustar/regtype") 1548 t.name = "foo" 1549 with src.extractfile(t) as f: 1550 with tarfile.open(self.tarname, mode) as tar: 1551 tar.addfile(t, f) 1552 1553 def test_append_compressed(self): 1554 self._create_testtar("w:" + self.suffix) 1555 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1556 1557class AppendTest(AppendTestBase, unittest.TestCase): 1558 test_append_compressed = None 1559 1560 def _add_testfile(self, fileobj=None): 1561 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1562 tar.addfile(tarfile.TarInfo("bar")) 1563 1564 def _test(self, names=["bar"], fileobj=None): 1565 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1566 self.assertEqual(tar.getnames(), names) 1567 1568 def test_non_existing(self): 1569 self._add_testfile() 1570 self._test() 1571 1572 def test_empty(self): 1573 tarfile.open(self.tarname, "w:").close() 1574 self._add_testfile() 1575 self._test() 1576 1577 def test_empty_fileobj(self): 1578 fobj = io.BytesIO(b"\0" * 1024) 1579 self._add_testfile(fobj) 1580 fobj.seek(0) 1581 self._test(fileobj=fobj) 1582 1583 def test_fileobj(self): 1584 self._create_testtar() 1585 with open(self.tarname, "rb") as fobj: 1586 data = fobj.read() 1587 fobj = io.BytesIO(data) 1588 self._add_testfile(fobj) 1589 fobj.seek(0) 1590 self._test(names=["foo", "bar"], fileobj=fobj) 1591 1592 def test_existing(self): 1593 self._create_testtar() 1594 self._add_testfile() 1595 self._test(names=["foo", "bar"]) 1596 1597 # Append mode is supposed to fail if the tarfile to append to 1598 # does not end with a zero block. 1599 def _test_error(self, data): 1600 with open(self.tarname, "wb") as fobj: 1601 fobj.write(data) 1602 self.assertRaises(tarfile.ReadError, self._add_testfile) 1603 1604 def test_null(self): 1605 self._test_error(b"") 1606 1607 def test_incomplete(self): 1608 self._test_error(b"\0" * 13) 1609 1610 def test_premature_eof(self): 1611 data = tarfile.TarInfo("foo").tobuf() 1612 self._test_error(data) 1613 1614 def test_trailing_garbage(self): 1615 data = tarfile.TarInfo("foo").tobuf() 1616 self._test_error(data + b"\0" * 13) 1617 1618 def test_invalid(self): 1619 self._test_error(b"a" * 512) 1620 1621class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 1622 pass 1623 1624class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 1625 pass 1626 1627class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 1628 pass 1629 1630 1631class LimitsTest(unittest.TestCase): 1632 1633 def test_ustar_limits(self): 1634 # 100 char name 1635 tarinfo = tarfile.TarInfo("0123456789" * 10) 1636 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1637 1638 # 101 char name that cannot be stored 1639 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1640 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1641 1642 # 256 char name with a slash at pos 156 1643 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1644 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1645 1646 # 256 char name that cannot be stored 1647 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1648 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1649 1650 # 512 char name 1651 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1652 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1653 1654 # 512 char linkname 1655 tarinfo = tarfile.TarInfo("longlink") 1656 tarinfo.linkname = "123/" * 126 + "longname" 1657 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1658 1659 # uid > 8 digits 1660 tarinfo = tarfile.TarInfo("name") 1661 tarinfo.uid = 0o10000000 1662 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1663 1664 def test_gnu_limits(self): 1665 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1666 tarinfo.tobuf(tarfile.GNU_FORMAT) 1667 1668 tarinfo = tarfile.TarInfo("longlink") 1669 tarinfo.linkname = "123/" * 126 + "longname" 1670 tarinfo.tobuf(tarfile.GNU_FORMAT) 1671 1672 # uid >= 256 ** 7 1673 tarinfo = tarfile.TarInfo("name") 1674 tarinfo.uid = 0o4000000000000000000 1675 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1676 1677 def test_pax_limits(self): 1678 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1679 tarinfo.tobuf(tarfile.PAX_FORMAT) 1680 1681 tarinfo = tarfile.TarInfo("longlink") 1682 tarinfo.linkname = "123/" * 126 + "longname" 1683 tarinfo.tobuf(tarfile.PAX_FORMAT) 1684 1685 tarinfo = tarfile.TarInfo("name") 1686 tarinfo.uid = 0o4000000000000000000 1687 tarinfo.tobuf(tarfile.PAX_FORMAT) 1688 1689 1690class MiscTest(unittest.TestCase): 1691 1692 def test_char_fields(self): 1693 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 1694 b"foo\0\0\0\0\0") 1695 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 1696 b"foo") 1697 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 1698 "foo") 1699 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 1700 "foo") 1701 1702 def test_read_number_fields(self): 1703 # Issue 13158: Test if GNU tar specific base-256 number fields 1704 # are decoded correctly. 1705 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 1706 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 1707 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 1708 0o10000000) 1709 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 1710 0xffffffff) 1711 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 1712 -1) 1713 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 1714 -100) 1715 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 1716 -0x100000000000000) 1717 1718 def test_write_number_fields(self): 1719 self.assertEqual(tarfile.itn(1), b"0000001\x00") 1720 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 1721 self.assertEqual(tarfile.itn(0o10000000), 1722 b"\x80\x00\x00\x00\x00\x20\x00\x00") 1723 self.assertEqual(tarfile.itn(0xffffffff), 1724 b"\x80\x00\x00\x00\xff\xff\xff\xff") 1725 self.assertEqual(tarfile.itn(-1), 1726 b"\xff\xff\xff\xff\xff\xff\xff\xff") 1727 self.assertEqual(tarfile.itn(-100), 1728 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 1729 self.assertEqual(tarfile.itn(-0x100000000000000), 1730 b"\xff\x00\x00\x00\x00\x00\x00\x00") 1731 1732 def test_number_field_limits(self): 1733 with self.assertRaises(ValueError): 1734 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 1735 with self.assertRaises(ValueError): 1736 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 1737 with self.assertRaises(ValueError): 1738 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 1739 with self.assertRaises(ValueError): 1740 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 1741 1742 1743class ContextManagerTest(unittest.TestCase): 1744 1745 def test_basic(self): 1746 with tarfile.open(tarname) as tar: 1747 self.assertFalse(tar.closed, "closed inside runtime context") 1748 self.assertTrue(tar.closed, "context manager failed") 1749 1750 def test_closed(self): 1751 # The __enter__() method is supposed to raise IOError 1752 # if the TarFile object is already closed. 1753 tar = tarfile.open(tarname) 1754 tar.close() 1755 with self.assertRaises(IOError): 1756 with tar: 1757 pass 1758 1759 def test_exception(self): 1760 # Test if the IOError exception is passed through properly. 1761 with self.assertRaises(Exception) as exc: 1762 with tarfile.open(tarname) as tar: 1763 raise IOError 1764 self.assertIsInstance(exc.exception, IOError, 1765 "wrong exception raised in context manager") 1766 self.assertTrue(tar.closed, "context manager failed") 1767 1768 def test_no_eof(self): 1769 # __exit__() must not write end-of-archive blocks if an 1770 # exception was raised. 1771 try: 1772 with tarfile.open(tmpname, "w") as tar: 1773 raise Exception 1774 except: 1775 pass 1776 self.assertEqual(os.path.getsize(tmpname), 0, 1777 "context manager wrote an end-of-archive block") 1778 self.assertTrue(tar.closed, "context manager failed") 1779 1780 def test_eof(self): 1781 # __exit__() must write end-of-archive blocks, i.e. call 1782 # TarFile.close() if there was no error. 1783 with tarfile.open(tmpname, "w"): 1784 pass 1785 self.assertNotEqual(os.path.getsize(tmpname), 0, 1786 "context manager wrote no end-of-archive block") 1787 1788 def test_fileobj(self): 1789 # Test that __exit__() did not close the external file 1790 # object. 1791 with open(tmpname, "wb") as fobj: 1792 try: 1793 with tarfile.open(fileobj=fobj, mode="w") as tar: 1794 raise Exception 1795 except: 1796 pass 1797 self.assertFalse(fobj.closed, "external file object was closed") 1798 self.assertTrue(tar.closed, "context manager failed") 1799 1800 1801@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 1802class LinkEmulationTest(ReadTest, unittest.TestCase): 1803 1804 # Test for issue #8741 regression. On platforms that do not support 1805 # symbolic or hard links tarfile tries to extract these types of members 1806 # as the regular files they point to. 1807 def _test_link_extraction(self, name): 1808 self.tar.extract(name, TEMPDIR) 1809 with open(os.path.join(TEMPDIR, name), "rb") as f: 1810 data = f.read() 1811 self.assertEqual(md5sum(data), md5_regtype) 1812 1813 # See issues #1578269, #8879, and #17689 for some history on these skips 1814 @unittest.skipIf(hasattr(os.path, "islink"), 1815 "Skip emulation - has os.path.islink but not os.link") 1816 def test_hardlink_extraction1(self): 1817 self._test_link_extraction("ustar/lnktype") 1818 1819 @unittest.skipIf(hasattr(os.path, "islink"), 1820 "Skip emulation - has os.path.islink but not os.link") 1821 def test_hardlink_extraction2(self): 1822 self._test_link_extraction("./ustar/linktest2/lnktype") 1823 1824 @unittest.skipIf(hasattr(os, "symlink"), 1825 "Skip emulation if symlink exists") 1826 def test_symlink_extraction1(self): 1827 self._test_link_extraction("ustar/symtype") 1828 1829 @unittest.skipIf(hasattr(os, "symlink"), 1830 "Skip emulation if symlink exists") 1831 def test_symlink_extraction2(self): 1832 self._test_link_extraction("./ustar/linktest2/symtype") 1833 1834 1835class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 1836 # Issue5068: The _BZ2Proxy.read() method loops forever 1837 # on an empty or partial bzipped file. 1838 1839 def _test_partial_input(self, mode): 1840 class MyBytesIO(io.BytesIO): 1841 hit_eof = False 1842 def read(self, n): 1843 if self.hit_eof: 1844 raise AssertionError("infinite loop detected in " 1845 "tarfile.open()") 1846 self.hit_eof = self.tell() == len(self.getvalue()) 1847 return super(MyBytesIO, self).read(n) 1848 def seek(self, *args): 1849 self.hit_eof = False 1850 return super(MyBytesIO, self).seek(*args) 1851 1852 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1853 for x in range(len(data) + 1): 1854 try: 1855 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 1856 except tarfile.ReadError: 1857 pass # we have no interest in ReadErrors 1858 1859 def test_partial_input(self): 1860 self._test_partial_input("r") 1861 1862 def test_partial_input_bz2(self): 1863 self._test_partial_input("r:bz2") 1864 1865 1866def setUpModule(): 1867 support.unlink(TEMPDIR) 1868 os.makedirs(TEMPDIR) 1869 1870 with open(tarname, "rb") as fobj: 1871 data = fobj.read() 1872 1873 # Create compressed tarfiles. 1874 for c in GzipTest, Bz2Test, LzmaTest: 1875 if c.open: 1876 support.unlink(c.tarname) 1877 with c.open(c.tarname, "wb") as tar: 1878 tar.write(data) 1879 1880def tearDownModule(): 1881 if os.path.exists(TEMPDIR): 1882 shutil.rmtree(TEMPDIR) 1883 1884if __name__ == "__main__": 1885 unittest.main() 1886