test_tarfile.py revision ab91fdef1f1e556203a2eee98ba7d379e4790de9
1import sys 2import os 3import io 4import shutil 5import tempfile 6import io 7from hashlib import md5 8import errno 9 10import unittest 11import tarfile 12 13from test import support 14 15# Check for our compression modules. 16try: 17 import gzip 18 gzip.GzipFile 19except (ImportError, AttributeError): 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25 26def md5sum(data): 27 return md5(data).hexdigest() 28 29def path(path): 30 return support.findfile(path) 31 32TEMPDIR = os.path.join(tempfile.gettempdir(), "test_tarfile_tmp") 33tarname = path("testtar.tar") 34gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 35bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 36tmpname = os.path.join(TEMPDIR, "tmp.tar") 37 38md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 39md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 40 41 42class ReadTest(unittest.TestCase): 43 44 tarname = tarname 45 mode = "r:" 46 47 def setUp(self): 48 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 49 50 def tearDown(self): 51 self.tar.close() 52 53 54class UstarReadTest(ReadTest): 55 56 def test_fileobj_regular_file(self): 57 tarinfo = self.tar.getmember("ustar/regtype") 58 fobj = self.tar.extractfile(tarinfo) 59 data = fobj.read() 60 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 61 "regular file extraction failed") 62 63 def test_fileobj_readlines(self): 64 self.tar.extract("ustar/regtype", TEMPDIR) 65 tarinfo = self.tar.getmember("ustar/regtype") 66 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "r") 67 fobj2 = io.TextIOWrapper(self.tar.extractfile(tarinfo)) 68 69 lines1 = fobj1.readlines() 70 lines2 = fobj2.readlines() 71 self.assertTrue(lines1 == lines2, 72 "fileobj.readlines() failed") 73 self.assertTrue(len(lines2) == 114, 74 "fileobj.readlines() failed") 75 self.assertTrue(lines2[83] == \ 76 "I will gladly admit that Python is not the fastest running scripting language.\n", 77 "fileobj.readlines() failed") 78 79 def test_fileobj_iter(self): 80 self.tar.extract("ustar/regtype", TEMPDIR) 81 tarinfo = self.tar.getmember("ustar/regtype") 82 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") 83 fobj2 = self.tar.extractfile(tarinfo) 84 lines1 = fobj1.readlines() 85 lines2 = list(io.TextIOWrapper(fobj2)) 86 self.assertTrue(lines1 == lines2, 87 "fileobj.__iter__() failed") 88 89 def test_fileobj_seek(self): 90 self.tar.extract("ustar/regtype", TEMPDIR) 91 fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") 92 data = fobj.read() 93 fobj.close() 94 95 tarinfo = self.tar.getmember("ustar/regtype") 96 fobj = self.tar.extractfile(tarinfo) 97 98 text = fobj.read() 99 fobj.seek(0) 100 self.assertEqual(0, fobj.tell(), 101 "seek() to file's start failed") 102 fobj.seek(2048, 0) 103 self.assertEqual(2048, fobj.tell(), 104 "seek() to absolute position failed") 105 fobj.seek(-1024, 1) 106 self.assertEqual(1024, fobj.tell(), 107 "seek() to negative relative position failed") 108 fobj.seek(1024, 1) 109 self.assertEqual(2048, fobj.tell(), 110 "seek() to positive relative position failed") 111 s = fobj.read(10) 112 self.assertTrue(s == data[2048:2058], 113 "read() after seek failed") 114 fobj.seek(0, 2) 115 self.assertEqual(tarinfo.size, fobj.tell(), 116 "seek() to file's end failed") 117 self.assertTrue(fobj.read() == b"", 118 "read() at file's end did not return empty string") 119 fobj.seek(-tarinfo.size, 2) 120 self.assertEqual(0, fobj.tell(), 121 "relative seek() to file's end failed") 122 fobj.seek(512) 123 s1 = fobj.readlines() 124 fobj.seek(512) 125 s2 = fobj.readlines() 126 self.assertTrue(s1 == s2, 127 "readlines() after seek failed") 128 fobj.seek(0) 129 self.assertEqual(len(fobj.readline()), fobj.tell(), 130 "tell() after readline() failed") 131 fobj.seek(512) 132 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 133 "tell() after seek() and readline() failed") 134 fobj.seek(0) 135 line = fobj.readline() 136 self.assertEqual(fobj.read(), data[len(line):], 137 "read() after readline() failed") 138 fobj.close() 139 140 141class MiscReadTest(ReadTest): 142 143 def test_no_name_argument(self): 144 fobj = open(self.tarname, "rb") 145 tar = tarfile.open(fileobj=fobj, mode=self.mode) 146 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 147 148 def test_no_name_attribute(self): 149 data = open(self.tarname, "rb").read() 150 fobj = io.BytesIO(data) 151 self.assertRaises(AttributeError, getattr, fobj, "name") 152 tar = tarfile.open(fileobj=fobj, mode=self.mode) 153 self.assertEqual(tar.name, None) 154 155 def test_empty_name_attribute(self): 156 data = open(self.tarname, "rb").read() 157 fobj = io.BytesIO(data) 158 fobj.name = "" 159 tar = tarfile.open(fileobj=fobj, mode=self.mode) 160 self.assertEqual(tar.name, None) 161 162 def test_fileobj_with_offset(self): 163 # Skip the first member and store values from the second member 164 # of the testtar. 165 tar = tarfile.open(self.tarname, mode=self.mode) 166 tar.next() 167 t = tar.next() 168 name = t.name 169 offset = t.offset 170 data = tar.extractfile(t).read() 171 tar.close() 172 173 # Open the testtar and seek to the offset of the second member. 174 if self.mode.endswith(":gz"): 175 _open = gzip.GzipFile 176 elif self.mode.endswith(":bz2"): 177 _open = bz2.BZ2File 178 else: 179 _open = open 180 fobj = _open(self.tarname, "rb") 181 fobj.seek(offset) 182 183 # Test if the tarfile starts with the second member. 184 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 185 t = tar.next() 186 self.assertEqual(t.name, name) 187 # Read to the end of fileobj and test if seeking back to the 188 # beginning works. 189 tar.getmembers() 190 self.assertEqual(tar.extractfile(t).read(), data, 191 "seek back did not work") 192 tar.close() 193 194 def test_fail_comp(self): 195 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 196 if self.mode == "r:": 197 return 198 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 199 fobj = open(tarname, "rb") 200 self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode) 201 202 def test_v7_dirtype(self): 203 # Test old style dirtype member (bug #1336623): 204 # Old V7 tars create directory members using an AREGTYPE 205 # header with a "/" appended to the filename field. 206 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 207 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 208 "v7 dirtype failed") 209 210 def test_xstar_type(self): 211 # The xstar format stores extra atime and ctime fields inside the 212 # space reserved for the prefix field. The prefix field must be 213 # ignored in this case, otherwise it will mess up the name. 214 try: 215 self.tar.getmember("misc/regtype-xstar") 216 except KeyError: 217 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 218 219 def test_check_members(self): 220 for tarinfo in self.tar: 221 self.assertTrue(int(tarinfo.mtime) == 0o7606136617, 222 "wrong mtime for %s" % tarinfo.name) 223 if not tarinfo.name.startswith("ustar/"): 224 continue 225 self.assertTrue(tarinfo.uname == "tarfile", 226 "wrong uname for %s" % tarinfo.name) 227 228 def test_find_members(self): 229 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 230 "could not find all members") 231 232 def test_extract_hardlink(self): 233 # Test hardlink extraction (e.g. bug #857297). 234 tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") 235 236 tar.extract("ustar/regtype", TEMPDIR) 237 try: 238 tar.extract("ustar/lnktype", TEMPDIR) 239 except EnvironmentError as e: 240 if e.errno == errno.ENOENT: 241 self.fail("hardlink not extracted properly") 242 243 data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read() 244 self.assertEqual(md5sum(data), md5_regtype) 245 246 try: 247 tar.extract("ustar/symtype", TEMPDIR) 248 except EnvironmentError as e: 249 if e.errno == errno.ENOENT: 250 self.fail("symlink not extracted properly") 251 252 data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read() 253 self.assertEqual(md5sum(data), md5_regtype) 254 255 def test_extractall(self): 256 # Test if extractall() correctly restores directory permissions 257 # and times (see issue1735). 258 tar = tarfile.open(tarname, encoding="iso8859-1") 259 directories = [t for t in tar if t.isdir()] 260 tar.extractall(TEMPDIR, directories) 261 for tarinfo in directories: 262 path = os.path.join(TEMPDIR, tarinfo.name) 263 if sys.platform != "win32": 264 # Win32 has no support for fine grained permissions. 265 self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777) 266 self.assertEqual(tarinfo.mtime, os.path.getmtime(path)) 267 tar.close() 268 269 270class StreamReadTest(ReadTest): 271 272 mode="r|" 273 274 def test_fileobj_regular_file(self): 275 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 276 fobj = self.tar.extractfile(tarinfo) 277 data = fobj.read() 278 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 279 "regular file extraction failed") 280 281 def test_provoke_stream_error(self): 282 tarinfos = self.tar.getmembers() 283 f = self.tar.extractfile(tarinfos[0]) # read the first member 284 self.assertRaises(tarfile.StreamError, f.read) 285 286 def test_compare_members(self): 287 tar1 = tarfile.open(tarname, encoding="iso8859-1") 288 tar2 = self.tar 289 290 while True: 291 t1 = tar1.next() 292 t2 = tar2.next() 293 if t1 is None: 294 break 295 self.assertTrue(t2 is not None, "stream.next() failed.") 296 297 if t2.islnk() or t2.issym(): 298 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) 299 continue 300 301 v1 = tar1.extractfile(t1) 302 v2 = tar2.extractfile(t2) 303 if v1 is None: 304 continue 305 self.assertTrue(v2 is not None, "stream.extractfile() failed") 306 self.assertEqual(v1.read(), v2.read(), "stream extraction failed") 307 308 tar1.close() 309 310 311class DetectReadTest(unittest.TestCase): 312 313 def _testfunc_file(self, name, mode): 314 try: 315 tarfile.open(name, mode) 316 except tarfile.ReadError as e: 317 self.fail() 318 319 def _testfunc_fileobj(self, name, mode): 320 try: 321 tarfile.open(name, mode, fileobj=open(name, "rb")) 322 except tarfile.ReadError as e: 323 self.fail() 324 325 def _test_modes(self, testfunc): 326 testfunc(tarname, "r") 327 testfunc(tarname, "r:") 328 testfunc(tarname, "r:*") 329 testfunc(tarname, "r|") 330 testfunc(tarname, "r|*") 331 332 if gzip: 333 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") 334 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") 335 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") 336 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") 337 338 testfunc(gzipname, "r") 339 testfunc(gzipname, "r:*") 340 testfunc(gzipname, "r:gz") 341 testfunc(gzipname, "r|*") 342 testfunc(gzipname, "r|gz") 343 344 if bz2: 345 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") 346 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") 347 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") 348 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") 349 350 testfunc(bz2name, "r") 351 testfunc(bz2name, "r:*") 352 testfunc(bz2name, "r:bz2") 353 testfunc(bz2name, "r|*") 354 testfunc(bz2name, "r|bz2") 355 356 def test_detect_file(self): 357 self._test_modes(self._testfunc_file) 358 359 def test_detect_fileobj(self): 360 self._test_modes(self._testfunc_fileobj) 361 362 363class MemberReadTest(ReadTest): 364 365 def _test_member(self, tarinfo, chksum=None, **kwargs): 366 if chksum is not None: 367 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 368 "wrong md5sum for %s" % tarinfo.name) 369 370 kwargs["mtime"] = 0o7606136617 371 kwargs["uid"] = 1000 372 kwargs["gid"] = 100 373 if "old-v7" not in tarinfo.name: 374 # V7 tar can't handle alphabetic owners. 375 kwargs["uname"] = "tarfile" 376 kwargs["gname"] = "tarfile" 377 for k, v in kwargs.items(): 378 self.assertTrue(getattr(tarinfo, k) == v, 379 "wrong value in %s field of %s" % (k, tarinfo.name)) 380 381 def test_find_regtype(self): 382 tarinfo = self.tar.getmember("ustar/regtype") 383 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 384 385 def test_find_conttype(self): 386 tarinfo = self.tar.getmember("ustar/conttype") 387 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 388 389 def test_find_dirtype(self): 390 tarinfo = self.tar.getmember("ustar/dirtype") 391 self._test_member(tarinfo, size=0) 392 393 def test_find_dirtype_with_size(self): 394 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 395 self._test_member(tarinfo, size=255) 396 397 def test_find_lnktype(self): 398 tarinfo = self.tar.getmember("ustar/lnktype") 399 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 400 401 def test_find_symtype(self): 402 tarinfo = self.tar.getmember("ustar/symtype") 403 self._test_member(tarinfo, size=0, linkname="regtype") 404 405 def test_find_blktype(self): 406 tarinfo = self.tar.getmember("ustar/blktype") 407 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 408 409 def test_find_chrtype(self): 410 tarinfo = self.tar.getmember("ustar/chrtype") 411 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 412 413 def test_find_fifotype(self): 414 tarinfo = self.tar.getmember("ustar/fifotype") 415 self._test_member(tarinfo, size=0) 416 417 def test_find_sparse(self): 418 tarinfo = self.tar.getmember("ustar/sparse") 419 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 420 421 def test_find_umlauts(self): 422 tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 423 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 424 425 def test_find_ustar_longname(self): 426 name = "ustar/" + "12345/" * 39 + "1234567/longname" 427 self.assertTrue(name in self.tar.getnames()) 428 429 def test_find_regtype_oldv7(self): 430 tarinfo = self.tar.getmember("misc/regtype-old-v7") 431 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 432 433 def test_find_pax_umlauts(self): 434 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 435 tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 436 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 437 438 439class LongnameTest(ReadTest): 440 441 def test_read_longname(self): 442 # Test reading of longname (bug #1471427). 443 longname = self.subdir + "/" + "123/" * 125 + "longname" 444 try: 445 tarinfo = self.tar.getmember(longname) 446 except KeyError: 447 self.fail("longname not found") 448 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 449 450 def test_read_longlink(self): 451 longname = self.subdir + "/" + "123/" * 125 + "longname" 452 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 453 try: 454 tarinfo = self.tar.getmember(longlink) 455 except KeyError: 456 self.fail("longlink not found") 457 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 458 459 def test_truncated_longname(self): 460 longname = self.subdir + "/" + "123/" * 125 + "longname" 461 tarinfo = self.tar.getmember(longname) 462 offset = tarinfo.offset 463 self.tar.fileobj.seek(offset) 464 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 465 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) 466 467 def test_header_offset(self): 468 # Test if the start offset of the TarInfo object includes 469 # the preceding extended header. 470 longname = self.subdir + "/" + "123/" * 125 + "longname" 471 offset = self.tar.getmember(longname).offset 472 fobj = open(tarname, "rb") 473 fobj.seek(offset) 474 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict") 475 self.assertEqual(tarinfo.type, self.longnametype) 476 477 478class GNUReadTest(LongnameTest): 479 480 subdir = "gnu" 481 longnametype = tarfile.GNUTYPE_LONGNAME 482 483 def test_sparse_file(self): 484 tarinfo1 = self.tar.getmember("ustar/sparse") 485 fobj1 = self.tar.extractfile(tarinfo1) 486 tarinfo2 = self.tar.getmember("gnu/sparse") 487 fobj2 = self.tar.extractfile(tarinfo2) 488 self.assertEqual(fobj1.read(), fobj2.read(), 489 "sparse file extraction failed") 490 491 492class PaxReadTest(LongnameTest): 493 494 subdir = "pax" 495 longnametype = tarfile.XHDTYPE 496 497 def test_pax_global_headers(self): 498 tar = tarfile.open(tarname, encoding="iso8859-1") 499 500 tarinfo = tar.getmember("pax/regtype1") 501 self.assertEqual(tarinfo.uname, "foo") 502 self.assertEqual(tarinfo.gname, "bar") 503 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 504 505 tarinfo = tar.getmember("pax/regtype2") 506 self.assertEqual(tarinfo.uname, "") 507 self.assertEqual(tarinfo.gname, "bar") 508 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 509 510 tarinfo = tar.getmember("pax/regtype3") 511 self.assertEqual(tarinfo.uname, "tarfile") 512 self.assertEqual(tarinfo.gname, "tarfile") 513 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 514 515 def test_pax_number_fields(self): 516 # All following number fields are read from the pax header. 517 tar = tarfile.open(tarname, encoding="iso8859-1") 518 tarinfo = tar.getmember("pax/regtype4") 519 self.assertEqual(tarinfo.size, 7011) 520 self.assertEqual(tarinfo.uid, 123) 521 self.assertEqual(tarinfo.gid, 123) 522 self.assertEqual(tarinfo.mtime, 1041808783.0) 523 self.assertEqual(type(tarinfo.mtime), float) 524 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 525 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 526 527 528class WriteTestBase(unittest.TestCase): 529 # Put all write tests in here that are supposed to be tested 530 # in all possible mode combinations. 531 532 def test_fileobj_no_close(self): 533 fobj = io.BytesIO() 534 tar = tarfile.open(fileobj=fobj, mode=self.mode) 535 tar.addfile(tarfile.TarInfo("foo")) 536 tar.close() 537 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 538 539 540class WriteTest(WriteTestBase): 541 542 mode = "w:" 543 544 def test_100_char_name(self): 545 # The name field in a tar header stores strings of at most 100 chars. 546 # If a string is shorter than 100 chars it has to be padded with '\0', 547 # which implies that a string of exactly 100 chars is stored without 548 # a trailing '\0'. 549 name = "0123456789" * 10 550 tar = tarfile.open(tmpname, self.mode) 551 t = tarfile.TarInfo(name) 552 tar.addfile(t) 553 tar.close() 554 555 tar = tarfile.open(tmpname) 556 self.assertTrue(tar.getnames()[0] == name, 557 "failed to store 100 char filename") 558 tar.close() 559 560 def test_tar_size(self): 561 # Test for bug #1013882. 562 tar = tarfile.open(tmpname, self.mode) 563 path = os.path.join(TEMPDIR, "file") 564 fobj = open(path, "wb") 565 fobj.write(b"aaa") 566 fobj.close() 567 tar.add(path) 568 tar.close() 569 self.assertTrue(os.path.getsize(tmpname) > 0, 570 "tarfile is empty") 571 572 # The test_*_size tests test for bug #1167128. 573 def test_file_size(self): 574 tar = tarfile.open(tmpname, self.mode) 575 576 path = os.path.join(TEMPDIR, "file") 577 fobj = open(path, "wb") 578 fobj.close() 579 tarinfo = tar.gettarinfo(path) 580 self.assertEqual(tarinfo.size, 0) 581 582 fobj = open(path, "wb") 583 fobj.write(b"aaa") 584 fobj.close() 585 tarinfo = tar.gettarinfo(path) 586 self.assertEqual(tarinfo.size, 3) 587 588 tar.close() 589 590 def test_directory_size(self): 591 path = os.path.join(TEMPDIR, "directory") 592 os.mkdir(path) 593 try: 594 tar = tarfile.open(tmpname, self.mode) 595 tarinfo = tar.gettarinfo(path) 596 self.assertEqual(tarinfo.size, 0) 597 finally: 598 os.rmdir(path) 599 600 def test_link_size(self): 601 if hasattr(os, "link"): 602 link = os.path.join(TEMPDIR, "link") 603 target = os.path.join(TEMPDIR, "link_target") 604 open(target, "wb").close() 605 os.link(target, link) 606 try: 607 tar = tarfile.open(tmpname, self.mode) 608 tarinfo = tar.gettarinfo(link) 609 self.assertEqual(tarinfo.size, 0) 610 finally: 611 os.remove(target) 612 os.remove(link) 613 614 def test_symlink_size(self): 615 if hasattr(os, "symlink"): 616 path = os.path.join(TEMPDIR, "symlink") 617 os.symlink("link_target", path) 618 try: 619 tar = tarfile.open(tmpname, self.mode) 620 tarinfo = tar.gettarinfo(path) 621 self.assertEqual(tarinfo.size, 0) 622 finally: 623 os.remove(path) 624 625 def test_add_self(self): 626 # Test for #1257255. 627 dstname = os.path.abspath(tmpname) 628 629 tar = tarfile.open(tmpname, self.mode) 630 self.assertTrue(tar.name == dstname, "archive name must be absolute") 631 632 tar.add(dstname) 633 self.assertTrue(tar.getnames() == [], "added the archive to itself") 634 635 cwd = os.getcwd() 636 os.chdir(TEMPDIR) 637 tar.add(dstname) 638 os.chdir(cwd) 639 self.assertTrue(tar.getnames() == [], "added the archive to itself") 640 641 def test_exclude(self): 642 tempdir = os.path.join(TEMPDIR, "exclude") 643 os.mkdir(tempdir) 644 try: 645 for name in ("foo", "bar", "baz"): 646 name = os.path.join(tempdir, name) 647 open(name, "wb").close() 648 649 def exclude(name): 650 return os.path.isfile(name) 651 652 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 653 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 654 tar.close() 655 656 tar = tarfile.open(tmpname, "r") 657 self.assertEqual(len(tar.getmembers()), 1) 658 self.assertEqual(tar.getnames()[0], "empty_dir") 659 finally: 660 shutil.rmtree(tempdir) 661 662 663class StreamWriteTest(WriteTestBase): 664 665 mode = "w|" 666 667 def test_stream_padding(self): 668 # Test for bug #1543303. 669 tar = tarfile.open(tmpname, self.mode) 670 tar.close() 671 672 if self.mode.endswith("gz"): 673 fobj = gzip.GzipFile(tmpname) 674 data = fobj.read() 675 fobj.close() 676 elif self.mode.endswith("bz2"): 677 dec = bz2.BZ2Decompressor() 678 data = open(tmpname, "rb").read() 679 data = dec.decompress(data) 680 self.assertTrue(len(dec.unused_data) == 0, 681 "found trailing data") 682 else: 683 fobj = open(tmpname, "rb") 684 data = fobj.read() 685 fobj.close() 686 687 self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, 688 "incorrect zero padding") 689 690 691class GNUWriteTest(unittest.TestCase): 692 # This testcase checks for correct creation of GNU Longname 693 # and Longlink extended headers (cp. bug #812325). 694 695 def _length(self, s): 696 blocks, remainder = divmod(len(s) + 1, 512) 697 if remainder: 698 blocks += 1 699 return blocks * 512 700 701 def _calc_size(self, name, link=None): 702 # Initial tar header 703 count = 512 704 705 if len(name) > tarfile.LENGTH_NAME: 706 # GNU longname extended header + longname 707 count += 512 708 count += self._length(name) 709 if link is not None and len(link) > tarfile.LENGTH_LINK: 710 # GNU longlink extended header + longlink 711 count += 512 712 count += self._length(link) 713 return count 714 715 def _test(self, name, link=None): 716 tarinfo = tarfile.TarInfo(name) 717 if link: 718 tarinfo.linkname = link 719 tarinfo.type = tarfile.LNKTYPE 720 721 tar = tarfile.open(tmpname, "w") 722 tar.format = tarfile.GNU_FORMAT 723 tar.addfile(tarinfo) 724 725 v1 = self._calc_size(name, link) 726 v2 = tar.offset 727 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 728 729 tar.close() 730 731 tar = tarfile.open(tmpname) 732 member = tar.next() 733 self.assertFalse(member is None, "unable to read longname member") 734 self.assertTrue(tarinfo.name == member.name and \ 735 tarinfo.linkname == member.linkname, \ 736 "unable to read longname member") 737 738 def test_longname_1023(self): 739 self._test(("longnam/" * 127) + "longnam") 740 741 def test_longname_1024(self): 742 self._test(("longnam/" * 127) + "longname") 743 744 def test_longname_1025(self): 745 self._test(("longnam/" * 127) + "longname_") 746 747 def test_longlink_1023(self): 748 self._test("name", ("longlnk/" * 127) + "longlnk") 749 750 def test_longlink_1024(self): 751 self._test("name", ("longlnk/" * 127) + "longlink") 752 753 def test_longlink_1025(self): 754 self._test("name", ("longlnk/" * 127) + "longlink_") 755 756 def test_longnamelink_1023(self): 757 self._test(("longnam/" * 127) + "longnam", 758 ("longlnk/" * 127) + "longlnk") 759 760 def test_longnamelink_1024(self): 761 self._test(("longnam/" * 127) + "longname", 762 ("longlnk/" * 127) + "longlink") 763 764 def test_longnamelink_1025(self): 765 self._test(("longnam/" * 127) + "longname_", 766 ("longlnk/" * 127) + "longlink_") 767 768 769class HardlinkTest(unittest.TestCase): 770 # Test the creation of LNKTYPE (hardlink) members in an archive. 771 772 def setUp(self): 773 self.foo = os.path.join(TEMPDIR, "foo") 774 self.bar = os.path.join(TEMPDIR, "bar") 775 776 fobj = open(self.foo, "wb") 777 fobj.write(b"foo") 778 fobj.close() 779 780 os.link(self.foo, self.bar) 781 782 self.tar = tarfile.open(tmpname, "w") 783 self.tar.add(self.foo) 784 785 def tearDown(self): 786 self.tar.close() 787 os.remove(self.foo) 788 os.remove(self.bar) 789 790 def test_add_twice(self): 791 # The same name will be added as a REGTYPE every 792 # time regardless of st_nlink. 793 tarinfo = self.tar.gettarinfo(self.foo) 794 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 795 "add file as regular failed") 796 797 def test_add_hardlink(self): 798 tarinfo = self.tar.gettarinfo(self.bar) 799 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 800 "add file as hardlink failed") 801 802 def test_dereference_hardlink(self): 803 self.tar.dereference = True 804 tarinfo = self.tar.gettarinfo(self.bar) 805 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 806 "dereferencing hardlink failed") 807 808 809class PaxWriteTest(GNUWriteTest): 810 811 def _test(self, name, link=None): 812 # See GNUWriteTest. 813 tarinfo = tarfile.TarInfo(name) 814 if link: 815 tarinfo.linkname = link 816 tarinfo.type = tarfile.LNKTYPE 817 818 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 819 tar.addfile(tarinfo) 820 tar.close() 821 822 tar = tarfile.open(tmpname) 823 if link: 824 l = tar.getmembers()[0].linkname 825 self.assertTrue(link == l, "PAX longlink creation failed") 826 else: 827 n = tar.getmembers()[0].name 828 self.assertTrue(name == n, "PAX longname creation failed") 829 830 def test_pax_global_header(self): 831 pax_headers = { 832 "foo": "bar", 833 "uid": "0", 834 "mtime": "1.23", 835 "test": "\xe4\xf6\xfc", 836 "\xe4\xf6\xfc": "test"} 837 838 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, \ 839 pax_headers=pax_headers) 840 tar.addfile(tarfile.TarInfo("test")) 841 tar.close() 842 843 # Test if the global header was written correctly. 844 tar = tarfile.open(tmpname, encoding="iso8859-1") 845 self.assertEqual(tar.pax_headers, pax_headers) 846 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 847 848 # Test if all the fields are strings. 849 for key, val in tar.pax_headers.items(): 850 self.assertTrue(type(key) is not bytes) 851 self.assertTrue(type(val) is not bytes) 852 if key in tarfile.PAX_NUMBER_FIELDS: 853 try: 854 tarfile.PAX_NUMBER_FIELDS[key](val) 855 except (TypeError, ValueError): 856 self.fail("unable to convert pax header field") 857 858 def test_pax_extended_header(self): 859 # The fields from the pax header have priority over the 860 # TarInfo. 861 pax_headers = {"path": "foo", "uid": "123"} 862 863 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") 864 t = tarfile.TarInfo() 865 t.name = "\xe4\xf6\xfc" # non-ASCII 866 t.uid = 8**8 # too large 867 t.pax_headers = pax_headers 868 tar.addfile(t) 869 tar.close() 870 871 tar = tarfile.open(tmpname, encoding="iso8859-1") 872 t = tar.getmembers()[0] 873 self.assertEqual(t.pax_headers, pax_headers) 874 self.assertEqual(t.name, "foo") 875 self.assertEqual(t.uid, 123) 876 877 878class UstarUnicodeTest(unittest.TestCase): 879 880 format = tarfile.USTAR_FORMAT 881 882 def test_iso8859_1_filename(self): 883 self._test_unicode_filename("iso8859-1") 884 885 def test_utf7_filename(self): 886 self._test_unicode_filename("utf7") 887 888 def test_utf8_filename(self): 889 self._test_unicode_filename("utf8") 890 891 def _test_unicode_filename(self, encoding): 892 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") 893 name = "\xe4\xf6\xfc" 894 tar.addfile(tarfile.TarInfo(name)) 895 tar.close() 896 897 tar = tarfile.open(tmpname, encoding=encoding) 898 self.assertEqual(tar.getmembers()[0].name, name) 899 tar.close() 900 901 def test_unicode_filename_error(self): 902 if self.format == tarfile.PAX_FORMAT: 903 # PAX_FORMAT ignores encoding in write mode. 904 return 905 906 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") 907 tarinfo = tarfile.TarInfo() 908 909 tarinfo.name = "\xe4\xf6\xfc" 910 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 911 912 tarinfo.name = "foo" 913 tarinfo.uname = "\xe4\xf6\xfc" 914 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 915 916 def test_unicode_argument(self): 917 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 918 for t in tar: 919 self.assertTrue(type(t.name) is str) 920 self.assertTrue(type(t.linkname) is str) 921 self.assertTrue(type(t.uname) is str) 922 self.assertTrue(type(t.gname) is str) 923 tar.close() 924 925 def test_uname_unicode(self): 926 t = tarfile.TarInfo("foo") 927 t.uname = "\xe4\xf6\xfc" 928 t.gname = "\xe4\xf6\xfc" 929 930 tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") 931 tar.addfile(t) 932 tar.close() 933 934 tar = tarfile.open(tmpname, encoding="iso8859-1") 935 t = tar.getmember("foo") 936 self.assertEqual(t.uname, "\xe4\xf6\xfc") 937 self.assertEqual(t.gname, "\xe4\xf6\xfc") 938 939 if self.format != tarfile.PAX_FORMAT: 940 tar = tarfile.open(tmpname, encoding="ascii") 941 t = tar.getmember("foo") 942 self.assertEqual(t.uname, "\ufffd\ufffd\ufffd") 943 self.assertEqual(t.gname, "\ufffd\ufffd\ufffd") 944 945 946class GNUUnicodeTest(UstarUnicodeTest): 947 948 format = tarfile.GNU_FORMAT 949 950 951class PAXUnicodeTest(UstarUnicodeTest): 952 953 format = tarfile.PAX_FORMAT 954 955 956class AppendTest(unittest.TestCase): 957 # Test append mode (cp. patch #1652681). 958 959 def setUp(self): 960 self.tarname = tmpname 961 if os.path.exists(self.tarname): 962 os.remove(self.tarname) 963 964 def _add_testfile(self, fileobj=None): 965 tar = tarfile.open(self.tarname, "a", fileobj=fileobj) 966 tar.addfile(tarfile.TarInfo("bar")) 967 tar.close() 968 969 def _create_testtar(self, mode="w:"): 970 src = tarfile.open(tarname, encoding="iso8859-1") 971 t = src.getmember("ustar/regtype") 972 t.name = "foo" 973 f = src.extractfile(t) 974 tar = tarfile.open(self.tarname, mode) 975 tar.addfile(t, f) 976 tar.close() 977 978 def _test(self, names=["bar"], fileobj=None): 979 tar = tarfile.open(self.tarname, fileobj=fileobj) 980 self.assertEqual(tar.getnames(), names) 981 982 def test_non_existing(self): 983 self._add_testfile() 984 self._test() 985 986 def test_empty(self): 987 open(self.tarname, "w").close() 988 self._add_testfile() 989 self._test() 990 991 def test_empty_fileobj(self): 992 fobj = io.BytesIO() 993 self._add_testfile(fobj) 994 fobj.seek(0) 995 self._test(fileobj=fobj) 996 997 def test_fileobj(self): 998 self._create_testtar() 999 data = open(self.tarname, "rb").read() 1000 fobj = io.BytesIO(data) 1001 self._add_testfile(fobj) 1002 fobj.seek(0) 1003 self._test(names=["foo", "bar"], fileobj=fobj) 1004 1005 def test_existing(self): 1006 self._create_testtar() 1007 self._add_testfile() 1008 self._test(names=["foo", "bar"]) 1009 1010 def test_append_gz(self): 1011 if gzip is None: 1012 return 1013 self._create_testtar("w:gz") 1014 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1015 1016 def test_append_bz2(self): 1017 if bz2 is None: 1018 return 1019 self._create_testtar("w:bz2") 1020 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1021 1022 1023class LimitsTest(unittest.TestCase): 1024 1025 def test_ustar_limits(self): 1026 # 100 char name 1027 tarinfo = tarfile.TarInfo("0123456789" * 10) 1028 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1029 1030 # 101 char name that cannot be stored 1031 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1032 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1033 1034 # 256 char name with a slash at pos 156 1035 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1036 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1037 1038 # 256 char name that cannot be stored 1039 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1040 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1041 1042 # 512 char name 1043 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1044 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1045 1046 # 512 char linkname 1047 tarinfo = tarfile.TarInfo("longlink") 1048 tarinfo.linkname = "123/" * 126 + "longname" 1049 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1050 1051 # uid > 8 digits 1052 tarinfo = tarfile.TarInfo("name") 1053 tarinfo.uid = 0o10000000 1054 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1055 1056 def test_gnu_limits(self): 1057 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1058 tarinfo.tobuf(tarfile.GNU_FORMAT) 1059 1060 tarinfo = tarfile.TarInfo("longlink") 1061 tarinfo.linkname = "123/" * 126 + "longname" 1062 tarinfo.tobuf(tarfile.GNU_FORMAT) 1063 1064 # uid >= 256 ** 7 1065 tarinfo = tarfile.TarInfo("name") 1066 tarinfo.uid = 0o4000000000000000000 1067 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1068 1069 def test_pax_limits(self): 1070 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1071 tarinfo.tobuf(tarfile.PAX_FORMAT) 1072 1073 tarinfo = tarfile.TarInfo("longlink") 1074 tarinfo.linkname = "123/" * 126 + "longname" 1075 tarinfo.tobuf(tarfile.PAX_FORMAT) 1076 1077 tarinfo = tarfile.TarInfo("name") 1078 tarinfo.uid = 0o4000000000000000000 1079 tarinfo.tobuf(tarfile.PAX_FORMAT) 1080 1081 1082class MiscTest(unittest.TestCase): 1083 1084 def test_char_fields(self): 1085 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0") 1086 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo") 1087 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo") 1088 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo") 1089 1090 def test_number_fields(self): 1091 self.assertEqual(tarfile.itn(1), b"0000001\x00") 1092 self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff") 1093 1094 1095class GzipMiscReadTest(MiscReadTest): 1096 tarname = gzipname 1097 mode = "r:gz" 1098class GzipUstarReadTest(UstarReadTest): 1099 tarname = gzipname 1100 mode = "r:gz" 1101class GzipStreamReadTest(StreamReadTest): 1102 tarname = gzipname 1103 mode = "r|gz" 1104class GzipWriteTest(WriteTest): 1105 mode = "w:gz" 1106class GzipStreamWriteTest(StreamWriteTest): 1107 mode = "w|gz" 1108 1109 1110class Bz2MiscReadTest(MiscReadTest): 1111 tarname = bz2name 1112 mode = "r:bz2" 1113class Bz2UstarReadTest(UstarReadTest): 1114 tarname = bz2name 1115 mode = "r:bz2" 1116class Bz2StreamReadTest(StreamReadTest): 1117 tarname = bz2name 1118 mode = "r|bz2" 1119class Bz2WriteTest(WriteTest): 1120 mode = "w:bz2" 1121class Bz2StreamWriteTest(StreamWriteTest): 1122 mode = "w|bz2" 1123 1124class Bz2PartialReadTest(unittest.TestCase): 1125 # Issue5068: The _BZ2Proxy.read() method loops forever 1126 # on an empty or partial bzipped file. 1127 1128 def _test_partial_input(self, mode): 1129 class MyBytesIO(io.BytesIO): 1130 hit_eof = False 1131 def read(self, n): 1132 if self.hit_eof: 1133 raise AssertionError("infinite loop detected in tarfile.open()") 1134 self.hit_eof = self.tell() == len(self.getvalue()) 1135 return super(MyBytesIO, self).read(n) 1136 1137 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1138 for x in range(len(data) + 1): 1139 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 1140 1141 def test_partial_input(self): 1142 self._test_partial_input("r") 1143 1144 def test_partial_input_bz2(self): 1145 self._test_partial_input("r:bz2") 1146 1147 1148def test_main(): 1149 if not os.path.exists(TEMPDIR): 1150 os.mkdir(TEMPDIR) 1151 1152 tests = [ 1153 UstarReadTest, 1154 MiscReadTest, 1155 StreamReadTest, 1156 DetectReadTest, 1157 MemberReadTest, 1158 GNUReadTest, 1159 PaxReadTest, 1160 WriteTest, 1161 StreamWriteTest, 1162 GNUWriteTest, 1163 PaxWriteTest, 1164 UstarUnicodeTest, 1165 GNUUnicodeTest, 1166 PAXUnicodeTest, 1167 AppendTest, 1168 LimitsTest, 1169 MiscTest, 1170 ] 1171 1172 if hasattr(os, "link"): 1173 tests.append(HardlinkTest) 1174 1175 fobj = open(tarname, "rb") 1176 data = fobj.read() 1177 fobj.close() 1178 1179 if gzip: 1180 # Create testtar.tar.gz and add gzip-specific tests. 1181 tar = gzip.open(gzipname, "wb") 1182 tar.write(data) 1183 tar.close() 1184 1185 tests += [ 1186 GzipMiscReadTest, 1187 GzipUstarReadTest, 1188 GzipStreamReadTest, 1189 GzipWriteTest, 1190 GzipStreamWriteTest, 1191 ] 1192 1193 if bz2: 1194 # Create testtar.tar.bz2 and add bz2-specific tests. 1195 tar = bz2.BZ2File(bz2name, "wb") 1196 tar.write(data) 1197 tar.close() 1198 1199 tests += [ 1200 Bz2MiscReadTest, 1201 Bz2UstarReadTest, 1202 Bz2StreamReadTest, 1203 Bz2WriteTest, 1204 Bz2StreamWriteTest, 1205 Bz2PartialReadTest, 1206 ] 1207 1208 try: 1209 support.run_unittest(*tests) 1210 finally: 1211 if os.path.exists(TEMPDIR): 1212 shutil.rmtree(TEMPDIR) 1213 1214if __name__ == "__main__": 1215 test_main() 1216