test_tarfile.py revision a94568a7535de60f1144e4eea0d027b87017a4b4
1import sys
2import os
3import shutil
4
5import unittest
6import tarfile
7
8from test import test_support
9
10# Check for our compression modules.
11try:
12    import gzip
13    gzip.GzipFile
14except (ImportError, AttributeError):
15    gzip = None
16try:
17    import bz2
18except ImportError:
19    bz2 = None
20
21def path(path):
22    return test_support.findfile(path)
23
24testtar = path("testtar" + os.extsep + "tar")
25tempdir = path("testtar" + os.extsep + "dir")
26tempname = path("testtar" + os.extsep + "tmp")
27membercount = 10
28
29def tarname(comp=""):
30    if not comp:
31        return testtar
32    return "%s%s%s" % (testtar, os.extsep, comp)
33
34def dirname():
35    if not os.path.exists(tempdir):
36        os.mkdir(tempdir)
37    return tempdir
38
39def tmpname():
40    return tempname
41
42
43class BaseTest(unittest.TestCase):
44    comp = ''
45    mode = 'r'
46    sep = ':'
47
48    def setUp(self):
49        mode = self.mode + self.sep + self.comp
50        self.tar = tarfile.open(tarname(self.comp), mode)
51
52    def tearDown(self):
53        self.tar.close()
54
55class ReadTest(BaseTest):
56
57    def test(self):
58        """Test member extraction.
59        """
60        members = 0
61        for tarinfo in self.tar:
62            members += 1
63            if not tarinfo.isreg():
64                continue
65            f = self.tar.extractfile(tarinfo)
66            self.assert_(len(f.read()) == tarinfo.size,
67                         "size read does not match expected size")
68            f.close()
69
70        self.assert_(members == membercount,
71                     "could not find all members")
72
73    def test_sparse(self):
74        """Test sparse member extraction.
75        """
76        if self.sep != "|":
77            f1 = self.tar.extractfile("S-SPARSE")
78            f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
79            self.assert_(f1.read() == f2.read(),
80                         "_FileObject failed on sparse file member")
81
82    def test_readlines(self):
83        """Test readlines() method of _FileObject.
84        """
85        if self.sep != "|":
86            filename = "0-REGTYPE-TEXT"
87            self.tar.extract(filename, dirname())
88            lines1 = file(os.path.join(dirname(), filename), "rU").readlines()
89            lines2 = self.tar.extractfile(filename).readlines()
90            self.assert_(lines1 == lines2,
91                         "_FileObject.readline() does not work correctly")
92
93    def test_seek(self):
94        """Test seek() method of _FileObject, incl. random reading.
95        """
96        if self.sep != "|":
97            filename = "0-REGTYPE"
98            self.tar.extract(filename, dirname())
99            data = file(os.path.join(dirname(), filename), "rb").read()
100
101            tarinfo = self.tar.getmember(filename)
102            fobj = self.tar.extractfile(tarinfo)
103
104            text = fobj.read()
105            fobj.seek(0)
106            self.assert_(0 == fobj.tell(),
107                         "seek() to file's start failed")
108            fobj.seek(2048, 0)
109            self.assert_(2048 == fobj.tell(),
110                         "seek() to absolute position failed")
111            fobj.seek(-1024, 1)
112            self.assert_(1024 == fobj.tell(),
113                         "seek() to negative relative position failed")
114            fobj.seek(1024, 1)
115            self.assert_(2048 == fobj.tell(),
116                         "seek() to positive relative position failed")
117            s = fobj.read(10)
118            self.assert_(s == data[2048:2058],
119                         "read() after seek failed")
120            fobj.seek(0, 2)
121            self.assert_(tarinfo.size == fobj.tell(),
122                         "seek() to file's end failed")
123            self.assert_(fobj.read() == "",
124                         "read() at file's end did not return empty string")
125            fobj.seek(-tarinfo.size, 2)
126            self.assert_(0 == fobj.tell(),
127                         "relative seek() to file's start failed")
128            fobj.seek(512)
129            s1 = fobj.readlines()
130            fobj.seek(512)
131            s2 = fobj.readlines()
132            self.assert_(s1 == s2,
133                         "readlines() after seek failed")
134            fobj.close()
135
136class ReadStreamTest(ReadTest):
137    sep = "|"
138
139    def test(self):
140        """Test member extraction, and for StreamError when
141           seeking backwards.
142        """
143        ReadTest.test(self)
144        tarinfo = self.tar.getmembers()[0]
145        f = self.tar.extractfile(tarinfo)
146        self.assertRaises(tarfile.StreamError, f.read)
147
148    def test_stream(self):
149        """Compare the normal tar and the stream tar.
150        """
151        stream = self.tar
152        tar = tarfile.open(tarname(), 'r')
153
154        while 1:
155            t1 = tar.next()
156            t2 = stream.next()
157            if t1 is None:
158                break
159            self.assert_(t2 is not None, "stream.next() failed.")
160
161            if t2.islnk() or t2.issym():
162                self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
163                continue
164            v1 = tar.extractfile(t1)
165            v2 = stream.extractfile(t2)
166            if v1 is None:
167                continue
168            self.assert_(v2 is not None, "stream.extractfile() failed")
169            self.assert_(v1.read() == v2.read(), "stream extraction failed")
170
171        stream.close()
172
173class WriteTest(BaseTest):
174    mode = 'w'
175
176    def setUp(self):
177        mode = self.mode + self.sep + self.comp
178        self.src = tarfile.open(tarname(self.comp), 'r')
179        self.dst = tarfile.open(tmpname(), mode)
180
181    def tearDown(self):
182        self.src.close()
183        self.dst.close()
184
185    def test_posix(self):
186        self.dst.posix = 1
187        self._test()
188
189    def test_nonposix(self):
190        self.dst.posix = 0
191        self._test()
192
193    def _test(self):
194        for tarinfo in self.src:
195            if not tarinfo.isreg():
196                continue
197            f = self.src.extractfile(tarinfo)
198            if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME:
199                self.assertRaises(ValueError, self.dst.addfile,
200                                 tarinfo, f)
201            else:
202                self.dst.addfile(tarinfo, f)
203
204class WriteStreamTest(WriteTest):
205    sep = '|'
206
207# Gzip TestCases
208class ReadTestGzip(ReadTest):
209    comp = "gz"
210class ReadStreamTestGzip(ReadStreamTest):
211    comp = "gz"
212class WriteTestGzip(WriteTest):
213    comp = "gz"
214class WriteStreamTestGzip(WriteStreamTest):
215    comp = "gz"
216
217if bz2:
218    # Bzip2 TestCases
219    class ReadTestBzip2(ReadTestGzip):
220        comp = "bz2"
221    class ReadStreamTestBzip2(ReadStreamTestGzip):
222        comp = "bz2"
223    class WriteTestBzip2(WriteTest):
224        comp = "bz2"
225    class WriteStreamTestBzip2(WriteStreamTestGzip):
226        comp = "bz2"
227
228# If importing gzip failed, discard the Gzip TestCases.
229if not gzip:
230    del ReadTestGzip
231    del ReadStreamTestGzip
232    del WriteTestGzip
233    del WriteStreamTestGzip
234
235def test_main():
236    if gzip:
237        # create testtar.tar.gz
238        gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read())
239    if bz2:
240        # create testtar.tar.bz2
241        bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read())
242
243    tests = [
244        ReadTest,
245        ReadStreamTest,
246        WriteTest,
247        WriteStreamTest
248    ]
249
250    if gzip:
251        tests.extend([
252            ReadTestGzip, ReadStreamTestGzip,
253            WriteTestGzip, WriteStreamTestGzip
254        ])
255
256    if bz2:
257        tests.extend([
258            ReadTestBzip2, ReadStreamTestBzip2,
259            WriteTestBzip2, WriteStreamTestBzip2
260        ])
261    try:
262        test_support.run_unittest(*tests)
263    finally:
264        if gzip:
265            os.remove(tarname("gz"))
266        if bz2:
267            os.remove(tarname("bz2"))
268        if os.path.exists(tempdir):
269            shutil.rmtree(tempdir)
270        if os.path.exists(tempname):
271            os.remove(tempname)
272
273if __name__ == "__main__":
274    test_main()
275