test_tarfile.py revision dd071045e776e1c3e8cf6750a2fd1d0958bf19b3
1import sys 2import os 3import io 4import shutil 5import io 6from hashlib import md5 7import errno 8 9import unittest 10import tarfile 11 12from test import support 13 14# Check for our compression modules. 15try: 16 import gzip 17 gzip.GzipFile 18except (ImportError, AttributeError): 19 gzip = None 20try: 21 import bz2 22except ImportError: 23 bz2 = None 24 25def md5sum(data): 26 return md5(data).hexdigest() 27 28TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 29tarname = support.findfile("testtar.tar") 30gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 31bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 32tmpname = os.path.join(TEMPDIR, "tmp.tar") 33 34md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 35md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 36 37 38class ReadTest(unittest.TestCase): 39 40 tarname = tarname 41 mode = "r:" 42 43 def setUp(self): 44 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 45 46 def tearDown(self): 47 self.tar.close() 48 49 50class UstarReadTest(ReadTest): 51 52 def test_fileobj_regular_file(self): 53 tarinfo = self.tar.getmember("ustar/regtype") 54 fobj = self.tar.extractfile(tarinfo) 55 try: 56 data = fobj.read() 57 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 58 "regular file extraction failed") 59 finally: 60 fobj.close() 61 62 def test_fileobj_readlines(self): 63 self.tar.extract("ustar/regtype", TEMPDIR) 64 tarinfo = self.tar.getmember("ustar/regtype") 65 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 66 lines1 = fobj1.readlines() 67 68 fobj = self.tar.extractfile(tarinfo) 69 try: 70 fobj2 = io.TextIOWrapper(fobj) 71 lines2 = fobj2.readlines() 72 self.assertTrue(lines1 == lines2, 73 "fileobj.readlines() failed") 74 self.assertTrue(len(lines2) == 114, 75 "fileobj.readlines() failed") 76 self.assertTrue(lines2[83] == 77 "I will gladly admit that Python is not the fastest running scripting language.\n", 78 "fileobj.readlines() failed") 79 finally: 80 fobj.close() 81 82 def test_fileobj_iter(self): 83 self.tar.extract("ustar/regtype", TEMPDIR) 84 tarinfo = self.tar.getmember("ustar/regtype") 85 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1: 86 lines1 = fobj1.readlines() 87 fobj2 = self.tar.extractfile(tarinfo) 88 try: 89 lines2 = list(io.TextIOWrapper(fobj2)) 90 self.assertTrue(lines1 == lines2, 91 "fileobj.__iter__() failed") 92 finally: 93 fobj2.close() 94 95 def test_fileobj_seek(self): 96 self.tar.extract("ustar/regtype", TEMPDIR) 97 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 98 data = fobj.read() 99 100 tarinfo = self.tar.getmember("ustar/regtype") 101 fobj = self.tar.extractfile(tarinfo) 102 103 text = fobj.read() 104 fobj.seek(0) 105 self.assertEqual(0, fobj.tell(), 106 "seek() to file's start failed") 107 fobj.seek(2048, 0) 108 self.assertEqual(2048, fobj.tell(), 109 "seek() to absolute position failed") 110 fobj.seek(-1024, 1) 111 self.assertEqual(1024, fobj.tell(), 112 "seek() to negative relative position failed") 113 fobj.seek(1024, 1) 114 self.assertEqual(2048, fobj.tell(), 115 "seek() to positive relative position failed") 116 s = fobj.read(10) 117 self.assertTrue(s == data[2048:2058], 118 "read() after seek failed") 119 fobj.seek(0, 2) 120 self.assertEqual(tarinfo.size, fobj.tell(), 121 "seek() to file's end failed") 122 self.assertTrue(fobj.read() == b"", 123 "read() at file's end did not return empty string") 124 fobj.seek(-tarinfo.size, 2) 125 self.assertEqual(0, fobj.tell(), 126 "relative seek() to file's end failed") 127 fobj.seek(512) 128 s1 = fobj.readlines() 129 fobj.seek(512) 130 s2 = fobj.readlines() 131 self.assertTrue(s1 == s2, 132 "readlines() after seek failed") 133 fobj.seek(0) 134 self.assertEqual(len(fobj.readline()), fobj.tell(), 135 "tell() after readline() failed") 136 fobj.seek(512) 137 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 138 "tell() after seek() and readline() failed") 139 fobj.seek(0) 140 line = fobj.readline() 141 self.assertEqual(fobj.read(), data[len(line):], 142 "read() after readline() failed") 143 fobj.close() 144 145 # Test if symbolic and hard links are resolved by extractfile(). The 146 # test link members each point to a regular member whose data is 147 # supposed to be exported. 148 def _test_fileobj_link(self, lnktype, regtype): 149 a = self.tar.extractfile(lnktype) 150 b = self.tar.extractfile(regtype) 151 try: 152 self.assertEqual(a.name, b.name) 153 finally: 154 a.close() 155 b.close() 156 157 def test_fileobj_link1(self): 158 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 159 160 def test_fileobj_link2(self): 161 self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") 162 163 def test_fileobj_symlink1(self): 164 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 165 166 def test_fileobj_symlink2(self): 167 self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") 168 169 170class CommonReadTest(ReadTest): 171 172 def test_empty_tarfile(self): 173 # Test for issue6123: Allow opening empty archives. 174 # This test checks if tarfile.open() is able to open an empty tar 175 # archive successfully. Note that an empty tar archive is not the 176 # same as an empty file! 177 with tarfile.open(tmpname, self.mode.replace("r", "w")): 178 pass 179 try: 180 tar = tarfile.open(tmpname, self.mode) 181 tar.getnames() 182 except tarfile.ReadError: 183 self.fail("tarfile.open() failed on empty archive") 184 else: 185 self.assertListEqual(tar.getmembers(), []) 186 finally: 187 tar.close() 188 189 def test_null_tarfile(self): 190 # Test for issue6123: Allow opening empty archives. 191 # This test guarantees that tarfile.open() does not treat an empty 192 # file as an empty tar archive. 193 with open(tmpname, "wb"): 194 pass 195 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 196 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 197 198 def test_ignore_zeros(self): 199 # Test TarFile's ignore_zeros option. 200 if self.mode.endswith(":gz"): 201 _open = gzip.GzipFile 202 elif self.mode.endswith(":bz2"): 203 _open = bz2.BZ2File 204 else: 205 _open = open 206 207 for char in (b'\0', b'a'): 208 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 209 # are ignored correctly. 210 with _open(tmpname, "wb") as fobj: 211 fobj.write(char * 1024) 212 fobj.write(tarfile.TarInfo("foo").tobuf()) 213 214 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 215 try: 216 self.assertListEqual(tar.getnames(), ["foo"], 217 "ignore_zeros=True should have skipped the %r-blocks" % char) 218 finally: 219 tar.close() 220 221 222class MiscReadTest(CommonReadTest): 223 224 def test_no_name_argument(self): 225 with open(self.tarname, "rb") as fobj: 226 tar = tarfile.open(fileobj=fobj, mode=self.mode) 227 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 228 229 def test_no_name_attribute(self): 230 with open(self.tarname, "rb") as fobj: 231 data = fobj.read() 232 fobj = io.BytesIO(data) 233 self.assertRaises(AttributeError, getattr, fobj, "name") 234 tar = tarfile.open(fileobj=fobj, mode=self.mode) 235 self.assertEqual(tar.name, None) 236 237 def test_empty_name_attribute(self): 238 with open(self.tarname, "rb") as fobj: 239 data = fobj.read() 240 fobj = io.BytesIO(data) 241 fobj.name = "" 242 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 243 self.assertEqual(tar.name, None) 244 245 def test_fileobj_with_offset(self): 246 # Skip the first member and store values from the second member 247 # of the testtar. 248 tar = tarfile.open(self.tarname, mode=self.mode) 249 try: 250 tar.next() 251 t = tar.next() 252 name = t.name 253 offset = t.offset 254 f = tar.extractfile(t) 255 data = f.read() 256 f.close() 257 finally: 258 tar.close() 259 260 # Open the testtar and seek to the offset of the second member. 261 if self.mode.endswith(":gz"): 262 _open = gzip.GzipFile 263 elif self.mode.endswith(":bz2"): 264 _open = bz2.BZ2File 265 else: 266 _open = open 267 fobj = _open(self.tarname, "rb") 268 try: 269 fobj.seek(offset) 270 271 # Test if the tarfile starts with the second member. 272 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 273 t = tar.next() 274 self.assertEqual(t.name, name) 275 # Read to the end of fileobj and test if seeking back to the 276 # beginning works. 277 tar.getmembers() 278 self.assertEqual(tar.extractfile(t).read(), data, 279 "seek back did not work") 280 tar.close() 281 finally: 282 fobj.close() 283 284 def test_fail_comp(self): 285 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 286 if self.mode == "r:": 287 return 288 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 289 with open(tarname, "rb") as fobj: 290 self.assertRaises(tarfile.ReadError, tarfile.open, 291 fileobj=fobj, mode=self.mode) 292 293 def test_v7_dirtype(self): 294 # Test old style dirtype member (bug #1336623): 295 # Old V7 tars create directory members using an AREGTYPE 296 # header with a "/" appended to the filename field. 297 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 298 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 299 "v7 dirtype failed") 300 301 def test_xstar_type(self): 302 # The xstar format stores extra atime and ctime fields inside the 303 # space reserved for the prefix field. The prefix field must be 304 # ignored in this case, otherwise it will mess up the name. 305 try: 306 self.tar.getmember("misc/regtype-xstar") 307 except KeyError: 308 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 309 310 def test_check_members(self): 311 for tarinfo in self.tar: 312 self.assertTrue(int(tarinfo.mtime) == 0o7606136617, 313 "wrong mtime for %s" % tarinfo.name) 314 if not tarinfo.name.startswith("ustar/"): 315 continue 316 self.assertTrue(tarinfo.uname == "tarfile", 317 "wrong uname for %s" % tarinfo.name) 318 319 def test_find_members(self): 320 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 321 "could not find all members") 322 323 @unittest.skipUnless(hasattr(os, "link"), 324 "Missing hardlink implementation") 325 @support.skip_unless_symlink 326 def test_extract_hardlink(self): 327 # Test hardlink extraction (e.g. bug #857297). 328 tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") 329 330 try: 331 tar.extract("ustar/regtype", TEMPDIR) 332 try: 333 tar.extract("ustar/lnktype", TEMPDIR) 334 except EnvironmentError as e: 335 if e.errno == errno.ENOENT: 336 self.fail("hardlink not extracted properly") 337 338 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 339 data = f.read() 340 self.assertEqual(md5sum(data), md5_regtype) 341 342 try: 343 tar.extract("ustar/symtype", TEMPDIR) 344 except EnvironmentError as e: 345 if e.errno == errno.ENOENT: 346 self.fail("symlink not extracted properly") 347 348 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 349 data = f.read() 350 self.assertEqual(md5sum(data), md5_regtype) 351 finally: 352 tar.close() 353 354 def test_extractall(self): 355 # Test if extractall() correctly restores directory permissions 356 # and times (see issue1735). 357 tar = tarfile.open(tarname, encoding="iso8859-1") 358 DIR = os.path.join(TEMPDIR, "extractall") 359 os.mkdir(DIR) 360 try: 361 directories = [t for t in tar if t.isdir()] 362 tar.extractall(DIR, directories) 363 for tarinfo in directories: 364 path = os.path.join(DIR, tarinfo.name) 365 if sys.platform != "win32": 366 # Win32 has no support for fine grained permissions. 367 self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777) 368 def format_mtime(mtime): 369 if isinstance(mtime, float): 370 return "{} ({})".format(mtime, mtime.hex()) 371 else: 372 return "{!r} (int)".format(mtime) 373 file_mtime = os.path.getmtime(path) 374 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 375 format_mtime(tarinfo.mtime), 376 format_mtime(file_mtime), 377 path) 378 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 379 finally: 380 tar.close() 381 shutil.rmtree(DIR) 382 383 def test_extract_directory(self): 384 dirtype = "ustar/dirtype" 385 DIR = os.path.join(TEMPDIR, "extractdir") 386 os.mkdir(DIR) 387 try: 388 with tarfile.open(tarname, encoding="iso8859-1") as tar: 389 tarinfo = tar.getmember(dirtype) 390 tar.extract(tarinfo, path=DIR) 391 extracted = os.path.join(DIR, dirtype) 392 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 393 if sys.platform != "win32": 394 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 395 finally: 396 shutil.rmtree(DIR) 397 398 def test_init_close_fobj(self): 399 # Issue #7341: Close the internal file object in the TarFile 400 # constructor in case of an error. For the test we rely on 401 # the fact that opening an empty file raises a ReadError. 402 empty = os.path.join(TEMPDIR, "empty") 403 with open(empty, "wb") as fobj: 404 fobj.write(b"") 405 406 try: 407 tar = object.__new__(tarfile.TarFile) 408 try: 409 tar.__init__(empty) 410 except tarfile.ReadError: 411 self.assertTrue(tar.fileobj.closed) 412 else: 413 self.fail("ReadError not raised") 414 finally: 415 support.unlink(empty) 416 417 418class StreamReadTest(CommonReadTest): 419 420 mode="r|" 421 422 def test_read_through(self): 423 # Issue #11224: A poorly designed _FileInFile.read() method 424 # caused seeking errors with stream tar files. 425 for tarinfo in self.tar: 426 if not tarinfo.isreg(): 427 continue 428 fobj = self.tar.extractfile(tarinfo) 429 while True: 430 try: 431 buf = fobj.read(512) 432 except tarfile.StreamError: 433 self.fail("simple read-through using TarFile.extractfile() failed") 434 if not buf: 435 break 436 fobj.close() 437 438 def test_fileobj_regular_file(self): 439 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 440 fobj = self.tar.extractfile(tarinfo) 441 data = fobj.read() 442 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 443 "regular file extraction failed") 444 445 def test_provoke_stream_error(self): 446 tarinfos = self.tar.getmembers() 447 f = self.tar.extractfile(tarinfos[0]) # read the first member 448 self.assertRaises(tarfile.StreamError, f.read) 449 450 def test_compare_members(self): 451 tar1 = tarfile.open(tarname, encoding="iso8859-1") 452 try: 453 tar2 = self.tar 454 455 while True: 456 t1 = tar1.next() 457 t2 = tar2.next() 458 if t1 is None: 459 break 460 self.assertTrue(t2 is not None, "stream.next() failed.") 461 462 if t2.islnk() or t2.issym(): 463 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) 464 continue 465 466 v1 = tar1.extractfile(t1) 467 v2 = tar2.extractfile(t2) 468 if v1 is None: 469 continue 470 self.assertTrue(v2 is not None, "stream.extractfile() failed") 471 self.assertEqual(v1.read(), v2.read(), "stream extraction failed") 472 finally: 473 tar1.close() 474 475 476class DetectReadTest(unittest.TestCase): 477 478 def _testfunc_file(self, name, mode): 479 try: 480 tar = tarfile.open(name, mode) 481 except tarfile.ReadError as e: 482 self.fail() 483 else: 484 tar.close() 485 486 def _testfunc_fileobj(self, name, mode): 487 try: 488 with open(name, "rb") as f: 489 tar = tarfile.open(name, mode, fileobj=f) 490 except tarfile.ReadError as e: 491 self.fail() 492 else: 493 tar.close() 494 495 def _test_modes(self, testfunc): 496 testfunc(tarname, "r") 497 testfunc(tarname, "r:") 498 testfunc(tarname, "r:*") 499 testfunc(tarname, "r|") 500 testfunc(tarname, "r|*") 501 502 if gzip: 503 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") 504 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") 505 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") 506 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") 507 508 testfunc(gzipname, "r") 509 testfunc(gzipname, "r:*") 510 testfunc(gzipname, "r:gz") 511 testfunc(gzipname, "r|*") 512 testfunc(gzipname, "r|gz") 513 514 if bz2: 515 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") 516 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") 517 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") 518 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") 519 520 testfunc(bz2name, "r") 521 testfunc(bz2name, "r:*") 522 testfunc(bz2name, "r:bz2") 523 testfunc(bz2name, "r|*") 524 testfunc(bz2name, "r|bz2") 525 526 def test_detect_file(self): 527 self._test_modes(self._testfunc_file) 528 529 def test_detect_fileobj(self): 530 self._test_modes(self._testfunc_fileobj) 531 532 533class MemberReadTest(ReadTest): 534 535 def _test_member(self, tarinfo, chksum=None, **kwargs): 536 if chksum is not None: 537 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 538 "wrong md5sum for %s" % tarinfo.name) 539 540 kwargs["mtime"] = 0o7606136617 541 kwargs["uid"] = 1000 542 kwargs["gid"] = 100 543 if "old-v7" not in tarinfo.name: 544 # V7 tar can't handle alphabetic owners. 545 kwargs["uname"] = "tarfile" 546 kwargs["gname"] = "tarfile" 547 for k, v in kwargs.items(): 548 self.assertTrue(getattr(tarinfo, k) == v, 549 "wrong value in %s field of %s" % (k, tarinfo.name)) 550 551 def test_find_regtype(self): 552 tarinfo = self.tar.getmember("ustar/regtype") 553 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 554 555 def test_find_conttype(self): 556 tarinfo = self.tar.getmember("ustar/conttype") 557 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 558 559 def test_find_dirtype(self): 560 tarinfo = self.tar.getmember("ustar/dirtype") 561 self._test_member(tarinfo, size=0) 562 563 def test_find_dirtype_with_size(self): 564 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 565 self._test_member(tarinfo, size=255) 566 567 def test_find_lnktype(self): 568 tarinfo = self.tar.getmember("ustar/lnktype") 569 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 570 571 def test_find_symtype(self): 572 tarinfo = self.tar.getmember("ustar/symtype") 573 self._test_member(tarinfo, size=0, linkname="regtype") 574 575 def test_find_blktype(self): 576 tarinfo = self.tar.getmember("ustar/blktype") 577 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 578 579 def test_find_chrtype(self): 580 tarinfo = self.tar.getmember("ustar/chrtype") 581 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 582 583 def test_find_fifotype(self): 584 tarinfo = self.tar.getmember("ustar/fifotype") 585 self._test_member(tarinfo, size=0) 586 587 def test_find_sparse(self): 588 tarinfo = self.tar.getmember("ustar/sparse") 589 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 590 591 def test_find_gnusparse(self): 592 tarinfo = self.tar.getmember("gnu/sparse") 593 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 594 595 def test_find_gnusparse_00(self): 596 tarinfo = self.tar.getmember("gnu/sparse-0.0") 597 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 598 599 def test_find_gnusparse_01(self): 600 tarinfo = self.tar.getmember("gnu/sparse-0.1") 601 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 602 603 def test_find_gnusparse_10(self): 604 tarinfo = self.tar.getmember("gnu/sparse-1.0") 605 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 606 607 def test_find_umlauts(self): 608 tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 609 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 610 611 def test_find_ustar_longname(self): 612 name = "ustar/" + "12345/" * 39 + "1234567/longname" 613 self.assertIn(name, self.tar.getnames()) 614 615 def test_find_regtype_oldv7(self): 616 tarinfo = self.tar.getmember("misc/regtype-old-v7") 617 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 618 619 def test_find_pax_umlauts(self): 620 self.tar.close() 621 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 622 tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 623 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 624 625 626class LongnameTest(ReadTest): 627 628 def test_read_longname(self): 629 # Test reading of longname (bug #1471427). 630 longname = self.subdir + "/" + "123/" * 125 + "longname" 631 try: 632 tarinfo = self.tar.getmember(longname) 633 except KeyError: 634 self.fail("longname not found") 635 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 636 637 def test_read_longlink(self): 638 longname = self.subdir + "/" + "123/" * 125 + "longname" 639 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 640 try: 641 tarinfo = self.tar.getmember(longlink) 642 except KeyError: 643 self.fail("longlink not found") 644 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 645 646 def test_truncated_longname(self): 647 longname = self.subdir + "/" + "123/" * 125 + "longname" 648 tarinfo = self.tar.getmember(longname) 649 offset = tarinfo.offset 650 self.tar.fileobj.seek(offset) 651 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 652 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) 653 654 def test_header_offset(self): 655 # Test if the start offset of the TarInfo object includes 656 # the preceding extended header. 657 longname = self.subdir + "/" + "123/" * 125 + "longname" 658 offset = self.tar.getmember(longname).offset 659 with open(tarname, "rb") as fobj: 660 fobj.seek(offset) 661 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict") 662 self.assertEqual(tarinfo.type, self.longnametype) 663 664 665class GNUReadTest(LongnameTest): 666 667 subdir = "gnu" 668 longnametype = tarfile.GNUTYPE_LONGNAME 669 670 # Since 3.2 tarfile is supposed to accurately restore sparse members and 671 # produce files with holes. This is what we actually want to test here. 672 # Unfortunately, not all platforms/filesystems support sparse files, and 673 # even on platforms that do it is non-trivial to make reliable assertions 674 # about holes in files. Therefore, we first do one basic test which works 675 # an all platforms, and after that a test that will work only on 676 # platforms/filesystems that prove to support sparse files. 677 def _test_sparse_file(self, name): 678 self.tar.extract(name, TEMPDIR) 679 filename = os.path.join(TEMPDIR, name) 680 with open(filename, "rb") as fobj: 681 data = fobj.read() 682 self.assertEqual(md5sum(data), md5_sparse, 683 "wrong md5sum for %s" % name) 684 685 if self._fs_supports_holes(): 686 s = os.stat(filename) 687 self.assertTrue(s.st_blocks * 512 < s.st_size) 688 689 def test_sparse_file_old(self): 690 self._test_sparse_file("gnu/sparse") 691 692 def test_sparse_file_00(self): 693 self._test_sparse_file("gnu/sparse-0.0") 694 695 def test_sparse_file_01(self): 696 self._test_sparse_file("gnu/sparse-0.1") 697 698 def test_sparse_file_10(self): 699 self._test_sparse_file("gnu/sparse-1.0") 700 701 @staticmethod 702 def _fs_supports_holes(): 703 # Return True if the platform knows the st_blocks stat attribute and 704 # uses st_blocks units of 512 bytes, and if the filesystem is able to 705 # store holes in files. 706 if sys.platform == "linux2": 707 # Linux evidentially has 512 byte st_blocks units. 708 name = os.path.join(TEMPDIR, "sparse-test") 709 with open(name, "wb") as fobj: 710 fobj.seek(4096) 711 fobj.truncate() 712 s = os.stat(name) 713 os.remove(name) 714 return s.st_blocks == 0 715 else: 716 return False 717 718 719class PaxReadTest(LongnameTest): 720 721 subdir = "pax" 722 longnametype = tarfile.XHDTYPE 723 724 def test_pax_global_headers(self): 725 tar = tarfile.open(tarname, encoding="iso8859-1") 726 try: 727 tarinfo = tar.getmember("pax/regtype1") 728 self.assertEqual(tarinfo.uname, "foo") 729 self.assertEqual(tarinfo.gname, "bar") 730 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 731 732 tarinfo = tar.getmember("pax/regtype2") 733 self.assertEqual(tarinfo.uname, "") 734 self.assertEqual(tarinfo.gname, "bar") 735 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 736 737 tarinfo = tar.getmember("pax/regtype3") 738 self.assertEqual(tarinfo.uname, "tarfile") 739 self.assertEqual(tarinfo.gname, "tarfile") 740 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 741 finally: 742 tar.close() 743 744 def test_pax_number_fields(self): 745 # All following number fields are read from the pax header. 746 tar = tarfile.open(tarname, encoding="iso8859-1") 747 try: 748 tarinfo = tar.getmember("pax/regtype4") 749 self.assertEqual(tarinfo.size, 7011) 750 self.assertEqual(tarinfo.uid, 123) 751 self.assertEqual(tarinfo.gid, 123) 752 self.assertEqual(tarinfo.mtime, 1041808783.0) 753 self.assertEqual(type(tarinfo.mtime), float) 754 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 755 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 756 finally: 757 tar.close() 758 759 760class WriteTestBase(unittest.TestCase): 761 # Put all write tests in here that are supposed to be tested 762 # in all possible mode combinations. 763 764 def test_fileobj_no_close(self): 765 fobj = io.BytesIO() 766 tar = tarfile.open(fileobj=fobj, mode=self.mode) 767 tar.addfile(tarfile.TarInfo("foo")) 768 tar.close() 769 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 770 771 772class WriteTest(WriteTestBase): 773 774 mode = "w:" 775 776 def test_100_char_name(self): 777 # The name field in a tar header stores strings of at most 100 chars. 778 # If a string is shorter than 100 chars it has to be padded with '\0', 779 # which implies that a string of exactly 100 chars is stored without 780 # a trailing '\0'. 781 name = "0123456789" * 10 782 tar = tarfile.open(tmpname, self.mode) 783 try: 784 t = tarfile.TarInfo(name) 785 tar.addfile(t) 786 finally: 787 tar.close() 788 789 tar = tarfile.open(tmpname) 790 try: 791 self.assertTrue(tar.getnames()[0] == name, 792 "failed to store 100 char filename") 793 finally: 794 tar.close() 795 796 def test_tar_size(self): 797 # Test for bug #1013882. 798 tar = tarfile.open(tmpname, self.mode) 799 try: 800 path = os.path.join(TEMPDIR, "file") 801 with open(path, "wb") as fobj: 802 fobj.write(b"aaa") 803 tar.add(path) 804 finally: 805 tar.close() 806 self.assertTrue(os.path.getsize(tmpname) > 0, 807 "tarfile is empty") 808 809 # The test_*_size tests test for bug #1167128. 810 def test_file_size(self): 811 tar = tarfile.open(tmpname, self.mode) 812 try: 813 path = os.path.join(TEMPDIR, "file") 814 with open(path, "wb"): 815 pass 816 tarinfo = tar.gettarinfo(path) 817 self.assertEqual(tarinfo.size, 0) 818 819 with open(path, "wb") as fobj: 820 fobj.write(b"aaa") 821 tarinfo = tar.gettarinfo(path) 822 self.assertEqual(tarinfo.size, 3) 823 finally: 824 tar.close() 825 826 def test_directory_size(self): 827 path = os.path.join(TEMPDIR, "directory") 828 os.mkdir(path) 829 try: 830 tar = tarfile.open(tmpname, self.mode) 831 try: 832 tarinfo = tar.gettarinfo(path) 833 self.assertEqual(tarinfo.size, 0) 834 finally: 835 tar.close() 836 finally: 837 os.rmdir(path) 838 839 def test_link_size(self): 840 if hasattr(os, "link"): 841 link = os.path.join(TEMPDIR, "link") 842 target = os.path.join(TEMPDIR, "link_target") 843 with open(target, "wb") as fobj: 844 fobj.write(b"aaa") 845 os.link(target, link) 846 try: 847 tar = tarfile.open(tmpname, self.mode) 848 try: 849 # Record the link target in the inodes list. 850 tar.gettarinfo(target) 851 tarinfo = tar.gettarinfo(link) 852 self.assertEqual(tarinfo.size, 0) 853 finally: 854 tar.close() 855 finally: 856 os.remove(target) 857 os.remove(link) 858 859 @support.skip_unless_symlink 860 def test_symlink_size(self): 861 path = os.path.join(TEMPDIR, "symlink") 862 os.symlink("link_target", path) 863 try: 864 tar = tarfile.open(tmpname, self.mode) 865 try: 866 tarinfo = tar.gettarinfo(path) 867 self.assertEqual(tarinfo.size, 0) 868 finally: 869 tar.close() 870 finally: 871 os.remove(path) 872 873 def test_add_self(self): 874 # Test for #1257255. 875 dstname = os.path.abspath(tmpname) 876 tar = tarfile.open(tmpname, self.mode) 877 try: 878 self.assertTrue(tar.name == dstname, "archive name must be absolute") 879 tar.add(dstname) 880 self.assertTrue(tar.getnames() == [], "added the archive to itself") 881 882 cwd = os.getcwd() 883 os.chdir(TEMPDIR) 884 tar.add(dstname) 885 os.chdir(cwd) 886 self.assertTrue(tar.getnames() == [], "added the archive to itself") 887 finally: 888 tar.close() 889 890 def test_exclude(self): 891 tempdir = os.path.join(TEMPDIR, "exclude") 892 os.mkdir(tempdir) 893 try: 894 for name in ("foo", "bar", "baz"): 895 name = os.path.join(tempdir, name) 896 open(name, "wb").close() 897 898 exclude = os.path.isfile 899 900 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 901 try: 902 with support.check_warnings(("use the filter argument", 903 DeprecationWarning)): 904 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 905 finally: 906 tar.close() 907 908 tar = tarfile.open(tmpname, "r") 909 try: 910 self.assertEqual(len(tar.getmembers()), 1) 911 self.assertEqual(tar.getnames()[0], "empty_dir") 912 finally: 913 tar.close() 914 finally: 915 shutil.rmtree(tempdir) 916 917 def test_filter(self): 918 tempdir = os.path.join(TEMPDIR, "filter") 919 os.mkdir(tempdir) 920 try: 921 for name in ("foo", "bar", "baz"): 922 name = os.path.join(tempdir, name) 923 open(name, "wb").close() 924 925 def filter(tarinfo): 926 if os.path.basename(tarinfo.name) == "bar": 927 return 928 tarinfo.uid = 123 929 tarinfo.uname = "foo" 930 return tarinfo 931 932 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 933 try: 934 tar.add(tempdir, arcname="empty_dir", filter=filter) 935 finally: 936 tar.close() 937 938 # Verify that filter is a keyword-only argument 939 with self.assertRaises(TypeError): 940 tar.add(tempdir, "empty_dir", True, None, filter) 941 942 tar = tarfile.open(tmpname, "r") 943 try: 944 for tarinfo in tar: 945 self.assertEqual(tarinfo.uid, 123) 946 self.assertEqual(tarinfo.uname, "foo") 947 self.assertEqual(len(tar.getmembers()), 3) 948 finally: 949 tar.close() 950 finally: 951 shutil.rmtree(tempdir) 952 953 # Guarantee that stored pathnames are not modified. Don't 954 # remove ./ or ../ or double slashes. Still make absolute 955 # pathnames relative. 956 # For details see bug #6054. 957 def _test_pathname(self, path, cmp_path=None, dir=False): 958 # Create a tarfile with an empty member named path 959 # and compare the stored name with the original. 960 foo = os.path.join(TEMPDIR, "foo") 961 if not dir: 962 open(foo, "w").close() 963 else: 964 os.mkdir(foo) 965 966 tar = tarfile.open(tmpname, self.mode) 967 try: 968 tar.add(foo, arcname=path) 969 finally: 970 tar.close() 971 972 tar = tarfile.open(tmpname, "r") 973 try: 974 t = tar.next() 975 finally: 976 tar.close() 977 978 if not dir: 979 os.remove(foo) 980 else: 981 os.rmdir(foo) 982 983 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 984 985 def test_pathnames(self): 986 self._test_pathname("foo") 987 self._test_pathname(os.path.join("foo", ".", "bar")) 988 self._test_pathname(os.path.join("foo", "..", "bar")) 989 self._test_pathname(os.path.join(".", "foo")) 990 self._test_pathname(os.path.join(".", "foo", ".")) 991 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 992 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 993 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 994 self._test_pathname(os.path.join("..", "foo")) 995 self._test_pathname(os.path.join("..", "foo", "..")) 996 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 997 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 998 999 self._test_pathname("foo" + os.sep + os.sep + "bar") 1000 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1001 1002 def test_abs_pathnames(self): 1003 if sys.platform == "win32": 1004 self._test_pathname("C:\\foo", "foo") 1005 else: 1006 self._test_pathname("/foo", "foo") 1007 self._test_pathname("///foo", "foo") 1008 1009 def test_cwd(self): 1010 # Test adding the current working directory. 1011 cwd = os.getcwd() 1012 os.chdir(TEMPDIR) 1013 try: 1014 tar = tarfile.open(tmpname, self.mode) 1015 try: 1016 tar.add(".") 1017 finally: 1018 tar.close() 1019 1020 tar = tarfile.open(tmpname, "r") 1021 try: 1022 for t in tar: 1023 self.assertTrue(t.name == "." or t.name.startswith("./")) 1024 finally: 1025 tar.close() 1026 finally: 1027 os.chdir(cwd) 1028 1029 1030class StreamWriteTest(WriteTestBase): 1031 1032 mode = "w|" 1033 1034 def test_stream_padding(self): 1035 # Test for bug #1543303. 1036 tar = tarfile.open(tmpname, self.mode) 1037 tar.close() 1038 1039 if self.mode.endswith("gz"): 1040 with gzip.GzipFile(tmpname) as fobj: 1041 data = fobj.read() 1042 elif self.mode.endswith("bz2"): 1043 dec = bz2.BZ2Decompressor() 1044 with open(tmpname, "rb") as fobj: 1045 data = fobj.read() 1046 data = dec.decompress(data) 1047 self.assertTrue(len(dec.unused_data) == 0, 1048 "found trailing data") 1049 else: 1050 with open(tmpname, "rb") as fobj: 1051 data = fobj.read() 1052 1053 self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, 1054 "incorrect zero padding") 1055 1056 def test_file_mode(self): 1057 # Test for issue #8464: Create files with correct 1058 # permissions. 1059 if sys.platform == "win32" or not hasattr(os, "umask"): 1060 return 1061 1062 if os.path.exists(tmpname): 1063 os.remove(tmpname) 1064 1065 original_umask = os.umask(0o022) 1066 try: 1067 tar = tarfile.open(tmpname, self.mode) 1068 tar.close() 1069 mode = os.stat(tmpname).st_mode & 0o777 1070 self.assertEqual(mode, 0o644, "wrong file permissions") 1071 finally: 1072 os.umask(original_umask) 1073 1074 1075class GNUWriteTest(unittest.TestCase): 1076 # This testcase checks for correct creation of GNU Longname 1077 # and Longlink extended headers (cp. bug #812325). 1078 1079 def _length(self, s): 1080 blocks, remainder = divmod(len(s) + 1, 512) 1081 if remainder: 1082 blocks += 1 1083 return blocks * 512 1084 1085 def _calc_size(self, name, link=None): 1086 # Initial tar header 1087 count = 512 1088 1089 if len(name) > tarfile.LENGTH_NAME: 1090 # GNU longname extended header + longname 1091 count += 512 1092 count += self._length(name) 1093 if link is not None and len(link) > tarfile.LENGTH_LINK: 1094 # GNU longlink extended header + longlink 1095 count += 512 1096 count += self._length(link) 1097 return count 1098 1099 def _test(self, name, link=None): 1100 tarinfo = tarfile.TarInfo(name) 1101 if link: 1102 tarinfo.linkname = link 1103 tarinfo.type = tarfile.LNKTYPE 1104 1105 tar = tarfile.open(tmpname, "w") 1106 try: 1107 tar.format = tarfile.GNU_FORMAT 1108 tar.addfile(tarinfo) 1109 1110 v1 = self._calc_size(name, link) 1111 v2 = tar.offset 1112 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 1113 finally: 1114 tar.close() 1115 1116 tar = tarfile.open(tmpname) 1117 try: 1118 member = tar.next() 1119 self.assertIsNotNone(member, 1120 "unable to read longname member") 1121 self.assertEqual(tarinfo.name, member.name, 1122 "unable to read longname member") 1123 self.assertEqual(tarinfo.linkname, member.linkname, 1124 "unable to read longname member") 1125 finally: 1126 tar.close() 1127 1128 def test_longname_1023(self): 1129 self._test(("longnam/" * 127) + "longnam") 1130 1131 def test_longname_1024(self): 1132 self._test(("longnam/" * 127) + "longname") 1133 1134 def test_longname_1025(self): 1135 self._test(("longnam/" * 127) + "longname_") 1136 1137 def test_longlink_1023(self): 1138 self._test("name", ("longlnk/" * 127) + "longlnk") 1139 1140 def test_longlink_1024(self): 1141 self._test("name", ("longlnk/" * 127) + "longlink") 1142 1143 def test_longlink_1025(self): 1144 self._test("name", ("longlnk/" * 127) + "longlink_") 1145 1146 def test_longnamelink_1023(self): 1147 self._test(("longnam/" * 127) + "longnam", 1148 ("longlnk/" * 127) + "longlnk") 1149 1150 def test_longnamelink_1024(self): 1151 self._test(("longnam/" * 127) + "longname", 1152 ("longlnk/" * 127) + "longlink") 1153 1154 def test_longnamelink_1025(self): 1155 self._test(("longnam/" * 127) + "longname_", 1156 ("longlnk/" * 127) + "longlink_") 1157 1158 1159class HardlinkTest(unittest.TestCase): 1160 # Test the creation of LNKTYPE (hardlink) members in an archive. 1161 1162 def setUp(self): 1163 self.foo = os.path.join(TEMPDIR, "foo") 1164 self.bar = os.path.join(TEMPDIR, "bar") 1165 1166 with open(self.foo, "wb") as fobj: 1167 fobj.write(b"foo") 1168 1169 os.link(self.foo, self.bar) 1170 1171 self.tar = tarfile.open(tmpname, "w") 1172 self.tar.add(self.foo) 1173 1174 def tearDown(self): 1175 self.tar.close() 1176 support.unlink(self.foo) 1177 support.unlink(self.bar) 1178 1179 def test_add_twice(self): 1180 # The same name will be added as a REGTYPE every 1181 # time regardless of st_nlink. 1182 tarinfo = self.tar.gettarinfo(self.foo) 1183 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1184 "add file as regular failed") 1185 1186 def test_add_hardlink(self): 1187 tarinfo = self.tar.gettarinfo(self.bar) 1188 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 1189 "add file as hardlink failed") 1190 1191 def test_dereference_hardlink(self): 1192 self.tar.dereference = True 1193 tarinfo = self.tar.gettarinfo(self.bar) 1194 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1195 "dereferencing hardlink failed") 1196 1197 1198class PaxWriteTest(GNUWriteTest): 1199 1200 def _test(self, name, link=None): 1201 # See GNUWriteTest. 1202 tarinfo = tarfile.TarInfo(name) 1203 if link: 1204 tarinfo.linkname = link 1205 tarinfo.type = tarfile.LNKTYPE 1206 1207 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1208 try: 1209 tar.addfile(tarinfo) 1210 finally: 1211 tar.close() 1212 1213 tar = tarfile.open(tmpname) 1214 try: 1215 if link: 1216 l = tar.getmembers()[0].linkname 1217 self.assertTrue(link == l, "PAX longlink creation failed") 1218 else: 1219 n = tar.getmembers()[0].name 1220 self.assertTrue(name == n, "PAX longname creation failed") 1221 finally: 1222 tar.close() 1223 1224 def test_pax_global_header(self): 1225 pax_headers = { 1226 "foo": "bar", 1227 "uid": "0", 1228 "mtime": "1.23", 1229 "test": "\xe4\xf6\xfc", 1230 "\xe4\xf6\xfc": "test"} 1231 1232 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1233 pax_headers=pax_headers) 1234 try: 1235 tar.addfile(tarfile.TarInfo("test")) 1236 finally: 1237 tar.close() 1238 1239 # Test if the global header was written correctly. 1240 tar = tarfile.open(tmpname, encoding="iso8859-1") 1241 try: 1242 self.assertEqual(tar.pax_headers, pax_headers) 1243 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1244 # Test if all the fields are strings. 1245 for key, val in tar.pax_headers.items(): 1246 self.assertTrue(type(key) is not bytes) 1247 self.assertTrue(type(val) is not bytes) 1248 if key in tarfile.PAX_NUMBER_FIELDS: 1249 try: 1250 tarfile.PAX_NUMBER_FIELDS[key](val) 1251 except (TypeError, ValueError): 1252 self.fail("unable to convert pax header field") 1253 finally: 1254 tar.close() 1255 1256 def test_pax_extended_header(self): 1257 # The fields from the pax header have priority over the 1258 # TarInfo. 1259 pax_headers = {"path": "foo", "uid": "123"} 1260 1261 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") 1262 try: 1263 t = tarfile.TarInfo() 1264 t.name = "\xe4\xf6\xfc" # non-ASCII 1265 t.uid = 8**8 # too large 1266 t.pax_headers = pax_headers 1267 tar.addfile(t) 1268 finally: 1269 tar.close() 1270 1271 tar = tarfile.open(tmpname, encoding="iso8859-1") 1272 try: 1273 t = tar.getmembers()[0] 1274 self.assertEqual(t.pax_headers, pax_headers) 1275 self.assertEqual(t.name, "foo") 1276 self.assertEqual(t.uid, 123) 1277 finally: 1278 tar.close() 1279 1280 1281class UstarUnicodeTest(unittest.TestCase): 1282 1283 format = tarfile.USTAR_FORMAT 1284 1285 def test_iso8859_1_filename(self): 1286 self._test_unicode_filename("iso8859-1") 1287 1288 def test_utf7_filename(self): 1289 self._test_unicode_filename("utf7") 1290 1291 def test_utf8_filename(self): 1292 self._test_unicode_filename("utf8") 1293 1294 def _test_unicode_filename(self, encoding): 1295 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") 1296 try: 1297 name = "\xe4\xf6\xfc" 1298 tar.addfile(tarfile.TarInfo(name)) 1299 finally: 1300 tar.close() 1301 1302 tar = tarfile.open(tmpname, encoding=encoding) 1303 try: 1304 self.assertEqual(tar.getmembers()[0].name, name) 1305 finally: 1306 tar.close() 1307 1308 def test_unicode_filename_error(self): 1309 if self.format == tarfile.PAX_FORMAT: 1310 # PAX_FORMAT ignores encoding in write mode. 1311 return 1312 1313 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") 1314 try: 1315 tarinfo = tarfile.TarInfo() 1316 1317 tarinfo.name = "\xe4\xf6\xfc" 1318 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1319 1320 tarinfo.name = "foo" 1321 tarinfo.uname = "\xe4\xf6\xfc" 1322 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1323 finally: 1324 tar.close() 1325 1326 def test_unicode_argument(self): 1327 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 1328 try: 1329 for t in tar: 1330 self.assertTrue(type(t.name) is str) 1331 self.assertTrue(type(t.linkname) is str) 1332 self.assertTrue(type(t.uname) is str) 1333 self.assertTrue(type(t.gname) is str) 1334 finally: 1335 tar.close() 1336 1337 def test_uname_unicode(self): 1338 t = tarfile.TarInfo("foo") 1339 t.uname = "\xe4\xf6\xfc" 1340 t.gname = "\xe4\xf6\xfc" 1341 1342 tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") 1343 try: 1344 tar.addfile(t) 1345 finally: 1346 tar.close() 1347 1348 tar = tarfile.open(tmpname, encoding="iso8859-1") 1349 try: 1350 t = tar.getmember("foo") 1351 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1352 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1353 1354 if self.format != tarfile.PAX_FORMAT: 1355 tar.close() 1356 tar = tarfile.open(tmpname, encoding="ascii") 1357 t = tar.getmember("foo") 1358 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1359 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1360 finally: 1361 tar.close() 1362 1363 1364class GNUUnicodeTest(UstarUnicodeTest): 1365 1366 format = tarfile.GNU_FORMAT 1367 1368 def test_bad_pax_header(self): 1369 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1370 # without a hdrcharset=BINARY header. 1371 for encoding, name in (("utf8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1372 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1373 with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: 1374 try: 1375 t = tar.getmember(name) 1376 except KeyError: 1377 self.fail("unable to read bad GNU tar pax header") 1378 1379 1380class PAXUnicodeTest(UstarUnicodeTest): 1381 1382 format = tarfile.PAX_FORMAT 1383 1384 def test_binary_header(self): 1385 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1386 for encoding, name in (("utf8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1387 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1388 with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: 1389 try: 1390 t = tar.getmember(name) 1391 except KeyError: 1392 self.fail("unable to read POSIX.1-2008 binary header") 1393 1394 1395class AppendTest(unittest.TestCase): 1396 # Test append mode (cp. patch #1652681). 1397 1398 def setUp(self): 1399 self.tarname = tmpname 1400 if os.path.exists(self.tarname): 1401 os.remove(self.tarname) 1402 1403 def _add_testfile(self, fileobj=None): 1404 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1405 tar.addfile(tarfile.TarInfo("bar")) 1406 1407 def _create_testtar(self, mode="w:"): 1408 with tarfile.open(tarname, encoding="iso8859-1") as src: 1409 t = src.getmember("ustar/regtype") 1410 t.name = "foo" 1411 f = src.extractfile(t) 1412 try: 1413 with tarfile.open(self.tarname, mode) as tar: 1414 tar.addfile(t, f) 1415 finally: 1416 f.close() 1417 1418 def _test(self, names=["bar"], fileobj=None): 1419 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1420 self.assertEqual(tar.getnames(), names) 1421 1422 def test_non_existing(self): 1423 self._add_testfile() 1424 self._test() 1425 1426 def test_empty(self): 1427 tarfile.open(self.tarname, "w:").close() 1428 self._add_testfile() 1429 self._test() 1430 1431 def test_empty_fileobj(self): 1432 fobj = io.BytesIO(b"\0" * 1024) 1433 self._add_testfile(fobj) 1434 fobj.seek(0) 1435 self._test(fileobj=fobj) 1436 1437 def test_fileobj(self): 1438 self._create_testtar() 1439 with open(self.tarname, "rb") as fobj: 1440 data = fobj.read() 1441 fobj = io.BytesIO(data) 1442 self._add_testfile(fobj) 1443 fobj.seek(0) 1444 self._test(names=["foo", "bar"], fileobj=fobj) 1445 1446 def test_existing(self): 1447 self._create_testtar() 1448 self._add_testfile() 1449 self._test(names=["foo", "bar"]) 1450 1451 def test_append_gz(self): 1452 if gzip is None: 1453 return 1454 self._create_testtar("w:gz") 1455 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1456 1457 def test_append_bz2(self): 1458 if bz2 is None: 1459 return 1460 self._create_testtar("w:bz2") 1461 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1462 1463 # Append mode is supposed to fail if the tarfile to append to 1464 # does not end with a zero block. 1465 def _test_error(self, data): 1466 with open(self.tarname, "wb") as fobj: 1467 fobj.write(data) 1468 self.assertRaises(tarfile.ReadError, self._add_testfile) 1469 1470 def test_null(self): 1471 self._test_error(b"") 1472 1473 def test_incomplete(self): 1474 self._test_error(b"\0" * 13) 1475 1476 def test_premature_eof(self): 1477 data = tarfile.TarInfo("foo").tobuf() 1478 self._test_error(data) 1479 1480 def test_trailing_garbage(self): 1481 data = tarfile.TarInfo("foo").tobuf() 1482 self._test_error(data + b"\0" * 13) 1483 1484 def test_invalid(self): 1485 self._test_error(b"a" * 512) 1486 1487 1488class LimitsTest(unittest.TestCase): 1489 1490 def test_ustar_limits(self): 1491 # 100 char name 1492 tarinfo = tarfile.TarInfo("0123456789" * 10) 1493 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1494 1495 # 101 char name that cannot be stored 1496 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1497 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1498 1499 # 256 char name with a slash at pos 156 1500 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1501 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1502 1503 # 256 char name that cannot be stored 1504 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1505 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1506 1507 # 512 char name 1508 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1509 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1510 1511 # 512 char linkname 1512 tarinfo = tarfile.TarInfo("longlink") 1513 tarinfo.linkname = "123/" * 126 + "longname" 1514 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1515 1516 # uid > 8 digits 1517 tarinfo = tarfile.TarInfo("name") 1518 tarinfo.uid = 0o10000000 1519 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1520 1521 def test_gnu_limits(self): 1522 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1523 tarinfo.tobuf(tarfile.GNU_FORMAT) 1524 1525 tarinfo = tarfile.TarInfo("longlink") 1526 tarinfo.linkname = "123/" * 126 + "longname" 1527 tarinfo.tobuf(tarfile.GNU_FORMAT) 1528 1529 # uid >= 256 ** 7 1530 tarinfo = tarfile.TarInfo("name") 1531 tarinfo.uid = 0o4000000000000000000 1532 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1533 1534 def test_pax_limits(self): 1535 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1536 tarinfo.tobuf(tarfile.PAX_FORMAT) 1537 1538 tarinfo = tarfile.TarInfo("longlink") 1539 tarinfo.linkname = "123/" * 126 + "longname" 1540 tarinfo.tobuf(tarfile.PAX_FORMAT) 1541 1542 tarinfo = tarfile.TarInfo("name") 1543 tarinfo.uid = 0o4000000000000000000 1544 tarinfo.tobuf(tarfile.PAX_FORMAT) 1545 1546 1547class MiscTest(unittest.TestCase): 1548 1549 def test_char_fields(self): 1550 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0") 1551 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo") 1552 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo") 1553 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo") 1554 1555 def test_number_fields(self): 1556 self.assertEqual(tarfile.itn(1), b"0000001\x00") 1557 self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff") 1558 1559 1560class ContextManagerTest(unittest.TestCase): 1561 1562 def test_basic(self): 1563 with tarfile.open(tarname) as tar: 1564 self.assertFalse(tar.closed, "closed inside runtime context") 1565 self.assertTrue(tar.closed, "context manager failed") 1566 1567 def test_closed(self): 1568 # The __enter__() method is supposed to raise IOError 1569 # if the TarFile object is already closed. 1570 tar = tarfile.open(tarname) 1571 tar.close() 1572 with self.assertRaises(IOError): 1573 with tar: 1574 pass 1575 1576 def test_exception(self): 1577 # Test if the IOError exception is passed through properly. 1578 with self.assertRaises(Exception) as exc: 1579 with tarfile.open(tarname) as tar: 1580 raise IOError 1581 self.assertIsInstance(exc.exception, IOError, 1582 "wrong exception raised in context manager") 1583 self.assertTrue(tar.closed, "context manager failed") 1584 1585 def test_no_eof(self): 1586 # __exit__() must not write end-of-archive blocks if an 1587 # exception was raised. 1588 try: 1589 with tarfile.open(tmpname, "w") as tar: 1590 raise Exception 1591 except: 1592 pass 1593 self.assertEqual(os.path.getsize(tmpname), 0, 1594 "context manager wrote an end-of-archive block") 1595 self.assertTrue(tar.closed, "context manager failed") 1596 1597 def test_eof(self): 1598 # __exit__() must write end-of-archive blocks, i.e. call 1599 # TarFile.close() if there was no error. 1600 with tarfile.open(tmpname, "w"): 1601 pass 1602 self.assertNotEqual(os.path.getsize(tmpname), 0, 1603 "context manager wrote no end-of-archive block") 1604 1605 def test_fileobj(self): 1606 # Test that __exit__() did not close the external file 1607 # object. 1608 with open(tmpname, "wb") as fobj: 1609 try: 1610 with tarfile.open(fileobj=fobj, mode="w") as tar: 1611 raise Exception 1612 except: 1613 pass 1614 self.assertFalse(fobj.closed, "external file object was closed") 1615 self.assertTrue(tar.closed, "context manager failed") 1616 1617 1618class LinkEmulationTest(ReadTest): 1619 1620 # Test for issue #8741 regression. On platforms that do not support 1621 # symbolic or hard links tarfile tries to extract these types of members as 1622 # the regular files they point to. 1623 def _test_link_extraction(self, name): 1624 self.tar.extract(name, TEMPDIR) 1625 data = open(os.path.join(TEMPDIR, name), "rb").read() 1626 self.assertEqual(md5sum(data), md5_regtype) 1627 1628 # When 8879 gets fixed, this will need to change. Currently on Windows 1629 # we have os.path.islink but no os.link, so these tests fail without the 1630 # following skip until link is completed. 1631 @unittest.skipIf(hasattr(os.path, "islink"), 1632 "Skip emulation - has os.path.islink but not os.link") 1633 def test_hardlink_extraction1(self): 1634 self._test_link_extraction("ustar/lnktype") 1635 1636 @unittest.skipIf(hasattr(os.path, "islink"), 1637 "Skip emulation - has os.path.islink but not os.link") 1638 def test_hardlink_extraction2(self): 1639 self._test_link_extraction("./ustar/linktest2/lnktype") 1640 1641 @unittest.skipIf(hasattr(os, "symlink"), 1642 "Skip emulation if symlink exists") 1643 def test_symlink_extraction1(self): 1644 self._test_link_extraction("ustar/symtype") 1645 1646 @unittest.skipIf(hasattr(os, "symlink"), 1647 "Skip emulation if symlink exists") 1648 def test_symlink_extraction2(self): 1649 self._test_link_extraction("./ustar/linktest2/symtype") 1650 1651 1652class GzipMiscReadTest(MiscReadTest): 1653 tarname = gzipname 1654 mode = "r:gz" 1655class GzipUstarReadTest(UstarReadTest): 1656 tarname = gzipname 1657 mode = "r:gz" 1658class GzipStreamReadTest(StreamReadTest): 1659 tarname = gzipname 1660 mode = "r|gz" 1661class GzipWriteTest(WriteTest): 1662 mode = "w:gz" 1663class GzipStreamWriteTest(StreamWriteTest): 1664 mode = "w|gz" 1665 1666 1667class Bz2MiscReadTest(MiscReadTest): 1668 tarname = bz2name 1669 mode = "r:bz2" 1670class Bz2UstarReadTest(UstarReadTest): 1671 tarname = bz2name 1672 mode = "r:bz2" 1673class Bz2StreamReadTest(StreamReadTest): 1674 tarname = bz2name 1675 mode = "r|bz2" 1676class Bz2WriteTest(WriteTest): 1677 mode = "w:bz2" 1678class Bz2StreamWriteTest(StreamWriteTest): 1679 mode = "w|bz2" 1680 1681class Bz2PartialReadTest(unittest.TestCase): 1682 # Issue5068: The _BZ2Proxy.read() method loops forever 1683 # on an empty or partial bzipped file. 1684 1685 def _test_partial_input(self, mode): 1686 class MyBytesIO(io.BytesIO): 1687 hit_eof = False 1688 def read(self, n): 1689 if self.hit_eof: 1690 raise AssertionError("infinite loop detected in tarfile.open()") 1691 self.hit_eof = self.tell() == len(self.getvalue()) 1692 return super(MyBytesIO, self).read(n) 1693 def seek(self, *args): 1694 self.hit_eof = False 1695 return super(MyBytesIO, self).seek(*args) 1696 1697 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1698 for x in range(len(data) + 1): 1699 try: 1700 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 1701 except tarfile.ReadError: 1702 pass # we have no interest in ReadErrors 1703 1704 def test_partial_input(self): 1705 self._test_partial_input("r") 1706 1707 def test_partial_input_bz2(self): 1708 self._test_partial_input("r:bz2") 1709 1710 1711def test_main(): 1712 support.unlink(TEMPDIR) 1713 os.makedirs(TEMPDIR) 1714 1715 tests = [ 1716 UstarReadTest, 1717 MiscReadTest, 1718 StreamReadTest, 1719 DetectReadTest, 1720 MemberReadTest, 1721 GNUReadTest, 1722 PaxReadTest, 1723 WriteTest, 1724 StreamWriteTest, 1725 GNUWriteTest, 1726 PaxWriteTest, 1727 UstarUnicodeTest, 1728 GNUUnicodeTest, 1729 PAXUnicodeTest, 1730 AppendTest, 1731 LimitsTest, 1732 MiscTest, 1733 ContextManagerTest, 1734 ] 1735 1736 if hasattr(os, "link"): 1737 tests.append(HardlinkTest) 1738 else: 1739 tests.append(LinkEmulationTest) 1740 1741 with open(tarname, "rb") as fobj: 1742 data = fobj.read() 1743 1744 if gzip: 1745 # Create testtar.tar.gz and add gzip-specific tests. 1746 support.unlink(gzipname) 1747 with gzip.open(gzipname, "wb") as tar: 1748 tar.write(data) 1749 1750 tests += [ 1751 GzipMiscReadTest, 1752 GzipUstarReadTest, 1753 GzipStreamReadTest, 1754 GzipWriteTest, 1755 GzipStreamWriteTest, 1756 ] 1757 1758 if bz2: 1759 # Create testtar.tar.bz2 and add bz2-specific tests. 1760 support.unlink(bz2name) 1761 tar = bz2.BZ2File(bz2name, "wb") 1762 try: 1763 tar.write(data) 1764 finally: 1765 tar.close() 1766 1767 tests += [ 1768 Bz2MiscReadTest, 1769 Bz2UstarReadTest, 1770 Bz2StreamReadTest, 1771 Bz2WriteTest, 1772 Bz2StreamWriteTest, 1773 Bz2PartialReadTest, 1774 ] 1775 1776 try: 1777 support.run_unittest(*tests) 1778 finally: 1779 if os.path.exists(TEMPDIR): 1780 shutil.rmtree(TEMPDIR) 1781 1782if __name__ == "__main__": 1783 test_main() 1784