test_tarfile.py revision f2715e076435b74638acb81512c2ee014f75aea2
1import sys 2import os 3import shutil 4 5import unittest 6import tarfile 7 8from test import test_support 9 10# Check for our compression modules. 11try: 12 import gzip 13except ImportError: 14 gzip = None 15try: 16 import bz2 17except ImportError: 18 bz2 = None 19 20def path(path): 21 return test_support.findfile(path) 22 23testtar = path("testtar.tar") 24print testtar 25tempdir = path("testtar.dir") 26tempname = path("testtar.tmp") 27membercount = 10 28 29def tarname(comp=""): 30 if not comp: 31 return testtar 32 return "%s.%s" % (testtar, comp) 33 34def dirname(): 35 if not os.path.exists(tempdir): 36 os.mkdir(tempdir) 37 return tempdir 38 39def tmpname(): 40 return tempname 41 42 43class BaseTest(unittest.TestCase): 44 comp = '' 45 mode = 'r' 46 sep = ':' 47 48 def setUp(self): 49 mode = self.mode + self.sep + self.comp 50 self.tar = tarfile.open(tarname(self.comp), mode) 51 52 def tearDown(self): 53 self.tar.close() 54 55class ReadTest(BaseTest): 56 57 def test(self): 58 """Test member extraction. 59 """ 60 members = 0 61 for tarinfo in self.tar: 62 members += 1 63 if not tarinfo.isreg(): 64 continue 65 f = self.tar.extractfile(tarinfo) 66 self.assert_(len(f.read()) == tarinfo.size, 67 "size read does not match expected size") 68 f.close() 69 70 self.assert_(members == membercount, 71 "could not find all members") 72 73 def test_sparse(self): 74 """Test sparse member extraction. 75 """ 76 if self.sep != "|": 77 f1 = self.tar.extractfile("S-SPARSE") 78 f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS") 79 self.assert_(f1.read() == f2.read(), 80 "_FileObject failed on sparse file member") 81 82 def test_readlines(self): 83 """Test readlines() method of _FileObject. 84 """ 85 if self.sep != "|": 86 filename = "0-REGTYPE-TEXT" 87 self.tar.extract(filename, dirname()) 88 lines1 = file(os.path.join(dirname(), filename), "r").readlines() 89 lines2 = self.tar.extractfile(filename).readlines() 90 self.assert_(lines1 == lines2, 91 "_FileObject.readline() does not work correctly") 92 93 def test_seek(self): 94 """Test seek() method of _FileObject, incl. random reading. 95 """ 96 if self.sep != "|": 97 filename = "0-REGTYPE" 98 self.tar.extract(filename, dirname()) 99 data = file(os.path.join(dirname(), filename), "rb").read() 100 101 tarinfo = self.tar.getmember(filename) 102 fobj = self.tar.extractfile(tarinfo) 103 104 text = fobj.read() 105 fobj.seek(0) 106 self.assert_(0 == fobj.tell(), 107 "seek() to file's start failed") 108 fobj.seek(2048, 0) 109 self.assert_(2048 == fobj.tell(), 110 "seek() to absolute position failed") 111 fobj.seek(-1024, 1) 112 self.assert_(1024 == fobj.tell(), 113 "seek() to negative relative position failed") 114 fobj.seek(1024, 1) 115 self.assert_(2048 == fobj.tell(), 116 "seek() to positive relative position failed") 117 s = fobj.read(10) 118 self.assert_(s == data[2048:2058], 119 "read() after seek failed") 120 fobj.seek(0, 2) 121 self.assert_(tarinfo.size == fobj.tell(), 122 "seek() to file's end failed") 123 self.assert_(fobj.read() == "", 124 "read() at file's end did not return empty string") 125 fobj.seek(-tarinfo.size, 2) 126 self.assert_(0 == fobj.tell(), 127 "relative seek() to file's start failed") 128 fobj.seek(512) 129 s1 = fobj.readlines() 130 fobj.seek(512) 131 s2 = fobj.readlines() 132 self.assert_(s1 == s2, 133 "readlines() after seek failed") 134 fobj.close() 135 136class ReadStreamTest(ReadTest): 137 sep = "|" 138 139 def test(self): 140 """Test member extraction, and for StreamError when 141 seeking backwards. 142 """ 143 ReadTest.test(self) 144 tarinfo = self.tar.getmembers()[0] 145 f = self.tar.extractfile(tarinfo) 146 self.assertRaises(tarfile.StreamError, f.read) 147 148 def test_stream(self): 149 """Compare the normal tar and the stream tar. 150 """ 151 stream = self.tar 152 tar = tarfile.open(tarname(), 'r') 153 154 while 1: 155 t1 = tar.next() 156 t2 = stream.next() 157 if t1 is None: 158 break 159 self.assert_(t2 is not None, "stream.next() failed.") 160 161 if t2.islnk() or t2.issym(): 162 self.assertRaises(tarfile.StreamError, stream.extractfile, t2) 163 continue 164 v1 = tar.extractfile(t1) 165 v2 = stream.extractfile(t2) 166 if v1 is None: 167 continue 168 self.assert_(v2 is not None, "stream.extractfile() failed") 169 self.assert_(v1.read() == v2.read(), "stream extraction failed") 170 171 stream.close() 172 173class WriteTest(BaseTest): 174 mode = 'w' 175 176 def setUp(self): 177 mode = self.mode + self.sep + self.comp 178 self.src = tarfile.open(tarname(self.comp), 'r') 179 self.dst = tarfile.open(tmpname(), mode) 180 181 def tearDown(self): 182 self.src.close() 183 self.dst.close() 184 185 def test_posix(self): 186 self.dst.posix = 1 187 self._test() 188 189 def test_nonposix(self): 190 self.dst.posix = 0 191 self._test() 192 193 def _test(self): 194 for tarinfo in self.src: 195 if not tarinfo.isreg(): 196 continue 197 f = self.src.extractfile(tarinfo) 198 if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME: 199 self.assertRaises(ValueError, self.dst.addfile, 200 tarinfo, f) 201 else: 202 self.dst.addfile(tarinfo, f) 203 204class WriteStreamTest(WriteTest): 205 sep = '|' 206 207# Gzip TestCases 208class ReadTestGzip(ReadTest): 209 comp = "gz" 210class ReadStreamTestGzip(ReadStreamTest): 211 comp = "gz" 212class WriteTestGzip(WriteTest): 213 comp = "gz" 214class WriteStreamTestGzip(WriteStreamTest): 215 comp = "gz" 216 217if bz2: 218 # Bzip2 TestCases 219 class ReadTestBzip2(ReadTestGzip): 220 comp = "bz2" 221 class ReadStreamTestBzip2(ReadStreamTestGzip): 222 comp = "bz2" 223 class WriteTestBzip2(WriteTest): 224 comp = "bz2" 225 class WriteStreamTestBzip2(WriteStreamTestGzip): 226 comp = "bz2" 227 228# If importing gzip failed, discard the Gzip TestCases. 229if not gzip: 230 del ReadTestGzip 231 del ReadStreamTestGzip 232 del WriteTestGzip 233 del WriteStreamTestGzip 234 235def test_main(): 236 if gzip: 237 # create testtar.tar.gz 238 gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read()) 239 if bz2: 240 # create testtar.tar.bz2 241 bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read()) 242 243 try: 244 suite = unittest.TestSuite() 245 246 suite.addTest(unittest.makeSuite(ReadTest)) 247 suite.addTest(unittest.makeSuite(ReadStreamTest)) 248 suite.addTest(unittest.makeSuite(WriteTest)) 249 suite.addTest(unittest.makeSuite(WriteStreamTest)) 250 251 if gzip: 252 suite.addTest(unittest.makeSuite(ReadTestGzip)) 253 suite.addTest(unittest.makeSuite(ReadStreamTestGzip)) 254 suite.addTest(unittest.makeSuite(WriteTestGzip)) 255 suite.addTest(unittest.makeSuite(WriteStreamTestGzip)) 256 257 if bz2: 258 suite.addTest(unittest.makeSuite(ReadTestBzip2)) 259 suite.addTest(unittest.makeSuite(ReadStreamTestBzip2)) 260 suite.addTest(unittest.makeSuite(WriteTestBzip2)) 261 suite.addTest(unittest.makeSuite(WriteStreamTestBzip2)) 262 263 test_support.run_suite(suite) 264 finally: 265 if gzip: 266 os.remove(tarname("gz")) 267 if bz2: 268 os.remove(tarname("bz2")) 269 if os.path.exists(tempdir): 270 shutil.rmtree(tempdir) 271 if os.path.exists(tempname): 272 os.remove(tempname) 273 274if __name__ == "__main__": 275 test_main() 276