test_tarfile.py revision ad3e27ae4c2c21d422603eab36c6b48e6e09f732
172d2dab6058467036df73a5f668036a519043e5bChandler Carruthimport sys 272d2dab6058467036df73a5f668036a519043e5bChandler Carruthimport os 3651f13cea278ec967336033dd032faef0e9fc2ecStephen Hinesimport io 4faaec2234f21bf175dd05c723072202cdf39cb2fAnders Carlssonfrom hashlib import md5 5faaec2234f21bf175dd05c723072202cdf39cb2fAnders Carlssonfrom contextlib import contextmanager 6faaec2234f21bf175dd05c723072202cdf39cb2fAnders Carlsson 7faaec2234f21bf175dd05c723072202cdf39cb2fAnders Carlssonimport unittest 8import unittest.mock 9import tarfile 10 11from test import support 12from test.support import script_helper 13 14# Check for our compression modules. 15try: 16 import gzip 17except ImportError: 18 gzip = None 19try: 20 import bz2 21except ImportError: 22 bz2 = None 23try: 24 import lzma 25except ImportError: 26 lzma = None 27 28def md5sum(data): 29 return md5(data).hexdigest() 30 31TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 32tarextdir = TEMPDIR + '-extract-test' 33tarname = support.findfile("testtar.tar") 34gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 35bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 36xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 37tmpname = os.path.join(TEMPDIR, "tmp.tar") 38dotlessname = os.path.join(TEMPDIR, "testtar") 39 40md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 41md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 42 43 44class TarTest: 45 tarname = tarname 46 suffix = '' 47 open = io.FileIO 48 taropen = tarfile.TarFile.taropen 49 50 @property 51 def mode(self): 52 return self.prefix + self.suffix 53 54@support.requires_gzip 55class GzipTest: 56 tarname = gzipname 57 suffix = 'gz' 58 open = gzip.GzipFile if gzip else None 59 taropen = tarfile.TarFile.gzopen 60 61@support.requires_bz2 62class Bz2Test: 63 tarname = bz2name 64 suffix = 'bz2' 65 open = bz2.BZ2File if bz2 else None 66 taropen = tarfile.TarFile.bz2open 67 68@support.requires_lzma 69class LzmaTest: 70 tarname = xzname 71 suffix = 'xz' 72 open = lzma.LZMAFile if lzma else None 73 taropen = tarfile.TarFile.xzopen 74 75 76class ReadTest(TarTest): 77 78 prefix = "r:" 79 80 def setUp(self): 81 self.tar = tarfile.open(self.tarname, mode=self.mode, 82 encoding="iso8859-1") 83 84 def tearDown(self): 85 self.tar.close() 86 87 88class UstarReadTest(ReadTest, unittest.TestCase): 89 90 def test_fileobj_regular_file(self): 91 tarinfo = self.tar.getmember("ustar/regtype") 92 with self.tar.extractfile(tarinfo) as fobj: 93 data = fobj.read() 94 self.assertEqual(len(data), tarinfo.size, 95 "regular file extraction failed") 96 self.assertEqual(md5sum(data), md5_regtype, 97 "regular file extraction failed") 98 99 def test_fileobj_readlines(self): 100 self.tar.extract("ustar/regtype", TEMPDIR) 101 tarinfo = self.tar.getmember("ustar/regtype") 102 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 103 lines1 = fobj1.readlines() 104 105 with self.tar.extractfile(tarinfo) as fobj: 106 fobj2 = io.TextIOWrapper(fobj) 107 lines2 = fobj2.readlines() 108 self.assertEqual(lines1, lines2, 109 "fileobj.readlines() failed") 110 self.assertEqual(len(lines2), 114, 111 "fileobj.readlines() failed") 112 self.assertEqual(lines2[83], 113 "I will gladly admit that Python is not the fastest " 114 "running scripting language.\n", 115 "fileobj.readlines() failed") 116 117 def test_fileobj_iter(self): 118 self.tar.extract("ustar/regtype", TEMPDIR) 119 tarinfo = self.tar.getmember("ustar/regtype") 120 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 121 lines1 = fobj1.readlines() 122 with self.tar.extractfile(tarinfo) as fobj2: 123 lines2 = list(io.TextIOWrapper(fobj2)) 124 self.assertEqual(lines1, lines2, 125 "fileobj.__iter__() failed") 126 127 def test_fileobj_seek(self): 128 self.tar.extract("ustar/regtype", TEMPDIR) 129 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 130 data = fobj.read() 131 132 tarinfo = self.tar.getmember("ustar/regtype") 133 fobj = self.tar.extractfile(tarinfo) 134 135 text = fobj.read() 136 fobj.seek(0) 137 self.assertEqual(0, fobj.tell(), 138 "seek() to file's start failed") 139 fobj.seek(2048, 0) 140 self.assertEqual(2048, fobj.tell(), 141 "seek() to absolute position failed") 142 fobj.seek(-1024, 1) 143 self.assertEqual(1024, fobj.tell(), 144 "seek() to negative relative position failed") 145 fobj.seek(1024, 1) 146 self.assertEqual(2048, fobj.tell(), 147 "seek() to positive relative position failed") 148 s = fobj.read(10) 149 self.assertEqual(s, data[2048:2058], 150 "read() after seek failed") 151 fobj.seek(0, 2) 152 self.assertEqual(tarinfo.size, fobj.tell(), 153 "seek() to file's end failed") 154 self.assertEqual(fobj.read(), b"", 155 "read() at file's end did not return empty string") 156 fobj.seek(-tarinfo.size, 2) 157 self.assertEqual(0, fobj.tell(), 158 "relative seek() to file's end failed") 159 fobj.seek(512) 160 s1 = fobj.readlines() 161 fobj.seek(512) 162 s2 = fobj.readlines() 163 self.assertEqual(s1, s2, 164 "readlines() after seek failed") 165 fobj.seek(0) 166 self.assertEqual(len(fobj.readline()), fobj.tell(), 167 "tell() after readline() failed") 168 fobj.seek(512) 169 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 170 "tell() after seek() and readline() failed") 171 fobj.seek(0) 172 line = fobj.readline() 173 self.assertEqual(fobj.read(), data[len(line):], 174 "read() after readline() failed") 175 fobj.close() 176 177 def test_fileobj_text(self): 178 with self.tar.extractfile("ustar/regtype") as fobj: 179 fobj = io.TextIOWrapper(fobj) 180 data = fobj.read().encode("iso8859-1") 181 self.assertEqual(md5sum(data), md5_regtype) 182 try: 183 fobj.seek(100) 184 except AttributeError: 185 # Issue #13815: seek() complained about a missing 186 # flush() method. 187 self.fail("seeking failed in text mode") 188 189 # Test if symbolic and hard links are resolved by extractfile(). The 190 # test link members each point to a regular member whose data is 191 # supposed to be exported. 192 def _test_fileobj_link(self, lnktype, regtype): 193 with self.tar.extractfile(lnktype) as a, \ 194 self.tar.extractfile(regtype) as b: 195 self.assertEqual(a.name, b.name) 196 197 def test_fileobj_link1(self): 198 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 199 200 def test_fileobj_link2(self): 201 self._test_fileobj_link("./ustar/linktest2/lnktype", 202 "ustar/linktest1/regtype") 203 204 def test_fileobj_symlink1(self): 205 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 206 207 def test_fileobj_symlink2(self): 208 self._test_fileobj_link("./ustar/linktest2/symtype", 209 "ustar/linktest1/regtype") 210 211 def test_issue14160(self): 212 self._test_fileobj_link("symtype2", "ustar/regtype") 213 214class GzipUstarReadTest(GzipTest, UstarReadTest): 215 pass 216 217class Bz2UstarReadTest(Bz2Test, UstarReadTest): 218 pass 219 220class LzmaUstarReadTest(LzmaTest, UstarReadTest): 221 pass 222 223 224class ListTest(ReadTest, unittest.TestCase): 225 226 # Override setUp to use default encoding (UTF-8) 227 def setUp(self): 228 self.tar = tarfile.open(self.tarname, mode=self.mode) 229 230 def test_list(self): 231 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 232 with support.swap_attr(sys, 'stdout', tio): 233 self.tar.list(verbose=False) 234 out = tio.detach().getvalue() 235 self.assertIn(b'ustar/conttype', out) 236 self.assertIn(b'ustar/regtype', out) 237 self.assertIn(b'ustar/lnktype', out) 238 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 239 self.assertIn(b'./ustar/linktest2/symtype', out) 240 self.assertIn(b'./ustar/linktest2/lnktype', out) 241 # Make sure it puts trailing slash for directory 242 self.assertIn(b'ustar/dirtype/', out) 243 self.assertIn(b'ustar/dirtype-with-size/', out) 244 # Make sure it is able to print unencodable characters 245 def conv(b): 246 s = b.decode(self.tar.encoding, 'surrogateescape') 247 return s.encode('ascii', 'backslashreplace') 248 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 249 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 250 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 251 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 252 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 253 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 254 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 255 # Make sure it prints files separated by one newline without any 256 # 'ls -l'-like accessories if verbose flag is not being used 257 # ... 258 # ustar/conttype 259 # ustar/regtype 260 # ... 261 self.assertRegex(out, br'ustar/conttype ?\r?\n' 262 br'ustar/regtype ?\r?\n') 263 # Make sure it does not print the source of link without verbose flag 264 self.assertNotIn(b'link to', out) 265 self.assertNotIn(b'->', out) 266 267 def test_list_verbose(self): 268 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 269 with support.swap_attr(sys, 'stdout', tio): 270 self.tar.list(verbose=True) 271 out = tio.detach().getvalue() 272 # Make sure it prints files separated by one newline with 'ls -l'-like 273 # accessories if verbose flag is being used 274 # ... 275 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 276 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 277 # ... 278 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 279 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 280 br'ustar/\w+type ?\r?\n') * 2) 281 # Make sure it prints the source of link with verbose flag 282 self.assertIn(b'ustar/symtype -> regtype', out) 283 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 284 self.assertIn(b'./ustar/linktest2/lnktype link to ' 285 b'./ustar/linktest1/regtype', out) 286 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 287 (b'/123' * 125) + b'/longname', out) 288 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 289 (b'/123' * 125) + b'/longname', out) 290 291 def test_list_members(self): 292 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 293 def members(tar): 294 for tarinfo in tar.getmembers(): 295 if 'reg' in tarinfo.name: 296 yield tarinfo 297 with support.swap_attr(sys, 'stdout', tio): 298 self.tar.list(verbose=False, members=members(self.tar)) 299 out = tio.detach().getvalue() 300 self.assertIn(b'ustar/regtype', out) 301 self.assertNotIn(b'ustar/conttype', out) 302 303 304class GzipListTest(GzipTest, ListTest): 305 pass 306 307 308class Bz2ListTest(Bz2Test, ListTest): 309 pass 310 311 312class LzmaListTest(LzmaTest, ListTest): 313 pass 314 315 316class CommonReadTest(ReadTest): 317 318 def test_empty_tarfile(self): 319 # Test for issue6123: Allow opening empty archives. 320 # This test checks if tarfile.open() is able to open an empty tar 321 # archive successfully. Note that an empty tar archive is not the 322 # same as an empty file! 323 with tarfile.open(tmpname, self.mode.replace("r", "w")): 324 pass 325 try: 326 tar = tarfile.open(tmpname, self.mode) 327 tar.getnames() 328 except tarfile.ReadError: 329 self.fail("tarfile.open() failed on empty archive") 330 else: 331 self.assertListEqual(tar.getmembers(), []) 332 finally: 333 tar.close() 334 335 def test_non_existent_tarfile(self): 336 # Test for issue11513: prevent non-existent gzipped tarfiles raising 337 # multiple exceptions. 338 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 339 tarfile.open("xxx", self.mode) 340 341 def test_null_tarfile(self): 342 # Test for issue6123: Allow opening empty archives. 343 # This test guarantees that tarfile.open() does not treat an empty 344 # file as an empty tar archive. 345 with open(tmpname, "wb"): 346 pass 347 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 348 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 349 350 def test_ignore_zeros(self): 351 # Test TarFile's ignore_zeros option. 352 for char in (b'\0', b'a'): 353 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 354 # are ignored correctly. 355 with self.open(tmpname, "w") as fobj: 356 fobj.write(char * 1024) 357 fobj.write(tarfile.TarInfo("foo").tobuf()) 358 359 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 360 try: 361 self.assertListEqual(tar.getnames(), ["foo"], 362 "ignore_zeros=True should have skipped the %r-blocks" % 363 char) 364 finally: 365 tar.close() 366 367 368class MiscReadTestBase(CommonReadTest): 369 def requires_name_attribute(self): 370 pass 371 372 def test_no_name_argument(self): 373 self.requires_name_attribute() 374 with open(self.tarname, "rb") as fobj: 375 self.assertIsInstance(fobj.name, str) 376 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 377 self.assertIsInstance(tar.name, str) 378 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 379 380 def test_no_name_attribute(self): 381 with open(self.tarname, "rb") as fobj: 382 data = fobj.read() 383 fobj = io.BytesIO(data) 384 self.assertRaises(AttributeError, getattr, fobj, "name") 385 tar = tarfile.open(fileobj=fobj, mode=self.mode) 386 self.assertIsNone(tar.name) 387 388 def test_empty_name_attribute(self): 389 with open(self.tarname, "rb") as fobj: 390 data = fobj.read() 391 fobj = io.BytesIO(data) 392 fobj.name = "" 393 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 394 self.assertIsNone(tar.name) 395 396 def test_int_name_attribute(self): 397 # Issue 21044: tarfile.open() should handle fileobj with an integer 398 # 'name' attribute. 399 fd = os.open(self.tarname, os.O_RDONLY) 400 with open(fd, 'rb') as fobj: 401 self.assertIsInstance(fobj.name, int) 402 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 403 self.assertIsNone(tar.name) 404 405 def test_bytes_name_attribute(self): 406 self.requires_name_attribute() 407 tarname = os.fsencode(self.tarname) 408 with open(tarname, 'rb') as fobj: 409 self.assertIsInstance(fobj.name, bytes) 410 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 411 self.assertIsInstance(tar.name, bytes) 412 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 413 414 def test_illegal_mode_arg(self): 415 with open(tmpname, 'wb'): 416 pass 417 with self.assertRaisesRegex(ValueError, 'mode must be '): 418 tar = self.taropen(tmpname, 'q') 419 with self.assertRaisesRegex(ValueError, 'mode must be '): 420 tar = self.taropen(tmpname, 'rw') 421 with self.assertRaisesRegex(ValueError, 'mode must be '): 422 tar = self.taropen(tmpname, '') 423 424 def test_fileobj_with_offset(self): 425 # Skip the first member and store values from the second member 426 # of the testtar. 427 tar = tarfile.open(self.tarname, mode=self.mode) 428 try: 429 tar.next() 430 t = tar.next() 431 name = t.name 432 offset = t.offset 433 with tar.extractfile(t) as f: 434 data = f.read() 435 finally: 436 tar.close() 437 438 # Open the testtar and seek to the offset of the second member. 439 with self.open(self.tarname) as fobj: 440 fobj.seek(offset) 441 442 # Test if the tarfile starts with the second member. 443 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 444 t = tar.next() 445 self.assertEqual(t.name, name) 446 # Read to the end of fileobj and test if seeking back to the 447 # beginning works. 448 tar.getmembers() 449 self.assertEqual(tar.extractfile(t).read(), data, 450 "seek back did not work") 451 tar.close() 452 453 def test_fail_comp(self): 454 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 455 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 456 with open(tarname, "rb") as fobj: 457 self.assertRaises(tarfile.ReadError, tarfile.open, 458 fileobj=fobj, mode=self.mode) 459 460 def test_v7_dirtype(self): 461 # Test old style dirtype member (bug #1336623): 462 # Old V7 tars create directory members using an AREGTYPE 463 # header with a "/" appended to the filename field. 464 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 465 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 466 "v7 dirtype failed") 467 468 def test_xstar_type(self): 469 # The xstar format stores extra atime and ctime fields inside the 470 # space reserved for the prefix field. The prefix field must be 471 # ignored in this case, otherwise it will mess up the name. 472 try: 473 self.tar.getmember("misc/regtype-xstar") 474 except KeyError: 475 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 476 477 def test_check_members(self): 478 for tarinfo in self.tar: 479 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 480 "wrong mtime for %s" % tarinfo.name) 481 if not tarinfo.name.startswith("ustar/"): 482 continue 483 self.assertEqual(tarinfo.uname, "tarfile", 484 "wrong uname for %s" % tarinfo.name) 485 486 def test_find_members(self): 487 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 488 "could not find all members") 489 490 @unittest.skipUnless(hasattr(os, "link"), 491 "Missing hardlink implementation") 492 @support.skip_unless_symlink 493 def test_extract_hardlink(self): 494 # Test hardlink extraction (e.g. bug #857297). 495 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 496 tar.extract("ustar/regtype", TEMPDIR) 497 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 498 499 tar.extract("ustar/lnktype", TEMPDIR) 500 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 501 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 502 data = f.read() 503 self.assertEqual(md5sum(data), md5_regtype) 504 505 tar.extract("ustar/symtype", TEMPDIR) 506 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 507 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 508 data = f.read() 509 self.assertEqual(md5sum(data), md5_regtype) 510 511 def test_extractall(self): 512 # Test if extractall() correctly restores directory permissions 513 # and times (see issue1735). 514 tar = tarfile.open(tarname, encoding="iso8859-1") 515 DIR = os.path.join(TEMPDIR, "extractall") 516 os.mkdir(DIR) 517 try: 518 directories = [t for t in tar if t.isdir()] 519 tar.extractall(DIR, directories) 520 for tarinfo in directories: 521 path = os.path.join(DIR, tarinfo.name) 522 if sys.platform != "win32": 523 # Win32 has no support for fine grained permissions. 524 self.assertEqual(tarinfo.mode & 0o777, 525 os.stat(path).st_mode & 0o777) 526 def format_mtime(mtime): 527 if isinstance(mtime, float): 528 return "{} ({})".format(mtime, mtime.hex()) 529 else: 530 return "{!r} (int)".format(mtime) 531 file_mtime = os.path.getmtime(path) 532 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 533 format_mtime(tarinfo.mtime), 534 format_mtime(file_mtime), 535 path) 536 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 537 finally: 538 tar.close() 539 support.rmtree(DIR) 540 541 def test_extract_directory(self): 542 dirtype = "ustar/dirtype" 543 DIR = os.path.join(TEMPDIR, "extractdir") 544 os.mkdir(DIR) 545 try: 546 with tarfile.open(tarname, encoding="iso8859-1") as tar: 547 tarinfo = tar.getmember(dirtype) 548 tar.extract(tarinfo, path=DIR) 549 extracted = os.path.join(DIR, dirtype) 550 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 551 if sys.platform != "win32": 552 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 553 finally: 554 support.rmtree(DIR) 555 556 def test_init_close_fobj(self): 557 # Issue #7341: Close the internal file object in the TarFile 558 # constructor in case of an error. For the test we rely on 559 # the fact that opening an empty file raises a ReadError. 560 empty = os.path.join(TEMPDIR, "empty") 561 with open(empty, "wb") as fobj: 562 fobj.write(b"") 563 564 try: 565 tar = object.__new__(tarfile.TarFile) 566 try: 567 tar.__init__(empty) 568 except tarfile.ReadError: 569 self.assertTrue(tar.fileobj.closed) 570 else: 571 self.fail("ReadError not raised") 572 finally: 573 support.unlink(empty) 574 575 def test_parallel_iteration(self): 576 # Issue #16601: Restarting iteration over tarfile continued 577 # from where it left off. 578 with tarfile.open(self.tarname) as tar: 579 for m1, m2 in zip(tar, tar): 580 self.assertEqual(m1.offset, m2.offset) 581 self.assertEqual(m1.get_info(), m2.get_info()) 582 583class MiscReadTest(MiscReadTestBase, unittest.TestCase): 584 test_fail_comp = None 585 586class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 587 pass 588 589class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 590 def requires_name_attribute(self): 591 self.skipTest("BZ2File have no name attribute") 592 593class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 594 def requires_name_attribute(self): 595 self.skipTest("LZMAFile have no name attribute") 596 597 598class StreamReadTest(CommonReadTest, unittest.TestCase): 599 600 prefix="r|" 601 602 def test_read_through(self): 603 # Issue #11224: A poorly designed _FileInFile.read() method 604 # caused seeking errors with stream tar files. 605 for tarinfo in self.tar: 606 if not tarinfo.isreg(): 607 continue 608 with self.tar.extractfile(tarinfo) as fobj: 609 while True: 610 try: 611 buf = fobj.read(512) 612 except tarfile.StreamError: 613 self.fail("simple read-through using " 614 "TarFile.extractfile() failed") 615 if not buf: 616 break 617 618 def test_fileobj_regular_file(self): 619 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 620 with self.tar.extractfile(tarinfo) as fobj: 621 data = fobj.read() 622 self.assertEqual(len(data), tarinfo.size, 623 "regular file extraction failed") 624 self.assertEqual(md5sum(data), md5_regtype, 625 "regular file extraction failed") 626 627 def test_provoke_stream_error(self): 628 tarinfos = self.tar.getmembers() 629 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 630 self.assertRaises(tarfile.StreamError, f.read) 631 632 def test_compare_members(self): 633 tar1 = tarfile.open(tarname, encoding="iso8859-1") 634 try: 635 tar2 = self.tar 636 637 while True: 638 t1 = tar1.next() 639 t2 = tar2.next() 640 if t1 is None: 641 break 642 self.assertIsNotNone(t2, "stream.next() failed.") 643 644 if t2.islnk() or t2.issym(): 645 with self.assertRaises(tarfile.StreamError): 646 tar2.extractfile(t2) 647 continue 648 649 v1 = tar1.extractfile(t1) 650 v2 = tar2.extractfile(t2) 651 if v1 is None: 652 continue 653 self.assertIsNotNone(v2, "stream.extractfile() failed") 654 self.assertEqual(v1.read(), v2.read(), 655 "stream extraction failed") 656 finally: 657 tar1.close() 658 659class GzipStreamReadTest(GzipTest, StreamReadTest): 660 pass 661 662class Bz2StreamReadTest(Bz2Test, StreamReadTest): 663 pass 664 665class LzmaStreamReadTest(LzmaTest, StreamReadTest): 666 pass 667 668 669class DetectReadTest(TarTest, unittest.TestCase): 670 def _testfunc_file(self, name, mode): 671 try: 672 tar = tarfile.open(name, mode) 673 except tarfile.ReadError as e: 674 self.fail() 675 else: 676 tar.close() 677 678 def _testfunc_fileobj(self, name, mode): 679 try: 680 with open(name, "rb") as f: 681 tar = tarfile.open(name, mode, fileobj=f) 682 except tarfile.ReadError as e: 683 self.fail() 684 else: 685 tar.close() 686 687 def _test_modes(self, testfunc): 688 if self.suffix: 689 with self.assertRaises(tarfile.ReadError): 690 tarfile.open(tarname, mode="r:" + self.suffix) 691 with self.assertRaises(tarfile.ReadError): 692 tarfile.open(tarname, mode="r|" + self.suffix) 693 with self.assertRaises(tarfile.ReadError): 694 tarfile.open(self.tarname, mode="r:") 695 with self.assertRaises(tarfile.ReadError): 696 tarfile.open(self.tarname, mode="r|") 697 testfunc(self.tarname, "r") 698 testfunc(self.tarname, "r:" + self.suffix) 699 testfunc(self.tarname, "r:*") 700 testfunc(self.tarname, "r|" + self.suffix) 701 testfunc(self.tarname, "r|*") 702 703 def test_detect_file(self): 704 self._test_modes(self._testfunc_file) 705 706 def test_detect_fileobj(self): 707 self._test_modes(self._testfunc_fileobj) 708 709class GzipDetectReadTest(GzipTest, DetectReadTest): 710 pass 711 712class Bz2DetectReadTest(Bz2Test, DetectReadTest): 713 def test_detect_stream_bz2(self): 714 # Originally, tarfile's stream detection looked for the string 715 # "BZh91" at the start of the file. This is incorrect because 716 # the '9' represents the blocksize (900kB). If the file was 717 # compressed using another blocksize autodetection fails. 718 with open(tarname, "rb") as fobj: 719 data = fobj.read() 720 721 # Compress with blocksize 100kB, the file starts with "BZh11". 722 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 723 fobj.write(data) 724 725 self._testfunc_file(tmpname, "r|*") 726 727class LzmaDetectReadTest(LzmaTest, DetectReadTest): 728 pass 729 730 731class MemberReadTest(ReadTest, unittest.TestCase): 732 733 def _test_member(self, tarinfo, chksum=None, **kwargs): 734 if chksum is not None: 735 with self.tar.extractfile(tarinfo) as f: 736 self.assertEqual(md5sum(f.read()), chksum, 737 "wrong md5sum for %s" % tarinfo.name) 738 739 kwargs["mtime"] = 0o7606136617 740 kwargs["uid"] = 1000 741 kwargs["gid"] = 100 742 if "old-v7" not in tarinfo.name: 743 # V7 tar can't handle alphabetic owners. 744 kwargs["uname"] = "tarfile" 745 kwargs["gname"] = "tarfile" 746 for k, v in kwargs.items(): 747 self.assertEqual(getattr(tarinfo, k), v, 748 "wrong value in %s field of %s" % (k, tarinfo.name)) 749 750 def test_find_regtype(self): 751 tarinfo = self.tar.getmember("ustar/regtype") 752 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 753 754 def test_find_conttype(self): 755 tarinfo = self.tar.getmember("ustar/conttype") 756 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 757 758 def test_find_dirtype(self): 759 tarinfo = self.tar.getmember("ustar/dirtype") 760 self._test_member(tarinfo, size=0) 761 762 def test_find_dirtype_with_size(self): 763 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 764 self._test_member(tarinfo, size=255) 765 766 def test_find_lnktype(self): 767 tarinfo = self.tar.getmember("ustar/lnktype") 768 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 769 770 def test_find_symtype(self): 771 tarinfo = self.tar.getmember("ustar/symtype") 772 self._test_member(tarinfo, size=0, linkname="regtype") 773 774 def test_find_blktype(self): 775 tarinfo = self.tar.getmember("ustar/blktype") 776 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 777 778 def test_find_chrtype(self): 779 tarinfo = self.tar.getmember("ustar/chrtype") 780 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 781 782 def test_find_fifotype(self): 783 tarinfo = self.tar.getmember("ustar/fifotype") 784 self._test_member(tarinfo, size=0) 785 786 def test_find_sparse(self): 787 tarinfo = self.tar.getmember("ustar/sparse") 788 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 789 790 def test_find_gnusparse(self): 791 tarinfo = self.tar.getmember("gnu/sparse") 792 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 793 794 def test_find_gnusparse_00(self): 795 tarinfo = self.tar.getmember("gnu/sparse-0.0") 796 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 797 798 def test_find_gnusparse_01(self): 799 tarinfo = self.tar.getmember("gnu/sparse-0.1") 800 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 801 802 def test_find_gnusparse_10(self): 803 tarinfo = self.tar.getmember("gnu/sparse-1.0") 804 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 805 806 def test_find_umlauts(self): 807 tarinfo = self.tar.getmember("ustar/umlauts-" 808 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 809 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 810 811 def test_find_ustar_longname(self): 812 name = "ustar/" + "12345/" * 39 + "1234567/longname" 813 self.assertIn(name, self.tar.getnames()) 814 815 def test_find_regtype_oldv7(self): 816 tarinfo = self.tar.getmember("misc/regtype-old-v7") 817 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 818 819 def test_find_pax_umlauts(self): 820 self.tar.close() 821 self.tar = tarfile.open(self.tarname, mode=self.mode, 822 encoding="iso8859-1") 823 tarinfo = self.tar.getmember("pax/umlauts-" 824 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 825 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 826 827 828class LongnameTest: 829 830 def test_read_longname(self): 831 # Test reading of longname (bug #1471427). 832 longname = self.subdir + "/" + "123/" * 125 + "longname" 833 try: 834 tarinfo = self.tar.getmember(longname) 835 except KeyError: 836 self.fail("longname not found") 837 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 838 "read longname as dirtype") 839 840 def test_read_longlink(self): 841 longname = self.subdir + "/" + "123/" * 125 + "longname" 842 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 843 try: 844 tarinfo = self.tar.getmember(longlink) 845 except KeyError: 846 self.fail("longlink not found") 847 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 848 849 def test_truncated_longname(self): 850 longname = self.subdir + "/" + "123/" * 125 + "longname" 851 tarinfo = self.tar.getmember(longname) 852 offset = tarinfo.offset 853 self.tar.fileobj.seek(offset) 854 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 855 with self.assertRaises(tarfile.ReadError): 856 tarfile.open(name="foo.tar", fileobj=fobj) 857 858 def test_header_offset(self): 859 # Test if the start offset of the TarInfo object includes 860 # the preceding extended header. 861 longname = self.subdir + "/" + "123/" * 125 + "longname" 862 offset = self.tar.getmember(longname).offset 863 with open(tarname, "rb") as fobj: 864 fobj.seek(offset) 865 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 866 "iso8859-1", "strict") 867 self.assertEqual(tarinfo.type, self.longnametype) 868 869 870class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 871 872 subdir = "gnu" 873 longnametype = tarfile.GNUTYPE_LONGNAME 874 875 # Since 3.2 tarfile is supposed to accurately restore sparse members and 876 # produce files with holes. This is what we actually want to test here. 877 # Unfortunately, not all platforms/filesystems support sparse files, and 878 # even on platforms that do it is non-trivial to make reliable assertions 879 # about holes in files. Therefore, we first do one basic test which works 880 # an all platforms, and after that a test that will work only on 881 # platforms/filesystems that prove to support sparse files. 882 def _test_sparse_file(self, name): 883 self.tar.extract(name, TEMPDIR) 884 filename = os.path.join(TEMPDIR, name) 885 with open(filename, "rb") as fobj: 886 data = fobj.read() 887 self.assertEqual(md5sum(data), md5_sparse, 888 "wrong md5sum for %s" % name) 889 890 if self._fs_supports_holes(): 891 s = os.stat(filename) 892 self.assertLess(s.st_blocks * 512, s.st_size) 893 894 def test_sparse_file_old(self): 895 self._test_sparse_file("gnu/sparse") 896 897 def test_sparse_file_00(self): 898 self._test_sparse_file("gnu/sparse-0.0") 899 900 def test_sparse_file_01(self): 901 self._test_sparse_file("gnu/sparse-0.1") 902 903 def test_sparse_file_10(self): 904 self._test_sparse_file("gnu/sparse-1.0") 905 906 @staticmethod 907 def _fs_supports_holes(): 908 # Return True if the platform knows the st_blocks stat attribute and 909 # uses st_blocks units of 512 bytes, and if the filesystem is able to 910 # store holes in files. 911 if sys.platform.startswith("linux"): 912 # Linux evidentially has 512 byte st_blocks units. 913 name = os.path.join(TEMPDIR, "sparse-test") 914 with open(name, "wb") as fobj: 915 fobj.seek(4096) 916 fobj.truncate() 917 s = os.stat(name) 918 support.unlink(name) 919 return s.st_blocks == 0 920 else: 921 return False 922 923 924class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 925 926 subdir = "pax" 927 longnametype = tarfile.XHDTYPE 928 929 def test_pax_global_headers(self): 930 tar = tarfile.open(tarname, encoding="iso8859-1") 931 try: 932 tarinfo = tar.getmember("pax/regtype1") 933 self.assertEqual(tarinfo.uname, "foo") 934 self.assertEqual(tarinfo.gname, "bar") 935 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 936 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 937 938 tarinfo = tar.getmember("pax/regtype2") 939 self.assertEqual(tarinfo.uname, "") 940 self.assertEqual(tarinfo.gname, "bar") 941 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 942 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 943 944 tarinfo = tar.getmember("pax/regtype3") 945 self.assertEqual(tarinfo.uname, "tarfile") 946 self.assertEqual(tarinfo.gname, "tarfile") 947 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 948 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 949 finally: 950 tar.close() 951 952 def test_pax_number_fields(self): 953 # All following number fields are read from the pax header. 954 tar = tarfile.open(tarname, encoding="iso8859-1") 955 try: 956 tarinfo = tar.getmember("pax/regtype4") 957 self.assertEqual(tarinfo.size, 7011) 958 self.assertEqual(tarinfo.uid, 123) 959 self.assertEqual(tarinfo.gid, 123) 960 self.assertEqual(tarinfo.mtime, 1041808783.0) 961 self.assertEqual(type(tarinfo.mtime), float) 962 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 963 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 964 finally: 965 tar.close() 966 967 968class WriteTestBase(TarTest): 969 # Put all write tests in here that are supposed to be tested 970 # in all possible mode combinations. 971 972 def test_fileobj_no_close(self): 973 fobj = io.BytesIO() 974 tar = tarfile.open(fileobj=fobj, mode=self.mode) 975 tar.addfile(tarfile.TarInfo("foo")) 976 tar.close() 977 self.assertFalse(fobj.closed, "external fileobjs must never closed") 978 # Issue #20238: Incomplete gzip output with mode="w:gz" 979 data = fobj.getvalue() 980 del tar 981 support.gc_collect() 982 self.assertFalse(fobj.closed) 983 self.assertEqual(data, fobj.getvalue()) 984 985 986class WriteTest(WriteTestBase, unittest.TestCase): 987 988 prefix = "w:" 989 990 def test_100_char_name(self): 991 # The name field in a tar header stores strings of at most 100 chars. 992 # If a string is shorter than 100 chars it has to be padded with '\0', 993 # which implies that a string of exactly 100 chars is stored without 994 # a trailing '\0'. 995 name = "0123456789" * 10 996 tar = tarfile.open(tmpname, self.mode) 997 try: 998 t = tarfile.TarInfo(name) 999 tar.addfile(t) 1000 finally: 1001 tar.close() 1002 1003 tar = tarfile.open(tmpname) 1004 try: 1005 self.assertEqual(tar.getnames()[0], name, 1006 "failed to store 100 char filename") 1007 finally: 1008 tar.close() 1009 1010 def test_tar_size(self): 1011 # Test for bug #1013882. 1012 tar = tarfile.open(tmpname, self.mode) 1013 try: 1014 path = os.path.join(TEMPDIR, "file") 1015 with open(path, "wb") as fobj: 1016 fobj.write(b"aaa") 1017 tar.add(path) 1018 finally: 1019 tar.close() 1020 self.assertGreater(os.path.getsize(tmpname), 0, 1021 "tarfile is empty") 1022 1023 # The test_*_size tests test for bug #1167128. 1024 def test_file_size(self): 1025 tar = tarfile.open(tmpname, self.mode) 1026 try: 1027 path = os.path.join(TEMPDIR, "file") 1028 with open(path, "wb"): 1029 pass 1030 tarinfo = tar.gettarinfo(path) 1031 self.assertEqual(tarinfo.size, 0) 1032 1033 with open(path, "wb") as fobj: 1034 fobj.write(b"aaa") 1035 tarinfo = tar.gettarinfo(path) 1036 self.assertEqual(tarinfo.size, 3) 1037 finally: 1038 tar.close() 1039 1040 def test_directory_size(self): 1041 path = os.path.join(TEMPDIR, "directory") 1042 os.mkdir(path) 1043 try: 1044 tar = tarfile.open(tmpname, self.mode) 1045 try: 1046 tarinfo = tar.gettarinfo(path) 1047 self.assertEqual(tarinfo.size, 0) 1048 finally: 1049 tar.close() 1050 finally: 1051 support.rmdir(path) 1052 1053 @unittest.skipUnless(hasattr(os, "link"), 1054 "Missing hardlink implementation") 1055 def test_link_size(self): 1056 link = os.path.join(TEMPDIR, "link") 1057 target = os.path.join(TEMPDIR, "link_target") 1058 with open(target, "wb") as fobj: 1059 fobj.write(b"aaa") 1060 os.link(target, link) 1061 try: 1062 tar = tarfile.open(tmpname, self.mode) 1063 try: 1064 # Record the link target in the inodes list. 1065 tar.gettarinfo(target) 1066 tarinfo = tar.gettarinfo(link) 1067 self.assertEqual(tarinfo.size, 0) 1068 finally: 1069 tar.close() 1070 finally: 1071 support.unlink(target) 1072 support.unlink(link) 1073 1074 @support.skip_unless_symlink 1075 def test_symlink_size(self): 1076 path = os.path.join(TEMPDIR, "symlink") 1077 os.symlink("link_target", path) 1078 try: 1079 tar = tarfile.open(tmpname, self.mode) 1080 try: 1081 tarinfo = tar.gettarinfo(path) 1082 self.assertEqual(tarinfo.size, 0) 1083 finally: 1084 tar.close() 1085 finally: 1086 support.unlink(path) 1087 1088 def test_add_self(self): 1089 # Test for #1257255. 1090 dstname = os.path.abspath(tmpname) 1091 tar = tarfile.open(tmpname, self.mode) 1092 try: 1093 self.assertEqual(tar.name, dstname, 1094 "archive name must be absolute") 1095 tar.add(dstname) 1096 self.assertEqual(tar.getnames(), [], 1097 "added the archive to itself") 1098 1099 cwd = os.getcwd() 1100 os.chdir(TEMPDIR) 1101 tar.add(dstname) 1102 os.chdir(cwd) 1103 self.assertEqual(tar.getnames(), [], 1104 "added the archive to itself") 1105 finally: 1106 tar.close() 1107 1108 def test_exclude(self): 1109 tempdir = os.path.join(TEMPDIR, "exclude") 1110 os.mkdir(tempdir) 1111 try: 1112 for name in ("foo", "bar", "baz"): 1113 name = os.path.join(tempdir, name) 1114 support.create_empty_file(name) 1115 1116 exclude = os.path.isfile 1117 1118 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1119 try: 1120 with support.check_warnings(("use the filter argument", 1121 DeprecationWarning)): 1122 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 1123 finally: 1124 tar.close() 1125 1126 tar = tarfile.open(tmpname, "r") 1127 try: 1128 self.assertEqual(len(tar.getmembers()), 1) 1129 self.assertEqual(tar.getnames()[0], "empty_dir") 1130 finally: 1131 tar.close() 1132 finally: 1133 support.rmtree(tempdir) 1134 1135 def test_filter(self): 1136 tempdir = os.path.join(TEMPDIR, "filter") 1137 os.mkdir(tempdir) 1138 try: 1139 for name in ("foo", "bar", "baz"): 1140 name = os.path.join(tempdir, name) 1141 support.create_empty_file(name) 1142 1143 def filter(tarinfo): 1144 if os.path.basename(tarinfo.name) == "bar": 1145 return 1146 tarinfo.uid = 123 1147 tarinfo.uname = "foo" 1148 return tarinfo 1149 1150 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1151 try: 1152 tar.add(tempdir, arcname="empty_dir", filter=filter) 1153 finally: 1154 tar.close() 1155 1156 # Verify that filter is a keyword-only argument 1157 with self.assertRaises(TypeError): 1158 tar.add(tempdir, "empty_dir", True, None, filter) 1159 1160 tar = tarfile.open(tmpname, "r") 1161 try: 1162 for tarinfo in tar: 1163 self.assertEqual(tarinfo.uid, 123) 1164 self.assertEqual(tarinfo.uname, "foo") 1165 self.assertEqual(len(tar.getmembers()), 3) 1166 finally: 1167 tar.close() 1168 finally: 1169 support.rmtree(tempdir) 1170 1171 # Guarantee that stored pathnames are not modified. Don't 1172 # remove ./ or ../ or double slashes. Still make absolute 1173 # pathnames relative. 1174 # For details see bug #6054. 1175 def _test_pathname(self, path, cmp_path=None, dir=False): 1176 # Create a tarfile with an empty member named path 1177 # and compare the stored name with the original. 1178 foo = os.path.join(TEMPDIR, "foo") 1179 if not dir: 1180 support.create_empty_file(foo) 1181 else: 1182 os.mkdir(foo) 1183 1184 tar = tarfile.open(tmpname, self.mode) 1185 try: 1186 tar.add(foo, arcname=path) 1187 finally: 1188 tar.close() 1189 1190 tar = tarfile.open(tmpname, "r") 1191 try: 1192 t = tar.next() 1193 finally: 1194 tar.close() 1195 1196 if not dir: 1197 support.unlink(foo) 1198 else: 1199 support.rmdir(foo) 1200 1201 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1202 1203 1204 @support.skip_unless_symlink 1205 def test_extractall_symlinks(self): 1206 # Test if extractall works properly when tarfile contains symlinks 1207 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1208 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1209 os.mkdir(tempdir) 1210 try: 1211 source_file = os.path.join(tempdir,'source') 1212 target_file = os.path.join(tempdir,'symlink') 1213 with open(source_file,'w') as f: 1214 f.write('something\n') 1215 os.symlink(source_file, target_file) 1216 tar = tarfile.open(temparchive,'w') 1217 tar.add(source_file) 1218 tar.add(target_file) 1219 tar.close() 1220 # Let's extract it to the location which contains the symlink 1221 tar = tarfile.open(temparchive,'r') 1222 # this should not raise OSError: [Errno 17] File exists 1223 try: 1224 tar.extractall(path=tempdir) 1225 except OSError: 1226 self.fail("extractall failed with symlinked files") 1227 finally: 1228 tar.close() 1229 finally: 1230 support.unlink(temparchive) 1231 support.rmtree(tempdir) 1232 1233 def test_pathnames(self): 1234 self._test_pathname("foo") 1235 self._test_pathname(os.path.join("foo", ".", "bar")) 1236 self._test_pathname(os.path.join("foo", "..", "bar")) 1237 self._test_pathname(os.path.join(".", "foo")) 1238 self._test_pathname(os.path.join(".", "foo", ".")) 1239 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1240 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1241 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1242 self._test_pathname(os.path.join("..", "foo")) 1243 self._test_pathname(os.path.join("..", "foo", "..")) 1244 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1245 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1246 1247 self._test_pathname("foo" + os.sep + os.sep + "bar") 1248 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1249 1250 def test_abs_pathnames(self): 1251 if sys.platform == "win32": 1252 self._test_pathname("C:\\foo", "foo") 1253 else: 1254 self._test_pathname("/foo", "foo") 1255 self._test_pathname("///foo", "foo") 1256 1257 def test_cwd(self): 1258 # Test adding the current working directory. 1259 cwd = os.getcwd() 1260 os.chdir(TEMPDIR) 1261 try: 1262 tar = tarfile.open(tmpname, self.mode) 1263 try: 1264 tar.add(".") 1265 finally: 1266 tar.close() 1267 1268 tar = tarfile.open(tmpname, "r") 1269 try: 1270 for t in tar: 1271 if t.name != ".": 1272 self.assertTrue(t.name.startswith("./"), t.name) 1273 finally: 1274 tar.close() 1275 finally: 1276 os.chdir(cwd) 1277 1278 def test_open_nonwritable_fileobj(self): 1279 for exctype in OSError, EOFError, RuntimeError: 1280 class BadFile(io.BytesIO): 1281 first = True 1282 def write(self, data): 1283 if self.first: 1284 self.first = False 1285 raise exctype 1286 1287 f = BadFile() 1288 with self.assertRaises(exctype): 1289 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1290 format=tarfile.PAX_FORMAT, 1291 pax_headers={'non': 'empty'}) 1292 self.assertFalse(f.closed) 1293 1294class GzipWriteTest(GzipTest, WriteTest): 1295 pass 1296 1297class Bz2WriteTest(Bz2Test, WriteTest): 1298 pass 1299 1300class LzmaWriteTest(LzmaTest, WriteTest): 1301 pass 1302 1303 1304class StreamWriteTest(WriteTestBase, unittest.TestCase): 1305 1306 prefix = "w|" 1307 decompressor = None 1308 1309 def test_stream_padding(self): 1310 # Test for bug #1543303. 1311 tar = tarfile.open(tmpname, self.mode) 1312 tar.close() 1313 if self.decompressor: 1314 dec = self.decompressor() 1315 with open(tmpname, "rb") as fobj: 1316 data = fobj.read() 1317 data = dec.decompress(data) 1318 self.assertFalse(dec.unused_data, "found trailing data") 1319 else: 1320 with self.open(tmpname) as fobj: 1321 data = fobj.read() 1322 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1323 "incorrect zero padding") 1324 1325 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1326 "Missing umask implementation") 1327 def test_file_mode(self): 1328 # Test for issue #8464: Create files with correct 1329 # permissions. 1330 if os.path.exists(tmpname): 1331 support.unlink(tmpname) 1332 1333 original_umask = os.umask(0o022) 1334 try: 1335 tar = tarfile.open(tmpname, self.mode) 1336 tar.close() 1337 mode = os.stat(tmpname).st_mode & 0o777 1338 self.assertEqual(mode, 0o644, "wrong file permissions") 1339 finally: 1340 os.umask(original_umask) 1341 1342class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1343 pass 1344 1345class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1346 decompressor = bz2.BZ2Decompressor if bz2 else None 1347 1348class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1349 decompressor = lzma.LZMADecompressor if lzma else None 1350 1351 1352class GNUWriteTest(unittest.TestCase): 1353 # This testcase checks for correct creation of GNU Longname 1354 # and Longlink extended headers (cp. bug #812325). 1355 1356 def _length(self, s): 1357 blocks = len(s) // 512 + 1 1358 return blocks * 512 1359 1360 def _calc_size(self, name, link=None): 1361 # Initial tar header 1362 count = 512 1363 1364 if len(name) > tarfile.LENGTH_NAME: 1365 # GNU longname extended header + longname 1366 count += 512 1367 count += self._length(name) 1368 if link is not None and len(link) > tarfile.LENGTH_LINK: 1369 # GNU longlink extended header + longlink 1370 count += 512 1371 count += self._length(link) 1372 return count 1373 1374 def _test(self, name, link=None): 1375 tarinfo = tarfile.TarInfo(name) 1376 if link: 1377 tarinfo.linkname = link 1378 tarinfo.type = tarfile.LNKTYPE 1379 1380 tar = tarfile.open(tmpname, "w") 1381 try: 1382 tar.format = tarfile.GNU_FORMAT 1383 tar.addfile(tarinfo) 1384 1385 v1 = self._calc_size(name, link) 1386 v2 = tar.offset 1387 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1388 finally: 1389 tar.close() 1390 1391 tar = tarfile.open(tmpname) 1392 try: 1393 member = tar.next() 1394 self.assertIsNotNone(member, 1395 "unable to read longname member") 1396 self.assertEqual(tarinfo.name, member.name, 1397 "unable to read longname member") 1398 self.assertEqual(tarinfo.linkname, member.linkname, 1399 "unable to read longname member") 1400 finally: 1401 tar.close() 1402 1403 def test_longname_1023(self): 1404 self._test(("longnam/" * 127) + "longnam") 1405 1406 def test_longname_1024(self): 1407 self._test(("longnam/" * 127) + "longname") 1408 1409 def test_longname_1025(self): 1410 self._test(("longnam/" * 127) + "longname_") 1411 1412 def test_longlink_1023(self): 1413 self._test("name", ("longlnk/" * 127) + "longlnk") 1414 1415 def test_longlink_1024(self): 1416 self._test("name", ("longlnk/" * 127) + "longlink") 1417 1418 def test_longlink_1025(self): 1419 self._test("name", ("longlnk/" * 127) + "longlink_") 1420 1421 def test_longnamelink_1023(self): 1422 self._test(("longnam/" * 127) + "longnam", 1423 ("longlnk/" * 127) + "longlnk") 1424 1425 def test_longnamelink_1024(self): 1426 self._test(("longnam/" * 127) + "longname", 1427 ("longlnk/" * 127) + "longlink") 1428 1429 def test_longnamelink_1025(self): 1430 self._test(("longnam/" * 127) + "longname_", 1431 ("longlnk/" * 127) + "longlink_") 1432 1433 1434class CreateTest(TarTest, unittest.TestCase): 1435 1436 prefix = "x:" 1437 1438 file_path = os.path.join(TEMPDIR, "spameggs42") 1439 1440 def setUp(self): 1441 support.unlink(tmpname) 1442 1443 @classmethod 1444 def setUpClass(cls): 1445 with open(cls.file_path, "wb") as fobj: 1446 fobj.write(b"aaa") 1447 1448 @classmethod 1449 def tearDownClass(cls): 1450 support.unlink(cls.file_path) 1451 1452 def test_create(self): 1453 with tarfile.open(tmpname, self.mode) as tobj: 1454 tobj.add(self.file_path) 1455 1456 with self.taropen(tmpname) as tobj: 1457 names = tobj.getnames() 1458 self.assertEqual(len(names), 1) 1459 self.assertIn('spameggs42', names[0]) 1460 1461 def test_create_existing(self): 1462 with tarfile.open(tmpname, self.mode) as tobj: 1463 tobj.add(self.file_path) 1464 1465 with self.assertRaises(FileExistsError): 1466 tobj = tarfile.open(tmpname, self.mode) 1467 1468 with self.taropen(tmpname) as tobj: 1469 names = tobj.getnames() 1470 self.assertEqual(len(names), 1) 1471 self.assertIn('spameggs42', names[0]) 1472 1473 def test_create_taropen(self): 1474 with self.taropen(tmpname, "x") as tobj: 1475 tobj.add(self.file_path) 1476 1477 with self.taropen(tmpname) as tobj: 1478 names = tobj.getnames() 1479 self.assertEqual(len(names), 1) 1480 self.assertIn('spameggs42', names[0]) 1481 1482 def test_create_existing_taropen(self): 1483 with self.taropen(tmpname, "x") as tobj: 1484 tobj.add(self.file_path) 1485 1486 with self.assertRaises(FileExistsError): 1487 with self.taropen(tmpname, "x"): 1488 pass 1489 1490 with self.taropen(tmpname) as tobj: 1491 names = tobj.getnames() 1492 self.assertEqual(len(names), 1) 1493 self.assertIn("spameggs42", names[0]) 1494 1495 1496class GzipCreateTest(GzipTest, CreateTest): 1497 pass 1498 1499 1500class Bz2CreateTest(Bz2Test, CreateTest): 1501 pass 1502 1503 1504class LzmaCreateTest(LzmaTest, CreateTest): 1505 pass 1506 1507 1508class CreateWithXModeTest(CreateTest): 1509 1510 prefix = "x" 1511 1512 test_create_taropen = None 1513 test_create_existing_taropen = None 1514 1515 1516@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1517class HardlinkTest(unittest.TestCase): 1518 # Test the creation of LNKTYPE (hardlink) members in an archive. 1519 1520 def setUp(self): 1521 self.foo = os.path.join(TEMPDIR, "foo") 1522 self.bar = os.path.join(TEMPDIR, "bar") 1523 1524 with open(self.foo, "wb") as fobj: 1525 fobj.write(b"foo") 1526 1527 os.link(self.foo, self.bar) 1528 1529 self.tar = tarfile.open(tmpname, "w") 1530 self.tar.add(self.foo) 1531 1532 def tearDown(self): 1533 self.tar.close() 1534 support.unlink(self.foo) 1535 support.unlink(self.bar) 1536 1537 def test_add_twice(self): 1538 # The same name will be added as a REGTYPE every 1539 # time regardless of st_nlink. 1540 tarinfo = self.tar.gettarinfo(self.foo) 1541 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1542 "add file as regular failed") 1543 1544 def test_add_hardlink(self): 1545 tarinfo = self.tar.gettarinfo(self.bar) 1546 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1547 "add file as hardlink failed") 1548 1549 def test_dereference_hardlink(self): 1550 self.tar.dereference = True 1551 tarinfo = self.tar.gettarinfo(self.bar) 1552 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1553 "dereferencing hardlink failed") 1554 1555 1556class PaxWriteTest(GNUWriteTest): 1557 1558 def _test(self, name, link=None): 1559 # See GNUWriteTest. 1560 tarinfo = tarfile.TarInfo(name) 1561 if link: 1562 tarinfo.linkname = link 1563 tarinfo.type = tarfile.LNKTYPE 1564 1565 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1566 try: 1567 tar.addfile(tarinfo) 1568 finally: 1569 tar.close() 1570 1571 tar = tarfile.open(tmpname) 1572 try: 1573 if link: 1574 l = tar.getmembers()[0].linkname 1575 self.assertEqual(link, l, "PAX longlink creation failed") 1576 else: 1577 n = tar.getmembers()[0].name 1578 self.assertEqual(name, n, "PAX longname creation failed") 1579 finally: 1580 tar.close() 1581 1582 def test_pax_global_header(self): 1583 pax_headers = { 1584 "foo": "bar", 1585 "uid": "0", 1586 "mtime": "1.23", 1587 "test": "\xe4\xf6\xfc", 1588 "\xe4\xf6\xfc": "test"} 1589 1590 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1591 pax_headers=pax_headers) 1592 try: 1593 tar.addfile(tarfile.TarInfo("test")) 1594 finally: 1595 tar.close() 1596 1597 # Test if the global header was written correctly. 1598 tar = tarfile.open(tmpname, encoding="iso8859-1") 1599 try: 1600 self.assertEqual(tar.pax_headers, pax_headers) 1601 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1602 # Test if all the fields are strings. 1603 for key, val in tar.pax_headers.items(): 1604 self.assertIsNot(type(key), bytes) 1605 self.assertIsNot(type(val), bytes) 1606 if key in tarfile.PAX_NUMBER_FIELDS: 1607 try: 1608 tarfile.PAX_NUMBER_FIELDS[key](val) 1609 except (TypeError, ValueError): 1610 self.fail("unable to convert pax header field") 1611 finally: 1612 tar.close() 1613 1614 def test_pax_extended_header(self): 1615 # The fields from the pax header have priority over the 1616 # TarInfo. 1617 pax_headers = {"path": "foo", "uid": "123"} 1618 1619 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1620 encoding="iso8859-1") 1621 try: 1622 t = tarfile.TarInfo() 1623 t.name = "\xe4\xf6\xfc" # non-ASCII 1624 t.uid = 8**8 # too large 1625 t.pax_headers = pax_headers 1626 tar.addfile(t) 1627 finally: 1628 tar.close() 1629 1630 tar = tarfile.open(tmpname, encoding="iso8859-1") 1631 try: 1632 t = tar.getmembers()[0] 1633 self.assertEqual(t.pax_headers, pax_headers) 1634 self.assertEqual(t.name, "foo") 1635 self.assertEqual(t.uid, 123) 1636 finally: 1637 tar.close() 1638 1639 1640class UstarUnicodeTest(unittest.TestCase): 1641 1642 format = tarfile.USTAR_FORMAT 1643 1644 def test_iso8859_1_filename(self): 1645 self._test_unicode_filename("iso8859-1") 1646 1647 def test_utf7_filename(self): 1648 self._test_unicode_filename("utf7") 1649 1650 def test_utf8_filename(self): 1651 self._test_unicode_filename("utf-8") 1652 1653 def _test_unicode_filename(self, encoding): 1654 tar = tarfile.open(tmpname, "w", format=self.format, 1655 encoding=encoding, errors="strict") 1656 try: 1657 name = "\xe4\xf6\xfc" 1658 tar.addfile(tarfile.TarInfo(name)) 1659 finally: 1660 tar.close() 1661 1662 tar = tarfile.open(tmpname, encoding=encoding) 1663 try: 1664 self.assertEqual(tar.getmembers()[0].name, name) 1665 finally: 1666 tar.close() 1667 1668 def test_unicode_filename_error(self): 1669 tar = tarfile.open(tmpname, "w", format=self.format, 1670 encoding="ascii", errors="strict") 1671 try: 1672 tarinfo = tarfile.TarInfo() 1673 1674 tarinfo.name = "\xe4\xf6\xfc" 1675 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1676 1677 tarinfo.name = "foo" 1678 tarinfo.uname = "\xe4\xf6\xfc" 1679 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1680 finally: 1681 tar.close() 1682 1683 def test_unicode_argument(self): 1684 tar = tarfile.open(tarname, "r", 1685 encoding="iso8859-1", errors="strict") 1686 try: 1687 for t in tar: 1688 self.assertIs(type(t.name), str) 1689 self.assertIs(type(t.linkname), str) 1690 self.assertIs(type(t.uname), str) 1691 self.assertIs(type(t.gname), str) 1692 finally: 1693 tar.close() 1694 1695 def test_uname_unicode(self): 1696 t = tarfile.TarInfo("foo") 1697 t.uname = "\xe4\xf6\xfc" 1698 t.gname = "\xe4\xf6\xfc" 1699 1700 tar = tarfile.open(tmpname, mode="w", format=self.format, 1701 encoding="iso8859-1") 1702 try: 1703 tar.addfile(t) 1704 finally: 1705 tar.close() 1706 1707 tar = tarfile.open(tmpname, encoding="iso8859-1") 1708 try: 1709 t = tar.getmember("foo") 1710 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1711 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1712 1713 if self.format != tarfile.PAX_FORMAT: 1714 tar.close() 1715 tar = tarfile.open(tmpname, encoding="ascii") 1716 t = tar.getmember("foo") 1717 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1718 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1719 finally: 1720 tar.close() 1721 1722 1723class GNUUnicodeTest(UstarUnicodeTest): 1724 1725 format = tarfile.GNU_FORMAT 1726 1727 def test_bad_pax_header(self): 1728 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1729 # without a hdrcharset=BINARY header. 1730 for encoding, name in ( 1731 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1732 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1733 with tarfile.open(tarname, encoding=encoding, 1734 errors="surrogateescape") as tar: 1735 try: 1736 t = tar.getmember(name) 1737 except KeyError: 1738 self.fail("unable to read bad GNU tar pax header") 1739 1740 1741class PAXUnicodeTest(UstarUnicodeTest): 1742 1743 format = tarfile.PAX_FORMAT 1744 1745 # PAX_FORMAT ignores encoding in write mode. 1746 test_unicode_filename_error = None 1747 1748 def test_binary_header(self): 1749 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1750 for encoding, name in ( 1751 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1752 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1753 with tarfile.open(tarname, encoding=encoding, 1754 errors="surrogateescape") as tar: 1755 try: 1756 t = tar.getmember(name) 1757 except KeyError: 1758 self.fail("unable to read POSIX.1-2008 binary header") 1759 1760 1761class AppendTestBase: 1762 # Test append mode (cp. patch #1652681). 1763 1764 def setUp(self): 1765 self.tarname = tmpname 1766 if os.path.exists(self.tarname): 1767 support.unlink(self.tarname) 1768 1769 def _create_testtar(self, mode="w:"): 1770 with tarfile.open(tarname, encoding="iso8859-1") as src: 1771 t = src.getmember("ustar/regtype") 1772 t.name = "foo" 1773 with src.extractfile(t) as f: 1774 with tarfile.open(self.tarname, mode) as tar: 1775 tar.addfile(t, f) 1776 1777 def test_append_compressed(self): 1778 self._create_testtar("w:" + self.suffix) 1779 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1780 1781class AppendTest(AppendTestBase, unittest.TestCase): 1782 test_append_compressed = None 1783 1784 def _add_testfile(self, fileobj=None): 1785 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1786 tar.addfile(tarfile.TarInfo("bar")) 1787 1788 def _test(self, names=["bar"], fileobj=None): 1789 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1790 self.assertEqual(tar.getnames(), names) 1791 1792 def test_non_existing(self): 1793 self._add_testfile() 1794 self._test() 1795 1796 def test_empty(self): 1797 tarfile.open(self.tarname, "w:").close() 1798 self._add_testfile() 1799 self._test() 1800 1801 def test_empty_fileobj(self): 1802 fobj = io.BytesIO(b"\0" * 1024) 1803 self._add_testfile(fobj) 1804 fobj.seek(0) 1805 self._test(fileobj=fobj) 1806 1807 def test_fileobj(self): 1808 self._create_testtar() 1809 with open(self.tarname, "rb") as fobj: 1810 data = fobj.read() 1811 fobj = io.BytesIO(data) 1812 self._add_testfile(fobj) 1813 fobj.seek(0) 1814 self._test(names=["foo", "bar"], fileobj=fobj) 1815 1816 def test_existing(self): 1817 self._create_testtar() 1818 self._add_testfile() 1819 self._test(names=["foo", "bar"]) 1820 1821 # Append mode is supposed to fail if the tarfile to append to 1822 # does not end with a zero block. 1823 def _test_error(self, data): 1824 with open(self.tarname, "wb") as fobj: 1825 fobj.write(data) 1826 self.assertRaises(tarfile.ReadError, self._add_testfile) 1827 1828 def test_null(self): 1829 self._test_error(b"") 1830 1831 def test_incomplete(self): 1832 self._test_error(b"\0" * 13) 1833 1834 def test_premature_eof(self): 1835 data = tarfile.TarInfo("foo").tobuf() 1836 self._test_error(data) 1837 1838 def test_trailing_garbage(self): 1839 data = tarfile.TarInfo("foo").tobuf() 1840 self._test_error(data + b"\0" * 13) 1841 1842 def test_invalid(self): 1843 self._test_error(b"a" * 512) 1844 1845class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 1846 pass 1847 1848class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 1849 pass 1850 1851class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 1852 pass 1853 1854 1855class LimitsTest(unittest.TestCase): 1856 1857 def test_ustar_limits(self): 1858 # 100 char name 1859 tarinfo = tarfile.TarInfo("0123456789" * 10) 1860 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1861 1862 # 101 char name that cannot be stored 1863 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1864 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1865 1866 # 256 char name with a slash at pos 156 1867 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1868 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1869 1870 # 256 char name that cannot be stored 1871 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1872 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1873 1874 # 512 char name 1875 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1876 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1877 1878 # 512 char linkname 1879 tarinfo = tarfile.TarInfo("longlink") 1880 tarinfo.linkname = "123/" * 126 + "longname" 1881 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1882 1883 # uid > 8 digits 1884 tarinfo = tarfile.TarInfo("name") 1885 tarinfo.uid = 0o10000000 1886 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1887 1888 def test_gnu_limits(self): 1889 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1890 tarinfo.tobuf(tarfile.GNU_FORMAT) 1891 1892 tarinfo = tarfile.TarInfo("longlink") 1893 tarinfo.linkname = "123/" * 126 + "longname" 1894 tarinfo.tobuf(tarfile.GNU_FORMAT) 1895 1896 # uid >= 256 ** 7 1897 tarinfo = tarfile.TarInfo("name") 1898 tarinfo.uid = 0o4000000000000000000 1899 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1900 1901 def test_pax_limits(self): 1902 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1903 tarinfo.tobuf(tarfile.PAX_FORMAT) 1904 1905 tarinfo = tarfile.TarInfo("longlink") 1906 tarinfo.linkname = "123/" * 126 + "longname" 1907 tarinfo.tobuf(tarfile.PAX_FORMAT) 1908 1909 tarinfo = tarfile.TarInfo("name") 1910 tarinfo.uid = 0o4000000000000000000 1911 tarinfo.tobuf(tarfile.PAX_FORMAT) 1912 1913 1914class MiscTest(unittest.TestCase): 1915 1916 def test_char_fields(self): 1917 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 1918 b"foo\0\0\0\0\0") 1919 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 1920 b"foo") 1921 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 1922 "foo") 1923 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 1924 "foo") 1925 1926 def test_read_number_fields(self): 1927 # Issue 13158: Test if GNU tar specific base-256 number fields 1928 # are decoded correctly. 1929 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 1930 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 1931 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 1932 0o10000000) 1933 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 1934 0xffffffff) 1935 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 1936 -1) 1937 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 1938 -100) 1939 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 1940 -0x100000000000000) 1941 1942 def test_write_number_fields(self): 1943 self.assertEqual(tarfile.itn(1), b"0000001\x00") 1944 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 1945 self.assertEqual(tarfile.itn(0o10000000), 1946 b"\x80\x00\x00\x00\x00\x20\x00\x00") 1947 self.assertEqual(tarfile.itn(0xffffffff), 1948 b"\x80\x00\x00\x00\xff\xff\xff\xff") 1949 self.assertEqual(tarfile.itn(-1), 1950 b"\xff\xff\xff\xff\xff\xff\xff\xff") 1951 self.assertEqual(tarfile.itn(-100), 1952 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 1953 self.assertEqual(tarfile.itn(-0x100000000000000), 1954 b"\xff\x00\x00\x00\x00\x00\x00\x00") 1955 1956 def test_number_field_limits(self): 1957 with self.assertRaises(ValueError): 1958 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 1959 with self.assertRaises(ValueError): 1960 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 1961 with self.assertRaises(ValueError): 1962 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 1963 with self.assertRaises(ValueError): 1964 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 1965 1966 1967class CommandLineTest(unittest.TestCase): 1968 1969 def tarfilecmd(self, *args, **kwargs): 1970 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 1971 **kwargs) 1972 return out.replace(os.linesep.encode(), b'\n') 1973 1974 def tarfilecmd_failure(self, *args): 1975 return script_helper.assert_python_failure('-m', 'tarfile', *args) 1976 1977 def make_simple_tarfile(self, tar_name): 1978 files = [support.findfile('tokenize_tests.txt'), 1979 support.findfile('tokenize_tests-no-coding-cookie-' 1980 'and-utf8-bom-sig-only.txt')] 1981 self.addCleanup(support.unlink, tar_name) 1982 with tarfile.open(tar_name, 'w') as tf: 1983 for tardata in files: 1984 tf.add(tardata, arcname=os.path.basename(tardata)) 1985 1986 def test_test_command(self): 1987 for tar_name in testtarnames: 1988 for opt in '-t', '--test': 1989 out = self.tarfilecmd(opt, tar_name) 1990 self.assertEqual(out, b'') 1991 1992 def test_test_command_verbose(self): 1993 for tar_name in testtarnames: 1994 for opt in '-v', '--verbose': 1995 out = self.tarfilecmd(opt, '-t', tar_name) 1996 self.assertIn(b'is a tar archive.\n', out) 1997 1998 def test_test_command_invalid_file(self): 1999 zipname = support.findfile('zipdir.zip') 2000 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2001 self.assertIn(b' is not a tar archive.', err) 2002 self.assertEqual(out, b'') 2003 self.assertEqual(rc, 1) 2004 2005 for tar_name in testtarnames: 2006 with self.subTest(tar_name=tar_name): 2007 with open(tar_name, 'rb') as f: 2008 data = f.read() 2009 try: 2010 with open(tmpname, 'wb') as f: 2011 f.write(data[:511]) 2012 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2013 self.assertEqual(out, b'') 2014 self.assertEqual(rc, 1) 2015 finally: 2016 support.unlink(tmpname) 2017 2018 def test_list_command(self): 2019 for tar_name in testtarnames: 2020 with support.captured_stdout() as t: 2021 with tarfile.open(tar_name, 'r') as tf: 2022 tf.list(verbose=False) 2023 expected = t.getvalue().encode('ascii', 'backslashreplace') 2024 for opt in '-l', '--list': 2025 out = self.tarfilecmd(opt, tar_name, 2026 PYTHONIOENCODING='ascii') 2027 self.assertEqual(out, expected) 2028 2029 def test_list_command_verbose(self): 2030 for tar_name in testtarnames: 2031 with support.captured_stdout() as t: 2032 with tarfile.open(tar_name, 'r') as tf: 2033 tf.list(verbose=True) 2034 expected = t.getvalue().encode('ascii', 'backslashreplace') 2035 for opt in '-v', '--verbose': 2036 out = self.tarfilecmd(opt, '-l', tar_name, 2037 PYTHONIOENCODING='ascii') 2038 self.assertEqual(out, expected) 2039 2040 def test_list_command_invalid_file(self): 2041 zipname = support.findfile('zipdir.zip') 2042 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2043 self.assertIn(b' is not a tar archive.', err) 2044 self.assertEqual(out, b'') 2045 self.assertEqual(rc, 1) 2046 2047 def test_create_command(self): 2048 files = [support.findfile('tokenize_tests.txt'), 2049 support.findfile('tokenize_tests-no-coding-cookie-' 2050 'and-utf8-bom-sig-only.txt')] 2051 for opt in '-c', '--create': 2052 try: 2053 out = self.tarfilecmd(opt, tmpname, *files) 2054 self.assertEqual(out, b'') 2055 with tarfile.open(tmpname) as tar: 2056 tar.getmembers() 2057 finally: 2058 support.unlink(tmpname) 2059 2060 def test_create_command_verbose(self): 2061 files = [support.findfile('tokenize_tests.txt'), 2062 support.findfile('tokenize_tests-no-coding-cookie-' 2063 'and-utf8-bom-sig-only.txt')] 2064 for opt in '-v', '--verbose': 2065 try: 2066 out = self.tarfilecmd(opt, '-c', tmpname, *files) 2067 self.assertIn(b' file created.', out) 2068 with tarfile.open(tmpname) as tar: 2069 tar.getmembers() 2070 finally: 2071 support.unlink(tmpname) 2072 2073 def test_create_command_dotless_filename(self): 2074 files = [support.findfile('tokenize_tests.txt')] 2075 try: 2076 out = self.tarfilecmd('-c', dotlessname, *files) 2077 self.assertEqual(out, b'') 2078 with tarfile.open(dotlessname) as tar: 2079 tar.getmembers() 2080 finally: 2081 support.unlink(dotlessname) 2082 2083 def test_create_command_dot_started_filename(self): 2084 tar_name = os.path.join(TEMPDIR, ".testtar") 2085 files = [support.findfile('tokenize_tests.txt')] 2086 try: 2087 out = self.tarfilecmd('-c', tar_name, *files) 2088 self.assertEqual(out, b'') 2089 with tarfile.open(tar_name) as tar: 2090 tar.getmembers() 2091 finally: 2092 support.unlink(tar_name) 2093 2094 def test_create_command_compressed(self): 2095 files = [support.findfile('tokenize_tests.txt'), 2096 support.findfile('tokenize_tests-no-coding-cookie-' 2097 'and-utf8-bom-sig-only.txt')] 2098 for filetype in (GzipTest, Bz2Test, LzmaTest): 2099 if not filetype.open: 2100 continue 2101 try: 2102 tar_name = tmpname + '.' + filetype.suffix 2103 out = self.tarfilecmd('-c', tar_name, *files) 2104 with filetype.taropen(tar_name) as tar: 2105 tar.getmembers() 2106 finally: 2107 support.unlink(tar_name) 2108 2109 def test_extract_command(self): 2110 self.make_simple_tarfile(tmpname) 2111 for opt in '-e', '--extract': 2112 try: 2113 with support.temp_cwd(tarextdir): 2114 out = self.tarfilecmd(opt, tmpname) 2115 self.assertEqual(out, b'') 2116 finally: 2117 support.rmtree(tarextdir) 2118 2119 def test_extract_command_verbose(self): 2120 self.make_simple_tarfile(tmpname) 2121 for opt in '-v', '--verbose': 2122 try: 2123 with support.temp_cwd(tarextdir): 2124 out = self.tarfilecmd(opt, '-e', tmpname) 2125 self.assertIn(b' file is extracted.', out) 2126 finally: 2127 support.rmtree(tarextdir) 2128 2129 def test_extract_command_different_directory(self): 2130 self.make_simple_tarfile(tmpname) 2131 try: 2132 with support.temp_cwd(tarextdir): 2133 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2134 self.assertEqual(out, b'') 2135 finally: 2136 support.rmtree(tarextdir) 2137 2138 def test_extract_command_invalid_file(self): 2139 zipname = support.findfile('zipdir.zip') 2140 with support.temp_cwd(tarextdir): 2141 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2142 self.assertIn(b' is not a tar archive.', err) 2143 self.assertEqual(out, b'') 2144 self.assertEqual(rc, 1) 2145 2146 2147class ContextManagerTest(unittest.TestCase): 2148 2149 def test_basic(self): 2150 with tarfile.open(tarname) as tar: 2151 self.assertFalse(tar.closed, "closed inside runtime context") 2152 self.assertTrue(tar.closed, "context manager failed") 2153 2154 def test_closed(self): 2155 # The __enter__() method is supposed to raise OSError 2156 # if the TarFile object is already closed. 2157 tar = tarfile.open(tarname) 2158 tar.close() 2159 with self.assertRaises(OSError): 2160 with tar: 2161 pass 2162 2163 def test_exception(self): 2164 # Test if the OSError exception is passed through properly. 2165 with self.assertRaises(Exception) as exc: 2166 with tarfile.open(tarname) as tar: 2167 raise OSError 2168 self.assertIsInstance(exc.exception, OSError, 2169 "wrong exception raised in context manager") 2170 self.assertTrue(tar.closed, "context manager failed") 2171 2172 def test_no_eof(self): 2173 # __exit__() must not write end-of-archive blocks if an 2174 # exception was raised. 2175 try: 2176 with tarfile.open(tmpname, "w") as tar: 2177 raise Exception 2178 except: 2179 pass 2180 self.assertEqual(os.path.getsize(tmpname), 0, 2181 "context manager wrote an end-of-archive block") 2182 self.assertTrue(tar.closed, "context manager failed") 2183 2184 def test_eof(self): 2185 # __exit__() must write end-of-archive blocks, i.e. call 2186 # TarFile.close() if there was no error. 2187 with tarfile.open(tmpname, "w"): 2188 pass 2189 self.assertNotEqual(os.path.getsize(tmpname), 0, 2190 "context manager wrote no end-of-archive block") 2191 2192 def test_fileobj(self): 2193 # Test that __exit__() did not close the external file 2194 # object. 2195 with open(tmpname, "wb") as fobj: 2196 try: 2197 with tarfile.open(fileobj=fobj, mode="w") as tar: 2198 raise Exception 2199 except: 2200 pass 2201 self.assertFalse(fobj.closed, "external file object was closed") 2202 self.assertTrue(tar.closed, "context manager failed") 2203 2204 2205@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2206class LinkEmulationTest(ReadTest, unittest.TestCase): 2207 2208 # Test for issue #8741 regression. On platforms that do not support 2209 # symbolic or hard links tarfile tries to extract these types of members 2210 # as the regular files they point to. 2211 def _test_link_extraction(self, name): 2212 self.tar.extract(name, TEMPDIR) 2213 with open(os.path.join(TEMPDIR, name), "rb") as f: 2214 data = f.read() 2215 self.assertEqual(md5sum(data), md5_regtype) 2216 2217 # See issues #1578269, #8879, and #17689 for some history on these skips 2218 @unittest.skipIf(hasattr(os.path, "islink"), 2219 "Skip emulation - has os.path.islink but not os.link") 2220 def test_hardlink_extraction1(self): 2221 self._test_link_extraction("ustar/lnktype") 2222 2223 @unittest.skipIf(hasattr(os.path, "islink"), 2224 "Skip emulation - has os.path.islink but not os.link") 2225 def test_hardlink_extraction2(self): 2226 self._test_link_extraction("./ustar/linktest2/lnktype") 2227 2228 @unittest.skipIf(hasattr(os, "symlink"), 2229 "Skip emulation if symlink exists") 2230 def test_symlink_extraction1(self): 2231 self._test_link_extraction("ustar/symtype") 2232 2233 @unittest.skipIf(hasattr(os, "symlink"), 2234 "Skip emulation if symlink exists") 2235 def test_symlink_extraction2(self): 2236 self._test_link_extraction("./ustar/linktest2/symtype") 2237 2238 2239class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2240 # Issue5068: The _BZ2Proxy.read() method loops forever 2241 # on an empty or partial bzipped file. 2242 2243 def _test_partial_input(self, mode): 2244 class MyBytesIO(io.BytesIO): 2245 hit_eof = False 2246 def read(self, n): 2247 if self.hit_eof: 2248 raise AssertionError("infinite loop detected in " 2249 "tarfile.open()") 2250 self.hit_eof = self.tell() == len(self.getvalue()) 2251 return super(MyBytesIO, self).read(n) 2252 def seek(self, *args): 2253 self.hit_eof = False 2254 return super(MyBytesIO, self).seek(*args) 2255 2256 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2257 for x in range(len(data) + 1): 2258 try: 2259 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2260 except tarfile.ReadError: 2261 pass # we have no interest in ReadErrors 2262 2263 def test_partial_input(self): 2264 self._test_partial_input("r") 2265 2266 def test_partial_input_bz2(self): 2267 self._test_partial_input("r:bz2") 2268 2269 2270def root_is_uid_gid_0(): 2271 try: 2272 import pwd, grp 2273 except ImportError: 2274 return False 2275 if pwd.getpwuid(0)[0] != 'root': 2276 return False 2277 if grp.getgrgid(0)[0] != 'root': 2278 return False 2279 return True 2280 2281 2282@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2283@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2284class NumericOwnerTest(unittest.TestCase): 2285 # mock the following: 2286 # os.chown: so we can test what's being called 2287 # os.chmod: so the modes are not actually changed. if they are, we can't 2288 # delete the files/directories 2289 # os.geteuid: so we can lie and say we're root (uid = 0) 2290 2291 @staticmethod 2292 def _make_test_archive(filename_1, dirname_1, filename_2): 2293 # the file contents to write 2294 fobj = io.BytesIO(b"content") 2295 2296 # create a tar file with a file, a directory, and a file within that 2297 # directory. Assign various .uid/.gid values to them 2298 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2299 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2300 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2301 ] 2302 with tarfile.open(tmpname, 'w') as tarfl: 2303 for name, uid, gid, typ, contents in items: 2304 t = tarfile.TarInfo(name) 2305 t.uid = uid 2306 t.gid = gid 2307 t.uname = 'root' 2308 t.gname = 'root' 2309 t.type = typ 2310 tarfl.addfile(t, contents) 2311 2312 # return the full pathname to the tar file 2313 return tmpname 2314 2315 @staticmethod 2316 @contextmanager 2317 def _setup_test(mock_geteuid): 2318 mock_geteuid.return_value = 0 # lie and say we're root 2319 fname = 'numeric-owner-testfile' 2320 dirname = 'dir' 2321 2322 # the names we want stored in the tarfile 2323 filename_1 = fname 2324 dirname_1 = dirname 2325 filename_2 = os.path.join(dirname, fname) 2326 2327 # create the tarfile with the contents we're after 2328 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2329 dirname_1, 2330 filename_2) 2331 2332 # open the tarfile for reading. yield it and the names of the items 2333 # we stored into the file 2334 with tarfile.open(tar_filename) as tarfl: 2335 yield tarfl, filename_1, dirname_1, filename_2 2336 2337 @unittest.mock.patch('os.chown') 2338 @unittest.mock.patch('os.chmod') 2339 @unittest.mock.patch('os.geteuid') 2340 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2341 mock_chown): 2342 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2343 filename_2): 2344 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2345 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2346 2347 # convert to filesystem paths 2348 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2349 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2350 2351 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2352 unittest.mock.call(f_filename_2, 88, 87), 2353 ], 2354 any_order=True) 2355 2356 @unittest.mock.patch('os.chown') 2357 @unittest.mock.patch('os.chmod') 2358 @unittest.mock.patch('os.geteuid') 2359 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2360 mock_chown): 2361 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2362 filename_2): 2363 tarfl.extractall(TEMPDIR, numeric_owner=True) 2364 2365 # convert to filesystem paths 2366 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2367 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2368 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2369 2370 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2371 unittest.mock.call(f_dirname_1, 77, 76), 2372 unittest.mock.call(f_filename_2, 88, 87), 2373 ], 2374 any_order=True) 2375 2376 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2377 # because the uname and gname in the test file are 'root', and extract() 2378 # will look them up using pwd and grp to find their uid and gid, which we 2379 # test here to be 0. 2380 @unittest.skipUnless(root_is_uid_gid_0(), 2381 'uid=0,gid=0 must be named "root"') 2382 @unittest.mock.patch('os.chown') 2383 @unittest.mock.patch('os.chmod') 2384 @unittest.mock.patch('os.geteuid') 2385 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2386 mock_chown): 2387 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2388 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2389 2390 # convert to filesystem paths 2391 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2392 2393 mock_chown.assert_called_with(f_filename_1, 0, 0) 2394 2395 @unittest.mock.patch('os.geteuid') 2396 def test_keyword_only(self, mock_geteuid): 2397 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2398 self.assertRaises(TypeError, 2399 tarfl.extract, filename_1, TEMPDIR, False, True) 2400 2401 2402def setUpModule(): 2403 support.unlink(TEMPDIR) 2404 os.makedirs(TEMPDIR) 2405 2406 global testtarnames 2407 testtarnames = [tarname] 2408 with open(tarname, "rb") as fobj: 2409 data = fobj.read() 2410 2411 # Create compressed tarfiles. 2412 for c in GzipTest, Bz2Test, LzmaTest: 2413 if c.open: 2414 support.unlink(c.tarname) 2415 testtarnames.append(c.tarname) 2416 with c.open(c.tarname, "wb") as tar: 2417 tar.write(data) 2418 2419def tearDownModule(): 2420 if os.path.exists(TEMPDIR): 2421 support.rmtree(TEMPDIR) 2422 2423if __name__ == "__main__": 2424 unittest.main() 2425