test_tarfile.py revision 0a9dd2f11db2a52fbc2cabaf0755aa33ad9372e5
1import sys 2import os 3import io 4import shutil 5import io 6from hashlib import md5 7import errno 8 9import unittest 10import tarfile 11 12from test import support 13 14# Check for our compression modules. 15try: 16 import gzip 17 gzip.GzipFile 18except (ImportError, AttributeError): 19 gzip = None 20try: 21 import bz2 22except ImportError: 23 bz2 = None 24try: 25 import lzma 26except ImportError: 27 lzma = None 28 29def md5sum(data): 30 return md5(data).hexdigest() 31 32TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 33tarname = support.findfile("testtar.tar") 34gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 35bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 36xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 37tmpname = os.path.join(TEMPDIR, "tmp.tar") 38 39md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 40md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 41 42 43class ReadTest(unittest.TestCase): 44 45 tarname = tarname 46 mode = "r:" 47 48 def setUp(self): 49 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 50 51 def tearDown(self): 52 self.tar.close() 53 54 55class UstarReadTest(ReadTest): 56 57 def test_fileobj_regular_file(self): 58 tarinfo = self.tar.getmember("ustar/regtype") 59 fobj = self.tar.extractfile(tarinfo) 60 try: 61 data = fobj.read() 62 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 63 "regular file extraction failed") 64 finally: 65 fobj.close() 66 67 def test_fileobj_readlines(self): 68 self.tar.extract("ustar/regtype", TEMPDIR) 69 tarinfo = self.tar.getmember("ustar/regtype") 70 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 71 lines1 = fobj1.readlines() 72 73 fobj = self.tar.extractfile(tarinfo) 74 try: 75 fobj2 = io.TextIOWrapper(fobj) 76 lines2 = fobj2.readlines() 77 self.assertTrue(lines1 == lines2, 78 "fileobj.readlines() failed") 79 self.assertTrue(len(lines2) == 114, 80 "fileobj.readlines() failed") 81 self.assertTrue(lines2[83] == 82 "I will gladly admit that Python is not the fastest running scripting language.\n", 83 "fileobj.readlines() failed") 84 finally: 85 fobj.close() 86 87 def test_fileobj_iter(self): 88 self.tar.extract("ustar/regtype", TEMPDIR) 89 tarinfo = self.tar.getmember("ustar/regtype") 90 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 91 lines1 = fobj1.readlines() 92 fobj2 = self.tar.extractfile(tarinfo) 93 try: 94 lines2 = list(io.TextIOWrapper(fobj2)) 95 self.assertTrue(lines1 == lines2, 96 "fileobj.__iter__() failed") 97 finally: 98 fobj2.close() 99 100 def test_fileobj_seek(self): 101 self.tar.extract("ustar/regtype", TEMPDIR) 102 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 103 data = fobj.read() 104 105 tarinfo = self.tar.getmember("ustar/regtype") 106 fobj = self.tar.extractfile(tarinfo) 107 108 text = fobj.read() 109 fobj.seek(0) 110 self.assertEqual(0, fobj.tell(), 111 "seek() to file's start failed") 112 fobj.seek(2048, 0) 113 self.assertEqual(2048, fobj.tell(), 114 "seek() to absolute position failed") 115 fobj.seek(-1024, 1) 116 self.assertEqual(1024, fobj.tell(), 117 "seek() to negative relative position failed") 118 fobj.seek(1024, 1) 119 self.assertEqual(2048, fobj.tell(), 120 "seek() to positive relative position failed") 121 s = fobj.read(10) 122 self.assertTrue(s == data[2048:2058], 123 "read() after seek failed") 124 fobj.seek(0, 2) 125 self.assertEqual(tarinfo.size, fobj.tell(), 126 "seek() to file's end failed") 127 self.assertTrue(fobj.read() == b"", 128 "read() at file's end did not return empty string") 129 fobj.seek(-tarinfo.size, 2) 130 self.assertEqual(0, fobj.tell(), 131 "relative seek() to file's end failed") 132 fobj.seek(512) 133 s1 = fobj.readlines() 134 fobj.seek(512) 135 s2 = fobj.readlines() 136 self.assertTrue(s1 == s2, 137 "readlines() after seek failed") 138 fobj.seek(0) 139 self.assertEqual(len(fobj.readline()), fobj.tell(), 140 "tell() after readline() failed") 141 fobj.seek(512) 142 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 143 "tell() after seek() and readline() failed") 144 fobj.seek(0) 145 line = fobj.readline() 146 self.assertEqual(fobj.read(), data[len(line):], 147 "read() after readline() failed") 148 fobj.close() 149 150 # Test if symbolic and hard links are resolved by extractfile(). The 151 # test link members each point to a regular member whose data is 152 # supposed to be exported. 153 def _test_fileobj_link(self, lnktype, regtype): 154 a = self.tar.extractfile(lnktype) 155 b = self.tar.extractfile(regtype) 156 try: 157 self.assertEqual(a.name, b.name) 158 finally: 159 a.close() 160 b.close() 161 162 def test_fileobj_link1(self): 163 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 164 165 def test_fileobj_link2(self): 166 self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") 167 168 def test_fileobj_symlink1(self): 169 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 170 171 def test_fileobj_symlink2(self): 172 self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") 173 174 175class CommonReadTest(ReadTest): 176 177 def test_empty_tarfile(self): 178 # Test for issue6123: Allow opening empty archives. 179 # This test checks if tarfile.open() is able to open an empty tar 180 # archive successfully. Note that an empty tar archive is not the 181 # same as an empty file! 182 with tarfile.open(tmpname, self.mode.replace("r", "w")): 183 pass 184 try: 185 tar = tarfile.open(tmpname, self.mode) 186 tar.getnames() 187 except tarfile.ReadError: 188 self.fail("tarfile.open() failed on empty archive") 189 else: 190 self.assertListEqual(tar.getmembers(), []) 191 finally: 192 tar.close() 193 194 def test_null_tarfile(self): 195 # Test for issue6123: Allow opening empty archives. 196 # This test guarantees that tarfile.open() does not treat an empty 197 # file as an empty tar archive. 198 with open(tmpname, "wb"): 199 pass 200 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 201 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 202 203 def test_ignore_zeros(self): 204 # Test TarFile's ignore_zeros option. 205 if self.mode.endswith(":gz"): 206 _open = gzip.GzipFile 207 elif self.mode.endswith(":bz2"): 208 _open = bz2.BZ2File 209 elif self.mode.endswith(":xz"): 210 _open = lzma.LZMAFile 211 else: 212 _open = io.FileIO 213 214 for char in (b'\0', b'a'): 215 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 216 # are ignored correctly. 217 with _open(tmpname, "w") as fobj: 218 fobj.write(char * 1024) 219 fobj.write(tarfile.TarInfo("foo").tobuf()) 220 221 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 222 try: 223 self.assertListEqual(tar.getnames(), ["foo"], 224 "ignore_zeros=True should have skipped the %r-blocks" % char) 225 finally: 226 tar.close() 227 228 229class MiscReadTest(CommonReadTest): 230 231 def test_no_name_argument(self): 232 if self.mode.endswith(("bz2", "xz")): 233 # BZ2File and LZMAFile have no name attribute. 234 self.skipTest("no name attribute") 235 236 with open(self.tarname, "rb") as fobj: 237 tar = tarfile.open(fileobj=fobj, mode=self.mode) 238 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 239 240 def test_no_name_attribute(self): 241 with open(self.tarname, "rb") as fobj: 242 data = fobj.read() 243 fobj = io.BytesIO(data) 244 self.assertRaises(AttributeError, getattr, fobj, "name") 245 tar = tarfile.open(fileobj=fobj, mode=self.mode) 246 self.assertEqual(tar.name, None) 247 248 def test_empty_name_attribute(self): 249 with open(self.tarname, "rb") as fobj: 250 data = fobj.read() 251 fobj = io.BytesIO(data) 252 fobj.name = "" 253 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 254 self.assertEqual(tar.name, None) 255 256 def test_fileobj_with_offset(self): 257 # Skip the first member and store values from the second member 258 # of the testtar. 259 tar = tarfile.open(self.tarname, mode=self.mode) 260 try: 261 tar.next() 262 t = tar.next() 263 name = t.name 264 offset = t.offset 265 f = tar.extractfile(t) 266 data = f.read() 267 f.close() 268 finally: 269 tar.close() 270 271 # Open the testtar and seek to the offset of the second member. 272 if self.mode.endswith(":gz"): 273 _open = gzip.GzipFile 274 elif self.mode.endswith(":bz2"): 275 _open = bz2.BZ2File 276 elif self.mode.endswith(":xz"): 277 _open = lzma.LZMAFile 278 else: 279 _open = io.FileIO 280 281 with _open(self.tarname) as fobj: 282 fobj.seek(offset) 283 284 # Test if the tarfile starts with the second member. 285 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 286 t = tar.next() 287 self.assertEqual(t.name, name) 288 # Read to the end of fileobj and test if seeking back to the 289 # beginning works. 290 tar.getmembers() 291 self.assertEqual(tar.extractfile(t).read(), data, 292 "seek back did not work") 293 tar.close() 294 295 def test_fail_comp(self): 296 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 297 if self.mode == "r:": 298 return 299 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 300 with open(tarname, "rb") as fobj: 301 self.assertRaises(tarfile.ReadError, tarfile.open, 302 fileobj=fobj, mode=self.mode) 303 304 def test_v7_dirtype(self): 305 # Test old style dirtype member (bug #1336623): 306 # Old V7 tars create directory members using an AREGTYPE 307 # header with a "/" appended to the filename field. 308 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 309 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 310 "v7 dirtype failed") 311 312 def test_xstar_type(self): 313 # The xstar format stores extra atime and ctime fields inside the 314 # space reserved for the prefix field. The prefix field must be 315 # ignored in this case, otherwise it will mess up the name. 316 try: 317 self.tar.getmember("misc/regtype-xstar") 318 except KeyError: 319 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 320 321 def test_check_members(self): 322 for tarinfo in self.tar: 323 self.assertTrue(int(tarinfo.mtime) == 0o7606136617, 324 "wrong mtime for %s" % tarinfo.name) 325 if not tarinfo.name.startswith("ustar/"): 326 continue 327 self.assertTrue(tarinfo.uname == "tarfile", 328 "wrong uname for %s" % tarinfo.name) 329 330 def test_find_members(self): 331 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 332 "could not find all members") 333 334 @unittest.skipUnless(hasattr(os, "link"), 335 "Missing hardlink implementation") 336 @support.skip_unless_symlink 337 def test_extract_hardlink(self): 338 # Test hardlink extraction (e.g. bug #857297). 339 tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") 340 341 try: 342 tar.extract("ustar/regtype", TEMPDIR) 343 try: 344 tar.extract("ustar/lnktype", TEMPDIR) 345 except EnvironmentError as e: 346 if e.errno == errno.ENOENT: 347 self.fail("hardlink not extracted properly") 348 349 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 350 data = f.read() 351 self.assertEqual(md5sum(data), md5_regtype) 352 353 try: 354 tar.extract("ustar/symtype", TEMPDIR) 355 except EnvironmentError as e: 356 if e.errno == errno.ENOENT: 357 self.fail("symlink not extracted properly") 358 359 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 360 data = f.read() 361 self.assertEqual(md5sum(data), md5_regtype) 362 finally: 363 tar.close() 364 365 def test_extractall(self): 366 # Test if extractall() correctly restores directory permissions 367 # and times (see issue1735). 368 tar = tarfile.open(tarname, encoding="iso8859-1") 369 DIR = os.path.join(TEMPDIR, "extractall") 370 os.mkdir(DIR) 371 try: 372 directories = [t for t in tar if t.isdir()] 373 tar.extractall(DIR, directories) 374 for tarinfo in directories: 375 path = os.path.join(DIR, tarinfo.name) 376 if sys.platform != "win32": 377 # Win32 has no support for fine grained permissions. 378 self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777) 379 def format_mtime(mtime): 380 if isinstance(mtime, float): 381 return "{} ({})".format(mtime, mtime.hex()) 382 else: 383 return "{!r} (int)".format(mtime) 384 file_mtime = os.path.getmtime(path) 385 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 386 format_mtime(tarinfo.mtime), 387 format_mtime(file_mtime), 388 path) 389 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 390 finally: 391 tar.close() 392 shutil.rmtree(DIR) 393 394 def test_extract_directory(self): 395 dirtype = "ustar/dirtype" 396 DIR = os.path.join(TEMPDIR, "extractdir") 397 os.mkdir(DIR) 398 try: 399 with tarfile.open(tarname, encoding="iso8859-1") as tar: 400 tarinfo = tar.getmember(dirtype) 401 tar.extract(tarinfo, path=DIR) 402 extracted = os.path.join(DIR, dirtype) 403 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 404 if sys.platform != "win32": 405 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 406 finally: 407 shutil.rmtree(DIR) 408 409 def test_init_close_fobj(self): 410 # Issue #7341: Close the internal file object in the TarFile 411 # constructor in case of an error. For the test we rely on 412 # the fact that opening an empty file raises a ReadError. 413 empty = os.path.join(TEMPDIR, "empty") 414 with open(empty, "wb") as fobj: 415 fobj.write(b"") 416 417 try: 418 tar = object.__new__(tarfile.TarFile) 419 try: 420 tar.__init__(empty) 421 except tarfile.ReadError: 422 self.assertTrue(tar.fileobj.closed) 423 else: 424 self.fail("ReadError not raised") 425 finally: 426 support.unlink(empty) 427 428 429class StreamReadTest(CommonReadTest): 430 431 mode="r|" 432 433 def test_read_through(self): 434 # Issue #11224: A poorly designed _FileInFile.read() method 435 # caused seeking errors with stream tar files. 436 for tarinfo in self.tar: 437 if not tarinfo.isreg(): 438 continue 439 fobj = self.tar.extractfile(tarinfo) 440 while True: 441 try: 442 buf = fobj.read(512) 443 except tarfile.StreamError: 444 self.fail("simple read-through using TarFile.extractfile() failed") 445 if not buf: 446 break 447 fobj.close() 448 449 def test_fileobj_regular_file(self): 450 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 451 fobj = self.tar.extractfile(tarinfo) 452 data = fobj.read() 453 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 454 "regular file extraction failed") 455 456 def test_provoke_stream_error(self): 457 tarinfos = self.tar.getmembers() 458 f = self.tar.extractfile(tarinfos[0]) # read the first member 459 self.assertRaises(tarfile.StreamError, f.read) 460 461 def test_compare_members(self): 462 tar1 = tarfile.open(tarname, encoding="iso8859-1") 463 try: 464 tar2 = self.tar 465 466 while True: 467 t1 = tar1.next() 468 t2 = tar2.next() 469 if t1 is None: 470 break 471 self.assertTrue(t2 is not None, "stream.next() failed.") 472 473 if t2.islnk() or t2.issym(): 474 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) 475 continue 476 477 v1 = tar1.extractfile(t1) 478 v2 = tar2.extractfile(t2) 479 if v1 is None: 480 continue 481 self.assertTrue(v2 is not None, "stream.extractfile() failed") 482 self.assertEqual(v1.read(), v2.read(), "stream extraction failed") 483 finally: 484 tar1.close() 485 486 487class DetectReadTest(unittest.TestCase): 488 489 def _testfunc_file(self, name, mode): 490 try: 491 tar = tarfile.open(name, mode) 492 except tarfile.ReadError as e: 493 self.fail() 494 else: 495 tar.close() 496 497 def _testfunc_fileobj(self, name, mode): 498 try: 499 with open(name, "rb") as f: 500 tar = tarfile.open(name, mode, fileobj=f) 501 except tarfile.ReadError as e: 502 self.fail() 503 else: 504 tar.close() 505 506 def _test_modes(self, testfunc): 507 testfunc(tarname, "r") 508 testfunc(tarname, "r:") 509 testfunc(tarname, "r:*") 510 testfunc(tarname, "r|") 511 testfunc(tarname, "r|*") 512 513 if gzip: 514 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") 515 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") 516 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") 517 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") 518 519 testfunc(gzipname, "r") 520 testfunc(gzipname, "r:*") 521 testfunc(gzipname, "r:gz") 522 testfunc(gzipname, "r|*") 523 testfunc(gzipname, "r|gz") 524 525 if bz2: 526 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") 527 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") 528 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") 529 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") 530 531 testfunc(bz2name, "r") 532 testfunc(bz2name, "r:*") 533 testfunc(bz2name, "r:bz2") 534 testfunc(bz2name, "r|*") 535 testfunc(bz2name, "r|bz2") 536 537 if lzma: 538 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:xz") 539 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|xz") 540 self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r:") 541 self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r|") 542 543 testfunc(xzname, "r") 544 testfunc(xzname, "r:*") 545 testfunc(xzname, "r:xz") 546 testfunc(xzname, "r|*") 547 testfunc(xzname, "r|xz") 548 549 def test_detect_file(self): 550 self._test_modes(self._testfunc_file) 551 552 def test_detect_fileobj(self): 553 self._test_modes(self._testfunc_fileobj) 554 555 def test_detect_stream_bz2(self): 556 # Originally, tarfile's stream detection looked for the string 557 # "BZh91" at the start of the file. This is incorrect because 558 # the '9' represents the blocksize (900kB). If the file was 559 # compressed using another blocksize autodetection fails. 560 if not bz2: 561 return 562 563 with open(tarname, "rb") as fobj: 564 data = fobj.read() 565 566 # Compress with blocksize 100kB, the file starts with "BZh11". 567 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 568 fobj.write(data) 569 570 self._testfunc_file(tmpname, "r|*") 571 572 573class MemberReadTest(ReadTest): 574 575 def _test_member(self, tarinfo, chksum=None, **kwargs): 576 if chksum is not None: 577 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 578 "wrong md5sum for %s" % tarinfo.name) 579 580 kwargs["mtime"] = 0o7606136617 581 kwargs["uid"] = 1000 582 kwargs["gid"] = 100 583 if "old-v7" not in tarinfo.name: 584 # V7 tar can't handle alphabetic owners. 585 kwargs["uname"] = "tarfile" 586 kwargs["gname"] = "tarfile" 587 for k, v in kwargs.items(): 588 self.assertTrue(getattr(tarinfo, k) == v, 589 "wrong value in %s field of %s" % (k, tarinfo.name)) 590 591 def test_find_regtype(self): 592 tarinfo = self.tar.getmember("ustar/regtype") 593 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 594 595 def test_find_conttype(self): 596 tarinfo = self.tar.getmember("ustar/conttype") 597 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 598 599 def test_find_dirtype(self): 600 tarinfo = self.tar.getmember("ustar/dirtype") 601 self._test_member(tarinfo, size=0) 602 603 def test_find_dirtype_with_size(self): 604 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 605 self._test_member(tarinfo, size=255) 606 607 def test_find_lnktype(self): 608 tarinfo = self.tar.getmember("ustar/lnktype") 609 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 610 611 def test_find_symtype(self): 612 tarinfo = self.tar.getmember("ustar/symtype") 613 self._test_member(tarinfo, size=0, linkname="regtype") 614 615 def test_find_blktype(self): 616 tarinfo = self.tar.getmember("ustar/blktype") 617 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 618 619 def test_find_chrtype(self): 620 tarinfo = self.tar.getmember("ustar/chrtype") 621 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 622 623 def test_find_fifotype(self): 624 tarinfo = self.tar.getmember("ustar/fifotype") 625 self._test_member(tarinfo, size=0) 626 627 def test_find_sparse(self): 628 tarinfo = self.tar.getmember("ustar/sparse") 629 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 630 631 def test_find_gnusparse(self): 632 tarinfo = self.tar.getmember("gnu/sparse") 633 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 634 635 def test_find_gnusparse_00(self): 636 tarinfo = self.tar.getmember("gnu/sparse-0.0") 637 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 638 639 def test_find_gnusparse_01(self): 640 tarinfo = self.tar.getmember("gnu/sparse-0.1") 641 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 642 643 def test_find_gnusparse_10(self): 644 tarinfo = self.tar.getmember("gnu/sparse-1.0") 645 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 646 647 def test_find_umlauts(self): 648 tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 649 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 650 651 def test_find_ustar_longname(self): 652 name = "ustar/" + "12345/" * 39 + "1234567/longname" 653 self.assertIn(name, self.tar.getnames()) 654 655 def test_find_regtype_oldv7(self): 656 tarinfo = self.tar.getmember("misc/regtype-old-v7") 657 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 658 659 def test_find_pax_umlauts(self): 660 self.tar.close() 661 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 662 tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 663 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 664 665 666class LongnameTest(ReadTest): 667 668 def test_read_longname(self): 669 # Test reading of longname (bug #1471427). 670 longname = self.subdir + "/" + "123/" * 125 + "longname" 671 try: 672 tarinfo = self.tar.getmember(longname) 673 except KeyError: 674 self.fail("longname not found") 675 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 676 677 def test_read_longlink(self): 678 longname = self.subdir + "/" + "123/" * 125 + "longname" 679 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 680 try: 681 tarinfo = self.tar.getmember(longlink) 682 except KeyError: 683 self.fail("longlink not found") 684 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 685 686 def test_truncated_longname(self): 687 longname = self.subdir + "/" + "123/" * 125 + "longname" 688 tarinfo = self.tar.getmember(longname) 689 offset = tarinfo.offset 690 self.tar.fileobj.seek(offset) 691 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 692 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) 693 694 def test_header_offset(self): 695 # Test if the start offset of the TarInfo object includes 696 # the preceding extended header. 697 longname = self.subdir + "/" + "123/" * 125 + "longname" 698 offset = self.tar.getmember(longname).offset 699 with open(tarname, "rb") as fobj: 700 fobj.seek(offset) 701 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict") 702 self.assertEqual(tarinfo.type, self.longnametype) 703 704 705class GNUReadTest(LongnameTest): 706 707 subdir = "gnu" 708 longnametype = tarfile.GNUTYPE_LONGNAME 709 710 # Since 3.2 tarfile is supposed to accurately restore sparse members and 711 # produce files with holes. This is what we actually want to test here. 712 # Unfortunately, not all platforms/filesystems support sparse files, and 713 # even on platforms that do it is non-trivial to make reliable assertions 714 # about holes in files. Therefore, we first do one basic test which works 715 # an all platforms, and after that a test that will work only on 716 # platforms/filesystems that prove to support sparse files. 717 def _test_sparse_file(self, name): 718 self.tar.extract(name, TEMPDIR) 719 filename = os.path.join(TEMPDIR, name) 720 with open(filename, "rb") as fobj: 721 data = fobj.read() 722 self.assertEqual(md5sum(data), md5_sparse, 723 "wrong md5sum for %s" % name) 724 725 if self._fs_supports_holes(): 726 s = os.stat(filename) 727 self.assertTrue(s.st_blocks * 512 < s.st_size) 728 729 def test_sparse_file_old(self): 730 self._test_sparse_file("gnu/sparse") 731 732 def test_sparse_file_00(self): 733 self._test_sparse_file("gnu/sparse-0.0") 734 735 def test_sparse_file_01(self): 736 self._test_sparse_file("gnu/sparse-0.1") 737 738 def test_sparse_file_10(self): 739 self._test_sparse_file("gnu/sparse-1.0") 740 741 @staticmethod 742 def _fs_supports_holes(): 743 # Return True if the platform knows the st_blocks stat attribute and 744 # uses st_blocks units of 512 bytes, and if the filesystem is able to 745 # store holes in files. 746 if sys.platform.startswith("linux"): 747 # Linux evidentially has 512 byte st_blocks units. 748 name = os.path.join(TEMPDIR, "sparse-test") 749 with open(name, "wb") as fobj: 750 fobj.seek(4096) 751 fobj.truncate() 752 s = os.stat(name) 753 os.remove(name) 754 return s.st_blocks == 0 755 else: 756 return False 757 758 759class PaxReadTest(LongnameTest): 760 761 subdir = "pax" 762 longnametype = tarfile.XHDTYPE 763 764 def test_pax_global_headers(self): 765 tar = tarfile.open(tarname, encoding="iso8859-1") 766 try: 767 tarinfo = tar.getmember("pax/regtype1") 768 self.assertEqual(tarinfo.uname, "foo") 769 self.assertEqual(tarinfo.gname, "bar") 770 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 771 772 tarinfo = tar.getmember("pax/regtype2") 773 self.assertEqual(tarinfo.uname, "") 774 self.assertEqual(tarinfo.gname, "bar") 775 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 776 777 tarinfo = tar.getmember("pax/regtype3") 778 self.assertEqual(tarinfo.uname, "tarfile") 779 self.assertEqual(tarinfo.gname, "tarfile") 780 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 781 finally: 782 tar.close() 783 784 def test_pax_number_fields(self): 785 # All following number fields are read from the pax header. 786 tar = tarfile.open(tarname, encoding="iso8859-1") 787 try: 788 tarinfo = tar.getmember("pax/regtype4") 789 self.assertEqual(tarinfo.size, 7011) 790 self.assertEqual(tarinfo.uid, 123) 791 self.assertEqual(tarinfo.gid, 123) 792 self.assertEqual(tarinfo.mtime, 1041808783.0) 793 self.assertEqual(type(tarinfo.mtime), float) 794 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 795 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 796 finally: 797 tar.close() 798 799 800class WriteTestBase(unittest.TestCase): 801 # Put all write tests in here that are supposed to be tested 802 # in all possible mode combinations. 803 804 def test_fileobj_no_close(self): 805 fobj = io.BytesIO() 806 tar = tarfile.open(fileobj=fobj, mode=self.mode) 807 tar.addfile(tarfile.TarInfo("foo")) 808 tar.close() 809 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 810 811 812class WriteTest(WriteTestBase): 813 814 mode = "w:" 815 816 def test_100_char_name(self): 817 # The name field in a tar header stores strings of at most 100 chars. 818 # If a string is shorter than 100 chars it has to be padded with '\0', 819 # which implies that a string of exactly 100 chars is stored without 820 # a trailing '\0'. 821 name = "0123456789" * 10 822 tar = tarfile.open(tmpname, self.mode) 823 try: 824 t = tarfile.TarInfo(name) 825 tar.addfile(t) 826 finally: 827 tar.close() 828 829 tar = tarfile.open(tmpname) 830 try: 831 self.assertTrue(tar.getnames()[0] == name, 832 "failed to store 100 char filename") 833 finally: 834 tar.close() 835 836 def test_tar_size(self): 837 # Test for bug #1013882. 838 tar = tarfile.open(tmpname, self.mode) 839 try: 840 path = os.path.join(TEMPDIR, "file") 841 with open(path, "wb") as fobj: 842 fobj.write(b"aaa") 843 tar.add(path) 844 finally: 845 tar.close() 846 self.assertTrue(os.path.getsize(tmpname) > 0, 847 "tarfile is empty") 848 849 # The test_*_size tests test for bug #1167128. 850 def test_file_size(self): 851 tar = tarfile.open(tmpname, self.mode) 852 try: 853 path = os.path.join(TEMPDIR, "file") 854 with open(path, "wb"): 855 pass 856 tarinfo = tar.gettarinfo(path) 857 self.assertEqual(tarinfo.size, 0) 858 859 with open(path, "wb") as fobj: 860 fobj.write(b"aaa") 861 tarinfo = tar.gettarinfo(path) 862 self.assertEqual(tarinfo.size, 3) 863 finally: 864 tar.close() 865 866 def test_directory_size(self): 867 path = os.path.join(TEMPDIR, "directory") 868 os.mkdir(path) 869 try: 870 tar = tarfile.open(tmpname, self.mode) 871 try: 872 tarinfo = tar.gettarinfo(path) 873 self.assertEqual(tarinfo.size, 0) 874 finally: 875 tar.close() 876 finally: 877 os.rmdir(path) 878 879 def test_link_size(self): 880 if hasattr(os, "link"): 881 link = os.path.join(TEMPDIR, "link") 882 target = os.path.join(TEMPDIR, "link_target") 883 with open(target, "wb") as fobj: 884 fobj.write(b"aaa") 885 os.link(target, link) 886 try: 887 tar = tarfile.open(tmpname, self.mode) 888 try: 889 # Record the link target in the inodes list. 890 tar.gettarinfo(target) 891 tarinfo = tar.gettarinfo(link) 892 self.assertEqual(tarinfo.size, 0) 893 finally: 894 tar.close() 895 finally: 896 os.remove(target) 897 os.remove(link) 898 899 @support.skip_unless_symlink 900 def test_symlink_size(self): 901 path = os.path.join(TEMPDIR, "symlink") 902 os.symlink("link_target", path) 903 try: 904 tar = tarfile.open(tmpname, self.mode) 905 try: 906 tarinfo = tar.gettarinfo(path) 907 self.assertEqual(tarinfo.size, 0) 908 finally: 909 tar.close() 910 finally: 911 os.remove(path) 912 913 def test_add_self(self): 914 # Test for #1257255. 915 dstname = os.path.abspath(tmpname) 916 tar = tarfile.open(tmpname, self.mode) 917 try: 918 self.assertTrue(tar.name == dstname, "archive name must be absolute") 919 tar.add(dstname) 920 self.assertTrue(tar.getnames() == [], "added the archive to itself") 921 922 cwd = os.getcwd() 923 os.chdir(TEMPDIR) 924 tar.add(dstname) 925 os.chdir(cwd) 926 self.assertTrue(tar.getnames() == [], "added the archive to itself") 927 finally: 928 tar.close() 929 930 def test_exclude(self): 931 tempdir = os.path.join(TEMPDIR, "exclude") 932 os.mkdir(tempdir) 933 try: 934 for name in ("foo", "bar", "baz"): 935 name = os.path.join(tempdir, name) 936 support.create_empty_file(name) 937 938 exclude = os.path.isfile 939 940 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 941 try: 942 with support.check_warnings(("use the filter argument", 943 DeprecationWarning)): 944 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 945 finally: 946 tar.close() 947 948 tar = tarfile.open(tmpname, "r") 949 try: 950 self.assertEqual(len(tar.getmembers()), 1) 951 self.assertEqual(tar.getnames()[0], "empty_dir") 952 finally: 953 tar.close() 954 finally: 955 shutil.rmtree(tempdir) 956 957 def test_filter(self): 958 tempdir = os.path.join(TEMPDIR, "filter") 959 os.mkdir(tempdir) 960 try: 961 for name in ("foo", "bar", "baz"): 962 name = os.path.join(tempdir, name) 963 support.create_empty_file(name) 964 965 def filter(tarinfo): 966 if os.path.basename(tarinfo.name) == "bar": 967 return 968 tarinfo.uid = 123 969 tarinfo.uname = "foo" 970 return tarinfo 971 972 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 973 try: 974 tar.add(tempdir, arcname="empty_dir", filter=filter) 975 finally: 976 tar.close() 977 978 # Verify that filter is a keyword-only argument 979 with self.assertRaises(TypeError): 980 tar.add(tempdir, "empty_dir", True, None, filter) 981 982 tar = tarfile.open(tmpname, "r") 983 try: 984 for tarinfo in tar: 985 self.assertEqual(tarinfo.uid, 123) 986 self.assertEqual(tarinfo.uname, "foo") 987 self.assertEqual(len(tar.getmembers()), 3) 988 finally: 989 tar.close() 990 finally: 991 shutil.rmtree(tempdir) 992 993 # Guarantee that stored pathnames are not modified. Don't 994 # remove ./ or ../ or double slashes. Still make absolute 995 # pathnames relative. 996 # For details see bug #6054. 997 def _test_pathname(self, path, cmp_path=None, dir=False): 998 # Create a tarfile with an empty member named path 999 # and compare the stored name with the original. 1000 foo = os.path.join(TEMPDIR, "foo") 1001 if not dir: 1002 support.create_empty_file(foo) 1003 else: 1004 os.mkdir(foo) 1005 1006 tar = tarfile.open(tmpname, self.mode) 1007 try: 1008 tar.add(foo, arcname=path) 1009 finally: 1010 tar.close() 1011 1012 tar = tarfile.open(tmpname, "r") 1013 try: 1014 t = tar.next() 1015 finally: 1016 tar.close() 1017 1018 if not dir: 1019 os.remove(foo) 1020 else: 1021 os.rmdir(foo) 1022 1023 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1024 1025 1026 @support.skip_unless_symlink 1027 def test_extractall_symlinks(self): 1028 # Test if extractall works properly when tarfile contains symlinks 1029 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1030 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1031 os.mkdir(tempdir) 1032 try: 1033 source_file = os.path.join(tempdir,'source') 1034 target_file = os.path.join(tempdir,'symlink') 1035 with open(source_file,'w') as f: 1036 f.write('something\n') 1037 os.symlink(source_file, target_file) 1038 tar = tarfile.open(temparchive,'w') 1039 tar.add(source_file) 1040 tar.add(target_file) 1041 tar.close() 1042 # Let's extract it to the location which contains the symlink 1043 tar = tarfile.open(temparchive,'r') 1044 # this should not raise OSError: [Errno 17] File exists 1045 try: 1046 tar.extractall(path=tempdir) 1047 except OSError: 1048 self.fail("extractall failed with symlinked files") 1049 finally: 1050 tar.close() 1051 finally: 1052 os.unlink(temparchive) 1053 shutil.rmtree(tempdir) 1054 1055 def test_pathnames(self): 1056 self._test_pathname("foo") 1057 self._test_pathname(os.path.join("foo", ".", "bar")) 1058 self._test_pathname(os.path.join("foo", "..", "bar")) 1059 self._test_pathname(os.path.join(".", "foo")) 1060 self._test_pathname(os.path.join(".", "foo", ".")) 1061 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1062 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1063 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1064 self._test_pathname(os.path.join("..", "foo")) 1065 self._test_pathname(os.path.join("..", "foo", "..")) 1066 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1067 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1068 1069 self._test_pathname("foo" + os.sep + os.sep + "bar") 1070 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1071 1072 def test_abs_pathnames(self): 1073 if sys.platform == "win32": 1074 self._test_pathname("C:\\foo", "foo") 1075 else: 1076 self._test_pathname("/foo", "foo") 1077 self._test_pathname("///foo", "foo") 1078 1079 def test_cwd(self): 1080 # Test adding the current working directory. 1081 cwd = os.getcwd() 1082 os.chdir(TEMPDIR) 1083 try: 1084 tar = tarfile.open(tmpname, self.mode) 1085 try: 1086 tar.add(".") 1087 finally: 1088 tar.close() 1089 1090 tar = tarfile.open(tmpname, "r") 1091 try: 1092 for t in tar: 1093 self.assertTrue(t.name == "." or t.name.startswith("./")) 1094 finally: 1095 tar.close() 1096 finally: 1097 os.chdir(cwd) 1098 1099 1100class StreamWriteTest(WriteTestBase): 1101 1102 mode = "w|" 1103 1104 def test_stream_padding(self): 1105 # Test for bug #1543303. 1106 tar = tarfile.open(tmpname, self.mode) 1107 tar.close() 1108 1109 if self.mode.endswith("gz"): 1110 with gzip.GzipFile(tmpname) as fobj: 1111 data = fobj.read() 1112 elif self.mode.endswith("bz2"): 1113 dec = bz2.BZ2Decompressor() 1114 with open(tmpname, "rb") as fobj: 1115 data = fobj.read() 1116 data = dec.decompress(data) 1117 self.assertTrue(len(dec.unused_data) == 0, 1118 "found trailing data") 1119 elif self.mode.endswith("xz"): 1120 with lzma.LZMAFile(tmpname) as fobj: 1121 data = fobj.read() 1122 else: 1123 with open(tmpname, "rb") as fobj: 1124 data = fobj.read() 1125 1126 self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, 1127 "incorrect zero padding") 1128 1129 def test_file_mode(self): 1130 # Test for issue #8464: Create files with correct 1131 # permissions. 1132 if sys.platform == "win32" or not hasattr(os, "umask"): 1133 return 1134 1135 if os.path.exists(tmpname): 1136 os.remove(tmpname) 1137 1138 original_umask = os.umask(0o022) 1139 try: 1140 tar = tarfile.open(tmpname, self.mode) 1141 tar.close() 1142 mode = os.stat(tmpname).st_mode & 0o777 1143 self.assertEqual(mode, 0o644, "wrong file permissions") 1144 finally: 1145 os.umask(original_umask) 1146 1147 1148class GNUWriteTest(unittest.TestCase): 1149 # This testcase checks for correct creation of GNU Longname 1150 # and Longlink extended headers (cp. bug #812325). 1151 1152 def _length(self, s): 1153 blocks, remainder = divmod(len(s) + 1, 512) 1154 if remainder: 1155 blocks += 1 1156 return blocks * 512 1157 1158 def _calc_size(self, name, link=None): 1159 # Initial tar header 1160 count = 512 1161 1162 if len(name) > tarfile.LENGTH_NAME: 1163 # GNU longname extended header + longname 1164 count += 512 1165 count += self._length(name) 1166 if link is not None and len(link) > tarfile.LENGTH_LINK: 1167 # GNU longlink extended header + longlink 1168 count += 512 1169 count += self._length(link) 1170 return count 1171 1172 def _test(self, name, link=None): 1173 tarinfo = tarfile.TarInfo(name) 1174 if link: 1175 tarinfo.linkname = link 1176 tarinfo.type = tarfile.LNKTYPE 1177 1178 tar = tarfile.open(tmpname, "w") 1179 try: 1180 tar.format = tarfile.GNU_FORMAT 1181 tar.addfile(tarinfo) 1182 1183 v1 = self._calc_size(name, link) 1184 v2 = tar.offset 1185 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 1186 finally: 1187 tar.close() 1188 1189 tar = tarfile.open(tmpname) 1190 try: 1191 member = tar.next() 1192 self.assertIsNotNone(member, 1193 "unable to read longname member") 1194 self.assertEqual(tarinfo.name, member.name, 1195 "unable to read longname member") 1196 self.assertEqual(tarinfo.linkname, member.linkname, 1197 "unable to read longname member") 1198 finally: 1199 tar.close() 1200 1201 def test_longname_1023(self): 1202 self._test(("longnam/" * 127) + "longnam") 1203 1204 def test_longname_1024(self): 1205 self._test(("longnam/" * 127) + "longname") 1206 1207 def test_longname_1025(self): 1208 self._test(("longnam/" * 127) + "longname_") 1209 1210 def test_longlink_1023(self): 1211 self._test("name", ("longlnk/" * 127) + "longlnk") 1212 1213 def test_longlink_1024(self): 1214 self._test("name", ("longlnk/" * 127) + "longlink") 1215 1216 def test_longlink_1025(self): 1217 self._test("name", ("longlnk/" * 127) + "longlink_") 1218 1219 def test_longnamelink_1023(self): 1220 self._test(("longnam/" * 127) + "longnam", 1221 ("longlnk/" * 127) + "longlnk") 1222 1223 def test_longnamelink_1024(self): 1224 self._test(("longnam/" * 127) + "longname", 1225 ("longlnk/" * 127) + "longlink") 1226 1227 def test_longnamelink_1025(self): 1228 self._test(("longnam/" * 127) + "longname_", 1229 ("longlnk/" * 127) + "longlink_") 1230 1231 1232class HardlinkTest(unittest.TestCase): 1233 # Test the creation of LNKTYPE (hardlink) members in an archive. 1234 1235 def setUp(self): 1236 self.foo = os.path.join(TEMPDIR, "foo") 1237 self.bar = os.path.join(TEMPDIR, "bar") 1238 1239 with open(self.foo, "wb") as fobj: 1240 fobj.write(b"foo") 1241 1242 os.link(self.foo, self.bar) 1243 1244 self.tar = tarfile.open(tmpname, "w") 1245 self.tar.add(self.foo) 1246 1247 def tearDown(self): 1248 self.tar.close() 1249 support.unlink(self.foo) 1250 support.unlink(self.bar) 1251 1252 def test_add_twice(self): 1253 # The same name will be added as a REGTYPE every 1254 # time regardless of st_nlink. 1255 tarinfo = self.tar.gettarinfo(self.foo) 1256 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1257 "add file as regular failed") 1258 1259 def test_add_hardlink(self): 1260 tarinfo = self.tar.gettarinfo(self.bar) 1261 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 1262 "add file as hardlink failed") 1263 1264 def test_dereference_hardlink(self): 1265 self.tar.dereference = True 1266 tarinfo = self.tar.gettarinfo(self.bar) 1267 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1268 "dereferencing hardlink failed") 1269 1270 1271class PaxWriteTest(GNUWriteTest): 1272 1273 def _test(self, name, link=None): 1274 # See GNUWriteTest. 1275 tarinfo = tarfile.TarInfo(name) 1276 if link: 1277 tarinfo.linkname = link 1278 tarinfo.type = tarfile.LNKTYPE 1279 1280 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1281 try: 1282 tar.addfile(tarinfo) 1283 finally: 1284 tar.close() 1285 1286 tar = tarfile.open(tmpname) 1287 try: 1288 if link: 1289 l = tar.getmembers()[0].linkname 1290 self.assertTrue(link == l, "PAX longlink creation failed") 1291 else: 1292 n = tar.getmembers()[0].name 1293 self.assertTrue(name == n, "PAX longname creation failed") 1294 finally: 1295 tar.close() 1296 1297 def test_pax_global_header(self): 1298 pax_headers = { 1299 "foo": "bar", 1300 "uid": "0", 1301 "mtime": "1.23", 1302 "test": "\xe4\xf6\xfc", 1303 "\xe4\xf6\xfc": "test"} 1304 1305 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1306 pax_headers=pax_headers) 1307 try: 1308 tar.addfile(tarfile.TarInfo("test")) 1309 finally: 1310 tar.close() 1311 1312 # Test if the global header was written correctly. 1313 tar = tarfile.open(tmpname, encoding="iso8859-1") 1314 try: 1315 self.assertEqual(tar.pax_headers, pax_headers) 1316 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1317 # Test if all the fields are strings. 1318 for key, val in tar.pax_headers.items(): 1319 self.assertTrue(type(key) is not bytes) 1320 self.assertTrue(type(val) is not bytes) 1321 if key in tarfile.PAX_NUMBER_FIELDS: 1322 try: 1323 tarfile.PAX_NUMBER_FIELDS[key](val) 1324 except (TypeError, ValueError): 1325 self.fail("unable to convert pax header field") 1326 finally: 1327 tar.close() 1328 1329 def test_pax_extended_header(self): 1330 # The fields from the pax header have priority over the 1331 # TarInfo. 1332 pax_headers = {"path": "foo", "uid": "123"} 1333 1334 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") 1335 try: 1336 t = tarfile.TarInfo() 1337 t.name = "\xe4\xf6\xfc" # non-ASCII 1338 t.uid = 8**8 # too large 1339 t.pax_headers = pax_headers 1340 tar.addfile(t) 1341 finally: 1342 tar.close() 1343 1344 tar = tarfile.open(tmpname, encoding="iso8859-1") 1345 try: 1346 t = tar.getmembers()[0] 1347 self.assertEqual(t.pax_headers, pax_headers) 1348 self.assertEqual(t.name, "foo") 1349 self.assertEqual(t.uid, 123) 1350 finally: 1351 tar.close() 1352 1353 1354class UstarUnicodeTest(unittest.TestCase): 1355 1356 format = tarfile.USTAR_FORMAT 1357 1358 def test_iso8859_1_filename(self): 1359 self._test_unicode_filename("iso8859-1") 1360 1361 def test_utf7_filename(self): 1362 self._test_unicode_filename("utf7") 1363 1364 def test_utf8_filename(self): 1365 self._test_unicode_filename("utf-8") 1366 1367 def _test_unicode_filename(self, encoding): 1368 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") 1369 try: 1370 name = "\xe4\xf6\xfc" 1371 tar.addfile(tarfile.TarInfo(name)) 1372 finally: 1373 tar.close() 1374 1375 tar = tarfile.open(tmpname, encoding=encoding) 1376 try: 1377 self.assertEqual(tar.getmembers()[0].name, name) 1378 finally: 1379 tar.close() 1380 1381 def test_unicode_filename_error(self): 1382 if self.format == tarfile.PAX_FORMAT: 1383 # PAX_FORMAT ignores encoding in write mode. 1384 return 1385 1386 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") 1387 try: 1388 tarinfo = tarfile.TarInfo() 1389 1390 tarinfo.name = "\xe4\xf6\xfc" 1391 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1392 1393 tarinfo.name = "foo" 1394 tarinfo.uname = "\xe4\xf6\xfc" 1395 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1396 finally: 1397 tar.close() 1398 1399 def test_unicode_argument(self): 1400 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 1401 try: 1402 for t in tar: 1403 self.assertTrue(type(t.name) is str) 1404 self.assertTrue(type(t.linkname) is str) 1405 self.assertTrue(type(t.uname) is str) 1406 self.assertTrue(type(t.gname) is str) 1407 finally: 1408 tar.close() 1409 1410 def test_uname_unicode(self): 1411 t = tarfile.TarInfo("foo") 1412 t.uname = "\xe4\xf6\xfc" 1413 t.gname = "\xe4\xf6\xfc" 1414 1415 tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") 1416 try: 1417 tar.addfile(t) 1418 finally: 1419 tar.close() 1420 1421 tar = tarfile.open(tmpname, encoding="iso8859-1") 1422 try: 1423 t = tar.getmember("foo") 1424 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1425 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1426 1427 if self.format != tarfile.PAX_FORMAT: 1428 tar.close() 1429 tar = tarfile.open(tmpname, encoding="ascii") 1430 t = tar.getmember("foo") 1431 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1432 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1433 finally: 1434 tar.close() 1435 1436 1437class GNUUnicodeTest(UstarUnicodeTest): 1438 1439 format = tarfile.GNU_FORMAT 1440 1441 def test_bad_pax_header(self): 1442 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1443 # without a hdrcharset=BINARY header. 1444 for encoding, name in (("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1445 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1446 with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: 1447 try: 1448 t = tar.getmember(name) 1449 except KeyError: 1450 self.fail("unable to read bad GNU tar pax header") 1451 1452 1453class PAXUnicodeTest(UstarUnicodeTest): 1454 1455 format = tarfile.PAX_FORMAT 1456 1457 def test_binary_header(self): 1458 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1459 for encoding, name in (("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1460 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1461 with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: 1462 try: 1463 t = tar.getmember(name) 1464 except KeyError: 1465 self.fail("unable to read POSIX.1-2008 binary header") 1466 1467 1468class AppendTest(unittest.TestCase): 1469 # Test append mode (cp. patch #1652681). 1470 1471 def setUp(self): 1472 self.tarname = tmpname 1473 if os.path.exists(self.tarname): 1474 os.remove(self.tarname) 1475 1476 def _add_testfile(self, fileobj=None): 1477 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1478 tar.addfile(tarfile.TarInfo("bar")) 1479 1480 def _create_testtar(self, mode="w:"): 1481 with tarfile.open(tarname, encoding="iso8859-1") as src: 1482 t = src.getmember("ustar/regtype") 1483 t.name = "foo" 1484 f = src.extractfile(t) 1485 try: 1486 with tarfile.open(self.tarname, mode) as tar: 1487 tar.addfile(t, f) 1488 finally: 1489 f.close() 1490 1491 def _test(self, names=["bar"], fileobj=None): 1492 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1493 self.assertEqual(tar.getnames(), names) 1494 1495 def test_non_existing(self): 1496 self._add_testfile() 1497 self._test() 1498 1499 def test_empty(self): 1500 tarfile.open(self.tarname, "w:").close() 1501 self._add_testfile() 1502 self._test() 1503 1504 def test_empty_fileobj(self): 1505 fobj = io.BytesIO(b"\0" * 1024) 1506 self._add_testfile(fobj) 1507 fobj.seek(0) 1508 self._test(fileobj=fobj) 1509 1510 def test_fileobj(self): 1511 self._create_testtar() 1512 with open(self.tarname, "rb") as fobj: 1513 data = fobj.read() 1514 fobj = io.BytesIO(data) 1515 self._add_testfile(fobj) 1516 fobj.seek(0) 1517 self._test(names=["foo", "bar"], fileobj=fobj) 1518 1519 def test_existing(self): 1520 self._create_testtar() 1521 self._add_testfile() 1522 self._test(names=["foo", "bar"]) 1523 1524 def test_append_gz(self): 1525 if gzip is None: 1526 return 1527 self._create_testtar("w:gz") 1528 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1529 1530 def test_append_bz2(self): 1531 if bz2 is None: 1532 return 1533 self._create_testtar("w:bz2") 1534 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1535 1536 def test_append_lzma(self): 1537 if lzma is None: 1538 self.skipTest("lzma module not available") 1539 self._create_testtar("w:xz") 1540 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1541 1542 # Append mode is supposed to fail if the tarfile to append to 1543 # does not end with a zero block. 1544 def _test_error(self, data): 1545 with open(self.tarname, "wb") as fobj: 1546 fobj.write(data) 1547 self.assertRaises(tarfile.ReadError, self._add_testfile) 1548 1549 def test_null(self): 1550 self._test_error(b"") 1551 1552 def test_incomplete(self): 1553 self._test_error(b"\0" * 13) 1554 1555 def test_premature_eof(self): 1556 data = tarfile.TarInfo("foo").tobuf() 1557 self._test_error(data) 1558 1559 def test_trailing_garbage(self): 1560 data = tarfile.TarInfo("foo").tobuf() 1561 self._test_error(data + b"\0" * 13) 1562 1563 def test_invalid(self): 1564 self._test_error(b"a" * 512) 1565 1566 1567class LimitsTest(unittest.TestCase): 1568 1569 def test_ustar_limits(self): 1570 # 100 char name 1571 tarinfo = tarfile.TarInfo("0123456789" * 10) 1572 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1573 1574 # 101 char name that cannot be stored 1575 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1576 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1577 1578 # 256 char name with a slash at pos 156 1579 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1580 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1581 1582 # 256 char name that cannot be stored 1583 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1584 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1585 1586 # 512 char name 1587 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1588 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1589 1590 # 512 char linkname 1591 tarinfo = tarfile.TarInfo("longlink") 1592 tarinfo.linkname = "123/" * 126 + "longname" 1593 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1594 1595 # uid > 8 digits 1596 tarinfo = tarfile.TarInfo("name") 1597 tarinfo.uid = 0o10000000 1598 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1599 1600 def test_gnu_limits(self): 1601 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1602 tarinfo.tobuf(tarfile.GNU_FORMAT) 1603 1604 tarinfo = tarfile.TarInfo("longlink") 1605 tarinfo.linkname = "123/" * 126 + "longname" 1606 tarinfo.tobuf(tarfile.GNU_FORMAT) 1607 1608 # uid >= 256 ** 7 1609 tarinfo = tarfile.TarInfo("name") 1610 tarinfo.uid = 0o4000000000000000000 1611 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1612 1613 def test_pax_limits(self): 1614 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1615 tarinfo.tobuf(tarfile.PAX_FORMAT) 1616 1617 tarinfo = tarfile.TarInfo("longlink") 1618 tarinfo.linkname = "123/" * 126 + "longname" 1619 tarinfo.tobuf(tarfile.PAX_FORMAT) 1620 1621 tarinfo = tarfile.TarInfo("name") 1622 tarinfo.uid = 0o4000000000000000000 1623 tarinfo.tobuf(tarfile.PAX_FORMAT) 1624 1625 1626class MiscTest(unittest.TestCase): 1627 1628 def test_char_fields(self): 1629 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0") 1630 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo") 1631 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo") 1632 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo") 1633 1634 def test_read_number_fields(self): 1635 # Issue 13158: Test if GNU tar specific base-256 number fields 1636 # are decoded correctly. 1637 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 1638 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 1639 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 0o10000000) 1640 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 0xffffffff) 1641 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), -1) 1642 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), -100) 1643 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), -0x100000000000000) 1644 1645 def test_write_number_fields(self): 1646 self.assertEqual(tarfile.itn(1), b"0000001\x00") 1647 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 1648 self.assertEqual(tarfile.itn(0o10000000), b"\x80\x00\x00\x00\x00\x20\x00\x00") 1649 self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff") 1650 self.assertEqual(tarfile.itn(-1), b"\xff\xff\xff\xff\xff\xff\xff\xff") 1651 self.assertEqual(tarfile.itn(-100), b"\xff\xff\xff\xff\xff\xff\xff\x9c") 1652 self.assertEqual(tarfile.itn(-0x100000000000000), b"\xff\x00\x00\x00\x00\x00\x00\x00") 1653 1654 def test_number_field_limits(self): 1655 self.assertRaises(ValueError, tarfile.itn, -1, 8, tarfile.USTAR_FORMAT) 1656 self.assertRaises(ValueError, tarfile.itn, 0o10000000, 8, tarfile.USTAR_FORMAT) 1657 self.assertRaises(ValueError, tarfile.itn, -0x10000000001, 6, tarfile.GNU_FORMAT) 1658 self.assertRaises(ValueError, tarfile.itn, 0x10000000000, 6, tarfile.GNU_FORMAT) 1659 1660 1661class ContextManagerTest(unittest.TestCase): 1662 1663 def test_basic(self): 1664 with tarfile.open(tarname) as tar: 1665 self.assertFalse(tar.closed, "closed inside runtime context") 1666 self.assertTrue(tar.closed, "context manager failed") 1667 1668 def test_closed(self): 1669 # The __enter__() method is supposed to raise IOError 1670 # if the TarFile object is already closed. 1671 tar = tarfile.open(tarname) 1672 tar.close() 1673 with self.assertRaises(IOError): 1674 with tar: 1675 pass 1676 1677 def test_exception(self): 1678 # Test if the IOError exception is passed through properly. 1679 with self.assertRaises(Exception) as exc: 1680 with tarfile.open(tarname) as tar: 1681 raise IOError 1682 self.assertIsInstance(exc.exception, IOError, 1683 "wrong exception raised in context manager") 1684 self.assertTrue(tar.closed, "context manager failed") 1685 1686 def test_no_eof(self): 1687 # __exit__() must not write end-of-archive blocks if an 1688 # exception was raised. 1689 try: 1690 with tarfile.open(tmpname, "w") as tar: 1691 raise Exception 1692 except: 1693 pass 1694 self.assertEqual(os.path.getsize(tmpname), 0, 1695 "context manager wrote an end-of-archive block") 1696 self.assertTrue(tar.closed, "context manager failed") 1697 1698 def test_eof(self): 1699 # __exit__() must write end-of-archive blocks, i.e. call 1700 # TarFile.close() if there was no error. 1701 with tarfile.open(tmpname, "w"): 1702 pass 1703 self.assertNotEqual(os.path.getsize(tmpname), 0, 1704 "context manager wrote no end-of-archive block") 1705 1706 def test_fileobj(self): 1707 # Test that __exit__() did not close the external file 1708 # object. 1709 with open(tmpname, "wb") as fobj: 1710 try: 1711 with tarfile.open(fileobj=fobj, mode="w") as tar: 1712 raise Exception 1713 except: 1714 pass 1715 self.assertFalse(fobj.closed, "external file object was closed") 1716 self.assertTrue(tar.closed, "context manager failed") 1717 1718 1719class LinkEmulationTest(ReadTest): 1720 1721 # Test for issue #8741 regression. On platforms that do not support 1722 # symbolic or hard links tarfile tries to extract these types of members as 1723 # the regular files they point to. 1724 def _test_link_extraction(self, name): 1725 self.tar.extract(name, TEMPDIR) 1726 data = open(os.path.join(TEMPDIR, name), "rb").read() 1727 self.assertEqual(md5sum(data), md5_regtype) 1728 1729 # When 8879 gets fixed, this will need to change. Currently on Windows 1730 # we have os.path.islink but no os.link, so these tests fail without the 1731 # following skip until link is completed. 1732 @unittest.skipIf(hasattr(os.path, "islink"), 1733 "Skip emulation - has os.path.islink but not os.link") 1734 def test_hardlink_extraction1(self): 1735 self._test_link_extraction("ustar/lnktype") 1736 1737 @unittest.skipIf(hasattr(os.path, "islink"), 1738 "Skip emulation - has os.path.islink but not os.link") 1739 def test_hardlink_extraction2(self): 1740 self._test_link_extraction("./ustar/linktest2/lnktype") 1741 1742 @unittest.skipIf(hasattr(os, "symlink"), 1743 "Skip emulation if symlink exists") 1744 def test_symlink_extraction1(self): 1745 self._test_link_extraction("ustar/symtype") 1746 1747 @unittest.skipIf(hasattr(os, "symlink"), 1748 "Skip emulation if symlink exists") 1749 def test_symlink_extraction2(self): 1750 self._test_link_extraction("./ustar/linktest2/symtype") 1751 1752 1753class GzipMiscReadTest(MiscReadTest): 1754 tarname = gzipname 1755 mode = "r:gz" 1756 1757 def test_non_existent_targz_file(self): 1758 # Test for issue11513: prevent non-existent gzipped tarfiles raising 1759 # multiple exceptions. 1760 with self.assertRaisesRegex(IOError, "xxx") as ex: 1761 tarfile.open("xxx", self.mode) 1762 self.assertEqual(ex.exception.errno, errno.ENOENT) 1763 1764class GzipUstarReadTest(UstarReadTest): 1765 tarname = gzipname 1766 mode = "r:gz" 1767class GzipStreamReadTest(StreamReadTest): 1768 tarname = gzipname 1769 mode = "r|gz" 1770class GzipWriteTest(WriteTest): 1771 mode = "w:gz" 1772class GzipStreamWriteTest(StreamWriteTest): 1773 mode = "w|gz" 1774 1775 1776class Bz2MiscReadTest(MiscReadTest): 1777 tarname = bz2name 1778 mode = "r:bz2" 1779class Bz2UstarReadTest(UstarReadTest): 1780 tarname = bz2name 1781 mode = "r:bz2" 1782class Bz2StreamReadTest(StreamReadTest): 1783 tarname = bz2name 1784 mode = "r|bz2" 1785class Bz2WriteTest(WriteTest): 1786 mode = "w:bz2" 1787class Bz2StreamWriteTest(StreamWriteTest): 1788 mode = "w|bz2" 1789 1790class Bz2PartialReadTest(unittest.TestCase): 1791 # Issue5068: The _BZ2Proxy.read() method loops forever 1792 # on an empty or partial bzipped file. 1793 1794 def _test_partial_input(self, mode): 1795 class MyBytesIO(io.BytesIO): 1796 hit_eof = False 1797 def read(self, n): 1798 if self.hit_eof: 1799 raise AssertionError("infinite loop detected in tarfile.open()") 1800 self.hit_eof = self.tell() == len(self.getvalue()) 1801 return super(MyBytesIO, self).read(n) 1802 def seek(self, *args): 1803 self.hit_eof = False 1804 return super(MyBytesIO, self).seek(*args) 1805 1806 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1807 for x in range(len(data) + 1): 1808 try: 1809 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 1810 except tarfile.ReadError: 1811 pass # we have no interest in ReadErrors 1812 1813 def test_partial_input(self): 1814 self._test_partial_input("r") 1815 1816 def test_partial_input_bz2(self): 1817 self._test_partial_input("r:bz2") 1818 1819 1820class LzmaMiscReadTest(MiscReadTest): 1821 tarname = xzname 1822 mode = "r:xz" 1823class LzmaUstarReadTest(UstarReadTest): 1824 tarname = xzname 1825 mode = "r:xz" 1826class LzmaStreamReadTest(StreamReadTest): 1827 tarname = xzname 1828 mode = "r|xz" 1829class LzmaWriteTest(WriteTest): 1830 mode = "w:xz" 1831class LzmaStreamWriteTest(StreamWriteTest): 1832 mode = "w|xz" 1833 1834 1835def test_main(): 1836 support.unlink(TEMPDIR) 1837 os.makedirs(TEMPDIR) 1838 1839 tests = [ 1840 UstarReadTest, 1841 MiscReadTest, 1842 StreamReadTest, 1843 DetectReadTest, 1844 MemberReadTest, 1845 GNUReadTest, 1846 PaxReadTest, 1847 WriteTest, 1848 StreamWriteTest, 1849 GNUWriteTest, 1850 PaxWriteTest, 1851 UstarUnicodeTest, 1852 GNUUnicodeTest, 1853 PAXUnicodeTest, 1854 AppendTest, 1855 LimitsTest, 1856 MiscTest, 1857 ContextManagerTest, 1858 ] 1859 1860 if hasattr(os, "link"): 1861 tests.append(HardlinkTest) 1862 else: 1863 tests.append(LinkEmulationTest) 1864 1865 with open(tarname, "rb") as fobj: 1866 data = fobj.read() 1867 1868 if gzip: 1869 # Create testtar.tar.gz and add gzip-specific tests. 1870 support.unlink(gzipname) 1871 with gzip.open(gzipname, "wb") as tar: 1872 tar.write(data) 1873 1874 tests += [ 1875 GzipMiscReadTest, 1876 GzipUstarReadTest, 1877 GzipStreamReadTest, 1878 GzipWriteTest, 1879 GzipStreamWriteTest, 1880 ] 1881 1882 if bz2: 1883 # Create testtar.tar.bz2 and add bz2-specific tests. 1884 support.unlink(bz2name) 1885 with bz2.BZ2File(bz2name, "wb") as tar: 1886 tar.write(data) 1887 1888 tests += [ 1889 Bz2MiscReadTest, 1890 Bz2UstarReadTest, 1891 Bz2StreamReadTest, 1892 Bz2WriteTest, 1893 Bz2StreamWriteTest, 1894 Bz2PartialReadTest, 1895 ] 1896 1897 if lzma: 1898 # Create testtar.tar.xz and add lzma-specific tests. 1899 support.unlink(xzname) 1900 with lzma.LZMAFile(xzname, "w") as tar: 1901 tar.write(data) 1902 1903 tests += [ 1904 LzmaMiscReadTest, 1905 LzmaUstarReadTest, 1906 LzmaStreamReadTest, 1907 LzmaWriteTest, 1908 LzmaStreamWriteTest, 1909 ] 1910 1911 try: 1912 support.run_unittest(*tests) 1913 finally: 1914 if os.path.exists(TEMPDIR): 1915 shutil.rmtree(TEMPDIR) 1916 1917if __name__ == "__main__": 1918 test_main() 1919