1# -*- coding: iso-8859-15 -*- 2 3import sys 4import os 5import shutil 6import StringIO 7from hashlib import md5 8import errno 9 10import unittest 11import tarfile 12 13from test import test_support 14 15# Check for our compression modules. 16try: 17 import gzip 18 gzip.GzipFile 19except (ImportError, AttributeError): 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25 26def md5sum(data): 27 return md5(data).hexdigest() 28 29TEMPDIR = os.path.abspath(test_support.TESTFN) 30tarname = test_support.findfile("testtar.tar") 31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 33tmpname = os.path.join(TEMPDIR, "tmp.tar") 34 35md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 36md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 37 38 39class ReadTest(unittest.TestCase): 40 41 tarname = tarname 42 mode = "r:" 43 44 def setUp(self): 45 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 46 47 def tearDown(self): 48 self.tar.close() 49 50 51class UstarReadTest(ReadTest): 52 53 def test_fileobj_regular_file(self): 54 tarinfo = self.tar.getmember("ustar/regtype") 55 fobj = self.tar.extractfile(tarinfo) 56 data = fobj.read() 57 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 58 "regular file extraction failed") 59 60 def test_fileobj_readlines(self): 61 self.tar.extract("ustar/regtype", TEMPDIR) 62 tarinfo = self.tar.getmember("ustar/regtype") 63 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") 64 fobj2 = self.tar.extractfile(tarinfo) 65 66 lines1 = fobj1.readlines() 67 lines2 = fobj2.readlines() 68 self.assertTrue(lines1 == lines2, 69 "fileobj.readlines() failed") 70 self.assertTrue(len(lines2) == 114, 71 "fileobj.readlines() failed") 72 self.assertTrue(lines2[83] == 73 "I will gladly admit that Python is not the fastest running scripting language.\n", 74 "fileobj.readlines() failed") 75 76 def test_fileobj_iter(self): 77 self.tar.extract("ustar/regtype", TEMPDIR) 78 tarinfo = self.tar.getmember("ustar/regtype") 79 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") 80 fobj2 = self.tar.extractfile(tarinfo) 81 lines1 = fobj1.readlines() 82 lines2 = [line for line in fobj2] 83 self.assertTrue(lines1 == lines2, 84 "fileobj.__iter__() failed") 85 86 def test_fileobj_seek(self): 87 self.tar.extract("ustar/regtype", TEMPDIR) 88 fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") 89 data = fobj.read() 90 fobj.close() 91 92 tarinfo = self.tar.getmember("ustar/regtype") 93 fobj = self.tar.extractfile(tarinfo) 94 95 text = fobj.read() 96 fobj.seek(0) 97 self.assertTrue(0 == fobj.tell(), 98 "seek() to file's start failed") 99 fobj.seek(2048, 0) 100 self.assertTrue(2048 == fobj.tell(), 101 "seek() to absolute position failed") 102 fobj.seek(-1024, 1) 103 self.assertTrue(1024 == fobj.tell(), 104 "seek() to negative relative position failed") 105 fobj.seek(1024, 1) 106 self.assertTrue(2048 == fobj.tell(), 107 "seek() to positive relative position failed") 108 s = fobj.read(10) 109 self.assertTrue(s == data[2048:2058], 110 "read() after seek failed") 111 fobj.seek(0, 2) 112 self.assertTrue(tarinfo.size == fobj.tell(), 113 "seek() to file's end failed") 114 self.assertTrue(fobj.read() == "", 115 "read() at file's end did not return empty string") 116 fobj.seek(-tarinfo.size, 2) 117 self.assertTrue(0 == fobj.tell(), 118 "relative seek() to file's start failed") 119 fobj.seek(512) 120 s1 = fobj.readlines() 121 fobj.seek(512) 122 s2 = fobj.readlines() 123 self.assertTrue(s1 == s2, 124 "readlines() after seek failed") 125 fobj.seek(0) 126 self.assertTrue(len(fobj.readline()) == fobj.tell(), 127 "tell() after readline() failed") 128 fobj.seek(512) 129 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 130 "tell() after seek() and readline() failed") 131 fobj.seek(0) 132 line = fobj.readline() 133 self.assertTrue(fobj.read() == data[len(line):], 134 "read() after readline() failed") 135 fobj.close() 136 137 # Test if symbolic and hard links are resolved by extractfile(). The 138 # test link members each point to a regular member whose data is 139 # supposed to be exported. 140 def _test_fileobj_link(self, lnktype, regtype): 141 a = self.tar.extractfile(lnktype) 142 b = self.tar.extractfile(regtype) 143 self.assertEqual(a.name, b.name) 144 145 def test_fileobj_link1(self): 146 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 147 148 def test_fileobj_link2(self): 149 self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") 150 151 def test_fileobj_symlink1(self): 152 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 153 154 def test_fileobj_symlink2(self): 155 self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") 156 157 def test_issue14160(self): 158 self._test_fileobj_link("symtype2", "ustar/regtype") 159 160 161class CommonReadTest(ReadTest): 162 163 def test_empty_tarfile(self): 164 # Test for issue6123: Allow opening empty archives. 165 # This test checks if tarfile.open() is able to open an empty tar 166 # archive successfully. Note that an empty tar archive is not the 167 # same as an empty file! 168 tarfile.open(tmpname, self.mode.replace("r", "w")).close() 169 try: 170 tar = tarfile.open(tmpname, self.mode) 171 tar.getnames() 172 except tarfile.ReadError: 173 self.fail("tarfile.open() failed on empty archive") 174 self.assertListEqual(tar.getmembers(), []) 175 176 def test_null_tarfile(self): 177 # Test for issue6123: Allow opening empty archives. 178 # This test guarantees that tarfile.open() does not treat an empty 179 # file as an empty tar archive. 180 open(tmpname, "wb").close() 181 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 182 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 183 184 def test_ignore_zeros(self): 185 # Test TarFile's ignore_zeros option. 186 if self.mode.endswith(":gz"): 187 _open = gzip.GzipFile 188 elif self.mode.endswith(":bz2"): 189 _open = bz2.BZ2File 190 else: 191 _open = open 192 193 for char in ('\0', 'a'): 194 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 195 # are ignored correctly. 196 fobj = _open(tmpname, "wb") 197 fobj.write(char * 1024) 198 fobj.write(tarfile.TarInfo("foo").tobuf()) 199 fobj.close() 200 201 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 202 self.assertListEqual(tar.getnames(), ["foo"], 203 "ignore_zeros=True should have skipped the %r-blocks" % char) 204 tar.close() 205 206 207class MiscReadTest(CommonReadTest): 208 209 def test_no_name_argument(self): 210 fobj = open(self.tarname, "rb") 211 tar = tarfile.open(fileobj=fobj, mode=self.mode) 212 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 213 214 def test_no_name_attribute(self): 215 data = open(self.tarname, "rb").read() 216 fobj = StringIO.StringIO(data) 217 self.assertRaises(AttributeError, getattr, fobj, "name") 218 tar = tarfile.open(fileobj=fobj, mode=self.mode) 219 self.assertEqual(tar.name, None) 220 221 def test_empty_name_attribute(self): 222 data = open(self.tarname, "rb").read() 223 fobj = StringIO.StringIO(data) 224 fobj.name = "" 225 tar = tarfile.open(fileobj=fobj, mode=self.mode) 226 self.assertEqual(tar.name, None) 227 228 def test_fileobj_with_offset(self): 229 # Skip the first member and store values from the second member 230 # of the testtar. 231 tar = tarfile.open(self.tarname, mode=self.mode) 232 tar.next() 233 t = tar.next() 234 name = t.name 235 offset = t.offset 236 data = tar.extractfile(t).read() 237 tar.close() 238 239 # Open the testtar and seek to the offset of the second member. 240 if self.mode.endswith(":gz"): 241 _open = gzip.GzipFile 242 elif self.mode.endswith(":bz2"): 243 _open = bz2.BZ2File 244 else: 245 _open = open 246 fobj = _open(self.tarname, "rb") 247 fobj.seek(offset) 248 249 # Test if the tarfile starts with the second member. 250 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 251 t = tar.next() 252 self.assertEqual(t.name, name) 253 # Read to the end of fileobj and test if seeking back to the 254 # beginning works. 255 tar.getmembers() 256 self.assertEqual(tar.extractfile(t).read(), data, 257 "seek back did not work") 258 tar.close() 259 260 def test_fail_comp(self): 261 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 262 if self.mode == "r:": 263 return 264 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 265 fobj = open(tarname, "rb") 266 self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode) 267 268 def test_v7_dirtype(self): 269 # Test old style dirtype member (bug #1336623): 270 # Old V7 tars create directory members using an AREGTYPE 271 # header with a "/" appended to the filename field. 272 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 273 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 274 "v7 dirtype failed") 275 276 def test_xstar_type(self): 277 # The xstar format stores extra atime and ctime fields inside the 278 # space reserved for the prefix field. The prefix field must be 279 # ignored in this case, otherwise it will mess up the name. 280 try: 281 self.tar.getmember("misc/regtype-xstar") 282 except KeyError: 283 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 284 285 def test_check_members(self): 286 for tarinfo in self.tar: 287 self.assertTrue(int(tarinfo.mtime) == 07606136617, 288 "wrong mtime for %s" % tarinfo.name) 289 if not tarinfo.name.startswith("ustar/"): 290 continue 291 self.assertTrue(tarinfo.uname == "tarfile", 292 "wrong uname for %s" % tarinfo.name) 293 294 def test_find_members(self): 295 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 296 "could not find all members") 297 298 def test_extract_hardlink(self): 299 # Test hardlink extraction (e.g. bug #857297). 300 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 301 tar.extract("ustar/regtype", TEMPDIR) 302 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype")) 303 304 tar.extract("ustar/lnktype", TEMPDIR) 305 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype")) 306 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 307 data = f.read() 308 self.assertEqual(md5sum(data), md5_regtype) 309 310 tar.extract("ustar/symtype", TEMPDIR) 311 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype")) 312 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 313 data = f.read() 314 self.assertEqual(md5sum(data), md5_regtype) 315 316 def test_extractall(self): 317 # Test if extractall() correctly restores directory permissions 318 # and times (see issue1735). 319 tar = tarfile.open(tarname, encoding="iso8859-1") 320 directories = [t for t in tar if t.isdir()] 321 tar.extractall(TEMPDIR, directories) 322 for tarinfo in directories: 323 path = os.path.join(TEMPDIR, tarinfo.name) 324 if sys.platform != "win32": 325 # Win32 has no support for fine grained permissions. 326 self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777) 327 self.assertEqual(tarinfo.mtime, os.path.getmtime(path)) 328 tar.close() 329 330 def test_init_close_fobj(self): 331 # Issue #7341: Close the internal file object in the TarFile 332 # constructor in case of an error. For the test we rely on 333 # the fact that opening an empty file raises a ReadError. 334 empty = os.path.join(TEMPDIR, "empty") 335 open(empty, "wb").write("") 336 337 try: 338 tar = object.__new__(tarfile.TarFile) 339 try: 340 tar.__init__(empty) 341 except tarfile.ReadError: 342 self.assertTrue(tar.fileobj.closed) 343 else: 344 self.fail("ReadError not raised") 345 finally: 346 os.remove(empty) 347 348 def test_parallel_iteration(self): 349 # Issue #16601: Restarting iteration over tarfile continued 350 # from where it left off. 351 with tarfile.open(self.tarname) as tar: 352 for m1, m2 in zip(tar, tar): 353 self.assertEqual(m1.offset, m2.offset) 354 self.assertEqual(m1.name, m2.name) 355 356 357class StreamReadTest(CommonReadTest): 358 359 mode="r|" 360 361 def test_fileobj_regular_file(self): 362 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 363 fobj = self.tar.extractfile(tarinfo) 364 data = fobj.read() 365 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 366 "regular file extraction failed") 367 368 def test_provoke_stream_error(self): 369 tarinfos = self.tar.getmembers() 370 f = self.tar.extractfile(tarinfos[0]) # read the first member 371 self.assertRaises(tarfile.StreamError, f.read) 372 373 def test_compare_members(self): 374 tar1 = tarfile.open(tarname, encoding="iso8859-1") 375 tar2 = self.tar 376 377 while True: 378 t1 = tar1.next() 379 t2 = tar2.next() 380 if t1 is None: 381 break 382 self.assertTrue(t2 is not None, "stream.next() failed.") 383 384 if t2.islnk() or t2.issym(): 385 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) 386 continue 387 388 v1 = tar1.extractfile(t1) 389 v2 = tar2.extractfile(t2) 390 if v1 is None: 391 continue 392 self.assertTrue(v2 is not None, "stream.extractfile() failed") 393 self.assertTrue(v1.read() == v2.read(), "stream extraction failed") 394 395 tar1.close() 396 397 398class DetectReadTest(unittest.TestCase): 399 400 def _testfunc_file(self, name, mode): 401 try: 402 tarfile.open(name, mode) 403 except tarfile.ReadError: 404 self.fail() 405 406 def _testfunc_fileobj(self, name, mode): 407 try: 408 tarfile.open(name, mode, fileobj=open(name, "rb")) 409 except tarfile.ReadError: 410 self.fail() 411 412 def _test_modes(self, testfunc): 413 testfunc(tarname, "r") 414 testfunc(tarname, "r:") 415 testfunc(tarname, "r:*") 416 testfunc(tarname, "r|") 417 testfunc(tarname, "r|*") 418 419 if gzip: 420 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") 421 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") 422 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") 423 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") 424 425 testfunc(gzipname, "r") 426 testfunc(gzipname, "r:*") 427 testfunc(gzipname, "r:gz") 428 testfunc(gzipname, "r|*") 429 testfunc(gzipname, "r|gz") 430 431 if bz2: 432 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") 433 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") 434 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") 435 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") 436 437 testfunc(bz2name, "r") 438 testfunc(bz2name, "r:*") 439 testfunc(bz2name, "r:bz2") 440 testfunc(bz2name, "r|*") 441 testfunc(bz2name, "r|bz2") 442 443 def test_detect_file(self): 444 self._test_modes(self._testfunc_file) 445 446 def test_detect_fileobj(self): 447 self._test_modes(self._testfunc_fileobj) 448 449 def test_detect_stream_bz2(self): 450 # Originally, tarfile's stream detection looked for the string 451 # "BZh91" at the start of the file. This is incorrect because 452 # the '9' represents the blocksize (900kB). If the file was 453 # compressed using another blocksize autodetection fails. 454 if not bz2: 455 return 456 457 with open(tarname, "rb") as fobj: 458 data = fobj.read() 459 460 # Compress with blocksize 100kB, the file starts with "BZh11". 461 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 462 fobj.write(data) 463 464 self._testfunc_file(tmpname, "r|*") 465 466 467class MemberReadTest(ReadTest): 468 469 def _test_member(self, tarinfo, chksum=None, **kwargs): 470 if chksum is not None: 471 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 472 "wrong md5sum for %s" % tarinfo.name) 473 474 kwargs["mtime"] = 07606136617 475 kwargs["uid"] = 1000 476 kwargs["gid"] = 100 477 if "old-v7" not in tarinfo.name: 478 # V7 tar can't handle alphabetic owners. 479 kwargs["uname"] = "tarfile" 480 kwargs["gname"] = "tarfile" 481 for k, v in kwargs.iteritems(): 482 self.assertTrue(getattr(tarinfo, k) == v, 483 "wrong value in %s field of %s" % (k, tarinfo.name)) 484 485 def test_find_regtype(self): 486 tarinfo = self.tar.getmember("ustar/regtype") 487 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 488 489 def test_find_conttype(self): 490 tarinfo = self.tar.getmember("ustar/conttype") 491 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 492 493 def test_find_dirtype(self): 494 tarinfo = self.tar.getmember("ustar/dirtype") 495 self._test_member(tarinfo, size=0) 496 497 def test_find_dirtype_with_size(self): 498 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 499 self._test_member(tarinfo, size=255) 500 501 def test_find_lnktype(self): 502 tarinfo = self.tar.getmember("ustar/lnktype") 503 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 504 505 def test_find_symtype(self): 506 tarinfo = self.tar.getmember("ustar/symtype") 507 self._test_member(tarinfo, size=0, linkname="regtype") 508 509 def test_find_blktype(self): 510 tarinfo = self.tar.getmember("ustar/blktype") 511 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 512 513 def test_find_chrtype(self): 514 tarinfo = self.tar.getmember("ustar/chrtype") 515 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 516 517 def test_find_fifotype(self): 518 tarinfo = self.tar.getmember("ustar/fifotype") 519 self._test_member(tarinfo, size=0) 520 521 def test_find_sparse(self): 522 tarinfo = self.tar.getmember("ustar/sparse") 523 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 524 525 def test_find_umlauts(self): 526 tarinfo = self.tar.getmember("ustar/umlauts-�������") 527 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 528 529 def test_find_ustar_longname(self): 530 name = "ustar/" + "12345/" * 39 + "1234567/longname" 531 self.assertIn(name, self.tar.getnames()) 532 533 def test_find_regtype_oldv7(self): 534 tarinfo = self.tar.getmember("misc/regtype-old-v7") 535 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 536 537 def test_find_pax_umlauts(self): 538 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 539 tarinfo = self.tar.getmember("pax/umlauts-�������") 540 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 541 542 543class LongnameTest(ReadTest): 544 545 def test_read_longname(self): 546 # Test reading of longname (bug #1471427). 547 longname = self.subdir + "/" + "123/" * 125 + "longname" 548 try: 549 tarinfo = self.tar.getmember(longname) 550 except KeyError: 551 self.fail("longname not found") 552 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 553 554 def test_read_longlink(self): 555 longname = self.subdir + "/" + "123/" * 125 + "longname" 556 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 557 try: 558 tarinfo = self.tar.getmember(longlink) 559 except KeyError: 560 self.fail("longlink not found") 561 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 562 563 def test_truncated_longname(self): 564 longname = self.subdir + "/" + "123/" * 125 + "longname" 565 tarinfo = self.tar.getmember(longname) 566 offset = tarinfo.offset 567 self.tar.fileobj.seek(offset) 568 fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512)) 569 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) 570 571 def test_header_offset(self): 572 # Test if the start offset of the TarInfo object includes 573 # the preceding extended header. 574 longname = self.subdir + "/" + "123/" * 125 + "longname" 575 offset = self.tar.getmember(longname).offset 576 fobj = open(tarname) 577 fobj.seek(offset) 578 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512)) 579 self.assertEqual(tarinfo.type, self.longnametype) 580 581 582class GNUReadTest(LongnameTest): 583 584 subdir = "gnu" 585 longnametype = tarfile.GNUTYPE_LONGNAME 586 587 def test_sparse_file(self): 588 tarinfo1 = self.tar.getmember("ustar/sparse") 589 fobj1 = self.tar.extractfile(tarinfo1) 590 tarinfo2 = self.tar.getmember("gnu/sparse") 591 fobj2 = self.tar.extractfile(tarinfo2) 592 self.assertTrue(fobj1.read() == fobj2.read(), 593 "sparse file extraction failed") 594 595 596class PaxReadTest(LongnameTest): 597 598 subdir = "pax" 599 longnametype = tarfile.XHDTYPE 600 601 def test_pax_global_headers(self): 602 tar = tarfile.open(tarname, encoding="iso8859-1") 603 604 tarinfo = tar.getmember("pax/regtype1") 605 self.assertEqual(tarinfo.uname, "foo") 606 self.assertEqual(tarinfo.gname, "bar") 607 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������") 608 609 tarinfo = tar.getmember("pax/regtype2") 610 self.assertEqual(tarinfo.uname, "") 611 self.assertEqual(tarinfo.gname, "bar") 612 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������") 613 614 tarinfo = tar.getmember("pax/regtype3") 615 self.assertEqual(tarinfo.uname, "tarfile") 616 self.assertEqual(tarinfo.gname, "tarfile") 617 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������") 618 619 def test_pax_number_fields(self): 620 # All following number fields are read from the pax header. 621 tar = tarfile.open(tarname, encoding="iso8859-1") 622 tarinfo = tar.getmember("pax/regtype4") 623 self.assertEqual(tarinfo.size, 7011) 624 self.assertEqual(tarinfo.uid, 123) 625 self.assertEqual(tarinfo.gid, 123) 626 self.assertEqual(tarinfo.mtime, 1041808783.0) 627 self.assertEqual(type(tarinfo.mtime), float) 628 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 629 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 630 631 632class WriteTestBase(unittest.TestCase): 633 # Put all write tests in here that are supposed to be tested 634 # in all possible mode combinations. 635 636 def test_fileobj_no_close(self): 637 fobj = StringIO.StringIO() 638 tar = tarfile.open(fileobj=fobj, mode=self.mode) 639 tar.addfile(tarfile.TarInfo("foo")) 640 tar.close() 641 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 642 643 644class WriteTest(WriteTestBase): 645 646 mode = "w:" 647 648 def test_100_char_name(self): 649 # The name field in a tar header stores strings of at most 100 chars. 650 # If a string is shorter than 100 chars it has to be padded with '\0', 651 # which implies that a string of exactly 100 chars is stored without 652 # a trailing '\0'. 653 name = "0123456789" * 10 654 tar = tarfile.open(tmpname, self.mode) 655 t = tarfile.TarInfo(name) 656 tar.addfile(t) 657 tar.close() 658 659 tar = tarfile.open(tmpname) 660 self.assertTrue(tar.getnames()[0] == name, 661 "failed to store 100 char filename") 662 tar.close() 663 664 def test_tar_size(self): 665 # Test for bug #1013882. 666 tar = tarfile.open(tmpname, self.mode) 667 path = os.path.join(TEMPDIR, "file") 668 fobj = open(path, "wb") 669 fobj.write("aaa") 670 fobj.close() 671 tar.add(path) 672 tar.close() 673 self.assertTrue(os.path.getsize(tmpname) > 0, 674 "tarfile is empty") 675 676 # The test_*_size tests test for bug #1167128. 677 def test_file_size(self): 678 tar = tarfile.open(tmpname, self.mode) 679 680 path = os.path.join(TEMPDIR, "file") 681 fobj = open(path, "wb") 682 fobj.close() 683 tarinfo = tar.gettarinfo(path) 684 self.assertEqual(tarinfo.size, 0) 685 686 fobj = open(path, "wb") 687 fobj.write("aaa") 688 fobj.close() 689 tarinfo = tar.gettarinfo(path) 690 self.assertEqual(tarinfo.size, 3) 691 692 tar.close() 693 694 def test_directory_size(self): 695 path = os.path.join(TEMPDIR, "directory") 696 os.mkdir(path) 697 try: 698 tar = tarfile.open(tmpname, self.mode) 699 tarinfo = tar.gettarinfo(path) 700 self.assertEqual(tarinfo.size, 0) 701 finally: 702 os.rmdir(path) 703 704 def test_link_size(self): 705 if hasattr(os, "link"): 706 link = os.path.join(TEMPDIR, "link") 707 target = os.path.join(TEMPDIR, "link_target") 708 fobj = open(target, "wb") 709 fobj.write("aaa") 710 fobj.close() 711 os.link(target, link) 712 try: 713 tar = tarfile.open(tmpname, self.mode) 714 # Record the link target in the inodes list. 715 tar.gettarinfo(target) 716 tarinfo = tar.gettarinfo(link) 717 self.assertEqual(tarinfo.size, 0) 718 finally: 719 os.remove(target) 720 os.remove(link) 721 722 def test_symlink_size(self): 723 if hasattr(os, "symlink"): 724 path = os.path.join(TEMPDIR, "symlink") 725 os.symlink("link_target", path) 726 try: 727 tar = tarfile.open(tmpname, self.mode) 728 tarinfo = tar.gettarinfo(path) 729 self.assertEqual(tarinfo.size, 0) 730 finally: 731 os.remove(path) 732 733 def test_add_self(self): 734 # Test for #1257255. 735 dstname = os.path.abspath(tmpname) 736 737 tar = tarfile.open(tmpname, self.mode) 738 self.assertTrue(tar.name == dstname, "archive name must be absolute") 739 740 tar.add(dstname) 741 self.assertTrue(tar.getnames() == [], "added the archive to itself") 742 743 cwd = os.getcwd() 744 os.chdir(TEMPDIR) 745 tar.add(dstname) 746 os.chdir(cwd) 747 self.assertTrue(tar.getnames() == [], "added the archive to itself") 748 749 def test_exclude(self): 750 tempdir = os.path.join(TEMPDIR, "exclude") 751 os.mkdir(tempdir) 752 try: 753 for name in ("foo", "bar", "baz"): 754 name = os.path.join(tempdir, name) 755 open(name, "wb").close() 756 757 exclude = os.path.isfile 758 759 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 760 with test_support.check_warnings(("use the filter argument", 761 DeprecationWarning)): 762 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 763 tar.close() 764 765 tar = tarfile.open(tmpname, "r") 766 self.assertEqual(len(tar.getmembers()), 1) 767 self.assertEqual(tar.getnames()[0], "empty_dir") 768 finally: 769 shutil.rmtree(tempdir) 770 771 def test_filter(self): 772 tempdir = os.path.join(TEMPDIR, "filter") 773 os.mkdir(tempdir) 774 try: 775 for name in ("foo", "bar", "baz"): 776 name = os.path.join(tempdir, name) 777 open(name, "wb").close() 778 779 def filter(tarinfo): 780 if os.path.basename(tarinfo.name) == "bar": 781 return 782 tarinfo.uid = 123 783 tarinfo.uname = "foo" 784 return tarinfo 785 786 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 787 tar.add(tempdir, arcname="empty_dir", filter=filter) 788 tar.close() 789 790 tar = tarfile.open(tmpname, "r") 791 for tarinfo in tar: 792 self.assertEqual(tarinfo.uid, 123) 793 self.assertEqual(tarinfo.uname, "foo") 794 self.assertEqual(len(tar.getmembers()), 3) 795 tar.close() 796 finally: 797 shutil.rmtree(tempdir) 798 799 # Guarantee that stored pathnames are not modified. Don't 800 # remove ./ or ../ or double slashes. Still make absolute 801 # pathnames relative. 802 # For details see bug #6054. 803 def _test_pathname(self, path, cmp_path=None, dir=False): 804 # Create a tarfile with an empty member named path 805 # and compare the stored name with the original. 806 foo = os.path.join(TEMPDIR, "foo") 807 if not dir: 808 open(foo, "w").close() 809 else: 810 os.mkdir(foo) 811 812 tar = tarfile.open(tmpname, self.mode) 813 tar.add(foo, arcname=path) 814 tar.close() 815 816 tar = tarfile.open(tmpname, "r") 817 t = tar.next() 818 tar.close() 819 820 if not dir: 821 os.remove(foo) 822 else: 823 os.rmdir(foo) 824 825 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 826 827 def test_pathnames(self): 828 self._test_pathname("foo") 829 self._test_pathname(os.path.join("foo", ".", "bar")) 830 self._test_pathname(os.path.join("foo", "..", "bar")) 831 self._test_pathname(os.path.join(".", "foo")) 832 self._test_pathname(os.path.join(".", "foo", ".")) 833 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 834 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 835 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 836 self._test_pathname(os.path.join("..", "foo")) 837 self._test_pathname(os.path.join("..", "foo", "..")) 838 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 839 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 840 841 self._test_pathname("foo" + os.sep + os.sep + "bar") 842 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 843 844 def test_abs_pathnames(self): 845 if sys.platform == "win32": 846 self._test_pathname("C:\\foo", "foo") 847 else: 848 self._test_pathname("/foo", "foo") 849 self._test_pathname("///foo", "foo") 850 851 def test_cwd(self): 852 # Test adding the current working directory. 853 cwd = os.getcwd() 854 os.chdir(TEMPDIR) 855 try: 856 open("foo", "w").close() 857 858 tar = tarfile.open(tmpname, self.mode) 859 tar.add(".") 860 tar.close() 861 862 tar = tarfile.open(tmpname, "r") 863 for t in tar: 864 self.assertTrue(t.name == "." or t.name.startswith("./")) 865 tar.close() 866 finally: 867 os.chdir(cwd) 868 869 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") 870 def test_extractall_symlinks(self): 871 # Test if extractall works properly when tarfile contains symlinks 872 tempdir = os.path.join(TEMPDIR, "testsymlinks") 873 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 874 os.mkdir(tempdir) 875 try: 876 source_file = os.path.join(tempdir,'source') 877 target_file = os.path.join(tempdir,'symlink') 878 with open(source_file,'w') as f: 879 f.write('something\n') 880 os.symlink(source_file, target_file) 881 tar = tarfile.open(temparchive,'w') 882 tar.add(source_file, arcname=os.path.basename(source_file)) 883 tar.add(target_file, arcname=os.path.basename(target_file)) 884 tar.close() 885 # Let's extract it to the location which contains the symlink 886 tar = tarfile.open(temparchive,'r') 887 # this should not raise OSError: [Errno 17] File exists 888 try: 889 tar.extractall(path=tempdir) 890 except OSError: 891 self.fail("extractall failed with symlinked files") 892 finally: 893 tar.close() 894 finally: 895 os.unlink(temparchive) 896 shutil.rmtree(tempdir) 897 898 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") 899 def test_extractall_broken_symlinks(self): 900 # Test if extractall works properly when tarfile contains broken 901 # symlinks 902 tempdir = os.path.join(TEMPDIR, "testsymlinks") 903 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 904 os.mkdir(tempdir) 905 try: 906 source_file = os.path.join(tempdir,'source') 907 target_file = os.path.join(tempdir,'symlink') 908 with open(source_file,'w') as f: 909 f.write('something\n') 910 os.symlink(source_file, target_file) 911 tar = tarfile.open(temparchive,'w') 912 tar.add(target_file, arcname=os.path.basename(target_file)) 913 tar.close() 914 # remove the real file 915 os.unlink(source_file) 916 # Let's extract it to the location which contains the symlink 917 tar = tarfile.open(temparchive,'r') 918 # this should not raise OSError: [Errno 17] File exists 919 try: 920 tar.extractall(path=tempdir) 921 except OSError: 922 self.fail("extractall failed with broken symlinked files") 923 finally: 924 tar.close() 925 finally: 926 os.unlink(temparchive) 927 shutil.rmtree(tempdir) 928 929 @unittest.skipUnless(hasattr(os, 'link'), "needs os.link") 930 def test_extractall_hardlinks(self): 931 # Test if extractall works properly when tarfile contains symlinks 932 tempdir = os.path.join(TEMPDIR, "testsymlinks") 933 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 934 os.mkdir(tempdir) 935 try: 936 source_file = os.path.join(tempdir,'source') 937 target_file = os.path.join(tempdir,'symlink') 938 with open(source_file,'w') as f: 939 f.write('something\n') 940 os.link(source_file, target_file) 941 tar = tarfile.open(temparchive,'w') 942 tar.add(source_file, arcname=os.path.basename(source_file)) 943 tar.add(target_file, arcname=os.path.basename(target_file)) 944 tar.close() 945 # Let's extract it to the location which contains the symlink 946 tar = tarfile.open(temparchive,'r') 947 # this should not raise OSError: [Errno 17] File exists 948 try: 949 tar.extractall(path=tempdir) 950 except OSError: 951 self.fail("extractall failed with linked files") 952 finally: 953 tar.close() 954 finally: 955 os.unlink(temparchive) 956 shutil.rmtree(tempdir) 957 958class StreamWriteTest(WriteTestBase): 959 960 mode = "w|" 961 962 def test_stream_padding(self): 963 # Test for bug #1543303. 964 tar = tarfile.open(tmpname, self.mode) 965 tar.close() 966 967 if self.mode.endswith("gz"): 968 fobj = gzip.GzipFile(tmpname) 969 data = fobj.read() 970 fobj.close() 971 elif self.mode.endswith("bz2"): 972 dec = bz2.BZ2Decompressor() 973 data = open(tmpname, "rb").read() 974 data = dec.decompress(data) 975 self.assertTrue(len(dec.unused_data) == 0, 976 "found trailing data") 977 else: 978 fobj = open(tmpname, "rb") 979 data = fobj.read() 980 fobj.close() 981 982 self.assertTrue(data.count("\0") == tarfile.RECORDSIZE, 983 "incorrect zero padding") 984 985 def test_file_mode(self): 986 # Test for issue #8464: Create files with correct 987 # permissions. 988 if sys.platform == "win32" or not hasattr(os, "umask"): 989 return 990 991 if os.path.exists(tmpname): 992 os.remove(tmpname) 993 994 original_umask = os.umask(0022) 995 try: 996 tar = tarfile.open(tmpname, self.mode) 997 tar.close() 998 mode = os.stat(tmpname).st_mode & 0777 999 self.assertEqual(mode, 0644, "wrong file permissions") 1000 finally: 1001 os.umask(original_umask) 1002 1003 def test_issue13639(self): 1004 try: 1005 with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode): 1006 pass 1007 except UnicodeDecodeError: 1008 self.fail("_Stream failed to write unicode filename") 1009 1010 1011class GNUWriteTest(unittest.TestCase): 1012 # This testcase checks for correct creation of GNU Longname 1013 # and Longlink extended headers (cp. bug #812325). 1014 1015 def _length(self, s): 1016 blocks, remainder = divmod(len(s) + 1, 512) 1017 if remainder: 1018 blocks += 1 1019 return blocks * 512 1020 1021 def _calc_size(self, name, link=None): 1022 # Initial tar header 1023 count = 512 1024 1025 if len(name) > tarfile.LENGTH_NAME: 1026 # GNU longname extended header + longname 1027 count += 512 1028 count += self._length(name) 1029 if link is not None and len(link) > tarfile.LENGTH_LINK: 1030 # GNU longlink extended header + longlink 1031 count += 512 1032 count += self._length(link) 1033 return count 1034 1035 def _test(self, name, link=None): 1036 tarinfo = tarfile.TarInfo(name) 1037 if link: 1038 tarinfo.linkname = link 1039 tarinfo.type = tarfile.LNKTYPE 1040 1041 tar = tarfile.open(tmpname, "w") 1042 tar.format = tarfile.GNU_FORMAT 1043 tar.addfile(tarinfo) 1044 1045 v1 = self._calc_size(name, link) 1046 v2 = tar.offset 1047 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 1048 1049 tar.close() 1050 1051 tar = tarfile.open(tmpname) 1052 member = tar.next() 1053 self.assertIsNotNone(member, 1054 "unable to read longname member") 1055 self.assertEqual(tarinfo.name, member.name, 1056 "unable to read longname member") 1057 self.assertEqual(tarinfo.linkname, member.linkname, 1058 "unable to read longname member") 1059 1060 def test_longname_1023(self): 1061 self._test(("longnam/" * 127) + "longnam") 1062 1063 def test_longname_1024(self): 1064 self._test(("longnam/" * 127) + "longname") 1065 1066 def test_longname_1025(self): 1067 self._test(("longnam/" * 127) + "longname_") 1068 1069 def test_longlink_1023(self): 1070 self._test("name", ("longlnk/" * 127) + "longlnk") 1071 1072 def test_longlink_1024(self): 1073 self._test("name", ("longlnk/" * 127) + "longlink") 1074 1075 def test_longlink_1025(self): 1076 self._test("name", ("longlnk/" * 127) + "longlink_") 1077 1078 def test_longnamelink_1023(self): 1079 self._test(("longnam/" * 127) + "longnam", 1080 ("longlnk/" * 127) + "longlnk") 1081 1082 def test_longnamelink_1024(self): 1083 self._test(("longnam/" * 127) + "longname", 1084 ("longlnk/" * 127) + "longlink") 1085 1086 def test_longnamelink_1025(self): 1087 self._test(("longnam/" * 127) + "longname_", 1088 ("longlnk/" * 127) + "longlink_") 1089 1090 1091class HardlinkTest(unittest.TestCase): 1092 # Test the creation of LNKTYPE (hardlink) members in an archive. 1093 1094 def setUp(self): 1095 self.foo = os.path.join(TEMPDIR, "foo") 1096 self.bar = os.path.join(TEMPDIR, "bar") 1097 1098 fobj = open(self.foo, "wb") 1099 fobj.write("foo") 1100 fobj.close() 1101 1102 os.link(self.foo, self.bar) 1103 1104 self.tar = tarfile.open(tmpname, "w") 1105 self.tar.add(self.foo) 1106 1107 def tearDown(self): 1108 self.tar.close() 1109 os.remove(self.foo) 1110 os.remove(self.bar) 1111 1112 def test_add_twice(self): 1113 # The same name will be added as a REGTYPE every 1114 # time regardless of st_nlink. 1115 tarinfo = self.tar.gettarinfo(self.foo) 1116 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1117 "add file as regular failed") 1118 1119 def test_add_hardlink(self): 1120 tarinfo = self.tar.gettarinfo(self.bar) 1121 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 1122 "add file as hardlink failed") 1123 1124 def test_dereference_hardlink(self): 1125 self.tar.dereference = True 1126 tarinfo = self.tar.gettarinfo(self.bar) 1127 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1128 "dereferencing hardlink failed") 1129 1130 1131class PaxWriteTest(GNUWriteTest): 1132 1133 def _test(self, name, link=None): 1134 # See GNUWriteTest. 1135 tarinfo = tarfile.TarInfo(name) 1136 if link: 1137 tarinfo.linkname = link 1138 tarinfo.type = tarfile.LNKTYPE 1139 1140 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1141 tar.addfile(tarinfo) 1142 tar.close() 1143 1144 tar = tarfile.open(tmpname) 1145 if link: 1146 l = tar.getmembers()[0].linkname 1147 self.assertTrue(link == l, "PAX longlink creation failed") 1148 else: 1149 n = tar.getmembers()[0].name 1150 self.assertTrue(name == n, "PAX longname creation failed") 1151 1152 def test_pax_global_header(self): 1153 pax_headers = { 1154 u"foo": u"bar", 1155 u"uid": u"0", 1156 u"mtime": u"1.23", 1157 u"test": u"���", 1158 u"���": u"test"} 1159 1160 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1161 pax_headers=pax_headers) 1162 tar.addfile(tarfile.TarInfo("test")) 1163 tar.close() 1164 1165 # Test if the global header was written correctly. 1166 tar = tarfile.open(tmpname, encoding="iso8859-1") 1167 self.assertEqual(tar.pax_headers, pax_headers) 1168 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1169 1170 # Test if all the fields are unicode. 1171 for key, val in tar.pax_headers.iteritems(): 1172 self.assertTrue(type(key) is unicode) 1173 self.assertTrue(type(val) is unicode) 1174 if key in tarfile.PAX_NUMBER_FIELDS: 1175 try: 1176 tarfile.PAX_NUMBER_FIELDS[key](val) 1177 except (TypeError, ValueError): 1178 self.fail("unable to convert pax header field") 1179 1180 def test_pax_extended_header(self): 1181 # The fields from the pax header have priority over the 1182 # TarInfo. 1183 pax_headers = {u"path": u"foo", u"uid": u"123"} 1184 1185 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") 1186 t = tarfile.TarInfo() 1187 t.name = u"���" # non-ASCII 1188 t.uid = 8**8 # too large 1189 t.pax_headers = pax_headers 1190 tar.addfile(t) 1191 tar.close() 1192 1193 tar = tarfile.open(tmpname, encoding="iso8859-1") 1194 t = tar.getmembers()[0] 1195 self.assertEqual(t.pax_headers, pax_headers) 1196 self.assertEqual(t.name, "foo") 1197 self.assertEqual(t.uid, 123) 1198 1199 1200class UstarUnicodeTest(unittest.TestCase): 1201 # All *UnicodeTests FIXME 1202 1203 format = tarfile.USTAR_FORMAT 1204 1205 def test_iso8859_1_filename(self): 1206 self._test_unicode_filename("iso8859-1") 1207 1208 def test_utf7_filename(self): 1209 self._test_unicode_filename("utf7") 1210 1211 def test_utf8_filename(self): 1212 self._test_unicode_filename("utf8") 1213 1214 def _test_unicode_filename(self, encoding): 1215 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") 1216 name = u"���" 1217 tar.addfile(tarfile.TarInfo(name)) 1218 tar.close() 1219 1220 tar = tarfile.open(tmpname, encoding=encoding) 1221 self.assertTrue(type(tar.getnames()[0]) is not unicode) 1222 self.assertEqual(tar.getmembers()[0].name, name.encode(encoding)) 1223 tar.close() 1224 1225 def test_unicode_filename_error(self): 1226 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") 1227 tarinfo = tarfile.TarInfo() 1228 1229 tarinfo.name = "���" 1230 if self.format == tarfile.PAX_FORMAT: 1231 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1232 else: 1233 tar.addfile(tarinfo) 1234 1235 tarinfo.name = u"���" 1236 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1237 1238 tarinfo.name = "foo" 1239 tarinfo.uname = u"���" 1240 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1241 1242 def test_unicode_argument(self): 1243 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 1244 for t in tar: 1245 self.assertTrue(type(t.name) is str) 1246 self.assertTrue(type(t.linkname) is str) 1247 self.assertTrue(type(t.uname) is str) 1248 self.assertTrue(type(t.gname) is str) 1249 tar.close() 1250 1251 def test_uname_unicode(self): 1252 for name in (u"���", "���"): 1253 t = tarfile.TarInfo("foo") 1254 t.uname = name 1255 t.gname = name 1256 1257 fobj = StringIO.StringIO() 1258 tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1") 1259 tar.addfile(t) 1260 tar.close() 1261 fobj.seek(0) 1262 1263 tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1") 1264 t = tar.getmember("foo") 1265 self.assertEqual(t.uname, "���") 1266 self.assertEqual(t.gname, "���") 1267 1268 1269class GNUUnicodeTest(UstarUnicodeTest): 1270 1271 format = tarfile.GNU_FORMAT 1272 1273 1274class PaxUnicodeTest(UstarUnicodeTest): 1275 1276 format = tarfile.PAX_FORMAT 1277 1278 def _create_unicode_name(self, name): 1279 tar = tarfile.open(tmpname, "w", format=self.format) 1280 t = tarfile.TarInfo() 1281 t.pax_headers["path"] = name 1282 tar.addfile(t) 1283 tar.close() 1284 1285 def test_error_handlers(self): 1286 # Test if the unicode error handlers work correctly for characters 1287 # that cannot be expressed in a given encoding. 1288 self._create_unicode_name(u"���") 1289 1290 for handler, name in (("utf-8", u"���".encode("utf8")), 1291 ("replace", "???"), ("ignore", "")): 1292 tar = tarfile.open(tmpname, format=self.format, encoding="ascii", 1293 errors=handler) 1294 self.assertEqual(tar.getnames()[0], name) 1295 1296 self.assertRaises(UnicodeError, tarfile.open, tmpname, 1297 encoding="ascii", errors="strict") 1298 1299 def test_error_handler_utf8(self): 1300 # Create a pathname that has one component representable using 1301 # iso8859-1 and the other only in iso8859-15. 1302 self._create_unicode_name(u"���/�") 1303 1304 tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1", 1305 errors="utf-8") 1306 self.assertEqual(tar.getnames()[0], "���/" + u"�".encode("utf8")) 1307 1308 1309class AppendTest(unittest.TestCase): 1310 # Test append mode (cp. patch #1652681). 1311 1312 def setUp(self): 1313 self.tarname = tmpname 1314 if os.path.exists(self.tarname): 1315 os.remove(self.tarname) 1316 1317 def _add_testfile(self, fileobj=None): 1318 tar = tarfile.open(self.tarname, "a", fileobj=fileobj) 1319 tar.addfile(tarfile.TarInfo("bar")) 1320 tar.close() 1321 1322 def _create_testtar(self, mode="w:"): 1323 src = tarfile.open(tarname, encoding="iso8859-1") 1324 t = src.getmember("ustar/regtype") 1325 t.name = "foo" 1326 f = src.extractfile(t) 1327 tar = tarfile.open(self.tarname, mode) 1328 tar.addfile(t, f) 1329 tar.close() 1330 1331 def _test(self, names=["bar"], fileobj=None): 1332 tar = tarfile.open(self.tarname, fileobj=fileobj) 1333 self.assertEqual(tar.getnames(), names) 1334 1335 def test_non_existing(self): 1336 self._add_testfile() 1337 self._test() 1338 1339 def test_empty(self): 1340 tarfile.open(self.tarname, "w:").close() 1341 self._add_testfile() 1342 self._test() 1343 1344 def test_empty_fileobj(self): 1345 fobj = StringIO.StringIO("\0" * 1024) 1346 self._add_testfile(fobj) 1347 fobj.seek(0) 1348 self._test(fileobj=fobj) 1349 1350 def test_fileobj(self): 1351 self._create_testtar() 1352 data = open(self.tarname).read() 1353 fobj = StringIO.StringIO(data) 1354 self._add_testfile(fobj) 1355 fobj.seek(0) 1356 self._test(names=["foo", "bar"], fileobj=fobj) 1357 1358 def test_existing(self): 1359 self._create_testtar() 1360 self._add_testfile() 1361 self._test(names=["foo", "bar"]) 1362 1363 def test_append_gz(self): 1364 if gzip is None: 1365 return 1366 self._create_testtar("w:gz") 1367 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1368 1369 def test_append_bz2(self): 1370 if bz2 is None: 1371 return 1372 self._create_testtar("w:bz2") 1373 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1374 1375 # Append mode is supposed to fail if the tarfile to append to 1376 # does not end with a zero block. 1377 def _test_error(self, data): 1378 open(self.tarname, "wb").write(data) 1379 self.assertRaises(tarfile.ReadError, self._add_testfile) 1380 1381 def test_null(self): 1382 self._test_error("") 1383 1384 def test_incomplete(self): 1385 self._test_error("\0" * 13) 1386 1387 def test_premature_eof(self): 1388 data = tarfile.TarInfo("foo").tobuf() 1389 self._test_error(data) 1390 1391 def test_trailing_garbage(self): 1392 data = tarfile.TarInfo("foo").tobuf() 1393 self._test_error(data + "\0" * 13) 1394 1395 def test_invalid(self): 1396 self._test_error("a" * 512) 1397 1398 1399class LimitsTest(unittest.TestCase): 1400 1401 def test_ustar_limits(self): 1402 # 100 char name 1403 tarinfo = tarfile.TarInfo("0123456789" * 10) 1404 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1405 1406 # 101 char name that cannot be stored 1407 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1408 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1409 1410 # 256 char name with a slash at pos 156 1411 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1412 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1413 1414 # 256 char name that cannot be stored 1415 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1416 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1417 1418 # 512 char name 1419 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1420 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1421 1422 # 512 char linkname 1423 tarinfo = tarfile.TarInfo("longlink") 1424 tarinfo.linkname = "123/" * 126 + "longname" 1425 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1426 1427 # uid > 8 digits 1428 tarinfo = tarfile.TarInfo("name") 1429 tarinfo.uid = 010000000 1430 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1431 1432 def test_gnu_limits(self): 1433 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1434 tarinfo.tobuf(tarfile.GNU_FORMAT) 1435 1436 tarinfo = tarfile.TarInfo("longlink") 1437 tarinfo.linkname = "123/" * 126 + "longname" 1438 tarinfo.tobuf(tarfile.GNU_FORMAT) 1439 1440 # uid >= 256 ** 7 1441 tarinfo = tarfile.TarInfo("name") 1442 tarinfo.uid = 04000000000000000000L 1443 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1444 1445 def test_pax_limits(self): 1446 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1447 tarinfo.tobuf(tarfile.PAX_FORMAT) 1448 1449 tarinfo = tarfile.TarInfo("longlink") 1450 tarinfo.linkname = "123/" * 126 + "longname" 1451 tarinfo.tobuf(tarfile.PAX_FORMAT) 1452 1453 tarinfo = tarfile.TarInfo("name") 1454 tarinfo.uid = 04000000000000000000L 1455 tarinfo.tobuf(tarfile.PAX_FORMAT) 1456 1457 1458class ContextManagerTest(unittest.TestCase): 1459 1460 def test_basic(self): 1461 with tarfile.open(tarname) as tar: 1462 self.assertFalse(tar.closed, "closed inside runtime context") 1463 self.assertTrue(tar.closed, "context manager failed") 1464 1465 def test_closed(self): 1466 # The __enter__() method is supposed to raise IOError 1467 # if the TarFile object is already closed. 1468 tar = tarfile.open(tarname) 1469 tar.close() 1470 with self.assertRaises(IOError): 1471 with tar: 1472 pass 1473 1474 def test_exception(self): 1475 # Test if the IOError exception is passed through properly. 1476 with self.assertRaises(Exception) as exc: 1477 with tarfile.open(tarname) as tar: 1478 raise IOError 1479 self.assertIsInstance(exc.exception, IOError, 1480 "wrong exception raised in context manager") 1481 self.assertTrue(tar.closed, "context manager failed") 1482 1483 def test_no_eof(self): 1484 # __exit__() must not write end-of-archive blocks if an 1485 # exception was raised. 1486 try: 1487 with tarfile.open(tmpname, "w") as tar: 1488 raise Exception 1489 except: 1490 pass 1491 self.assertEqual(os.path.getsize(tmpname), 0, 1492 "context manager wrote an end-of-archive block") 1493 self.assertTrue(tar.closed, "context manager failed") 1494 1495 def test_eof(self): 1496 # __exit__() must write end-of-archive blocks, i.e. call 1497 # TarFile.close() if there was no error. 1498 with tarfile.open(tmpname, "w"): 1499 pass 1500 self.assertNotEqual(os.path.getsize(tmpname), 0, 1501 "context manager wrote no end-of-archive block") 1502 1503 def test_fileobj(self): 1504 # Test that __exit__() did not close the external file 1505 # object. 1506 fobj = open(tmpname, "wb") 1507 try: 1508 with tarfile.open(fileobj=fobj, mode="w") as tar: 1509 raise Exception 1510 except: 1511 pass 1512 self.assertFalse(fobj.closed, "external file object was closed") 1513 self.assertTrue(tar.closed, "context manager failed") 1514 fobj.close() 1515 1516 1517class LinkEmulationTest(ReadTest): 1518 1519 # Test for issue #8741 regression. On platforms that do not support 1520 # symbolic or hard links tarfile tries to extract these types of members as 1521 # the regular files they point to. 1522 def _test_link_extraction(self, name): 1523 self.tar.extract(name, TEMPDIR) 1524 data = open(os.path.join(TEMPDIR, name), "rb").read() 1525 self.assertEqual(md5sum(data), md5_regtype) 1526 1527 def test_hardlink_extraction1(self): 1528 self._test_link_extraction("ustar/lnktype") 1529 1530 def test_hardlink_extraction2(self): 1531 self._test_link_extraction("./ustar/linktest2/lnktype") 1532 1533 def test_symlink_extraction1(self): 1534 self._test_link_extraction("ustar/symtype") 1535 1536 def test_symlink_extraction2(self): 1537 self._test_link_extraction("./ustar/linktest2/symtype") 1538 1539 1540class GzipMiscReadTest(MiscReadTest): 1541 tarname = gzipname 1542 mode = "r:gz" 1543class GzipUstarReadTest(UstarReadTest): 1544 tarname = gzipname 1545 mode = "r:gz" 1546class GzipStreamReadTest(StreamReadTest): 1547 tarname = gzipname 1548 mode = "r|gz" 1549class GzipWriteTest(WriteTest): 1550 mode = "w:gz" 1551class GzipStreamWriteTest(StreamWriteTest): 1552 mode = "w|gz" 1553 1554 1555class Bz2MiscReadTest(MiscReadTest): 1556 tarname = bz2name 1557 mode = "r:bz2" 1558class Bz2UstarReadTest(UstarReadTest): 1559 tarname = bz2name 1560 mode = "r:bz2" 1561class Bz2StreamReadTest(StreamReadTest): 1562 tarname = bz2name 1563 mode = "r|bz2" 1564class Bz2WriteTest(WriteTest): 1565 mode = "w:bz2" 1566class Bz2StreamWriteTest(StreamWriteTest): 1567 mode = "w|bz2" 1568 1569class Bz2PartialReadTest(unittest.TestCase): 1570 # Issue5068: The _BZ2Proxy.read() method loops forever 1571 # on an empty or partial bzipped file. 1572 1573 def _test_partial_input(self, mode): 1574 class MyStringIO(StringIO.StringIO): 1575 hit_eof = False 1576 def read(self, n): 1577 if self.hit_eof: 1578 raise AssertionError("infinite loop detected in tarfile.open()") 1579 self.hit_eof = self.pos == self.len 1580 return StringIO.StringIO.read(self, n) 1581 def seek(self, *args): 1582 self.hit_eof = False 1583 return StringIO.StringIO.seek(self, *args) 1584 1585 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1586 for x in range(len(data) + 1): 1587 try: 1588 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode) 1589 except tarfile.ReadError: 1590 pass # we have no interest in ReadErrors 1591 1592 def test_partial_input(self): 1593 self._test_partial_input("r") 1594 1595 def test_partial_input_bz2(self): 1596 self._test_partial_input("r:bz2") 1597 1598 1599def test_main(): 1600 os.makedirs(TEMPDIR) 1601 1602 tests = [ 1603 UstarReadTest, 1604 MiscReadTest, 1605 StreamReadTest, 1606 DetectReadTest, 1607 MemberReadTest, 1608 GNUReadTest, 1609 PaxReadTest, 1610 WriteTest, 1611 StreamWriteTest, 1612 GNUWriteTest, 1613 PaxWriteTest, 1614 UstarUnicodeTest, 1615 GNUUnicodeTest, 1616 PaxUnicodeTest, 1617 AppendTest, 1618 LimitsTest, 1619 ContextManagerTest, 1620 ] 1621 1622 if hasattr(os, "link"): 1623 tests.append(HardlinkTest) 1624 else: 1625 tests.append(LinkEmulationTest) 1626 1627 fobj = open(tarname, "rb") 1628 data = fobj.read() 1629 fobj.close() 1630 1631 if gzip: 1632 # Create testtar.tar.gz and add gzip-specific tests. 1633 tar = gzip.open(gzipname, "wb") 1634 tar.write(data) 1635 tar.close() 1636 1637 tests += [ 1638 GzipMiscReadTest, 1639 GzipUstarReadTest, 1640 GzipStreamReadTest, 1641 GzipWriteTest, 1642 GzipStreamWriteTest, 1643 ] 1644 1645 if bz2: 1646 # Create testtar.tar.bz2 and add bz2-specific tests. 1647 tar = bz2.BZ2File(bz2name, "wb") 1648 tar.write(data) 1649 tar.close() 1650 1651 tests += [ 1652 Bz2MiscReadTest, 1653 Bz2UstarReadTest, 1654 Bz2StreamReadTest, 1655 Bz2WriteTest, 1656 Bz2StreamWriteTest, 1657 Bz2PartialReadTest, 1658 ] 1659 1660 try: 1661 test_support.run_unittest(*tests) 1662 finally: 1663 if os.path.exists(TEMPDIR): 1664 shutil.rmtree(TEMPDIR) 1665 1666if __name__ == "__main__": 1667 test_main() 1668