test_tarfile.py revision 57f75799bf4b24b09cbee9a2de4b01b2c73757d4
1import sys 2import os 3import io 4import shutil 5import io 6from hashlib import md5 7import errno 8 9import unittest 10import tarfile 11 12from test import support 13 14# Check for our compression modules. 15try: 16 import gzip 17 gzip.GzipFile 18except (ImportError, AttributeError): 19 gzip = None 20try: 21 import bz2 22except ImportError: 23 bz2 = None 24 25def md5sum(data): 26 return md5(data).hexdigest() 27 28TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 29tarname = support.findfile("testtar.tar") 30gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 31bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 32tmpname = os.path.join(TEMPDIR, "tmp.tar") 33 34md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 35md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 36 37 38class ReadTest(unittest.TestCase): 39 40 tarname = tarname 41 mode = "r:" 42 43 def setUp(self): 44 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 45 46 def tearDown(self): 47 self.tar.close() 48 49 50class UstarReadTest(ReadTest): 51 52 def test_fileobj_regular_file(self): 53 tarinfo = self.tar.getmember("ustar/regtype") 54 fobj = self.tar.extractfile(tarinfo) 55 data = fobj.read() 56 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 57 "regular file extraction failed") 58 59 def test_fileobj_readlines(self): 60 self.tar.extract("ustar/regtype", TEMPDIR) 61 tarinfo = self.tar.getmember("ustar/regtype") 62 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 63 lines1 = fobj1.readlines() 64 fobj2 = io.TextIOWrapper(self.tar.extractfile(tarinfo)) 65 66 lines2 = fobj2.readlines() 67 self.assertTrue(lines1 == lines2, 68 "fileobj.readlines() failed") 69 self.assertTrue(len(lines2) == 114, 70 "fileobj.readlines() failed") 71 self.assertTrue(lines2[83] == 72 "I will gladly admit that Python is not the fastest running scripting language.\n", 73 "fileobj.readlines() failed") 74 75 def test_fileobj_iter(self): 76 self.tar.extract("ustar/regtype", TEMPDIR) 77 tarinfo = self.tar.getmember("ustar/regtype") 78 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1: 79 lines1 = fobj1.readlines() 80 fobj2 = self.tar.extractfile(tarinfo) 81 lines2 = list(io.TextIOWrapper(fobj2)) 82 self.assertTrue(lines1 == lines2, 83 "fileobj.__iter__() failed") 84 85 def test_fileobj_seek(self): 86 self.tar.extract("ustar/regtype", TEMPDIR) 87 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 88 data = fobj.read() 89 90 tarinfo = self.tar.getmember("ustar/regtype") 91 fobj = self.tar.extractfile(tarinfo) 92 93 text = fobj.read() 94 fobj.seek(0) 95 self.assertEqual(0, fobj.tell(), 96 "seek() to file's start failed") 97 fobj.seek(2048, 0) 98 self.assertEqual(2048, fobj.tell(), 99 "seek() to absolute position failed") 100 fobj.seek(-1024, 1) 101 self.assertEqual(1024, fobj.tell(), 102 "seek() to negative relative position failed") 103 fobj.seek(1024, 1) 104 self.assertEqual(2048, fobj.tell(), 105 "seek() to positive relative position failed") 106 s = fobj.read(10) 107 self.assertTrue(s == data[2048:2058], 108 "read() after seek failed") 109 fobj.seek(0, 2) 110 self.assertEqual(tarinfo.size, fobj.tell(), 111 "seek() to file's end failed") 112 self.assertTrue(fobj.read() == b"", 113 "read() at file's end did not return empty string") 114 fobj.seek(-tarinfo.size, 2) 115 self.assertEqual(0, fobj.tell(), 116 "relative seek() to file's end failed") 117 fobj.seek(512) 118 s1 = fobj.readlines() 119 fobj.seek(512) 120 s2 = fobj.readlines() 121 self.assertTrue(s1 == s2, 122 "readlines() after seek failed") 123 fobj.seek(0) 124 self.assertEqual(len(fobj.readline()), fobj.tell(), 125 "tell() after readline() failed") 126 fobj.seek(512) 127 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 128 "tell() after seek() and readline() failed") 129 fobj.seek(0) 130 line = fobj.readline() 131 self.assertEqual(fobj.read(), data[len(line):], 132 "read() after readline() failed") 133 fobj.close() 134 135 # Test if symbolic and hard links are resolved by extractfile(). The 136 # test link members each point to a regular member whose data is 137 # supposed to be exported. 138 def _test_fileobj_link(self, lnktype, regtype): 139 a = self.tar.extractfile(lnktype) 140 b = self.tar.extractfile(regtype) 141 self.assertEqual(a.name, b.name) 142 143 def test_fileobj_link1(self): 144 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 145 146 def test_fileobj_link2(self): 147 self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") 148 149 def test_fileobj_symlink1(self): 150 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 151 152 def test_fileobj_symlink2(self): 153 self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") 154 155 156class CommonReadTest(ReadTest): 157 158 def test_empty_tarfile(self): 159 # Test for issue6123: Allow opening empty archives. 160 # This test checks if tarfile.open() is able to open an empty tar 161 # archive successfully. Note that an empty tar archive is not the 162 # same as an empty file! 163 with tarfile.open(tmpname, self.mode.replace("r", "w")): 164 pass 165 try: 166 tar = tarfile.open(tmpname, self.mode) 167 tar.getnames() 168 except tarfile.ReadError: 169 self.fail("tarfile.open() failed on empty archive") 170 else: 171 self.assertListEqual(tar.getmembers(), []) 172 finally: 173 tar.close() 174 175 def test_null_tarfile(self): 176 # Test for issue6123: Allow opening empty archives. 177 # This test guarantees that tarfile.open() does not treat an empty 178 # file as an empty tar archive. 179 with open(tmpname, "wb"): 180 pass 181 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 182 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 183 184 def test_ignore_zeros(self): 185 # Test TarFile's ignore_zeros option. 186 if self.mode.endswith(":gz"): 187 _open = gzip.GzipFile 188 elif self.mode.endswith(":bz2"): 189 _open = bz2.BZ2File 190 else: 191 _open = open 192 193 for char in (b'\0', b'a'): 194 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 195 # are ignored correctly. 196 with _open(tmpname, "wb") as fobj: 197 fobj.write(char * 1024) 198 fobj.write(tarfile.TarInfo("foo").tobuf()) 199 200 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 201 try: 202 self.assertListEqual(tar.getnames(), ["foo"], 203 "ignore_zeros=True should have skipped the %r-blocks" % char) 204 finally: 205 tar.close() 206 207 208class MiscReadTest(CommonReadTest): 209 210 def test_no_name_argument(self): 211 with open(self.tarname, "rb") as fobj: 212 tar = tarfile.open(fileobj=fobj, mode=self.mode) 213 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 214 215 def test_no_name_attribute(self): 216 with open(self.tarname, "rb") as fobj: 217 data = fobj.read() 218 fobj = io.BytesIO(data) 219 self.assertRaises(AttributeError, getattr, fobj, "name") 220 tar = tarfile.open(fileobj=fobj, mode=self.mode) 221 self.assertEqual(tar.name, None) 222 223 def test_empty_name_attribute(self): 224 with open(self.tarname, "rb") as fobj: 225 data = fobj.read() 226 fobj = io.BytesIO(data) 227 fobj.name = "" 228 tar = tarfile.open(fileobj=fobj, mode=self.mode) 229 self.assertEqual(tar.name, None) 230 231 def test_fileobj_with_offset(self): 232 # Skip the first member and store values from the second member 233 # of the testtar. 234 tar = tarfile.open(self.tarname, mode=self.mode) 235 try: 236 tar.next() 237 t = tar.next() 238 name = t.name 239 offset = t.offset 240 data = tar.extractfile(t).read() 241 finally: 242 tar.close() 243 244 # Open the testtar and seek to the offset of the second member. 245 if self.mode.endswith(":gz"): 246 _open = gzip.GzipFile 247 elif self.mode.endswith(":bz2"): 248 _open = bz2.BZ2File 249 else: 250 _open = open 251 fobj = _open(self.tarname, "rb") 252 try: 253 fobj.seek(offset) 254 255 # Test if the tarfile starts with the second member. 256 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 257 t = tar.next() 258 self.assertEqual(t.name, name) 259 # Read to the end of fileobj and test if seeking back to the 260 # beginning works. 261 tar.getmembers() 262 self.assertEqual(tar.extractfile(t).read(), data, 263 "seek back did not work") 264 tar.close() 265 finally: 266 fobj.close() 267 268 def test_fail_comp(self): 269 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 270 if self.mode == "r:": 271 return 272 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 273 with open(tarname, "rb") as fobj: 274 self.assertRaises(tarfile.ReadError, tarfile.open, 275 fileobj=fobj, mode=self.mode) 276 277 def test_v7_dirtype(self): 278 # Test old style dirtype member (bug #1336623): 279 # Old V7 tars create directory members using an AREGTYPE 280 # header with a "/" appended to the filename field. 281 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 282 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 283 "v7 dirtype failed") 284 285 def test_xstar_type(self): 286 # The xstar format stores extra atime and ctime fields inside the 287 # space reserved for the prefix field. The prefix field must be 288 # ignored in this case, otherwise it will mess up the name. 289 try: 290 self.tar.getmember("misc/regtype-xstar") 291 except KeyError: 292 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 293 294 def test_check_members(self): 295 for tarinfo in self.tar: 296 self.assertTrue(int(tarinfo.mtime) == 0o7606136617, 297 "wrong mtime for %s" % tarinfo.name) 298 if not tarinfo.name.startswith("ustar/"): 299 continue 300 self.assertTrue(tarinfo.uname == "tarfile", 301 "wrong uname for %s" % tarinfo.name) 302 303 def test_find_members(self): 304 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 305 "could not find all members") 306 307 @unittest.skipUnless(hasattr(os, "link"), 308 "Missing hardlink implementation") 309 @support.skip_unless_symlink 310 def test_extract_hardlink(self): 311 # Test hardlink extraction (e.g. bug #857297). 312 tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") 313 314 try: 315 tar.extract("ustar/regtype", TEMPDIR) 316 try: 317 tar.extract("ustar/lnktype", TEMPDIR) 318 except EnvironmentError as e: 319 if e.errno == errno.ENOENT: 320 self.fail("hardlink not extracted properly") 321 322 data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read() 323 self.assertEqual(md5sum(data), md5_regtype) 324 325 try: 326 tar.extract("ustar/symtype", TEMPDIR) 327 except EnvironmentError as e: 328 if e.errno == errno.ENOENT: 329 self.fail("symlink not extracted properly") 330 331 data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read() 332 self.assertEqual(md5sum(data), md5_regtype) 333 finally: 334 tar.close() 335 336 def test_extractall(self): 337 # Test if extractall() correctly restores directory permissions 338 # and times (see issue1735). 339 tar = tarfile.open(tarname, encoding="iso8859-1") 340 try: 341 directories = [t for t in tar if t.isdir()] 342 tar.extractall(TEMPDIR, directories) 343 for tarinfo in directories: 344 path = os.path.join(TEMPDIR, tarinfo.name) 345 if sys.platform != "win32": 346 # Win32 has no support for fine grained permissions. 347 self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777) 348 self.assertEqual(tarinfo.mtime, os.path.getmtime(path)) 349 finally: 350 tar.close() 351 352 def test_init_close_fobj(self): 353 # Issue #7341: Close the internal file object in the TarFile 354 # constructor in case of an error. For the test we rely on 355 # the fact that opening an empty file raises a ReadError. 356 empty = os.path.join(TEMPDIR, "empty") 357 with open(empty, "wb") as fobj: 358 fobj.write(b"") 359 360 try: 361 tar = object.__new__(tarfile.TarFile) 362 try: 363 tar.__init__(empty) 364 except tarfile.ReadError: 365 self.assertTrue(tar.fileobj.closed) 366 else: 367 self.fail("ReadError not raised") 368 finally: 369 support.unlink(empty) 370 371 372class StreamReadTest(CommonReadTest): 373 374 mode="r|" 375 376 def test_fileobj_regular_file(self): 377 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 378 fobj = self.tar.extractfile(tarinfo) 379 data = fobj.read() 380 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 381 "regular file extraction failed") 382 383 def test_provoke_stream_error(self): 384 tarinfos = self.tar.getmembers() 385 f = self.tar.extractfile(tarinfos[0]) # read the first member 386 self.assertRaises(tarfile.StreamError, f.read) 387 388 def test_compare_members(self): 389 tar1 = tarfile.open(tarname, encoding="iso8859-1") 390 try: 391 tar2 = self.tar 392 393 while True: 394 t1 = tar1.next() 395 t2 = tar2.next() 396 if t1 is None: 397 break 398 self.assertTrue(t2 is not None, "stream.next() failed.") 399 400 if t2.islnk() or t2.issym(): 401 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) 402 continue 403 404 v1 = tar1.extractfile(t1) 405 v2 = tar2.extractfile(t2) 406 if v1 is None: 407 continue 408 self.assertTrue(v2 is not None, "stream.extractfile() failed") 409 self.assertEqual(v1.read(), v2.read(), "stream extraction failed") 410 finally: 411 tar1.close() 412 413 414class DetectReadTest(unittest.TestCase): 415 416 def _testfunc_file(self, name, mode): 417 try: 418 tar = tarfile.open(name, mode) 419 except tarfile.ReadError as e: 420 self.fail() 421 else: 422 tar.close() 423 424 def _testfunc_fileobj(self, name, mode): 425 try: 426 with open(name, "rb") as f: 427 tar = tarfile.open(name, mode, fileobj=f) 428 except tarfile.ReadError as e: 429 self.fail() 430 else: 431 tar.close() 432 433 def _test_modes(self, testfunc): 434 testfunc(tarname, "r") 435 testfunc(tarname, "r:") 436 testfunc(tarname, "r:*") 437 testfunc(tarname, "r|") 438 testfunc(tarname, "r|*") 439 440 if gzip: 441 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") 442 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") 443 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") 444 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") 445 446 testfunc(gzipname, "r") 447 testfunc(gzipname, "r:*") 448 testfunc(gzipname, "r:gz") 449 testfunc(gzipname, "r|*") 450 testfunc(gzipname, "r|gz") 451 452 if bz2: 453 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") 454 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") 455 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") 456 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") 457 458 testfunc(bz2name, "r") 459 testfunc(bz2name, "r:*") 460 testfunc(bz2name, "r:bz2") 461 testfunc(bz2name, "r|*") 462 testfunc(bz2name, "r|bz2") 463 464 def test_detect_file(self): 465 self._test_modes(self._testfunc_file) 466 467 def test_detect_fileobj(self): 468 self._test_modes(self._testfunc_fileobj) 469 470 471class MemberReadTest(ReadTest): 472 473 def _test_member(self, tarinfo, chksum=None, **kwargs): 474 if chksum is not None: 475 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 476 "wrong md5sum for %s" % tarinfo.name) 477 478 kwargs["mtime"] = 0o7606136617 479 kwargs["uid"] = 1000 480 kwargs["gid"] = 100 481 if "old-v7" not in tarinfo.name: 482 # V7 tar can't handle alphabetic owners. 483 kwargs["uname"] = "tarfile" 484 kwargs["gname"] = "tarfile" 485 for k, v in kwargs.items(): 486 self.assertTrue(getattr(tarinfo, k) == v, 487 "wrong value in %s field of %s" % (k, tarinfo.name)) 488 489 def test_find_regtype(self): 490 tarinfo = self.tar.getmember("ustar/regtype") 491 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 492 493 def test_find_conttype(self): 494 tarinfo = self.tar.getmember("ustar/conttype") 495 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 496 497 def test_find_dirtype(self): 498 tarinfo = self.tar.getmember("ustar/dirtype") 499 self._test_member(tarinfo, size=0) 500 501 def test_find_dirtype_with_size(self): 502 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 503 self._test_member(tarinfo, size=255) 504 505 def test_find_lnktype(self): 506 tarinfo = self.tar.getmember("ustar/lnktype") 507 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 508 509 def test_find_symtype(self): 510 tarinfo = self.tar.getmember("ustar/symtype") 511 self._test_member(tarinfo, size=0, linkname="regtype") 512 513 def test_find_blktype(self): 514 tarinfo = self.tar.getmember("ustar/blktype") 515 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 516 517 def test_find_chrtype(self): 518 tarinfo = self.tar.getmember("ustar/chrtype") 519 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 520 521 def test_find_fifotype(self): 522 tarinfo = self.tar.getmember("ustar/fifotype") 523 self._test_member(tarinfo, size=0) 524 525 def test_find_sparse(self): 526 tarinfo = self.tar.getmember("ustar/sparse") 527 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 528 529 def test_find_umlauts(self): 530 tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 531 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 532 533 def test_find_ustar_longname(self): 534 name = "ustar/" + "12345/" * 39 + "1234567/longname" 535 self.assertIn(name, self.tar.getnames()) 536 537 def test_find_regtype_oldv7(self): 538 tarinfo = self.tar.getmember("misc/regtype-old-v7") 539 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 540 541 def test_find_pax_umlauts(self): 542 self.tar.close() 543 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 544 tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 545 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 546 547 548class LongnameTest(ReadTest): 549 550 def test_read_longname(self): 551 # Test reading of longname (bug #1471427). 552 longname = self.subdir + "/" + "123/" * 125 + "longname" 553 try: 554 tarinfo = self.tar.getmember(longname) 555 except KeyError: 556 self.fail("longname not found") 557 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 558 559 def test_read_longlink(self): 560 longname = self.subdir + "/" + "123/" * 125 + "longname" 561 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 562 try: 563 tarinfo = self.tar.getmember(longlink) 564 except KeyError: 565 self.fail("longlink not found") 566 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 567 568 def test_truncated_longname(self): 569 longname = self.subdir + "/" + "123/" * 125 + "longname" 570 tarinfo = self.tar.getmember(longname) 571 offset = tarinfo.offset 572 self.tar.fileobj.seek(offset) 573 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 574 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) 575 576 def test_header_offset(self): 577 # Test if the start offset of the TarInfo object includes 578 # the preceding extended header. 579 longname = self.subdir + "/" + "123/" * 125 + "longname" 580 offset = self.tar.getmember(longname).offset 581 fobj = open(tarname, "rb") 582 fobj.seek(offset) 583 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict") 584 self.assertEqual(tarinfo.type, self.longnametype) 585 586 587class GNUReadTest(LongnameTest): 588 589 subdir = "gnu" 590 longnametype = tarfile.GNUTYPE_LONGNAME 591 592 def test_sparse_file(self): 593 tarinfo1 = self.tar.getmember("ustar/sparse") 594 fobj1 = self.tar.extractfile(tarinfo1) 595 tarinfo2 = self.tar.getmember("gnu/sparse") 596 fobj2 = self.tar.extractfile(tarinfo2) 597 self.assertEqual(fobj1.read(), fobj2.read(), 598 "sparse file extraction failed") 599 600 601class PaxReadTest(LongnameTest): 602 603 subdir = "pax" 604 longnametype = tarfile.XHDTYPE 605 606 def test_pax_global_headers(self): 607 tar = tarfile.open(tarname, encoding="iso8859-1") 608 try: 609 tarinfo = tar.getmember("pax/regtype1") 610 self.assertEqual(tarinfo.uname, "foo") 611 self.assertEqual(tarinfo.gname, "bar") 612 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 613 614 tarinfo = tar.getmember("pax/regtype2") 615 self.assertEqual(tarinfo.uname, "") 616 self.assertEqual(tarinfo.gname, "bar") 617 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 618 619 tarinfo = tar.getmember("pax/regtype3") 620 self.assertEqual(tarinfo.uname, "tarfile") 621 self.assertEqual(tarinfo.gname, "tarfile") 622 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 623 finally: 624 tar.close() 625 626 def test_pax_number_fields(self): 627 # All following number fields are read from the pax header. 628 tar = tarfile.open(tarname, encoding="iso8859-1") 629 try: 630 tarinfo = tar.getmember("pax/regtype4") 631 self.assertEqual(tarinfo.size, 7011) 632 self.assertEqual(tarinfo.uid, 123) 633 self.assertEqual(tarinfo.gid, 123) 634 self.assertEqual(tarinfo.mtime, 1041808783.0) 635 self.assertEqual(type(tarinfo.mtime), float) 636 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 637 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 638 finally: 639 tar.close() 640 641 642class WriteTestBase(unittest.TestCase): 643 # Put all write tests in here that are supposed to be tested 644 # in all possible mode combinations. 645 646 def test_fileobj_no_close(self): 647 fobj = io.BytesIO() 648 tar = tarfile.open(fileobj=fobj, mode=self.mode) 649 tar.addfile(tarfile.TarInfo("foo")) 650 tar.close() 651 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 652 653 654class WriteTest(WriteTestBase): 655 656 mode = "w:" 657 658 def test_100_char_name(self): 659 # The name field in a tar header stores strings of at most 100 chars. 660 # If a string is shorter than 100 chars it has to be padded with '\0', 661 # which implies that a string of exactly 100 chars is stored without 662 # a trailing '\0'. 663 name = "0123456789" * 10 664 tar = tarfile.open(tmpname, self.mode) 665 try: 666 t = tarfile.TarInfo(name) 667 tar.addfile(t) 668 finally: 669 tar.close() 670 671 tar = tarfile.open(tmpname) 672 try: 673 self.assertTrue(tar.getnames()[0] == name, 674 "failed to store 100 char filename") 675 finally: 676 tar.close() 677 678 def test_tar_size(self): 679 # Test for bug #1013882. 680 tar = tarfile.open(tmpname, self.mode) 681 try: 682 path = os.path.join(TEMPDIR, "file") 683 with open(path, "wb") as fobj: 684 fobj.write(b"aaa") 685 tar.add(path) 686 finally: 687 tar.close() 688 self.assertTrue(os.path.getsize(tmpname) > 0, 689 "tarfile is empty") 690 691 # The test_*_size tests test for bug #1167128. 692 def test_file_size(self): 693 tar = tarfile.open(tmpname, self.mode) 694 try: 695 path = os.path.join(TEMPDIR, "file") 696 with open(path, "wb"): 697 pass 698 tarinfo = tar.gettarinfo(path) 699 self.assertEqual(tarinfo.size, 0) 700 701 with open(path, "wb") as fobj: 702 fobj.write(b"aaa") 703 tarinfo = tar.gettarinfo(path) 704 self.assertEqual(tarinfo.size, 3) 705 finally: 706 tar.close() 707 708 def test_directory_size(self): 709 path = os.path.join(TEMPDIR, "directory") 710 os.mkdir(path) 711 try: 712 tar = tarfile.open(tmpname, self.mode) 713 try: 714 tarinfo = tar.gettarinfo(path) 715 self.assertEqual(tarinfo.size, 0) 716 finally: 717 tar.close() 718 finally: 719 os.rmdir(path) 720 721 def test_link_size(self): 722 if hasattr(os, "link"): 723 link = os.path.join(TEMPDIR, "link") 724 target = os.path.join(TEMPDIR, "link_target") 725 with open(target, "wb") as fobj: 726 fobj.write(b"aaa") 727 os.link(target, link) 728 try: 729 tar = tarfile.open(tmpname, self.mode) 730 try: 731 # Record the link target in the inodes list. 732 tar.gettarinfo(target) 733 tarinfo = tar.gettarinfo(link) 734 self.assertEqual(tarinfo.size, 0) 735 finally: 736 tar.close() 737 finally: 738 os.remove(target) 739 os.remove(link) 740 741 @support.skip_unless_symlink 742 def test_symlink_size(self): 743 path = os.path.join(TEMPDIR, "symlink") 744 os.symlink("link_target", path) 745 try: 746 tar = tarfile.open(tmpname, self.mode) 747 try: 748 tarinfo = tar.gettarinfo(path) 749 self.assertEqual(tarinfo.size, 0) 750 finally: 751 tar.close() 752 finally: 753 os.remove(path) 754 755 def test_add_self(self): 756 # Test for #1257255. 757 dstname = os.path.abspath(tmpname) 758 tar = tarfile.open(tmpname, self.mode) 759 try: 760 self.assertTrue(tar.name == dstname, "archive name must be absolute") 761 tar.add(dstname) 762 self.assertTrue(tar.getnames() == [], "added the archive to itself") 763 764 cwd = os.getcwd() 765 os.chdir(TEMPDIR) 766 tar.add(dstname) 767 os.chdir(cwd) 768 self.assertTrue(tar.getnames() == [], "added the archive to itself") 769 finally: 770 tar.close() 771 772 def test_exclude(self): 773 tempdir = os.path.join(TEMPDIR, "exclude") 774 os.mkdir(tempdir) 775 try: 776 for name in ("foo", "bar", "baz"): 777 name = os.path.join(tempdir, name) 778 open(name, "wb").close() 779 780 exclude = os.path.isfile 781 782 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 783 try: 784 with support.check_warnings(("use the filter argument", 785 DeprecationWarning)): 786 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 787 finally: 788 tar.close() 789 790 tar = tarfile.open(tmpname, "r") 791 try: 792 self.assertEqual(len(tar.getmembers()), 1) 793 self.assertEqual(tar.getnames()[0], "empty_dir") 794 finally: 795 tar.close() 796 finally: 797 shutil.rmtree(tempdir) 798 799 def test_filter(self): 800 tempdir = os.path.join(TEMPDIR, "filter") 801 os.mkdir(tempdir) 802 try: 803 for name in ("foo", "bar", "baz"): 804 name = os.path.join(tempdir, name) 805 open(name, "wb").close() 806 807 def filter(tarinfo): 808 if os.path.basename(tarinfo.name) == "bar": 809 return 810 tarinfo.uid = 123 811 tarinfo.uname = "foo" 812 return tarinfo 813 814 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 815 try: 816 tar.add(tempdir, arcname="empty_dir", filter=filter) 817 finally: 818 tar.close() 819 820 tar = tarfile.open(tmpname, "r") 821 try: 822 for tarinfo in tar: 823 self.assertEqual(tarinfo.uid, 123) 824 self.assertEqual(tarinfo.uname, "foo") 825 self.assertEqual(len(tar.getmembers()), 3) 826 finally: 827 tar.close() 828 finally: 829 shutil.rmtree(tempdir) 830 831 # Guarantee that stored pathnames are not modified. Don't 832 # remove ./ or ../ or double slashes. Still make absolute 833 # pathnames relative. 834 # For details see bug #6054. 835 def _test_pathname(self, path, cmp_path=None, dir=False): 836 # Create a tarfile with an empty member named path 837 # and compare the stored name with the original. 838 foo = os.path.join(TEMPDIR, "foo") 839 if not dir: 840 open(foo, "w").close() 841 else: 842 os.mkdir(foo) 843 844 tar = tarfile.open(tmpname, self.mode) 845 try: 846 tar.add(foo, arcname=path) 847 finally: 848 tar.close() 849 850 tar = tarfile.open(tmpname, "r") 851 try: 852 t = tar.next() 853 finally: 854 tar.close() 855 856 if not dir: 857 os.remove(foo) 858 else: 859 os.rmdir(foo) 860 861 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 862 863 def test_pathnames(self): 864 self._test_pathname("foo") 865 self._test_pathname(os.path.join("foo", ".", "bar")) 866 self._test_pathname(os.path.join("foo", "..", "bar")) 867 self._test_pathname(os.path.join(".", "foo")) 868 self._test_pathname(os.path.join(".", "foo", ".")) 869 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 870 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 871 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 872 self._test_pathname(os.path.join("..", "foo")) 873 self._test_pathname(os.path.join("..", "foo", "..")) 874 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 875 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 876 877 self._test_pathname("foo" + os.sep + os.sep + "bar") 878 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 879 880 def test_abs_pathnames(self): 881 if sys.platform == "win32": 882 self._test_pathname("C:\\foo", "foo") 883 else: 884 self._test_pathname("/foo", "foo") 885 self._test_pathname("///foo", "foo") 886 887 def test_cwd(self): 888 # Test adding the current working directory. 889 cwd = os.getcwd() 890 os.chdir(TEMPDIR) 891 try: 892 tar = tarfile.open(tmpname, self.mode) 893 try: 894 tar.add(".") 895 finally: 896 tar.close() 897 898 tar = tarfile.open(tmpname, "r") 899 try: 900 for t in tar: 901 self.assert_(t.name == "." or t.name.startswith("./")) 902 finally: 903 tar.close() 904 finally: 905 os.chdir(cwd) 906 907 908class StreamWriteTest(WriteTestBase): 909 910 mode = "w|" 911 912 def test_stream_padding(self): 913 # Test for bug #1543303. 914 tar = tarfile.open(tmpname, self.mode) 915 tar.close() 916 917 if self.mode.endswith("gz"): 918 with gzip.GzipFile(tmpname) as fobj: 919 data = fobj.read() 920 elif self.mode.endswith("bz2"): 921 dec = bz2.BZ2Decompressor() 922 with open(tmpname, "rb") as fobj: 923 data = fobj.read() 924 data = dec.decompress(data) 925 self.assertTrue(len(dec.unused_data) == 0, 926 "found trailing data") 927 else: 928 with open(tmpname, "rb") as fobj: 929 data = fobj.read() 930 931 self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, 932 "incorrect zero padding") 933 934 def test_file_mode(self): 935 # Test for issue #8464: Create files with correct 936 # permissions. 937 if sys.platform == "win32" or not hasattr(os, "umask"): 938 return 939 940 if os.path.exists(tmpname): 941 os.remove(tmpname) 942 943 original_umask = os.umask(0o022) 944 try: 945 tar = tarfile.open(tmpname, self.mode) 946 tar.close() 947 mode = os.stat(tmpname).st_mode & 0o777 948 self.assertEqual(mode, 0o644, "wrong file permissions") 949 finally: 950 os.umask(original_umask) 951 952 953class GNUWriteTest(unittest.TestCase): 954 # This testcase checks for correct creation of GNU Longname 955 # and Longlink extended headers (cp. bug #812325). 956 957 def _length(self, s): 958 blocks, remainder = divmod(len(s) + 1, 512) 959 if remainder: 960 blocks += 1 961 return blocks * 512 962 963 def _calc_size(self, name, link=None): 964 # Initial tar header 965 count = 512 966 967 if len(name) > tarfile.LENGTH_NAME: 968 # GNU longname extended header + longname 969 count += 512 970 count += self._length(name) 971 if link is not None and len(link) > tarfile.LENGTH_LINK: 972 # GNU longlink extended header + longlink 973 count += 512 974 count += self._length(link) 975 return count 976 977 def _test(self, name, link=None): 978 tarinfo = tarfile.TarInfo(name) 979 if link: 980 tarinfo.linkname = link 981 tarinfo.type = tarfile.LNKTYPE 982 983 tar = tarfile.open(tmpname, "w") 984 try: 985 tar.format = tarfile.GNU_FORMAT 986 tar.addfile(tarinfo) 987 988 v1 = self._calc_size(name, link) 989 v2 = tar.offset 990 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 991 finally: 992 tar.close() 993 994 tar = tarfile.open(tmpname) 995 try: 996 member = tar.next() 997 self.assertIsNotNone(member, 998 "unable to read longname member") 999 self.assertEqual(tarinfo.name, member.name, 1000 "unable to read longname member") 1001 self.assertEqual(tarinfo.linkname, member.linkname, 1002 "unable to read longname member") 1003 finally: 1004 tar.close() 1005 1006 def test_longname_1023(self): 1007 self._test(("longnam/" * 127) + "longnam") 1008 1009 def test_longname_1024(self): 1010 self._test(("longnam/" * 127) + "longname") 1011 1012 def test_longname_1025(self): 1013 self._test(("longnam/" * 127) + "longname_") 1014 1015 def test_longlink_1023(self): 1016 self._test("name", ("longlnk/" * 127) + "longlnk") 1017 1018 def test_longlink_1024(self): 1019 self._test("name", ("longlnk/" * 127) + "longlink") 1020 1021 def test_longlink_1025(self): 1022 self._test("name", ("longlnk/" * 127) + "longlink_") 1023 1024 def test_longnamelink_1023(self): 1025 self._test(("longnam/" * 127) + "longnam", 1026 ("longlnk/" * 127) + "longlnk") 1027 1028 def test_longnamelink_1024(self): 1029 self._test(("longnam/" * 127) + "longname", 1030 ("longlnk/" * 127) + "longlink") 1031 1032 def test_longnamelink_1025(self): 1033 self._test(("longnam/" * 127) + "longname_", 1034 ("longlnk/" * 127) + "longlink_") 1035 1036 1037class HardlinkTest(unittest.TestCase): 1038 # Test the creation of LNKTYPE (hardlink) members in an archive. 1039 1040 def setUp(self): 1041 self.foo = os.path.join(TEMPDIR, "foo") 1042 self.bar = os.path.join(TEMPDIR, "bar") 1043 1044 with open(self.foo, "wb") as fobj: 1045 fobj.write(b"foo") 1046 1047 os.link(self.foo, self.bar) 1048 1049 self.tar = tarfile.open(tmpname, "w") 1050 self.tar.add(self.foo) 1051 1052 def tearDown(self): 1053 self.tar.close() 1054 support.unlink(self.foo) 1055 support.unlink(self.bar) 1056 1057 def test_add_twice(self): 1058 # The same name will be added as a REGTYPE every 1059 # time regardless of st_nlink. 1060 tarinfo = self.tar.gettarinfo(self.foo) 1061 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1062 "add file as regular failed") 1063 1064 def test_add_hardlink(self): 1065 tarinfo = self.tar.gettarinfo(self.bar) 1066 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 1067 "add file as hardlink failed") 1068 1069 def test_dereference_hardlink(self): 1070 self.tar.dereference = True 1071 tarinfo = self.tar.gettarinfo(self.bar) 1072 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1073 "dereferencing hardlink failed") 1074 1075 1076class PaxWriteTest(GNUWriteTest): 1077 1078 def _test(self, name, link=None): 1079 # See GNUWriteTest. 1080 tarinfo = tarfile.TarInfo(name) 1081 if link: 1082 tarinfo.linkname = link 1083 tarinfo.type = tarfile.LNKTYPE 1084 1085 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1086 try: 1087 tar.addfile(tarinfo) 1088 finally: 1089 tar.close() 1090 1091 tar = tarfile.open(tmpname) 1092 try: 1093 if link: 1094 l = tar.getmembers()[0].linkname 1095 self.assertTrue(link == l, "PAX longlink creation failed") 1096 else: 1097 n = tar.getmembers()[0].name 1098 self.assertTrue(name == n, "PAX longname creation failed") 1099 finally: 1100 tar.close() 1101 1102 def test_pax_global_header(self): 1103 pax_headers = { 1104 "foo": "bar", 1105 "uid": "0", 1106 "mtime": "1.23", 1107 "test": "\xe4\xf6\xfc", 1108 "\xe4\xf6\xfc": "test"} 1109 1110 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1111 pax_headers=pax_headers) 1112 try: 1113 tar.addfile(tarfile.TarInfo("test")) 1114 finally: 1115 tar.close() 1116 1117 # Test if the global header was written correctly. 1118 tar = tarfile.open(tmpname, encoding="iso8859-1") 1119 try: 1120 self.assertEqual(tar.pax_headers, pax_headers) 1121 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1122 # Test if all the fields are strings. 1123 for key, val in tar.pax_headers.items(): 1124 self.assertTrue(type(key) is not bytes) 1125 self.assertTrue(type(val) is not bytes) 1126 if key in tarfile.PAX_NUMBER_FIELDS: 1127 try: 1128 tarfile.PAX_NUMBER_FIELDS[key](val) 1129 except (TypeError, ValueError): 1130 self.fail("unable to convert pax header field") 1131 finally: 1132 tar.close() 1133 1134 def test_pax_extended_header(self): 1135 # The fields from the pax header have priority over the 1136 # TarInfo. 1137 pax_headers = {"path": "foo", "uid": "123"} 1138 1139 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") 1140 try: 1141 t = tarfile.TarInfo() 1142 t.name = "\xe4\xf6\xfc" # non-ASCII 1143 t.uid = 8**8 # too large 1144 t.pax_headers = pax_headers 1145 tar.addfile(t) 1146 finally: 1147 tar.close() 1148 1149 tar = tarfile.open(tmpname, encoding="iso8859-1") 1150 try: 1151 t = tar.getmembers()[0] 1152 self.assertEqual(t.pax_headers, pax_headers) 1153 self.assertEqual(t.name, "foo") 1154 self.assertEqual(t.uid, 123) 1155 finally: 1156 tar.close() 1157 1158 1159class UstarUnicodeTest(unittest.TestCase): 1160 1161 format = tarfile.USTAR_FORMAT 1162 1163 def test_iso8859_1_filename(self): 1164 self._test_unicode_filename("iso8859-1") 1165 1166 def test_utf7_filename(self): 1167 self._test_unicode_filename("utf7") 1168 1169 def test_utf8_filename(self): 1170 self._test_unicode_filename("utf8") 1171 1172 def _test_unicode_filename(self, encoding): 1173 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") 1174 try: 1175 name = "\xe4\xf6\xfc" 1176 tar.addfile(tarfile.TarInfo(name)) 1177 finally: 1178 tar.close() 1179 1180 tar = tarfile.open(tmpname, encoding=encoding) 1181 try: 1182 self.assertEqual(tar.getmembers()[0].name, name) 1183 finally: 1184 tar.close() 1185 1186 def test_unicode_filename_error(self): 1187 if self.format == tarfile.PAX_FORMAT: 1188 # PAX_FORMAT ignores encoding in write mode. 1189 return 1190 1191 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") 1192 try: 1193 tarinfo = tarfile.TarInfo() 1194 1195 tarinfo.name = "\xe4\xf6\xfc" 1196 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1197 1198 tarinfo.name = "foo" 1199 tarinfo.uname = "\xe4\xf6\xfc" 1200 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1201 finally: 1202 tar.close() 1203 1204 def test_unicode_argument(self): 1205 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 1206 try: 1207 for t in tar: 1208 self.assertTrue(type(t.name) is str) 1209 self.assertTrue(type(t.linkname) is str) 1210 self.assertTrue(type(t.uname) is str) 1211 self.assertTrue(type(t.gname) is str) 1212 finally: 1213 tar.close() 1214 1215 def test_uname_unicode(self): 1216 t = tarfile.TarInfo("foo") 1217 t.uname = "\xe4\xf6\xfc" 1218 t.gname = "\xe4\xf6\xfc" 1219 1220 tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") 1221 try: 1222 tar.addfile(t) 1223 finally: 1224 tar.close() 1225 1226 tar = tarfile.open(tmpname, encoding="iso8859-1") 1227 try: 1228 t = tar.getmember("foo") 1229 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1230 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1231 1232 if self.format != tarfile.PAX_FORMAT: 1233 tar.close() 1234 tar = tarfile.open(tmpname, encoding="ascii") 1235 t = tar.getmember("foo") 1236 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1237 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1238 finally: 1239 tar.close() 1240 1241 1242class GNUUnicodeTest(UstarUnicodeTest): 1243 1244 format = tarfile.GNU_FORMAT 1245 1246 def test_bad_pax_header(self): 1247 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1248 # without a hdrcharset=BINARY header. 1249 for encoding, name in (("utf8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1250 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1251 with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: 1252 try: 1253 t = tar.getmember(name) 1254 except KeyError: 1255 self.fail("unable to read bad GNU tar pax header") 1256 1257 1258class PAXUnicodeTest(UstarUnicodeTest): 1259 1260 format = tarfile.PAX_FORMAT 1261 1262 def test_binary_header(self): 1263 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1264 for encoding, name in (("utf8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1265 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1266 with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: 1267 try: 1268 t = tar.getmember(name) 1269 except KeyError: 1270 self.fail("unable to read POSIX.1-2008 binary header") 1271 1272 1273class AppendTest(unittest.TestCase): 1274 # Test append mode (cp. patch #1652681). 1275 1276 def setUp(self): 1277 self.tarname = tmpname 1278 if os.path.exists(self.tarname): 1279 os.remove(self.tarname) 1280 1281 def _add_testfile(self, fileobj=None): 1282 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1283 tar.addfile(tarfile.TarInfo("bar")) 1284 1285 def _create_testtar(self, mode="w:"): 1286 with tarfile.open(tarname, encoding="iso8859-1") as src: 1287 t = src.getmember("ustar/regtype") 1288 t.name = "foo" 1289 f = src.extractfile(t) 1290 with tarfile.open(self.tarname, mode) as tar: 1291 tar.addfile(t, f) 1292 1293 def _test(self, names=["bar"], fileobj=None): 1294 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1295 self.assertEqual(tar.getnames(), names) 1296 1297 def test_non_existing(self): 1298 self._add_testfile() 1299 self._test() 1300 1301 def test_empty(self): 1302 tarfile.open(self.tarname, "w:").close() 1303 self._add_testfile() 1304 self._test() 1305 1306 def test_empty_fileobj(self): 1307 fobj = io.BytesIO(b"\0" * 1024) 1308 self._add_testfile(fobj) 1309 fobj.seek(0) 1310 self._test(fileobj=fobj) 1311 1312 def test_fileobj(self): 1313 self._create_testtar() 1314 with open(self.tarname, "rb") as fobj: 1315 data = fobj.read() 1316 fobj = io.BytesIO(data) 1317 self._add_testfile(fobj) 1318 fobj.seek(0) 1319 self._test(names=["foo", "bar"], fileobj=fobj) 1320 1321 def test_existing(self): 1322 self._create_testtar() 1323 self._add_testfile() 1324 self._test(names=["foo", "bar"]) 1325 1326 def test_append_gz(self): 1327 if gzip is None: 1328 return 1329 self._create_testtar("w:gz") 1330 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1331 1332 def test_append_bz2(self): 1333 if bz2 is None: 1334 return 1335 self._create_testtar("w:bz2") 1336 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1337 1338 # Append mode is supposed to fail if the tarfile to append to 1339 # does not end with a zero block. 1340 def _test_error(self, data): 1341 with open(self.tarname, "wb") as fobj: 1342 fobj.write(data) 1343 self.assertRaises(tarfile.ReadError, self._add_testfile) 1344 1345 def test_null(self): 1346 self._test_error(b"") 1347 1348 def test_incomplete(self): 1349 self._test_error(b"\0" * 13) 1350 1351 def test_premature_eof(self): 1352 data = tarfile.TarInfo("foo").tobuf() 1353 self._test_error(data) 1354 1355 def test_trailing_garbage(self): 1356 data = tarfile.TarInfo("foo").tobuf() 1357 self._test_error(data + b"\0" * 13) 1358 1359 def test_invalid(self): 1360 self._test_error(b"a" * 512) 1361 1362 1363class LimitsTest(unittest.TestCase): 1364 1365 def test_ustar_limits(self): 1366 # 100 char name 1367 tarinfo = tarfile.TarInfo("0123456789" * 10) 1368 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1369 1370 # 101 char name that cannot be stored 1371 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1372 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1373 1374 # 256 char name with a slash at pos 156 1375 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1376 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1377 1378 # 256 char name that cannot be stored 1379 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1380 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1381 1382 # 512 char name 1383 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1384 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1385 1386 # 512 char linkname 1387 tarinfo = tarfile.TarInfo("longlink") 1388 tarinfo.linkname = "123/" * 126 + "longname" 1389 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1390 1391 # uid > 8 digits 1392 tarinfo = tarfile.TarInfo("name") 1393 tarinfo.uid = 0o10000000 1394 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1395 1396 def test_gnu_limits(self): 1397 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1398 tarinfo.tobuf(tarfile.GNU_FORMAT) 1399 1400 tarinfo = tarfile.TarInfo("longlink") 1401 tarinfo.linkname = "123/" * 126 + "longname" 1402 tarinfo.tobuf(tarfile.GNU_FORMAT) 1403 1404 # uid >= 256 ** 7 1405 tarinfo = tarfile.TarInfo("name") 1406 tarinfo.uid = 0o4000000000000000000 1407 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1408 1409 def test_pax_limits(self): 1410 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1411 tarinfo.tobuf(tarfile.PAX_FORMAT) 1412 1413 tarinfo = tarfile.TarInfo("longlink") 1414 tarinfo.linkname = "123/" * 126 + "longname" 1415 tarinfo.tobuf(tarfile.PAX_FORMAT) 1416 1417 tarinfo = tarfile.TarInfo("name") 1418 tarinfo.uid = 0o4000000000000000000 1419 tarinfo.tobuf(tarfile.PAX_FORMAT) 1420 1421 1422class MiscTest(unittest.TestCase): 1423 1424 def test_char_fields(self): 1425 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0") 1426 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo") 1427 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo") 1428 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo") 1429 1430 def test_number_fields(self): 1431 self.assertEqual(tarfile.itn(1), b"0000001\x00") 1432 self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff") 1433 1434 1435class ContextManagerTest(unittest.TestCase): 1436 1437 def test_basic(self): 1438 with tarfile.open(tarname) as tar: 1439 self.assertFalse(tar.closed, "closed inside runtime context") 1440 self.assertTrue(tar.closed, "context manager failed") 1441 1442 def test_closed(self): 1443 # The __enter__() method is supposed to raise IOError 1444 # if the TarFile object is already closed. 1445 tar = tarfile.open(tarname) 1446 tar.close() 1447 with self.assertRaises(IOError): 1448 with tar: 1449 pass 1450 1451 def test_exception(self): 1452 # Test if the IOError exception is passed through properly. 1453 with self.assertRaises(Exception) as exc: 1454 with tarfile.open(tarname) as tar: 1455 raise IOError 1456 self.assertIsInstance(exc.exception, IOError, 1457 "wrong exception raised in context manager") 1458 self.assertTrue(tar.closed, "context manager failed") 1459 1460 def test_no_eof(self): 1461 # __exit__() must not write end-of-archive blocks if an 1462 # exception was raised. 1463 try: 1464 with tarfile.open(tmpname, "w") as tar: 1465 raise Exception 1466 except: 1467 pass 1468 self.assertEqual(os.path.getsize(tmpname), 0, 1469 "context manager wrote an end-of-archive block") 1470 self.assertTrue(tar.closed, "context manager failed") 1471 1472 def test_eof(self): 1473 # __exit__() must write end-of-archive blocks, i.e. call 1474 # TarFile.close() if there was no error. 1475 with tarfile.open(tmpname, "w"): 1476 pass 1477 self.assertNotEqual(os.path.getsize(tmpname), 0, 1478 "context manager wrote no end-of-archive block") 1479 1480 def test_fileobj(self): 1481 # Test that __exit__() did not close the external file 1482 # object. 1483 with open(tmpname, "wb") as fobj: 1484 try: 1485 with tarfile.open(fileobj=fobj, mode="w") as tar: 1486 raise Exception 1487 except: 1488 pass 1489 self.assertFalse(fobj.closed, "external file object was closed") 1490 self.assertTrue(tar.closed, "context manager failed") 1491 1492 1493class LinkEmulationTest(ReadTest): 1494 1495 # Test for issue #8741 regression. On platforms that do not support 1496 # symbolic or hard links tarfile tries to extract these types of members as 1497 # the regular files they point to. 1498 def _test_link_extraction(self, name): 1499 self.tar.extract(name, TEMPDIR) 1500 data = open(os.path.join(TEMPDIR, name), "rb").read() 1501 self.assertEqual(md5sum(data), md5_regtype) 1502 1503 # When 8879 gets fixed, this will need to change. Currently on Windows 1504 # we have os.path.islink but no os.link, so these tests fail without the 1505 # following skip until link is completed. 1506 @unittest.skipIf(hasattr(os.path, "islink"), 1507 "Skip emulation - has os.path.islink but not os.link") 1508 def test_hardlink_extraction1(self): 1509 self._test_link_extraction("ustar/lnktype") 1510 1511 @unittest.skipIf(hasattr(os.path, "islink"), 1512 "Skip emulation - has os.path.islink but not os.link") 1513 def test_hardlink_extraction2(self): 1514 self._test_link_extraction("./ustar/linktest2/lnktype") 1515 1516 @unittest.skipIf(hasattr(os, "symlink"), 1517 "Skip emulation if symlink exists") 1518 def test_symlink_extraction1(self): 1519 self._test_link_extraction("ustar/symtype") 1520 1521 @unittest.skipIf(hasattr(os, "symlink"), 1522 "Skip emulation if symlink exists") 1523 def test_symlink_extraction2(self): 1524 self._test_link_extraction("./ustar/linktest2/symtype") 1525 1526 1527class GzipMiscReadTest(MiscReadTest): 1528 tarname = gzipname 1529 mode = "r:gz" 1530class GzipUstarReadTest(UstarReadTest): 1531 tarname = gzipname 1532 mode = "r:gz" 1533class GzipStreamReadTest(StreamReadTest): 1534 tarname = gzipname 1535 mode = "r|gz" 1536class GzipWriteTest(WriteTest): 1537 mode = "w:gz" 1538class GzipStreamWriteTest(StreamWriteTest): 1539 mode = "w|gz" 1540 1541 1542class Bz2MiscReadTest(MiscReadTest): 1543 tarname = bz2name 1544 mode = "r:bz2" 1545class Bz2UstarReadTest(UstarReadTest): 1546 tarname = bz2name 1547 mode = "r:bz2" 1548class Bz2StreamReadTest(StreamReadTest): 1549 tarname = bz2name 1550 mode = "r|bz2" 1551class Bz2WriteTest(WriteTest): 1552 mode = "w:bz2" 1553class Bz2StreamWriteTest(StreamWriteTest): 1554 mode = "w|bz2" 1555 1556class Bz2PartialReadTest(unittest.TestCase): 1557 # Issue5068: The _BZ2Proxy.read() method loops forever 1558 # on an empty or partial bzipped file. 1559 1560 def _test_partial_input(self, mode): 1561 class MyBytesIO(io.BytesIO): 1562 hit_eof = False 1563 def read(self, n): 1564 if self.hit_eof: 1565 raise AssertionError("infinite loop detected in tarfile.open()") 1566 self.hit_eof = self.tell() == len(self.getvalue()) 1567 return super(MyBytesIO, self).read(n) 1568 def seek(self, *args): 1569 self.hit_eof = False 1570 return super(MyBytesIO, self).seek(*args) 1571 1572 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1573 for x in range(len(data) + 1): 1574 try: 1575 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 1576 except tarfile.ReadError: 1577 pass # we have no interest in ReadErrors 1578 1579 def test_partial_input(self): 1580 self._test_partial_input("r") 1581 1582 def test_partial_input_bz2(self): 1583 self._test_partial_input("r:bz2") 1584 1585 1586def test_main(): 1587 support.unlink(TEMPDIR) 1588 os.makedirs(TEMPDIR) 1589 1590 tests = [ 1591 UstarReadTest, 1592 MiscReadTest, 1593 StreamReadTest, 1594 DetectReadTest, 1595 MemberReadTest, 1596 GNUReadTest, 1597 PaxReadTest, 1598 WriteTest, 1599 StreamWriteTest, 1600 GNUWriteTest, 1601 PaxWriteTest, 1602 UstarUnicodeTest, 1603 GNUUnicodeTest, 1604 PAXUnicodeTest, 1605 AppendTest, 1606 LimitsTest, 1607 MiscTest, 1608 ContextManagerTest, 1609 ] 1610 1611 if hasattr(os, "link"): 1612 tests.append(HardlinkTest) 1613 else: 1614 tests.append(LinkEmulationTest) 1615 1616 with open(tarname, "rb") as fobj: 1617 data = fobj.read() 1618 1619 if gzip: 1620 # Create testtar.tar.gz and add gzip-specific tests. 1621 support.unlink(gzipname) 1622 with gzip.open(gzipname, "wb") as tar: 1623 tar.write(data) 1624 1625 tests += [ 1626 GzipMiscReadTest, 1627 GzipUstarReadTest, 1628 GzipStreamReadTest, 1629 GzipWriteTest, 1630 GzipStreamWriteTest, 1631 ] 1632 1633 if bz2: 1634 # Create testtar.tar.bz2 and add bz2-specific tests. 1635 support.unlink(bz2name) 1636 tar = bz2.BZ2File(bz2name, "wb") 1637 try: 1638 tar.write(data) 1639 finally: 1640 tar.close() 1641 1642 tests += [ 1643 Bz2MiscReadTest, 1644 Bz2UstarReadTest, 1645 Bz2StreamReadTest, 1646 Bz2WriteTest, 1647 Bz2StreamWriteTest, 1648 Bz2PartialReadTest, 1649 ] 1650 1651 try: 1652 support.run_unittest(*tests) 1653 finally: 1654 if os.path.exists(TEMPDIR): 1655 shutil.rmtree(TEMPDIR) 1656 1657if __name__ == "__main__": 1658 test_main() 1659