test_tarfile.py revision 477c8d5e70240744d24631b18341ad892c8a8e1c
1import sys 2import os 3import shutil 4import tempfile 5import StringIO 6 7import unittest 8import tarfile 9 10from test import test_support 11 12# Check for our compression modules. 13try: 14 import gzip 15 gzip.GzipFile 16except (ImportError, AttributeError): 17 gzip = None 18try: 19 import bz2 20except ImportError: 21 bz2 = None 22 23def path(path): 24 return test_support.findfile(path) 25 26testtar = path("testtar.tar") 27tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir") 28tempname = test_support.TESTFN 29membercount = 12 30 31def tarname(comp=""): 32 if not comp: 33 return testtar 34 return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp)) 35 36def dirname(): 37 if not os.path.exists(tempdir): 38 os.mkdir(tempdir) 39 return tempdir 40 41def tmpname(): 42 return tempname 43 44 45class BaseTest(unittest.TestCase): 46 comp = '' 47 mode = 'r' 48 sep = ':' 49 50 def setUp(self): 51 mode = self.mode + self.sep + self.comp 52 self.tar = tarfile.open(tarname(self.comp), mode) 53 54 def tearDown(self): 55 self.tar.close() 56 57class ReadTest(BaseTest): 58 59 def test(self): 60 """Test member extraction. 61 """ 62 members = 0 63 for tarinfo in self.tar: 64 members += 1 65 if not tarinfo.isreg(): 66 continue 67 f = self.tar.extractfile(tarinfo) 68 self.assert_(len(f.read()) == tarinfo.size, 69 "size read does not match expected size") 70 f.close() 71 72 self.assert_(members == membercount, 73 "could not find all members") 74 75 def test_sparse(self): 76 """Test sparse member extraction. 77 """ 78 if self.sep != "|": 79 f1 = self.tar.extractfile("S-SPARSE") 80 f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS") 81 self.assert_(f1.read() == f2.read(), 82 "_FileObject failed on sparse file member") 83 84 def test_readlines(self): 85 """Test readlines() method of _FileObject. 86 """ 87 if self.sep != "|": 88 filename = "0-REGTYPE-TEXT" 89 self.tar.extract(filename, dirname()) 90 f = open(os.path.join(dirname(), filename), "rU") 91 lines1 = f.readlines() 92 f.close() 93 lines2 = self.tar.extractfile(filename).readlines() 94 self.assert_(lines1 == lines2, 95 "_FileObject.readline() does not work correctly") 96 97 def test_iter(self): 98 # Test iteration over ExFileObject. 99 if self.sep != "|": 100 filename = "0-REGTYPE-TEXT" 101 self.tar.extract(filename, dirname()) 102 f = open(os.path.join(dirname(), filename), "rU") 103 lines1 = f.readlines() 104 f.close() 105 lines2 = [line for line in self.tar.extractfile(filename)] 106 self.assert_(lines1 == lines2, 107 "ExFileObject iteration does not work correctly") 108 109 def test_seek(self): 110 """Test seek() method of _FileObject, incl. random reading. 111 """ 112 if self.sep != "|": 113 filename = "0-REGTYPE" 114 self.tar.extract(filename, dirname()) 115 f = open(os.path.join(dirname(), filename), "rb") 116 data = f.read() 117 f.close() 118 119 tarinfo = self.tar.getmember(filename) 120 fobj = self.tar.extractfile(tarinfo) 121 122 text = fobj.read() 123 fobj.seek(0) 124 self.assert_(0 == fobj.tell(), 125 "seek() to file's start failed") 126 fobj.seek(2048, 0) 127 self.assert_(2048 == fobj.tell(), 128 "seek() to absolute position failed") 129 fobj.seek(-1024, 1) 130 self.assert_(1024 == fobj.tell(), 131 "seek() to negative relative position failed") 132 fobj.seek(1024, 1) 133 self.assert_(2048 == fobj.tell(), 134 "seek() to positive relative position failed") 135 s = fobj.read(10) 136 self.assert_(s == data[2048:2058], 137 "read() after seek failed") 138 fobj.seek(0, 2) 139 self.assert_(tarinfo.size == fobj.tell(), 140 "seek() to file's end failed") 141 self.assert_(fobj.read() == "", 142 "read() at file's end did not return empty string") 143 fobj.seek(-tarinfo.size, 2) 144 self.assert_(0 == fobj.tell(), 145 "relative seek() to file's start failed") 146 fobj.seek(512) 147 s1 = fobj.readlines() 148 fobj.seek(512) 149 s2 = fobj.readlines() 150 self.assert_(s1 == s2, 151 "readlines() after seek failed") 152 fobj.close() 153 154 def test_old_dirtype(self): 155 """Test old style dirtype member (bug #1336623). 156 """ 157 # Old tars create directory members using a REGTYPE 158 # header with a "/" appended to the filename field. 159 160 # Create an old tar style directory entry. 161 filename = tmpname() 162 tarinfo = tarfile.TarInfo("directory/") 163 tarinfo.type = tarfile.REGTYPE 164 165 fobj = open(filename, "w") 166 fobj.write(tarinfo.tobuf()) 167 fobj.close() 168 169 try: 170 # Test if it is still a directory entry when 171 # read back. 172 tar = tarfile.open(filename) 173 tarinfo = tar.getmembers()[0] 174 tar.close() 175 176 self.assert_(tarinfo.type == tarfile.DIRTYPE) 177 self.assert_(tarinfo.name.endswith("/")) 178 finally: 179 try: 180 os.unlink(filename) 181 except: 182 pass 183 184class ReadStreamTest(ReadTest): 185 sep = "|" 186 187 def test(self): 188 """Test member extraction, and for StreamError when 189 seeking backwards. 190 """ 191 ReadTest.test(self) 192 tarinfo = self.tar.getmembers()[0] 193 f = self.tar.extractfile(tarinfo) 194 self.assertRaises(tarfile.StreamError, f.read) 195 196 def test_stream(self): 197 """Compare the normal tar and the stream tar. 198 """ 199 stream = self.tar 200 tar = tarfile.open(tarname(), 'r') 201 202 while 1: 203 t1 = tar.next() 204 t2 = stream.next() 205 if t1 is None: 206 break 207 self.assert_(t2 is not None, "stream.next() failed.") 208 209 if t2.islnk() or t2.issym(): 210 self.assertRaises(tarfile.StreamError, stream.extractfile, t2) 211 continue 212 v1 = tar.extractfile(t1) 213 v2 = stream.extractfile(t2) 214 if v1 is None: 215 continue 216 self.assert_(v2 is not None, "stream.extractfile() failed") 217 self.assert_(v1.read() == v2.read(), "stream extraction failed") 218 219 tar.close() 220 stream.close() 221 222class ReadDetectTest(ReadTest): 223 224 def setUp(self): 225 self.tar = tarfile.open(tarname(self.comp), self.mode) 226 227class ReadDetectFileobjTest(ReadTest): 228 229 def setUp(self): 230 name = tarname(self.comp) 231 self.tar = tarfile.open(name, mode=self.mode, 232 fileobj=open(name, "rb")) 233 234class ReadAsteriskTest(ReadTest): 235 236 def setUp(self): 237 mode = self.mode + self.sep + "*" 238 self.tar = tarfile.open(tarname(self.comp), mode) 239 240class ReadStreamAsteriskTest(ReadStreamTest): 241 242 def setUp(self): 243 mode = self.mode + self.sep + "*" 244 self.tar = tarfile.open(tarname(self.comp), mode) 245 246class WriteTest(BaseTest): 247 mode = 'w' 248 249 def setUp(self): 250 mode = self.mode + self.sep + self.comp 251 self.src = tarfile.open(tarname(self.comp), 'r') 252 self.dstname = tmpname() 253 self.dst = tarfile.open(self.dstname, mode) 254 255 def tearDown(self): 256 self.src.close() 257 self.dst.close() 258 259 def test_posix(self): 260 self.dst.posix = 1 261 self._test() 262 263 def test_nonposix(self): 264 self.dst.posix = 0 265 self._test() 266 267 def test_small(self): 268 self.dst.add(os.path.join(os.path.dirname(__file__),"cfgparser.1")) 269 self.dst.close() 270 self.assertNotEqual(os.stat(self.dstname).st_size, 0) 271 272 def _test(self): 273 for tarinfo in self.src: 274 if not tarinfo.isreg(): 275 continue 276 f = self.src.extractfile(tarinfo) 277 if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME and "/" not in tarinfo.name: 278 self.assertRaises(ValueError, self.dst.addfile, 279 tarinfo, f) 280 else: 281 self.dst.addfile(tarinfo, f) 282 283class WriteSize0Test(BaseTest): 284 mode = 'w' 285 286 def setUp(self): 287 self.tmpdir = dirname() 288 self.dstname = tmpname() 289 self.dst = tarfile.open(self.dstname, "w") 290 291 def tearDown(self): 292 self.dst.close() 293 294 def test_file(self): 295 path = os.path.join(self.tmpdir, "file") 296 f = open(path, "w") 297 f.close() 298 tarinfo = self.dst.gettarinfo(path) 299 self.assertEqual(tarinfo.size, 0) 300 f = open(path, "w") 301 f.write("aaa") 302 f.close() 303 tarinfo = self.dst.gettarinfo(path) 304 self.assertEqual(tarinfo.size, 3) 305 306 def test_directory(self): 307 path = os.path.join(self.tmpdir, "directory") 308 if os.path.exists(path): 309 # This shouldn't be necessary, but is <wink> if a previous 310 # run was killed in mid-stream. 311 shutil.rmtree(path) 312 os.mkdir(path) 313 tarinfo = self.dst.gettarinfo(path) 314 self.assertEqual(tarinfo.size, 0) 315 316 def test_symlink(self): 317 if hasattr(os, "symlink"): 318 path = os.path.join(self.tmpdir, "symlink") 319 os.symlink("link_target", path) 320 tarinfo = self.dst.gettarinfo(path) 321 self.assertEqual(tarinfo.size, 0) 322 323 324class WriteStreamTest(WriteTest): 325 sep = '|' 326 327class WriteGNULongTest(unittest.TestCase): 328 """This testcase checks for correct creation of GNU Longname 329 and Longlink extensions. 330 331 It creates a tarfile and adds empty members with either 332 long names, long linknames or both and compares the size 333 of the tarfile with the expected size. 334 335 It checks for SF bug #812325 in TarFile._create_gnulong(). 336 337 While I was writing this testcase, I noticed a second bug 338 in the same method: 339 Long{names,links} weren't null-terminated which lead to 340 bad tarfiles when their length was a multiple of 512. This 341 is tested as well. 342 """ 343 344 def setUp(self): 345 self.tar = tarfile.open(tmpname(), "w") 346 self.tar.posix = False 347 348 def tearDown(self): 349 self.tar.close() 350 351 def _length(self, s): 352 blocks, remainder = divmod(len(s) + 1, 512) 353 if remainder: 354 blocks += 1 355 return blocks * 512 356 357 def _calc_size(self, name, link=None): 358 # initial tar header 359 count = 512 360 361 if len(name) > tarfile.LENGTH_NAME: 362 # gnu longname extended header + longname 363 count += 512 364 count += self._length(name) 365 366 if link is not None and len(link) > tarfile.LENGTH_LINK: 367 # gnu longlink extended header + longlink 368 count += 512 369 count += self._length(link) 370 371 return count 372 373 def _test(self, name, link=None): 374 tarinfo = tarfile.TarInfo(name) 375 if link: 376 tarinfo.linkname = link 377 tarinfo.type = tarfile.LNKTYPE 378 379 self.tar.addfile(tarinfo) 380 381 v1 = self._calc_size(name, link) 382 v2 = self.tar.offset 383 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 384 385 def test_longname_1023(self): 386 self._test(("longnam/" * 127) + "longnam") 387 388 def test_longname_1024(self): 389 self._test(("longnam/" * 127) + "longname") 390 391 def test_longname_1025(self): 392 self._test(("longnam/" * 127) + "longname_") 393 394 def test_longlink_1023(self): 395 self._test("name", ("longlnk/" * 127) + "longlnk") 396 397 def test_longlink_1024(self): 398 self._test("name", ("longlnk/" * 127) + "longlink") 399 400 def test_longlink_1025(self): 401 self._test("name", ("longlnk/" * 127) + "longlink_") 402 403 def test_longnamelink_1023(self): 404 self._test(("longnam/" * 127) + "longnam", 405 ("longlnk/" * 127) + "longlnk") 406 407 def test_longnamelink_1024(self): 408 self._test(("longnam/" * 127) + "longname", 409 ("longlnk/" * 127) + "longlink") 410 411 def test_longnamelink_1025(self): 412 self._test(("longnam/" * 127) + "longname_", 413 ("longlnk/" * 127) + "longlink_") 414 415class ReadGNULongTest(unittest.TestCase): 416 417 def setUp(self): 418 self.tar = tarfile.open(tarname()) 419 420 def tearDown(self): 421 self.tar.close() 422 423 def test_1471427(self): 424 """Test reading of longname (bug #1471427). 425 """ 426 name = "test/" * 20 + "0-REGTYPE" 427 try: 428 tarinfo = self.tar.getmember(name) 429 except KeyError: 430 tarinfo = None 431 self.assert_(tarinfo is not None, "longname not found") 432 self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 433 434 def test_read_name(self): 435 name = ("0-LONGNAME-" * 10)[:101] 436 try: 437 tarinfo = self.tar.getmember(name) 438 except KeyError: 439 tarinfo = None 440 self.assert_(tarinfo is not None, "longname not found") 441 442 def test_read_link(self): 443 link = ("1-LONGLINK-" * 10)[:101] 444 name = ("0-LONGNAME-" * 10)[:101] 445 try: 446 tarinfo = self.tar.getmember(link) 447 except KeyError: 448 tarinfo = None 449 self.assert_(tarinfo is not None, "longlink not found") 450 self.assert_(tarinfo.linkname == name, "linkname wrong") 451 452 def test_truncated_longname(self): 453 f = open(tarname()) 454 fobj = StringIO.StringIO(f.read(1024)) 455 f.close() 456 tar = tarfile.open(name="foo.tar", fileobj=fobj) 457 self.assert_(len(tar.getmembers()) == 0, "") 458 tar.close() 459 460 461class ExtractHardlinkTest(BaseTest): 462 463 def test_hardlink(self): 464 """Test hardlink extraction (bug #857297) 465 """ 466 # Prevent errors from being caught 467 self.tar.errorlevel = 1 468 469 self.tar.extract("0-REGTYPE", dirname()) 470 try: 471 # Extract 1-LNKTYPE which is a hardlink to 0-REGTYPE 472 self.tar.extract("1-LNKTYPE", dirname()) 473 except EnvironmentError, e: 474 import errno 475 if e.errno == errno.ENOENT: 476 self.fail("hardlink not extracted properly") 477 478class CreateHardlinkTest(BaseTest): 479 """Test the creation of LNKTYPE (hardlink) members in an archive. 480 In this respect tarfile.py mimics the behaviour of GNU tar: If 481 a file has a st_nlink > 1, it will be added a REGTYPE member 482 only the first time. 483 """ 484 485 def setUp(self): 486 self.tar = tarfile.open(tmpname(), "w") 487 488 self.foo = os.path.join(dirname(), "foo") 489 self.bar = os.path.join(dirname(), "bar") 490 491 if os.path.exists(self.foo): 492 os.remove(self.foo) 493 if os.path.exists(self.bar): 494 os.remove(self.bar) 495 496 f = open(self.foo, "w") 497 f.write("foo") 498 f.close() 499 self.tar.add(self.foo) 500 501 def test_add_twice(self): 502 # If st_nlink == 1 then the same file will be added as 503 # REGTYPE every time. 504 tarinfo = self.tar.gettarinfo(self.foo) 505 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 506 "add file as regular failed") 507 508 def test_add_hardlink(self): 509 # If st_nlink > 1 then the same file will be added as 510 # LNKTYPE. 511 os.link(self.foo, self.bar) 512 tarinfo = self.tar.gettarinfo(self.foo) 513 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 514 "add file as hardlink failed") 515 516 tarinfo = self.tar.gettarinfo(self.bar) 517 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 518 "add file as hardlink failed") 519 520 def test_dereference_hardlink(self): 521 self.tar.dereference = True 522 os.link(self.foo, self.bar) 523 tarinfo = self.tar.gettarinfo(self.bar) 524 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 525 "dereferencing hardlink failed") 526 527 528# Gzip TestCases 529class ReadTestGzip(ReadTest): 530 comp = "gz" 531class ReadStreamTestGzip(ReadStreamTest): 532 comp = "gz" 533class WriteTestGzip(WriteTest): 534 comp = "gz" 535class WriteStreamTestGzip(WriteStreamTest): 536 comp = "gz" 537class ReadDetectTestGzip(ReadDetectTest): 538 comp = "gz" 539class ReadDetectFileobjTestGzip(ReadDetectFileobjTest): 540 comp = "gz" 541class ReadAsteriskTestGzip(ReadAsteriskTest): 542 comp = "gz" 543class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest): 544 comp = "gz" 545 546# Filemode test cases 547 548class FileModeTest(unittest.TestCase): 549 def test_modes(self): 550 self.assertEqual(tarfile.filemode(0755), '-rwxr-xr-x') 551 self.assertEqual(tarfile.filemode(07111), '---s--s--t') 552 553 554if bz2: 555 # Bzip2 TestCases 556 class ReadTestBzip2(ReadTestGzip): 557 comp = "bz2" 558 class ReadStreamTestBzip2(ReadStreamTestGzip): 559 comp = "bz2" 560 class WriteTestBzip2(WriteTest): 561 comp = "bz2" 562 class WriteStreamTestBzip2(WriteStreamTestGzip): 563 comp = "bz2" 564 class ReadDetectTestBzip2(ReadDetectTest): 565 comp = "bz2" 566 class ReadDetectFileobjTestBzip2(ReadDetectFileobjTest): 567 comp = "bz2" 568 class ReadAsteriskTestBzip2(ReadAsteriskTest): 569 comp = "bz2" 570 class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest): 571 comp = "bz2" 572 573# If importing gzip failed, discard the Gzip TestCases. 574if not gzip: 575 del ReadTestGzip 576 del ReadStreamTestGzip 577 del WriteTestGzip 578 del WriteStreamTestGzip 579 580def test_main(): 581 # Create archive. 582 f = open(tarname(), "rb") 583 fguts = f.read() 584 f.close() 585 if gzip: 586 # create testtar.tar.gz 587 tar = gzip.open(tarname("gz"), "wb") 588 tar.write(fguts) 589 tar.close() 590 if bz2: 591 # create testtar.tar.bz2 592 tar = bz2.BZ2File(tarname("bz2"), "wb") 593 tar.write(fguts) 594 tar.close() 595 596 tests = [ 597 FileModeTest, 598 ReadTest, 599 ReadStreamTest, 600 ReadDetectTest, 601 ReadDetectFileobjTest, 602 ReadAsteriskTest, 603 ReadStreamAsteriskTest, 604 WriteTest, 605 WriteSize0Test, 606 WriteStreamTest, 607 WriteGNULongTest, 608 ReadGNULongTest, 609 ] 610 611 if hasattr(os, "link"): 612 tests.append(ExtractHardlinkTest) 613 tests.append(CreateHardlinkTest) 614 615 if gzip: 616 tests.extend([ 617 ReadTestGzip, ReadStreamTestGzip, 618 WriteTestGzip, WriteStreamTestGzip, 619 ReadDetectTestGzip, ReadDetectFileobjTestGzip, 620 ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip 621 ]) 622 623 if bz2: 624 tests.extend([ 625 ReadTestBzip2, ReadStreamTestBzip2, 626 WriteTestBzip2, WriteStreamTestBzip2, 627 ReadDetectTestBzip2, ReadDetectFileobjTestBzip2, 628 ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2 629 ]) 630 try: 631 test_support.run_unittest(*tests) 632 finally: 633 if gzip: 634 os.remove(tarname("gz")) 635 if bz2: 636 os.remove(tarname("bz2")) 637 if os.path.exists(dirname()): 638 shutil.rmtree(dirname()) 639 if os.path.exists(tmpname()): 640 os.remove(tmpname()) 641 642if __name__ == "__main__": 643 test_main() 644