test_tarfile.py revision 0662f8a5eaa19f7eed6978eed1af32563359fb93
1import sys 2import os 3import shutil 4import tempfile 5 6import unittest 7import tarfile 8 9from test import test_support 10 11# Check for our compression modules. 12try: 13 import gzip 14 gzip.GzipFile 15except (ImportError, AttributeError): 16 gzip = None 17try: 18 import bz2 19except ImportError: 20 bz2 = None 21 22def path(path): 23 return test_support.findfile(path) 24 25testtar = path("testtar.tar") 26tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir") 27tempname = test_support.TESTFN 28membercount = 10 29 30def tarname(comp=""): 31 if not comp: 32 return testtar 33 return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp)) 34 35def dirname(): 36 if not os.path.exists(tempdir): 37 os.mkdir(tempdir) 38 return tempdir 39 40def tmpname(): 41 return tempname 42 43 44class BaseTest(unittest.TestCase): 45 comp = '' 46 mode = 'r' 47 sep = ':' 48 49 def setUp(self): 50 mode = self.mode + self.sep + self.comp 51 self.tar = tarfile.open(tarname(self.comp), mode) 52 53 def tearDown(self): 54 self.tar.close() 55 56class ReadTest(BaseTest): 57 58 def test(self): 59 """Test member extraction. 60 """ 61 members = 0 62 for tarinfo in self.tar: 63 members += 1 64 if not tarinfo.isreg(): 65 continue 66 f = self.tar.extractfile(tarinfo) 67 self.assert_(len(f.read()) == tarinfo.size, 68 "size read does not match expected size") 69 f.close() 70 71 self.assert_(members == membercount, 72 "could not find all members") 73 74 def test_sparse(self): 75 """Test sparse member extraction. 76 """ 77 if self.sep != "|": 78 f1 = self.tar.extractfile("S-SPARSE") 79 f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS") 80 self.assert_(f1.read() == f2.read(), 81 "_FileObject failed on sparse file member") 82 83 def test_readlines(self): 84 """Test readlines() method of _FileObject. 85 """ 86 if self.sep != "|": 87 filename = "0-REGTYPE-TEXT" 88 self.tar.extract(filename, dirname()) 89 lines1 = file(os.path.join(dirname(), filename), "rU").readlines() 90 lines2 = self.tar.extractfile(filename).readlines() 91 self.assert_(lines1 == lines2, 92 "_FileObject.readline() does not work correctly") 93 94 def test_seek(self): 95 """Test seek() method of _FileObject, incl. random reading. 96 """ 97 if self.sep != "|": 98 filename = "0-REGTYPE" 99 self.tar.extract(filename, dirname()) 100 data = file(os.path.join(dirname(), filename), "rb").read() 101 102 tarinfo = self.tar.getmember(filename) 103 fobj = self.tar.extractfile(tarinfo) 104 105 text = fobj.read() 106 fobj.seek(0) 107 self.assert_(0 == fobj.tell(), 108 "seek() to file's start failed") 109 fobj.seek(2048, 0) 110 self.assert_(2048 == fobj.tell(), 111 "seek() to absolute position failed") 112 fobj.seek(-1024, 1) 113 self.assert_(1024 == fobj.tell(), 114 "seek() to negative relative position failed") 115 fobj.seek(1024, 1) 116 self.assert_(2048 == fobj.tell(), 117 "seek() to positive relative position failed") 118 s = fobj.read(10) 119 self.assert_(s == data[2048:2058], 120 "read() after seek failed") 121 fobj.seek(0, 2) 122 self.assert_(tarinfo.size == fobj.tell(), 123 "seek() to file's end failed") 124 self.assert_(fobj.read() == "", 125 "read() at file's end did not return empty string") 126 fobj.seek(-tarinfo.size, 2) 127 self.assert_(0 == fobj.tell(), 128 "relative seek() to file's start failed") 129 fobj.seek(512) 130 s1 = fobj.readlines() 131 fobj.seek(512) 132 s2 = fobj.readlines() 133 self.assert_(s1 == s2, 134 "readlines() after seek failed") 135 fobj.close() 136 137class ReadStreamTest(ReadTest): 138 sep = "|" 139 140 def test(self): 141 """Test member extraction, and for StreamError when 142 seeking backwards. 143 """ 144 ReadTest.test(self) 145 tarinfo = self.tar.getmembers()[0] 146 f = self.tar.extractfile(tarinfo) 147 self.assertRaises(tarfile.StreamError, f.read) 148 149 def test_stream(self): 150 """Compare the normal tar and the stream tar. 151 """ 152 stream = self.tar 153 tar = tarfile.open(tarname(), 'r') 154 155 while 1: 156 t1 = tar.next() 157 t2 = stream.next() 158 if t1 is None: 159 break 160 self.assert_(t2 is not None, "stream.next() failed.") 161 162 if t2.islnk() or t2.issym(): 163 self.assertRaises(tarfile.StreamError, stream.extractfile, t2) 164 continue 165 v1 = tar.extractfile(t1) 166 v2 = stream.extractfile(t2) 167 if v1 is None: 168 continue 169 self.assert_(v2 is not None, "stream.extractfile() failed") 170 self.assert_(v1.read() == v2.read(), "stream extraction failed") 171 172 stream.close() 173 174class WriteTest(BaseTest): 175 mode = 'w' 176 177 def setUp(self): 178 mode = self.mode + self.sep + self.comp 179 self.src = tarfile.open(tarname(self.comp), 'r') 180 self.dst = tarfile.open(tmpname(), mode) 181 182 def tearDown(self): 183 self.src.close() 184 self.dst.close() 185 186 def test_posix(self): 187 self.dst.posix = 1 188 self._test() 189 190 def test_nonposix(self): 191 self.dst.posix = 0 192 self._test() 193 194 def _test(self): 195 for tarinfo in self.src: 196 if not tarinfo.isreg(): 197 continue 198 f = self.src.extractfile(tarinfo) 199 if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME: 200 self.assertRaises(ValueError, self.dst.addfile, 201 tarinfo, f) 202 else: 203 self.dst.addfile(tarinfo, f) 204 205class WriteStreamTest(WriteTest): 206 sep = '|' 207 208class WriteGNULongTest(unittest.TestCase): 209 """This testcase checks for correct creation of GNU Longname 210 and Longlink extensions. 211 212 It creates a tarfile and adds empty members with either 213 long names, long linknames or both and compares the size 214 of the tarfile with the expected size. 215 216 It checks for SF bug #812325 in TarFile._create_gnulong(). 217 218 While I was writing this testcase, I noticed a second bug 219 in the same method: 220 Long{names,links} weren't null-terminated which lead to 221 bad tarfiles when their length was a multiple of 512. This 222 is tested as well. 223 """ 224 225 def setUp(self): 226 self.tar = tarfile.open(tmpname(), "w") 227 self.tar.posix = False 228 229 def tearDown(self): 230 self.tar.close() 231 232 def _length(self, s): 233 blocks, remainder = divmod(len(s) + 1, 512) 234 if remainder: 235 blocks += 1 236 return blocks * 512 237 238 def _calc_size(self, name, link=None): 239 # initial tar header 240 count = 512 241 242 if len(name) > tarfile.LENGTH_NAME: 243 # gnu longname extended header + longname 244 count += 512 245 count += self._length(name) 246 247 if link is not None and len(link) > tarfile.LENGTH_LINK: 248 # gnu longlink extended header + longlink 249 count += 512 250 count += self._length(link) 251 252 return count 253 254 def _test(self, name, link=None): 255 tarinfo = tarfile.TarInfo(name) 256 if link: 257 tarinfo.linkname = link 258 tarinfo.type = tarfile.LNKTYPE 259 260 self.tar.addfile(tarinfo) 261 262 v1 = self._calc_size(name, link) 263 v2 = self.tar.offset 264 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 265 266 def test_longname_1023(self): 267 self._test(("longnam/" * 127) + "longnam") 268 269 def test_longname_1024(self): 270 self._test(("longnam/" * 127) + "longname") 271 272 def test_longname_1025(self): 273 self._test(("longnam/" * 127) + "longname_") 274 275 def test_longlink_1023(self): 276 self._test("name", ("longlnk/" * 127) + "longlnk") 277 278 def test_longlink_1024(self): 279 self._test("name", ("longlnk/" * 127) + "longlink") 280 281 def test_longlink_1025(self): 282 self._test("name", ("longlnk/" * 127) + "longlink_") 283 284 def test_longnamelink_1023(self): 285 self._test(("longnam/" * 127) + "longnam", 286 ("longlnk/" * 127) + "longlnk") 287 288 def test_longnamelink_1024(self): 289 self._test(("longnam/" * 127) + "longname", 290 ("longlnk/" * 127) + "longlink") 291 292 def test_longnamelink_1025(self): 293 self._test(("longnam/" * 127) + "longname_", 294 ("longlnk/" * 127) + "longlink_") 295 296# Gzip TestCases 297class ReadTestGzip(ReadTest): 298 comp = "gz" 299class ReadStreamTestGzip(ReadStreamTest): 300 comp = "gz" 301class WriteTestGzip(WriteTest): 302 comp = "gz" 303class WriteStreamTestGzip(WriteStreamTest): 304 comp = "gz" 305 306if bz2: 307 # Bzip2 TestCases 308 class ReadTestBzip2(ReadTestGzip): 309 comp = "bz2" 310 class ReadStreamTestBzip2(ReadStreamTestGzip): 311 comp = "bz2" 312 class WriteTestBzip2(WriteTest): 313 comp = "bz2" 314 class WriteStreamTestBzip2(WriteStreamTestGzip): 315 comp = "bz2" 316 317# If importing gzip failed, discard the Gzip TestCases. 318if not gzip: 319 del ReadTestGzip 320 del ReadStreamTestGzip 321 del WriteTestGzip 322 del WriteStreamTestGzip 323 324def test_main(): 325 if gzip: 326 # create testtar.tar.gz 327 gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read()) 328 if bz2: 329 # create testtar.tar.bz2 330 bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read()) 331 332 tests = [ 333 ReadTest, 334 ReadStreamTest, 335 WriteTest, 336 WriteStreamTest, 337 WriteGNULongTest, 338 ] 339 340 if gzip: 341 tests.extend([ 342 ReadTestGzip, ReadStreamTestGzip, 343 WriteTestGzip, WriteStreamTestGzip 344 ]) 345 346 if bz2: 347 tests.extend([ 348 ReadTestBzip2, ReadStreamTestBzip2, 349 WriteTestBzip2, WriteStreamTestBzip2 350 ]) 351 try: 352 test_support.run_unittest(*tests) 353 finally: 354 if gzip: 355 os.remove(tarname("gz")) 356 if bz2: 357 os.remove(tarname("bz2")) 358 if os.path.exists(dirname()): 359 shutil.rmtree(dirname()) 360 if os.path.exists(tmpname()): 361 os.remove(tmpname()) 362 363if __name__ == "__main__": 364 test_main() 365