test_tarfile.py revision 0662f8a5eaa19f7eed6978eed1af32563359fb93
1import sys
2import os
3import shutil
4import tempfile
5
6import unittest
7import tarfile
8
9from test import test_support
10
11# Check for our compression modules.
12try:
13    import gzip
14    gzip.GzipFile
15except (ImportError, AttributeError):
16    gzip = None
17try:
18    import bz2
19except ImportError:
20    bz2 = None
21
22def path(path):
23    return test_support.findfile(path)
24
25testtar = path("testtar.tar")
26tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir")
27tempname = test_support.TESTFN
28membercount = 10
29
30def tarname(comp=""):
31    if not comp:
32        return testtar
33    return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp))
34
35def dirname():
36    if not os.path.exists(tempdir):
37        os.mkdir(tempdir)
38    return tempdir
39
40def tmpname():
41    return tempname
42
43
44class BaseTest(unittest.TestCase):
45    comp = ''
46    mode = 'r'
47    sep = ':'
48
49    def setUp(self):
50        mode = self.mode + self.sep + self.comp
51        self.tar = tarfile.open(tarname(self.comp), mode)
52
53    def tearDown(self):
54        self.tar.close()
55
56class ReadTest(BaseTest):
57
58    def test(self):
59        """Test member extraction.
60        """
61        members = 0
62        for tarinfo in self.tar:
63            members += 1
64            if not tarinfo.isreg():
65                continue
66            f = self.tar.extractfile(tarinfo)
67            self.assert_(len(f.read()) == tarinfo.size,
68                         "size read does not match expected size")
69            f.close()
70
71        self.assert_(members == membercount,
72                     "could not find all members")
73
74    def test_sparse(self):
75        """Test sparse member extraction.
76        """
77        if self.sep != "|":
78            f1 = self.tar.extractfile("S-SPARSE")
79            f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
80            self.assert_(f1.read() == f2.read(),
81                         "_FileObject failed on sparse file member")
82
83    def test_readlines(self):
84        """Test readlines() method of _FileObject.
85        """
86        if self.sep != "|":
87            filename = "0-REGTYPE-TEXT"
88            self.tar.extract(filename, dirname())
89            lines1 = file(os.path.join(dirname(), filename), "rU").readlines()
90            lines2 = self.tar.extractfile(filename).readlines()
91            self.assert_(lines1 == lines2,
92                         "_FileObject.readline() does not work correctly")
93
94    def test_seek(self):
95        """Test seek() method of _FileObject, incl. random reading.
96        """
97        if self.sep != "|":
98            filename = "0-REGTYPE"
99            self.tar.extract(filename, dirname())
100            data = file(os.path.join(dirname(), filename), "rb").read()
101
102            tarinfo = self.tar.getmember(filename)
103            fobj = self.tar.extractfile(tarinfo)
104
105            text = fobj.read()
106            fobj.seek(0)
107            self.assert_(0 == fobj.tell(),
108                         "seek() to file's start failed")
109            fobj.seek(2048, 0)
110            self.assert_(2048 == fobj.tell(),
111                         "seek() to absolute position failed")
112            fobj.seek(-1024, 1)
113            self.assert_(1024 == fobj.tell(),
114                         "seek() to negative relative position failed")
115            fobj.seek(1024, 1)
116            self.assert_(2048 == fobj.tell(),
117                         "seek() to positive relative position failed")
118            s = fobj.read(10)
119            self.assert_(s == data[2048:2058],
120                         "read() after seek failed")
121            fobj.seek(0, 2)
122            self.assert_(tarinfo.size == fobj.tell(),
123                         "seek() to file's end failed")
124            self.assert_(fobj.read() == "",
125                         "read() at file's end did not return empty string")
126            fobj.seek(-tarinfo.size, 2)
127            self.assert_(0 == fobj.tell(),
128                         "relative seek() to file's start failed")
129            fobj.seek(512)
130            s1 = fobj.readlines()
131            fobj.seek(512)
132            s2 = fobj.readlines()
133            self.assert_(s1 == s2,
134                         "readlines() after seek failed")
135            fobj.close()
136
137class ReadStreamTest(ReadTest):
138    sep = "|"
139
140    def test(self):
141        """Test member extraction, and for StreamError when
142           seeking backwards.
143        """
144        ReadTest.test(self)
145        tarinfo = self.tar.getmembers()[0]
146        f = self.tar.extractfile(tarinfo)
147        self.assertRaises(tarfile.StreamError, f.read)
148
149    def test_stream(self):
150        """Compare the normal tar and the stream tar.
151        """
152        stream = self.tar
153        tar = tarfile.open(tarname(), 'r')
154
155        while 1:
156            t1 = tar.next()
157            t2 = stream.next()
158            if t1 is None:
159                break
160            self.assert_(t2 is not None, "stream.next() failed.")
161
162            if t2.islnk() or t2.issym():
163                self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
164                continue
165            v1 = tar.extractfile(t1)
166            v2 = stream.extractfile(t2)
167            if v1 is None:
168                continue
169            self.assert_(v2 is not None, "stream.extractfile() failed")
170            self.assert_(v1.read() == v2.read(), "stream extraction failed")
171
172        stream.close()
173
174class WriteTest(BaseTest):
175    mode = 'w'
176
177    def setUp(self):
178        mode = self.mode + self.sep + self.comp
179        self.src = tarfile.open(tarname(self.comp), 'r')
180        self.dst = tarfile.open(tmpname(), mode)
181
182    def tearDown(self):
183        self.src.close()
184        self.dst.close()
185
186    def test_posix(self):
187        self.dst.posix = 1
188        self._test()
189
190    def test_nonposix(self):
191        self.dst.posix = 0
192        self._test()
193
194    def _test(self):
195        for tarinfo in self.src:
196            if not tarinfo.isreg():
197                continue
198            f = self.src.extractfile(tarinfo)
199            if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME:
200                self.assertRaises(ValueError, self.dst.addfile,
201                                 tarinfo, f)
202            else:
203                self.dst.addfile(tarinfo, f)
204
205class WriteStreamTest(WriteTest):
206    sep = '|'
207
208class WriteGNULongTest(unittest.TestCase):
209    """This testcase checks for correct creation of GNU Longname
210       and Longlink extensions.
211
212       It creates a tarfile and adds empty members with either
213       long names, long linknames or both and compares the size
214       of the tarfile with the expected size.
215
216       It checks for SF bug #812325 in TarFile._create_gnulong().
217
218       While I was writing this testcase, I noticed a second bug
219       in the same method:
220       Long{names,links} weren't null-terminated which lead to
221       bad tarfiles when their length was a multiple of 512. This
222       is tested as well.
223    """
224
225    def setUp(self):
226        self.tar = tarfile.open(tmpname(), "w")
227        self.tar.posix = False
228
229    def tearDown(self):
230        self.tar.close()
231
232    def _length(self, s):
233        blocks, remainder = divmod(len(s) + 1, 512)
234        if remainder:
235            blocks += 1
236        return blocks * 512
237
238    def _calc_size(self, name, link=None):
239        # initial tar header
240        count = 512
241
242        if len(name) > tarfile.LENGTH_NAME:
243            # gnu longname extended header + longname
244            count += 512
245            count += self._length(name)
246
247        if link is not None and len(link) > tarfile.LENGTH_LINK:
248            # gnu longlink extended header + longlink
249            count += 512
250            count += self._length(link)
251
252        return count
253
254    def _test(self, name, link=None):
255        tarinfo = tarfile.TarInfo(name)
256        if link:
257            tarinfo.linkname = link
258            tarinfo.type = tarfile.LNKTYPE
259
260        self.tar.addfile(tarinfo)
261
262        v1 = self._calc_size(name, link)
263        v2 = self.tar.offset
264        self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
265
266    def test_longname_1023(self):
267        self._test(("longnam/" * 127) + "longnam")
268
269    def test_longname_1024(self):
270        self._test(("longnam/" * 127) + "longname")
271
272    def test_longname_1025(self):
273        self._test(("longnam/" * 127) + "longname_")
274
275    def test_longlink_1023(self):
276        self._test("name", ("longlnk/" * 127) + "longlnk")
277
278    def test_longlink_1024(self):
279        self._test("name", ("longlnk/" * 127) + "longlink")
280
281    def test_longlink_1025(self):
282        self._test("name", ("longlnk/" * 127) + "longlink_")
283
284    def test_longnamelink_1023(self):
285        self._test(("longnam/" * 127) + "longnam",
286                   ("longlnk/" * 127) + "longlnk")
287
288    def test_longnamelink_1024(self):
289        self._test(("longnam/" * 127) + "longname",
290                   ("longlnk/" * 127) + "longlink")
291
292    def test_longnamelink_1025(self):
293        self._test(("longnam/" * 127) + "longname_",
294                   ("longlnk/" * 127) + "longlink_")
295
296# Gzip TestCases
297class ReadTestGzip(ReadTest):
298    comp = "gz"
299class ReadStreamTestGzip(ReadStreamTest):
300    comp = "gz"
301class WriteTestGzip(WriteTest):
302    comp = "gz"
303class WriteStreamTestGzip(WriteStreamTest):
304    comp = "gz"
305
306if bz2:
307    # Bzip2 TestCases
308    class ReadTestBzip2(ReadTestGzip):
309        comp = "bz2"
310    class ReadStreamTestBzip2(ReadStreamTestGzip):
311        comp = "bz2"
312    class WriteTestBzip2(WriteTest):
313        comp = "bz2"
314    class WriteStreamTestBzip2(WriteStreamTestGzip):
315        comp = "bz2"
316
317# If importing gzip failed, discard the Gzip TestCases.
318if not gzip:
319    del ReadTestGzip
320    del ReadStreamTestGzip
321    del WriteTestGzip
322    del WriteStreamTestGzip
323
324def test_main():
325    if gzip:
326        # create testtar.tar.gz
327        gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read())
328    if bz2:
329        # create testtar.tar.bz2
330        bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read())
331
332    tests = [
333        ReadTest,
334        ReadStreamTest,
335        WriteTest,
336        WriteStreamTest,
337        WriteGNULongTest,
338    ]
339
340    if gzip:
341        tests.extend([
342            ReadTestGzip, ReadStreamTestGzip,
343            WriteTestGzip, WriteStreamTestGzip
344        ])
345
346    if bz2:
347        tests.extend([
348            ReadTestBzip2, ReadStreamTestBzip2,
349            WriteTestBzip2, WriteStreamTestBzip2
350        ])
351    try:
352        test_support.run_unittest(*tests)
353    finally:
354        if gzip:
355            os.remove(tarname("gz"))
356        if bz2:
357            os.remove(tarname("bz2"))
358        if os.path.exists(dirname()):
359            shutil.rmtree(dirname())
360        if os.path.exists(tmpname()):
361            os.remove(tmpname())
362
363if __name__ == "__main__":
364    test_main()
365