test_tarfile.py revision 577473fe687b38c8f01b0c372d6d2563680045b3
1import sys 2import os 3import io 4import shutil 5import tempfile 6import io 7from hashlib import md5 8import errno 9 10import unittest 11import tarfile 12 13from test import support 14 15# Check for our compression modules. 16try: 17 import gzip 18 gzip.GzipFile 19except (ImportError, AttributeError): 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25 26def md5sum(data): 27 return md5(data).hexdigest() 28 29TEMPDIR = os.path.abspath(support.TESTFN) 30tarname = support.findfile("testtar.tar") 31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 33tmpname = os.path.join(TEMPDIR, "tmp.tar") 34 35md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 36md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 37 38 39class ReadTest(unittest.TestCase): 40 41 tarname = tarname 42 mode = "r:" 43 44 def setUp(self): 45 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 46 47 def tearDown(self): 48 self.tar.close() 49 50 51class UstarReadTest(ReadTest): 52 53 def test_fileobj_regular_file(self): 54 tarinfo = self.tar.getmember("ustar/regtype") 55 fobj = self.tar.extractfile(tarinfo) 56 data = fobj.read() 57 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 58 "regular file extraction failed") 59 60 def test_fileobj_readlines(self): 61 self.tar.extract("ustar/regtype", TEMPDIR) 62 tarinfo = self.tar.getmember("ustar/regtype") 63 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "r") 64 fobj2 = io.TextIOWrapper(self.tar.extractfile(tarinfo)) 65 66 lines1 = fobj1.readlines() 67 lines2 = fobj2.readlines() 68 self.assertTrue(lines1 == lines2, 69 "fileobj.readlines() failed") 70 self.assertTrue(len(lines2) == 114, 71 "fileobj.readlines() failed") 72 self.assertTrue(lines2[83] == \ 73 "I will gladly admit that Python is not the fastest running scripting language.\n", 74 "fileobj.readlines() failed") 75 76 def test_fileobj_iter(self): 77 self.tar.extract("ustar/regtype", TEMPDIR) 78 tarinfo = self.tar.getmember("ustar/regtype") 79 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") 80 fobj2 = self.tar.extractfile(tarinfo) 81 lines1 = fobj1.readlines() 82 lines2 = list(io.TextIOWrapper(fobj2)) 83 self.assertTrue(lines1 == lines2, 84 "fileobj.__iter__() failed") 85 86 def test_fileobj_seek(self): 87 self.tar.extract("ustar/regtype", TEMPDIR) 88 fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") 89 data = fobj.read() 90 fobj.close() 91 92 tarinfo = self.tar.getmember("ustar/regtype") 93 fobj = self.tar.extractfile(tarinfo) 94 95 text = fobj.read() 96 fobj.seek(0) 97 self.assertEqual(0, fobj.tell(), 98 "seek() to file's start failed") 99 fobj.seek(2048, 0) 100 self.assertEqual(2048, fobj.tell(), 101 "seek() to absolute position failed") 102 fobj.seek(-1024, 1) 103 self.assertEqual(1024, fobj.tell(), 104 "seek() to negative relative position failed") 105 fobj.seek(1024, 1) 106 self.assertEqual(2048, fobj.tell(), 107 "seek() to positive relative position failed") 108 s = fobj.read(10) 109 self.assertTrue(s == data[2048:2058], 110 "read() after seek failed") 111 fobj.seek(0, 2) 112 self.assertEqual(tarinfo.size, fobj.tell(), 113 "seek() to file's end failed") 114 self.assertTrue(fobj.read() == b"", 115 "read() at file's end did not return empty string") 116 fobj.seek(-tarinfo.size, 2) 117 self.assertEqual(0, fobj.tell(), 118 "relative seek() to file's end failed") 119 fobj.seek(512) 120 s1 = fobj.readlines() 121 fobj.seek(512) 122 s2 = fobj.readlines() 123 self.assertTrue(s1 == s2, 124 "readlines() after seek failed") 125 fobj.seek(0) 126 self.assertEqual(len(fobj.readline()), fobj.tell(), 127 "tell() after readline() failed") 128 fobj.seek(512) 129 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 130 "tell() after seek() and readline() failed") 131 fobj.seek(0) 132 line = fobj.readline() 133 self.assertEqual(fobj.read(), data[len(line):], 134 "read() after readline() failed") 135 fobj.close() 136 137 138class CommonReadTest(ReadTest): 139 140 def test_empty_tarfile(self): 141 # Test for issue6123: Allow opening empty archives. 142 # This test checks if tarfile.open() is able to open an empty tar 143 # archive successfully. Note that an empty tar archive is not the 144 # same as an empty file! 145 tarfile.open(tmpname, self.mode.replace("r", "w")).close() 146 try: 147 tar = tarfile.open(tmpname, self.mode) 148 tar.getnames() 149 except tarfile.ReadError: 150 self.fail("tarfile.open() failed on empty archive") 151 self.assertListEqual(tar.getmembers(), []) 152 153 def test_null_tarfile(self): 154 # Test for issue6123: Allow opening empty archives. 155 # This test guarantees that tarfile.open() does not treat an empty 156 # file as an empty tar archive. 157 open(tmpname, "wb").close() 158 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 159 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 160 161 def test_ignore_zeros(self): 162 # Test TarFile's ignore_zeros option. 163 if self.mode.endswith(":gz"): 164 _open = gzip.GzipFile 165 elif self.mode.endswith(":bz2"): 166 _open = bz2.BZ2File 167 else: 168 _open = open 169 170 for char in (b'\0', b'a'): 171 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 172 # are ignored correctly. 173 fobj = _open(tmpname, "wb") 174 fobj.write(char * 1024) 175 fobj.write(tarfile.TarInfo("foo").tobuf()) 176 fobj.close() 177 178 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 179 self.assertListEqual(tar.getnames(), ["foo"], 180 "ignore_zeros=True should have skipped the %r-blocks" % char) 181 tar.close() 182 183 184class MiscReadTest(CommonReadTest): 185 186 def test_no_name_argument(self): 187 fobj = open(self.tarname, "rb") 188 tar = tarfile.open(fileobj=fobj, mode=self.mode) 189 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 190 191 def test_no_name_attribute(self): 192 data = open(self.tarname, "rb").read() 193 fobj = io.BytesIO(data) 194 self.assertRaises(AttributeError, getattr, fobj, "name") 195 tar = tarfile.open(fileobj=fobj, mode=self.mode) 196 self.assertEqual(tar.name, None) 197 198 def test_empty_name_attribute(self): 199 data = open(self.tarname, "rb").read() 200 fobj = io.BytesIO(data) 201 fobj.name = "" 202 tar = tarfile.open(fileobj=fobj, mode=self.mode) 203 self.assertEqual(tar.name, None) 204 205 def test_fileobj_with_offset(self): 206 # Skip the first member and store values from the second member 207 # of the testtar. 208 tar = tarfile.open(self.tarname, mode=self.mode) 209 tar.next() 210 t = tar.next() 211 name = t.name 212 offset = t.offset 213 data = tar.extractfile(t).read() 214 tar.close() 215 216 # Open the testtar and seek to the offset of the second member. 217 if self.mode.endswith(":gz"): 218 _open = gzip.GzipFile 219 elif self.mode.endswith(":bz2"): 220 _open = bz2.BZ2File 221 else: 222 _open = open 223 fobj = _open(self.tarname, "rb") 224 fobj.seek(offset) 225 226 # Test if the tarfile starts with the second member. 227 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 228 t = tar.next() 229 self.assertEqual(t.name, name) 230 # Read to the end of fileobj and test if seeking back to the 231 # beginning works. 232 tar.getmembers() 233 self.assertEqual(tar.extractfile(t).read(), data, 234 "seek back did not work") 235 tar.close() 236 237 def test_fail_comp(self): 238 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 239 if self.mode == "r:": 240 return 241 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 242 fobj = open(tarname, "rb") 243 self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode) 244 245 def test_v7_dirtype(self): 246 # Test old style dirtype member (bug #1336623): 247 # Old V7 tars create directory members using an AREGTYPE 248 # header with a "/" appended to the filename field. 249 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 250 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 251 "v7 dirtype failed") 252 253 def test_xstar_type(self): 254 # The xstar format stores extra atime and ctime fields inside the 255 # space reserved for the prefix field. The prefix field must be 256 # ignored in this case, otherwise it will mess up the name. 257 try: 258 self.tar.getmember("misc/regtype-xstar") 259 except KeyError: 260 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 261 262 def test_check_members(self): 263 for tarinfo in self.tar: 264 self.assertTrue(int(tarinfo.mtime) == 0o7606136617, 265 "wrong mtime for %s" % tarinfo.name) 266 if not tarinfo.name.startswith("ustar/"): 267 continue 268 self.assertTrue(tarinfo.uname == "tarfile", 269 "wrong uname for %s" % tarinfo.name) 270 271 def test_find_members(self): 272 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 273 "could not find all members") 274 275 def test_extract_hardlink(self): 276 # Test hardlink extraction (e.g. bug #857297). 277 tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") 278 279 tar.extract("ustar/regtype", TEMPDIR) 280 try: 281 tar.extract("ustar/lnktype", TEMPDIR) 282 except EnvironmentError as e: 283 if e.errno == errno.ENOENT: 284 self.fail("hardlink not extracted properly") 285 286 data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read() 287 self.assertEqual(md5sum(data), md5_regtype) 288 289 try: 290 tar.extract("ustar/symtype", TEMPDIR) 291 except EnvironmentError as e: 292 if e.errno == errno.ENOENT: 293 self.fail("symlink not extracted properly") 294 295 data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read() 296 self.assertEqual(md5sum(data), md5_regtype) 297 298 def test_extractall(self): 299 # Test if extractall() correctly restores directory permissions 300 # and times (see issue1735). 301 tar = tarfile.open(tarname, encoding="iso8859-1") 302 directories = [t for t in tar if t.isdir()] 303 tar.extractall(TEMPDIR, directories) 304 for tarinfo in directories: 305 path = os.path.join(TEMPDIR, tarinfo.name) 306 if sys.platform != "win32": 307 # Win32 has no support for fine grained permissions. 308 self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777) 309 self.assertEqual(tarinfo.mtime, os.path.getmtime(path)) 310 tar.close() 311 312 def test_init_close_fobj(self): 313 # Issue #7341: Close the internal file object in the TarFile 314 # constructor in case of an error. For the test we rely on 315 # the fact that opening an empty file raises a ReadError. 316 empty = os.path.join(TEMPDIR, "empty") 317 open(empty, "wb").write(b"") 318 319 try: 320 tar = object.__new__(tarfile.TarFile) 321 try: 322 tar.__init__(empty) 323 except tarfile.ReadError: 324 self.assertTrue(tar.fileobj.closed) 325 else: 326 self.fail("ReadError not raised") 327 finally: 328 os.remove(empty) 329 330 331class StreamReadTest(CommonReadTest): 332 333 mode="r|" 334 335 def test_fileobj_regular_file(self): 336 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 337 fobj = self.tar.extractfile(tarinfo) 338 data = fobj.read() 339 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 340 "regular file extraction failed") 341 342 def test_provoke_stream_error(self): 343 tarinfos = self.tar.getmembers() 344 f = self.tar.extractfile(tarinfos[0]) # read the first member 345 self.assertRaises(tarfile.StreamError, f.read) 346 347 def test_compare_members(self): 348 tar1 = tarfile.open(tarname, encoding="iso8859-1") 349 tar2 = self.tar 350 351 while True: 352 t1 = tar1.next() 353 t2 = tar2.next() 354 if t1 is None: 355 break 356 self.assertTrue(t2 is not None, "stream.next() failed.") 357 358 if t2.islnk() or t2.issym(): 359 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) 360 continue 361 362 v1 = tar1.extractfile(t1) 363 v2 = tar2.extractfile(t2) 364 if v1 is None: 365 continue 366 self.assertTrue(v2 is not None, "stream.extractfile() failed") 367 self.assertEqual(v1.read(), v2.read(), "stream extraction failed") 368 369 tar1.close() 370 371 372class DetectReadTest(unittest.TestCase): 373 374 def _testfunc_file(self, name, mode): 375 try: 376 tarfile.open(name, mode) 377 except tarfile.ReadError as e: 378 self.fail() 379 380 def _testfunc_fileobj(self, name, mode): 381 try: 382 tarfile.open(name, mode, fileobj=open(name, "rb")) 383 except tarfile.ReadError as e: 384 self.fail() 385 386 def _test_modes(self, testfunc): 387 testfunc(tarname, "r") 388 testfunc(tarname, "r:") 389 testfunc(tarname, "r:*") 390 testfunc(tarname, "r|") 391 testfunc(tarname, "r|*") 392 393 if gzip: 394 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") 395 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") 396 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") 397 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") 398 399 testfunc(gzipname, "r") 400 testfunc(gzipname, "r:*") 401 testfunc(gzipname, "r:gz") 402 testfunc(gzipname, "r|*") 403 testfunc(gzipname, "r|gz") 404 405 if bz2: 406 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") 407 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") 408 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") 409 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") 410 411 testfunc(bz2name, "r") 412 testfunc(bz2name, "r:*") 413 testfunc(bz2name, "r:bz2") 414 testfunc(bz2name, "r|*") 415 testfunc(bz2name, "r|bz2") 416 417 def test_detect_file(self): 418 self._test_modes(self._testfunc_file) 419 420 def test_detect_fileobj(self): 421 self._test_modes(self._testfunc_fileobj) 422 423 424class MemberReadTest(ReadTest): 425 426 def _test_member(self, tarinfo, chksum=None, **kwargs): 427 if chksum is not None: 428 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 429 "wrong md5sum for %s" % tarinfo.name) 430 431 kwargs["mtime"] = 0o7606136617 432 kwargs["uid"] = 1000 433 kwargs["gid"] = 100 434 if "old-v7" not in tarinfo.name: 435 # V7 tar can't handle alphabetic owners. 436 kwargs["uname"] = "tarfile" 437 kwargs["gname"] = "tarfile" 438 for k, v in kwargs.items(): 439 self.assertTrue(getattr(tarinfo, k) == v, 440 "wrong value in %s field of %s" % (k, tarinfo.name)) 441 442 def test_find_regtype(self): 443 tarinfo = self.tar.getmember("ustar/regtype") 444 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 445 446 def test_find_conttype(self): 447 tarinfo = self.tar.getmember("ustar/conttype") 448 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 449 450 def test_find_dirtype(self): 451 tarinfo = self.tar.getmember("ustar/dirtype") 452 self._test_member(tarinfo, size=0) 453 454 def test_find_dirtype_with_size(self): 455 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 456 self._test_member(tarinfo, size=255) 457 458 def test_find_lnktype(self): 459 tarinfo = self.tar.getmember("ustar/lnktype") 460 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 461 462 def test_find_symtype(self): 463 tarinfo = self.tar.getmember("ustar/symtype") 464 self._test_member(tarinfo, size=0, linkname="regtype") 465 466 def test_find_blktype(self): 467 tarinfo = self.tar.getmember("ustar/blktype") 468 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 469 470 def test_find_chrtype(self): 471 tarinfo = self.tar.getmember("ustar/chrtype") 472 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 473 474 def test_find_fifotype(self): 475 tarinfo = self.tar.getmember("ustar/fifotype") 476 self._test_member(tarinfo, size=0) 477 478 def test_find_sparse(self): 479 tarinfo = self.tar.getmember("ustar/sparse") 480 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 481 482 def test_find_umlauts(self): 483 tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 484 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 485 486 def test_find_ustar_longname(self): 487 name = "ustar/" + "12345/" * 39 + "1234567/longname" 488 self.assertIn(name, self.tar.getnames()) 489 490 def test_find_regtype_oldv7(self): 491 tarinfo = self.tar.getmember("misc/regtype-old-v7") 492 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 493 494 def test_find_pax_umlauts(self): 495 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 496 tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 497 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 498 499 500class LongnameTest(ReadTest): 501 502 def test_read_longname(self): 503 # Test reading of longname (bug #1471427). 504 longname = self.subdir + "/" + "123/" * 125 + "longname" 505 try: 506 tarinfo = self.tar.getmember(longname) 507 except KeyError: 508 self.fail("longname not found") 509 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 510 511 def test_read_longlink(self): 512 longname = self.subdir + "/" + "123/" * 125 + "longname" 513 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 514 try: 515 tarinfo = self.tar.getmember(longlink) 516 except KeyError: 517 self.fail("longlink not found") 518 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 519 520 def test_truncated_longname(self): 521 longname = self.subdir + "/" + "123/" * 125 + "longname" 522 tarinfo = self.tar.getmember(longname) 523 offset = tarinfo.offset 524 self.tar.fileobj.seek(offset) 525 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 526 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) 527 528 def test_header_offset(self): 529 # Test if the start offset of the TarInfo object includes 530 # the preceding extended header. 531 longname = self.subdir + "/" + "123/" * 125 + "longname" 532 offset = self.tar.getmember(longname).offset 533 fobj = open(tarname, "rb") 534 fobj.seek(offset) 535 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict") 536 self.assertEqual(tarinfo.type, self.longnametype) 537 538 539class GNUReadTest(LongnameTest): 540 541 subdir = "gnu" 542 longnametype = tarfile.GNUTYPE_LONGNAME 543 544 def test_sparse_file(self): 545 tarinfo1 = self.tar.getmember("ustar/sparse") 546 fobj1 = self.tar.extractfile(tarinfo1) 547 tarinfo2 = self.tar.getmember("gnu/sparse") 548 fobj2 = self.tar.extractfile(tarinfo2) 549 self.assertEqual(fobj1.read(), fobj2.read(), 550 "sparse file extraction failed") 551 552 553class PaxReadTest(LongnameTest): 554 555 subdir = "pax" 556 longnametype = tarfile.XHDTYPE 557 558 def test_pax_global_headers(self): 559 tar = tarfile.open(tarname, encoding="iso8859-1") 560 561 tarinfo = tar.getmember("pax/regtype1") 562 self.assertEqual(tarinfo.uname, "foo") 563 self.assertEqual(tarinfo.gname, "bar") 564 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 565 566 tarinfo = tar.getmember("pax/regtype2") 567 self.assertEqual(tarinfo.uname, "") 568 self.assertEqual(tarinfo.gname, "bar") 569 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 570 571 tarinfo = tar.getmember("pax/regtype3") 572 self.assertEqual(tarinfo.uname, "tarfile") 573 self.assertEqual(tarinfo.gname, "tarfile") 574 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 575 576 def test_pax_number_fields(self): 577 # All following number fields are read from the pax header. 578 tar = tarfile.open(tarname, encoding="iso8859-1") 579 tarinfo = tar.getmember("pax/regtype4") 580 self.assertEqual(tarinfo.size, 7011) 581 self.assertEqual(tarinfo.uid, 123) 582 self.assertEqual(tarinfo.gid, 123) 583 self.assertEqual(tarinfo.mtime, 1041808783.0) 584 self.assertEqual(type(tarinfo.mtime), float) 585 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 586 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 587 588 589class WriteTestBase(unittest.TestCase): 590 # Put all write tests in here that are supposed to be tested 591 # in all possible mode combinations. 592 593 def test_fileobj_no_close(self): 594 fobj = io.BytesIO() 595 tar = tarfile.open(fileobj=fobj, mode=self.mode) 596 tar.addfile(tarfile.TarInfo("foo")) 597 tar.close() 598 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 599 600 601class WriteTest(WriteTestBase): 602 603 mode = "w:" 604 605 def test_100_char_name(self): 606 # The name field in a tar header stores strings of at most 100 chars. 607 # If a string is shorter than 100 chars it has to be padded with '\0', 608 # which implies that a string of exactly 100 chars is stored without 609 # a trailing '\0'. 610 name = "0123456789" * 10 611 tar = tarfile.open(tmpname, self.mode) 612 t = tarfile.TarInfo(name) 613 tar.addfile(t) 614 tar.close() 615 616 tar = tarfile.open(tmpname) 617 self.assertTrue(tar.getnames()[0] == name, 618 "failed to store 100 char filename") 619 tar.close() 620 621 def test_tar_size(self): 622 # Test for bug #1013882. 623 tar = tarfile.open(tmpname, self.mode) 624 path = os.path.join(TEMPDIR, "file") 625 fobj = open(path, "wb") 626 fobj.write(b"aaa") 627 fobj.close() 628 tar.add(path) 629 tar.close() 630 self.assertTrue(os.path.getsize(tmpname) > 0, 631 "tarfile is empty") 632 633 # The test_*_size tests test for bug #1167128. 634 def test_file_size(self): 635 tar = tarfile.open(tmpname, self.mode) 636 637 path = os.path.join(TEMPDIR, "file") 638 fobj = open(path, "wb") 639 fobj.close() 640 tarinfo = tar.gettarinfo(path) 641 self.assertEqual(tarinfo.size, 0) 642 643 fobj = open(path, "wb") 644 fobj.write(b"aaa") 645 fobj.close() 646 tarinfo = tar.gettarinfo(path) 647 self.assertEqual(tarinfo.size, 3) 648 649 tar.close() 650 651 def test_directory_size(self): 652 path = os.path.join(TEMPDIR, "directory") 653 os.mkdir(path) 654 try: 655 tar = tarfile.open(tmpname, self.mode) 656 tarinfo = tar.gettarinfo(path) 657 self.assertEqual(tarinfo.size, 0) 658 finally: 659 os.rmdir(path) 660 661 def test_link_size(self): 662 if hasattr(os, "link"): 663 link = os.path.join(TEMPDIR, "link") 664 target = os.path.join(TEMPDIR, "link_target") 665 open(target, "wb").close() 666 os.link(target, link) 667 try: 668 tar = tarfile.open(tmpname, self.mode) 669 tarinfo = tar.gettarinfo(link) 670 self.assertEqual(tarinfo.size, 0) 671 finally: 672 os.remove(target) 673 os.remove(link) 674 675 def test_symlink_size(self): 676 if hasattr(os, "symlink"): 677 path = os.path.join(TEMPDIR, "symlink") 678 os.symlink("link_target", path) 679 try: 680 tar = tarfile.open(tmpname, self.mode) 681 tarinfo = tar.gettarinfo(path) 682 self.assertEqual(tarinfo.size, 0) 683 finally: 684 os.remove(path) 685 686 def test_add_self(self): 687 # Test for #1257255. 688 dstname = os.path.abspath(tmpname) 689 690 tar = tarfile.open(tmpname, self.mode) 691 self.assertTrue(tar.name == dstname, "archive name must be absolute") 692 693 tar.add(dstname) 694 self.assertTrue(tar.getnames() == [], "added the archive to itself") 695 696 cwd = os.getcwd() 697 os.chdir(TEMPDIR) 698 tar.add(dstname) 699 os.chdir(cwd) 700 self.assertTrue(tar.getnames() == [], "added the archive to itself") 701 702 def test_exclude(self): 703 tempdir = os.path.join(TEMPDIR, "exclude") 704 os.mkdir(tempdir) 705 try: 706 for name in ("foo", "bar", "baz"): 707 name = os.path.join(tempdir, name) 708 open(name, "wb").close() 709 710 def exclude(name): 711 return os.path.isfile(name) 712 713 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 714 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 715 tar.close() 716 717 tar = tarfile.open(tmpname, "r") 718 self.assertEqual(len(tar.getmembers()), 1) 719 self.assertEqual(tar.getnames()[0], "empty_dir") 720 finally: 721 shutil.rmtree(tempdir) 722 723 def test_filter(self): 724 tempdir = os.path.join(TEMPDIR, "filter") 725 os.mkdir(tempdir) 726 try: 727 for name in ("foo", "bar", "baz"): 728 name = os.path.join(tempdir, name) 729 open(name, "wb").close() 730 731 def filter(tarinfo): 732 if os.path.basename(tarinfo.name) == "bar": 733 return 734 tarinfo.uid = 123 735 tarinfo.uname = "foo" 736 return tarinfo 737 738 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 739 tar.add(tempdir, arcname="empty_dir", filter=filter) 740 tar.close() 741 742 tar = tarfile.open(tmpname, "r") 743 for tarinfo in tar: 744 self.assertEqual(tarinfo.uid, 123) 745 self.assertEqual(tarinfo.uname, "foo") 746 self.assertEqual(len(tar.getmembers()), 3) 747 tar.close() 748 finally: 749 shutil.rmtree(tempdir) 750 751 # Guarantee that stored pathnames are not modified. Don't 752 # remove ./ or ../ or double slashes. Still make absolute 753 # pathnames relative. 754 # For details see bug #6054. 755 def _test_pathname(self, path, cmp_path=None, dir=False): 756 # Create a tarfile with an empty member named path 757 # and compare the stored name with the original. 758 foo = os.path.join(TEMPDIR, "foo") 759 if not dir: 760 open(foo, "w").close() 761 else: 762 os.mkdir(foo) 763 764 tar = tarfile.open(tmpname, self.mode) 765 tar.add(foo, arcname=path) 766 tar.close() 767 768 tar = tarfile.open(tmpname, "r") 769 t = tar.next() 770 tar.close() 771 772 if not dir: 773 os.remove(foo) 774 else: 775 os.rmdir(foo) 776 777 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 778 779 def test_pathnames(self): 780 self._test_pathname("foo") 781 self._test_pathname(os.path.join("foo", ".", "bar")) 782 self._test_pathname(os.path.join("foo", "..", "bar")) 783 self._test_pathname(os.path.join(".", "foo")) 784 self._test_pathname(os.path.join(".", "foo", ".")) 785 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 786 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 787 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 788 self._test_pathname(os.path.join("..", "foo")) 789 self._test_pathname(os.path.join("..", "foo", "..")) 790 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 791 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 792 793 self._test_pathname("foo" + os.sep + os.sep + "bar") 794 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 795 796 def test_abs_pathnames(self): 797 if sys.platform == "win32": 798 self._test_pathname("C:\\foo", "foo") 799 else: 800 self._test_pathname("/foo", "foo") 801 self._test_pathname("///foo", "foo") 802 803 def test_cwd(self): 804 # Test adding the current working directory. 805 cwd = os.getcwd() 806 os.chdir(TEMPDIR) 807 try: 808 open("foo", "w").close() 809 810 tar = tarfile.open(tmpname, self.mode) 811 tar.add(".") 812 tar.close() 813 814 tar = tarfile.open(tmpname, "r") 815 for t in tar: 816 self.assert_(t.name == "." or t.name.startswith("./")) 817 tar.close() 818 finally: 819 os.chdir(cwd) 820 821 822class StreamWriteTest(WriteTestBase): 823 824 mode = "w|" 825 826 def test_stream_padding(self): 827 # Test for bug #1543303. 828 tar = tarfile.open(tmpname, self.mode) 829 tar.close() 830 831 if self.mode.endswith("gz"): 832 fobj = gzip.GzipFile(tmpname) 833 data = fobj.read() 834 fobj.close() 835 elif self.mode.endswith("bz2"): 836 dec = bz2.BZ2Decompressor() 837 data = open(tmpname, "rb").read() 838 data = dec.decompress(data) 839 self.assertTrue(len(dec.unused_data) == 0, 840 "found trailing data") 841 else: 842 fobj = open(tmpname, "rb") 843 data = fobj.read() 844 fobj.close() 845 846 self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, 847 "incorrect zero padding") 848 849 850class GNUWriteTest(unittest.TestCase): 851 # This testcase checks for correct creation of GNU Longname 852 # and Longlink extended headers (cp. bug #812325). 853 854 def _length(self, s): 855 blocks, remainder = divmod(len(s) + 1, 512) 856 if remainder: 857 blocks += 1 858 return blocks * 512 859 860 def _calc_size(self, name, link=None): 861 # Initial tar header 862 count = 512 863 864 if len(name) > tarfile.LENGTH_NAME: 865 # GNU longname extended header + longname 866 count += 512 867 count += self._length(name) 868 if link is not None and len(link) > tarfile.LENGTH_LINK: 869 # GNU longlink extended header + longlink 870 count += 512 871 count += self._length(link) 872 return count 873 874 def _test(self, name, link=None): 875 tarinfo = tarfile.TarInfo(name) 876 if link: 877 tarinfo.linkname = link 878 tarinfo.type = tarfile.LNKTYPE 879 880 tar = tarfile.open(tmpname, "w") 881 tar.format = tarfile.GNU_FORMAT 882 tar.addfile(tarinfo) 883 884 v1 = self._calc_size(name, link) 885 v2 = tar.offset 886 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 887 888 tar.close() 889 890 tar = tarfile.open(tmpname) 891 member = tar.next() 892 self.assertFalse(member is None, "unable to read longname member") 893 self.assertTrue(tarinfo.name == member.name and \ 894 tarinfo.linkname == member.linkname, \ 895 "unable to read longname member") 896 897 def test_longname_1023(self): 898 self._test(("longnam/" * 127) + "longnam") 899 900 def test_longname_1024(self): 901 self._test(("longnam/" * 127) + "longname") 902 903 def test_longname_1025(self): 904 self._test(("longnam/" * 127) + "longname_") 905 906 def test_longlink_1023(self): 907 self._test("name", ("longlnk/" * 127) + "longlnk") 908 909 def test_longlink_1024(self): 910 self._test("name", ("longlnk/" * 127) + "longlink") 911 912 def test_longlink_1025(self): 913 self._test("name", ("longlnk/" * 127) + "longlink_") 914 915 def test_longnamelink_1023(self): 916 self._test(("longnam/" * 127) + "longnam", 917 ("longlnk/" * 127) + "longlnk") 918 919 def test_longnamelink_1024(self): 920 self._test(("longnam/" * 127) + "longname", 921 ("longlnk/" * 127) + "longlink") 922 923 def test_longnamelink_1025(self): 924 self._test(("longnam/" * 127) + "longname_", 925 ("longlnk/" * 127) + "longlink_") 926 927 928class HardlinkTest(unittest.TestCase): 929 # Test the creation of LNKTYPE (hardlink) members in an archive. 930 931 def setUp(self): 932 self.foo = os.path.join(TEMPDIR, "foo") 933 self.bar = os.path.join(TEMPDIR, "bar") 934 935 fobj = open(self.foo, "wb") 936 fobj.write(b"foo") 937 fobj.close() 938 939 os.link(self.foo, self.bar) 940 941 self.tar = tarfile.open(tmpname, "w") 942 self.tar.add(self.foo) 943 944 def tearDown(self): 945 self.tar.close() 946 os.remove(self.foo) 947 os.remove(self.bar) 948 949 def test_add_twice(self): 950 # The same name will be added as a REGTYPE every 951 # time regardless of st_nlink. 952 tarinfo = self.tar.gettarinfo(self.foo) 953 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 954 "add file as regular failed") 955 956 def test_add_hardlink(self): 957 tarinfo = self.tar.gettarinfo(self.bar) 958 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 959 "add file as hardlink failed") 960 961 def test_dereference_hardlink(self): 962 self.tar.dereference = True 963 tarinfo = self.tar.gettarinfo(self.bar) 964 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 965 "dereferencing hardlink failed") 966 967 968class PaxWriteTest(GNUWriteTest): 969 970 def _test(self, name, link=None): 971 # See GNUWriteTest. 972 tarinfo = tarfile.TarInfo(name) 973 if link: 974 tarinfo.linkname = link 975 tarinfo.type = tarfile.LNKTYPE 976 977 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 978 tar.addfile(tarinfo) 979 tar.close() 980 981 tar = tarfile.open(tmpname) 982 if link: 983 l = tar.getmembers()[0].linkname 984 self.assertTrue(link == l, "PAX longlink creation failed") 985 else: 986 n = tar.getmembers()[0].name 987 self.assertTrue(name == n, "PAX longname creation failed") 988 989 def test_pax_global_header(self): 990 pax_headers = { 991 "foo": "bar", 992 "uid": "0", 993 "mtime": "1.23", 994 "test": "\xe4\xf6\xfc", 995 "\xe4\xf6\xfc": "test"} 996 997 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, \ 998 pax_headers=pax_headers) 999 tar.addfile(tarfile.TarInfo("test")) 1000 tar.close() 1001 1002 # Test if the global header was written correctly. 1003 tar = tarfile.open(tmpname, encoding="iso8859-1") 1004 self.assertEqual(tar.pax_headers, pax_headers) 1005 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1006 1007 # Test if all the fields are strings. 1008 for key, val in tar.pax_headers.items(): 1009 self.assertTrue(type(key) is not bytes) 1010 self.assertTrue(type(val) is not bytes) 1011 if key in tarfile.PAX_NUMBER_FIELDS: 1012 try: 1013 tarfile.PAX_NUMBER_FIELDS[key](val) 1014 except (TypeError, ValueError): 1015 self.fail("unable to convert pax header field") 1016 1017 def test_pax_extended_header(self): 1018 # The fields from the pax header have priority over the 1019 # TarInfo. 1020 pax_headers = {"path": "foo", "uid": "123"} 1021 1022 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") 1023 t = tarfile.TarInfo() 1024 t.name = "\xe4\xf6\xfc" # non-ASCII 1025 t.uid = 8**8 # too large 1026 t.pax_headers = pax_headers 1027 tar.addfile(t) 1028 tar.close() 1029 1030 tar = tarfile.open(tmpname, encoding="iso8859-1") 1031 t = tar.getmembers()[0] 1032 self.assertEqual(t.pax_headers, pax_headers) 1033 self.assertEqual(t.name, "foo") 1034 self.assertEqual(t.uid, 123) 1035 1036 1037class UstarUnicodeTest(unittest.TestCase): 1038 1039 format = tarfile.USTAR_FORMAT 1040 1041 def test_iso8859_1_filename(self): 1042 self._test_unicode_filename("iso8859-1") 1043 1044 def test_utf7_filename(self): 1045 self._test_unicode_filename("utf7") 1046 1047 def test_utf8_filename(self): 1048 self._test_unicode_filename("utf8") 1049 1050 def _test_unicode_filename(self, encoding): 1051 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") 1052 name = "\xe4\xf6\xfc" 1053 tar.addfile(tarfile.TarInfo(name)) 1054 tar.close() 1055 1056 tar = tarfile.open(tmpname, encoding=encoding) 1057 self.assertEqual(tar.getmembers()[0].name, name) 1058 tar.close() 1059 1060 def test_unicode_filename_error(self): 1061 if self.format == tarfile.PAX_FORMAT: 1062 # PAX_FORMAT ignores encoding in write mode. 1063 return 1064 1065 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") 1066 tarinfo = tarfile.TarInfo() 1067 1068 tarinfo.name = "\xe4\xf6\xfc" 1069 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1070 1071 tarinfo.name = "foo" 1072 tarinfo.uname = "\xe4\xf6\xfc" 1073 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1074 1075 def test_unicode_argument(self): 1076 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 1077 for t in tar: 1078 self.assertTrue(type(t.name) is str) 1079 self.assertTrue(type(t.linkname) is str) 1080 self.assertTrue(type(t.uname) is str) 1081 self.assertTrue(type(t.gname) is str) 1082 tar.close() 1083 1084 def test_uname_unicode(self): 1085 t = tarfile.TarInfo("foo") 1086 t.uname = "\xe4\xf6\xfc" 1087 t.gname = "\xe4\xf6\xfc" 1088 1089 tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") 1090 tar.addfile(t) 1091 tar.close() 1092 1093 tar = tarfile.open(tmpname, encoding="iso8859-1") 1094 t = tar.getmember("foo") 1095 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1096 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1097 1098 if self.format != tarfile.PAX_FORMAT: 1099 tar = tarfile.open(tmpname, encoding="ascii") 1100 t = tar.getmember("foo") 1101 self.assertEqual(t.uname, "\ufffd\ufffd\ufffd") 1102 self.assertEqual(t.gname, "\ufffd\ufffd\ufffd") 1103 1104 1105class GNUUnicodeTest(UstarUnicodeTest): 1106 1107 format = tarfile.GNU_FORMAT 1108 1109 1110class PAXUnicodeTest(UstarUnicodeTest): 1111 1112 format = tarfile.PAX_FORMAT 1113 1114 1115class AppendTest(unittest.TestCase): 1116 # Test append mode (cp. patch #1652681). 1117 1118 def setUp(self): 1119 self.tarname = tmpname 1120 if os.path.exists(self.tarname): 1121 os.remove(self.tarname) 1122 1123 def _add_testfile(self, fileobj=None): 1124 tar = tarfile.open(self.tarname, "a", fileobj=fileobj) 1125 tar.addfile(tarfile.TarInfo("bar")) 1126 tar.close() 1127 1128 def _create_testtar(self, mode="w:"): 1129 src = tarfile.open(tarname, encoding="iso8859-1") 1130 t = src.getmember("ustar/regtype") 1131 t.name = "foo" 1132 f = src.extractfile(t) 1133 tar = tarfile.open(self.tarname, mode) 1134 tar.addfile(t, f) 1135 tar.close() 1136 1137 def _test(self, names=["bar"], fileobj=None): 1138 tar = tarfile.open(self.tarname, fileobj=fileobj) 1139 self.assertEqual(tar.getnames(), names) 1140 1141 def test_non_existing(self): 1142 self._add_testfile() 1143 self._test() 1144 1145 def test_empty(self): 1146 tarfile.open(self.tarname, "w:").close() 1147 self._add_testfile() 1148 self._test() 1149 1150 def test_empty_fileobj(self): 1151 fobj = io.BytesIO(b"\0" * 1024) 1152 self._add_testfile(fobj) 1153 fobj.seek(0) 1154 self._test(fileobj=fobj) 1155 1156 def test_fileobj(self): 1157 self._create_testtar() 1158 data = open(self.tarname, "rb").read() 1159 fobj = io.BytesIO(data) 1160 self._add_testfile(fobj) 1161 fobj.seek(0) 1162 self._test(names=["foo", "bar"], fileobj=fobj) 1163 1164 def test_existing(self): 1165 self._create_testtar() 1166 self._add_testfile() 1167 self._test(names=["foo", "bar"]) 1168 1169 def test_append_gz(self): 1170 if gzip is None: 1171 return 1172 self._create_testtar("w:gz") 1173 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1174 1175 def test_append_bz2(self): 1176 if bz2 is None: 1177 return 1178 self._create_testtar("w:bz2") 1179 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1180 1181 # Append mode is supposed to fail if the tarfile to append to 1182 # does not end with a zero block. 1183 def _test_error(self, data): 1184 open(self.tarname, "wb").write(data) 1185 self.assertRaises(tarfile.ReadError, self._add_testfile) 1186 1187 def test_null(self): 1188 self._test_error(b"") 1189 1190 def test_incomplete(self): 1191 self._test_error(b"\0" * 13) 1192 1193 def test_premature_eof(self): 1194 data = tarfile.TarInfo("foo").tobuf() 1195 self._test_error(data) 1196 1197 def test_trailing_garbage(self): 1198 data = tarfile.TarInfo("foo").tobuf() 1199 self._test_error(data + b"\0" * 13) 1200 1201 def test_invalid(self): 1202 self._test_error(b"a" * 512) 1203 1204 1205class LimitsTest(unittest.TestCase): 1206 1207 def test_ustar_limits(self): 1208 # 100 char name 1209 tarinfo = tarfile.TarInfo("0123456789" * 10) 1210 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1211 1212 # 101 char name that cannot be stored 1213 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1214 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1215 1216 # 256 char name with a slash at pos 156 1217 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1218 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1219 1220 # 256 char name that cannot be stored 1221 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1222 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1223 1224 # 512 char name 1225 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1226 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1227 1228 # 512 char linkname 1229 tarinfo = tarfile.TarInfo("longlink") 1230 tarinfo.linkname = "123/" * 126 + "longname" 1231 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1232 1233 # uid > 8 digits 1234 tarinfo = tarfile.TarInfo("name") 1235 tarinfo.uid = 0o10000000 1236 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1237 1238 def test_gnu_limits(self): 1239 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1240 tarinfo.tobuf(tarfile.GNU_FORMAT) 1241 1242 tarinfo = tarfile.TarInfo("longlink") 1243 tarinfo.linkname = "123/" * 126 + "longname" 1244 tarinfo.tobuf(tarfile.GNU_FORMAT) 1245 1246 # uid >= 256 ** 7 1247 tarinfo = tarfile.TarInfo("name") 1248 tarinfo.uid = 0o4000000000000000000 1249 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1250 1251 def test_pax_limits(self): 1252 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1253 tarinfo.tobuf(tarfile.PAX_FORMAT) 1254 1255 tarinfo = tarfile.TarInfo("longlink") 1256 tarinfo.linkname = "123/" * 126 + "longname" 1257 tarinfo.tobuf(tarfile.PAX_FORMAT) 1258 1259 tarinfo = tarfile.TarInfo("name") 1260 tarinfo.uid = 0o4000000000000000000 1261 tarinfo.tobuf(tarfile.PAX_FORMAT) 1262 1263 1264class MiscTest(unittest.TestCase): 1265 1266 def test_char_fields(self): 1267 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0") 1268 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo") 1269 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo") 1270 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo") 1271 1272 def test_number_fields(self): 1273 self.assertEqual(tarfile.itn(1), b"0000001\x00") 1274 self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff") 1275 1276 1277class GzipMiscReadTest(MiscReadTest): 1278 tarname = gzipname 1279 mode = "r:gz" 1280class GzipUstarReadTest(UstarReadTest): 1281 tarname = gzipname 1282 mode = "r:gz" 1283class GzipStreamReadTest(StreamReadTest): 1284 tarname = gzipname 1285 mode = "r|gz" 1286class GzipWriteTest(WriteTest): 1287 mode = "w:gz" 1288class GzipStreamWriteTest(StreamWriteTest): 1289 mode = "w|gz" 1290 1291 1292class Bz2MiscReadTest(MiscReadTest): 1293 tarname = bz2name 1294 mode = "r:bz2" 1295class Bz2UstarReadTest(UstarReadTest): 1296 tarname = bz2name 1297 mode = "r:bz2" 1298class Bz2StreamReadTest(StreamReadTest): 1299 tarname = bz2name 1300 mode = "r|bz2" 1301class Bz2WriteTest(WriteTest): 1302 mode = "w:bz2" 1303class Bz2StreamWriteTest(StreamWriteTest): 1304 mode = "w|bz2" 1305 1306class Bz2PartialReadTest(unittest.TestCase): 1307 # Issue5068: The _BZ2Proxy.read() method loops forever 1308 # on an empty or partial bzipped file. 1309 1310 def _test_partial_input(self, mode): 1311 class MyBytesIO(io.BytesIO): 1312 hit_eof = False 1313 def read(self, n): 1314 if self.hit_eof: 1315 raise AssertionError("infinite loop detected in tarfile.open()") 1316 self.hit_eof = self.tell() == len(self.getvalue()) 1317 return super(MyBytesIO, self).read(n) 1318 def seek(self, *args): 1319 self.hit_eof = False 1320 return super(MyBytesIO, self).seek(*args) 1321 1322 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1323 for x in range(len(data) + 1): 1324 try: 1325 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 1326 except tarfile.ReadError: 1327 pass # we have no interest in ReadErrors 1328 1329 def test_partial_input(self): 1330 self._test_partial_input("r") 1331 1332 def test_partial_input_bz2(self): 1333 self._test_partial_input("r:bz2") 1334 1335 1336def test_main(): 1337 os.makedirs(TEMPDIR) 1338 1339 tests = [ 1340 UstarReadTest, 1341 MiscReadTest, 1342 StreamReadTest, 1343 DetectReadTest, 1344 MemberReadTest, 1345 GNUReadTest, 1346 PaxReadTest, 1347 WriteTest, 1348 StreamWriteTest, 1349 GNUWriteTest, 1350 PaxWriteTest, 1351 UstarUnicodeTest, 1352 GNUUnicodeTest, 1353 PAXUnicodeTest, 1354 AppendTest, 1355 LimitsTest, 1356 MiscTest, 1357 ] 1358 1359 if hasattr(os, "link"): 1360 tests.append(HardlinkTest) 1361 1362 fobj = open(tarname, "rb") 1363 data = fobj.read() 1364 fobj.close() 1365 1366 if gzip: 1367 # Create testtar.tar.gz and add gzip-specific tests. 1368 tar = gzip.open(gzipname, "wb") 1369 tar.write(data) 1370 tar.close() 1371 1372 tests += [ 1373 GzipMiscReadTest, 1374 GzipUstarReadTest, 1375 GzipStreamReadTest, 1376 GzipWriteTest, 1377 GzipStreamWriteTest, 1378 ] 1379 1380 if bz2: 1381 # Create testtar.tar.bz2 and add bz2-specific tests. 1382 tar = bz2.BZ2File(bz2name, "wb") 1383 tar.write(data) 1384 tar.close() 1385 1386 tests += [ 1387 Bz2MiscReadTest, 1388 Bz2UstarReadTest, 1389 Bz2StreamReadTest, 1390 Bz2WriteTest, 1391 Bz2StreamWriteTest, 1392 Bz2PartialReadTest, 1393 ] 1394 1395 try: 1396 support.run_unittest(*tests) 1397 finally: 1398 if os.path.exists(TEMPDIR): 1399 shutil.rmtree(TEMPDIR) 1400 1401if __name__ == "__main__": 1402 test_main() 1403