test_tarfile.py revision b506dc32c1ab2d0e4d2c3549d0822904432140ec
1# -*- coding: iso-8859-15 -*-
2
3import sys
4import os
5import io
6import shutil
7import tempfile
8import StringIO
9from hashlib import md5
10import errno
11
12import unittest
13import tarfile
14
15from test import test_support
16
17# Check for our compression modules.
18try:
19    import gzip
20    gzip.GzipFile
21except (ImportError, AttributeError):
22    gzip = None
23try:
24    import bz2
25except ImportError:
26    bz2 = None
27
28def md5sum(data):
29    return md5(data).hexdigest()
30
31def path(path):
32    return test_support.findfile(path)
33
34TEMPDIR = os.path.join(tempfile.gettempdir(), "test_tarfile_tmp")
35tarname = path("testtar.tar")
36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
38tmpname = os.path.join(TEMPDIR, "tmp.tar")
39
40md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
41md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
42
43
44class ReadTest(unittest.TestCase):
45
46    tarname = tarname
47    mode = "r:"
48
49    def setUp(self):
50        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
51
52    def tearDown(self):
53        self.tar.close()
54
55
56class UstarReadTest(ReadTest):
57
58    def test_fileobj_regular_file(self):
59        tarinfo = self.tar.getmember("ustar/regtype")
60        fobj = self.tar.extractfile(tarinfo)
61        data = fobj.read()
62        self.assert_((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
63                "regular file extraction failed")
64
65    def test_fileobj_readlines(self):
66        self.tar.extract("ustar/regtype", TEMPDIR)
67        tarinfo = self.tar.getmember("ustar/regtype")
68        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "r")
69        fobj2 = io.TextIOWrapper(self.tar.extractfile(tarinfo))
70
71        lines1 = fobj1.readlines()
72        lines2 = fobj2.readlines()
73        self.assert_(lines1 == lines2,
74                "fileobj.readlines() failed")
75        self.assert_(len(lines2) == 114,
76                "fileobj.readlines() failed")
77        self.assert_(lines2[83] == \
78                "I will gladly admit that Python is not the fastest running scripting language.\n",
79                "fileobj.readlines() failed")
80
81    def test_fileobj_iter(self):
82        self.tar.extract("ustar/regtype", TEMPDIR)
83        tarinfo = self.tar.getmember("ustar/regtype")
84        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
85        fobj2 = self.tar.extractfile(tarinfo)
86        lines1 = fobj1.readlines()
87        lines2 = list(io.TextIOWrapper(fobj2))
88        self.assert_(lines1 == lines2,
89                     "fileobj.__iter__() failed")
90
91    def test_fileobj_seek(self):
92        self.tar.extract("ustar/regtype", TEMPDIR)
93        fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
94        data = fobj.read()
95        fobj.close()
96
97        tarinfo = self.tar.getmember("ustar/regtype")
98        fobj = self.tar.extractfile(tarinfo)
99
100        text = fobj.read()
101        fobj.seek(0)
102        self.assertEqual(0, fobj.tell(),
103                     "seek() to file's start failed")
104        fobj.seek(2048, 0)
105        self.assertEqual(2048, fobj.tell(),
106                     "seek() to absolute position failed")
107        fobj.seek(-1024, 1)
108        self.assertEqual(1024, fobj.tell(),
109                     "seek() to negative relative position failed")
110        fobj.seek(1024, 1)
111        self.assertEqual(2048, fobj.tell(),
112                     "seek() to positive relative position failed")
113        s = fobj.read(10)
114        self.assert_(s == data[2048:2058],
115                     "read() after seek failed")
116        fobj.seek(0, 2)
117        self.assertEqual(tarinfo.size, fobj.tell(),
118                     "seek() to file's end failed")
119        self.assert_(fobj.read() == b"",
120                     "read() at file's end did not return empty string")
121        fobj.seek(-tarinfo.size, 2)
122        self.assertEqual(0, fobj.tell(),
123                     "relative seek() to file's end failed")
124        fobj.seek(512)
125        s1 = fobj.readlines()
126        fobj.seek(512)
127        s2 = fobj.readlines()
128        self.assert_(s1 == s2,
129                     "readlines() after seek failed")
130        fobj.seek(0)
131        self.assertEqual(len(fobj.readline()), fobj.tell(),
132                     "tell() after readline() failed")
133        fobj.seek(512)
134        self.assert_(len(fobj.readline()) + 512 == fobj.tell(),
135                     "tell() after seek() and readline() failed")
136        fobj.seek(0)
137        line = fobj.readline()
138        self.assertEqual(fobj.read(), data[len(line):],
139                     "read() after readline() failed")
140        fobj.close()
141
142
143class MiscReadTest(ReadTest):
144
145    def test_no_filename(self):
146        fobj = open(self.tarname, "rb")
147        tar = tarfile.open(fileobj=fobj, mode=self.mode)
148        self.assertEqual(tar.name, os.path.abspath(fobj.name))
149
150    def test_fail_comp(self):
151        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
152        if self.mode == "r:":
153            return
154        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
155        fobj = open(tarname, "rb")
156        self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
157
158    def test_v7_dirtype(self):
159        # Test old style dirtype member (bug #1336623):
160        # Old V7 tars create directory members using an AREGTYPE
161        # header with a "/" appended to the filename field.
162        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
163        self.assert_(tarinfo.type == tarfile.DIRTYPE,
164                "v7 dirtype failed")
165
166    def test_check_members(self):
167        for tarinfo in self.tar:
168            self.assert_(int(tarinfo.mtime) == 0o7606136617,
169                    "wrong mtime for %s" % tarinfo.name)
170            if not tarinfo.name.startswith("ustar/"):
171                continue
172            self.assert_(tarinfo.uname == "tarfile",
173                    "wrong uname for %s" % tarinfo.name)
174
175    def test_find_members(self):
176        self.assert_(self.tar.getmembers()[-1].name == "misc/eof",
177                "could not find all members")
178
179    def test_extract_hardlink(self):
180        # Test hardlink extraction (e.g. bug #857297).
181        tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
182
183        tar.extract("ustar/regtype", TEMPDIR)
184        try:
185            tar.extract("ustar/lnktype", TEMPDIR)
186        except EnvironmentError as e:
187            if e.errno == errno.ENOENT:
188                self.fail("hardlink not extracted properly")
189
190        data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read()
191        self.assertEqual(md5sum(data), md5_regtype)
192
193        try:
194            tar.extract("ustar/symtype", TEMPDIR)
195        except EnvironmentError as e:
196            if e.errno == errno.ENOENT:
197                self.fail("symlink not extracted properly")
198
199        data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read()
200        self.assertEqual(md5sum(data), md5_regtype)
201
202
203class StreamReadTest(ReadTest):
204
205    mode="r|"
206
207    def test_fileobj_regular_file(self):
208        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
209        fobj = self.tar.extractfile(tarinfo)
210        data = fobj.read()
211        self.assert_((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
212                "regular file extraction failed")
213
214    def test_provoke_stream_error(self):
215        tarinfos = self.tar.getmembers()
216        f = self.tar.extractfile(tarinfos[0]) # read the first member
217        self.assertRaises(tarfile.StreamError, f.read)
218
219    def test_compare_members(self):
220        tar1 = tarfile.open(tarname, encoding="iso8859-1")
221        tar2 = self.tar
222
223        while True:
224            t1 = tar1.next()
225            t2 = tar2.next()
226            if t1 is None:
227                break
228            self.assert_(t2 is not None, "stream.next() failed.")
229
230            if t2.islnk() or t2.issym():
231                self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
232                continue
233
234            v1 = tar1.extractfile(t1)
235            v2 = tar2.extractfile(t2)
236            if v1 is None:
237                continue
238            self.assert_(v2 is not None, "stream.extractfile() failed")
239            self.assertEqual(v1.read(), v2.read(), "stream extraction failed")
240
241        tar1.close()
242
243
244class DetectReadTest(unittest.TestCase):
245
246    def _testfunc_file(self, name, mode):
247        try:
248            tarfile.open(name, mode)
249        except tarfile.ReadError as e:
250            self.fail()
251
252    def _testfunc_fileobj(self, name, mode):
253        try:
254            tarfile.open(name, mode, fileobj=open(name, "rb"))
255        except tarfile.ReadError as e:
256            self.fail()
257
258    def _test_modes(self, testfunc):
259        testfunc(tarname, "r")
260        testfunc(tarname, "r:")
261        testfunc(tarname, "r:*")
262        testfunc(tarname, "r|")
263        testfunc(tarname, "r|*")
264
265        if gzip:
266            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
267            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
268            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
269            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
270
271            testfunc(gzipname, "r")
272            testfunc(gzipname, "r:*")
273            testfunc(gzipname, "r:gz")
274            testfunc(gzipname, "r|*")
275            testfunc(gzipname, "r|gz")
276
277        if bz2:
278            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
279            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
280            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
281            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
282
283            testfunc(bz2name, "r")
284            testfunc(bz2name, "r:*")
285            testfunc(bz2name, "r:bz2")
286            testfunc(bz2name, "r|*")
287            testfunc(bz2name, "r|bz2")
288
289    def test_detect_file(self):
290        self._test_modes(self._testfunc_file)
291
292    def test_detect_fileobj(self):
293        self._test_modes(self._testfunc_fileobj)
294
295
296class MemberReadTest(ReadTest):
297
298    def _test_member(self, tarinfo, chksum=None, **kwargs):
299        if chksum is not None:
300            self.assert_(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
301                    "wrong md5sum for %s" % tarinfo.name)
302
303        kwargs["mtime"] = 0o7606136617
304        kwargs["uid"] = 1000
305        kwargs["gid"] = 100
306        if "old-v7" not in tarinfo.name:
307            # V7 tar can't handle alphabetic owners.
308            kwargs["uname"] = "tarfile"
309            kwargs["gname"] = "tarfile"
310        for k, v in kwargs.items():
311            self.assert_(getattr(tarinfo, k) == v,
312                    "wrong value in %s field of %s" % (k, tarinfo.name))
313
314    def test_find_regtype(self):
315        tarinfo = self.tar.getmember("ustar/regtype")
316        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
317
318    def test_find_conttype(self):
319        tarinfo = self.tar.getmember("ustar/conttype")
320        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
321
322    def test_find_dirtype(self):
323        tarinfo = self.tar.getmember("ustar/dirtype")
324        self._test_member(tarinfo, size=0)
325
326    def test_find_dirtype_with_size(self):
327        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
328        self._test_member(tarinfo, size=255)
329
330    def test_find_lnktype(self):
331        tarinfo = self.tar.getmember("ustar/lnktype")
332        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
333
334    def test_find_symtype(self):
335        tarinfo = self.tar.getmember("ustar/symtype")
336        self._test_member(tarinfo, size=0, linkname="regtype")
337
338    def test_find_blktype(self):
339        tarinfo = self.tar.getmember("ustar/blktype")
340        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
341
342    def test_find_chrtype(self):
343        tarinfo = self.tar.getmember("ustar/chrtype")
344        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
345
346    def test_find_fifotype(self):
347        tarinfo = self.tar.getmember("ustar/fifotype")
348        self._test_member(tarinfo, size=0)
349
350    def test_find_sparse(self):
351        tarinfo = self.tar.getmember("ustar/sparse")
352        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
353
354    def test_find_umlauts(self):
355        tarinfo = self.tar.getmember("ustar/umlauts-�������")
356        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
357
358    def test_find_ustar_longname(self):
359        name = "ustar/" + "12345/" * 39 + "1234567/longname"
360        self.assert_(name in self.tar.getnames())
361
362    def test_find_regtype_oldv7(self):
363        tarinfo = self.tar.getmember("misc/regtype-old-v7")
364        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
365
366    def test_find_pax_umlauts(self):
367        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
368        tarinfo = self.tar.getmember("pax/umlauts-�������")
369        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
370
371
372class LongnameTest(ReadTest):
373
374    def test_read_longname(self):
375        # Test reading of longname (bug #1471427).
376        longname = self.subdir + "/" + "123/" * 125 + "longname"
377        try:
378            tarinfo = self.tar.getmember(longname)
379        except KeyError:
380            self.fail("longname not found")
381        self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
382
383    def test_read_longlink(self):
384        longname = self.subdir + "/" + "123/" * 125 + "longname"
385        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
386        try:
387            tarinfo = self.tar.getmember(longlink)
388        except KeyError:
389            self.fail("longlink not found")
390        self.assert_(tarinfo.linkname == longname, "linkname wrong")
391
392    def test_truncated_longname(self):
393        longname = self.subdir + "/" + "123/" * 125 + "longname"
394        tarinfo = self.tar.getmember(longname)
395        offset = tarinfo.offset
396        self.tar.fileobj.seek(offset)
397        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
398        self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
399
400    def test_header_offset(self):
401        # Test if the start offset of the TarInfo object includes
402        # the preceding extended header.
403        longname = self.subdir + "/" + "123/" * 125 + "longname"
404        offset = self.tar.getmember(longname).offset
405        fobj = open(tarname, "rb")
406        fobj.seek(offset)
407        tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict")
408        self.assertEqual(tarinfo.type, self.longnametype)
409
410
411class GNUReadTest(LongnameTest):
412
413    subdir = "gnu"
414    longnametype = tarfile.GNUTYPE_LONGNAME
415
416    def test_sparse_file(self):
417        tarinfo1 = self.tar.getmember("ustar/sparse")
418        fobj1 = self.tar.extractfile(tarinfo1)
419        tarinfo2 = self.tar.getmember("gnu/sparse")
420        fobj2 = self.tar.extractfile(tarinfo2)
421        self.assertEqual(fobj1.read(), fobj2.read(),
422                "sparse file extraction failed")
423
424
425class PaxReadTest(LongnameTest):
426
427    subdir = "pax"
428    longnametype = tarfile.XHDTYPE
429
430    def test_pax_global_headers(self):
431        tar = tarfile.open(tarname, encoding="iso8859-1")
432
433        tarinfo = tar.getmember("pax/regtype1")
434        self.assertEqual(tarinfo.uname, "foo")
435        self.assertEqual(tarinfo.gname, "bar")
436        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "�������")
437
438        tarinfo = tar.getmember("pax/regtype2")
439        self.assertEqual(tarinfo.uname, "")
440        self.assertEqual(tarinfo.gname, "bar")
441        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "�������")
442
443        tarinfo = tar.getmember("pax/regtype3")
444        self.assertEqual(tarinfo.uname, "tarfile")
445        self.assertEqual(tarinfo.gname, "tarfile")
446        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "�������")
447
448    def test_pax_number_fields(self):
449        # All following number fields are read from the pax header.
450        tar = tarfile.open(tarname, encoding="iso8859-1")
451        tarinfo = tar.getmember("pax/regtype4")
452        self.assertEqual(tarinfo.size, 7011)
453        self.assertEqual(tarinfo.uid, 123)
454        self.assertEqual(tarinfo.gid, 123)
455        self.assertEqual(tarinfo.mtime, 1041808783.0)
456        self.assertEqual(type(tarinfo.mtime), float)
457        self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
458        self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
459
460
461class WriteTest(unittest.TestCase):
462
463    mode = "w:"
464
465    def test_100_char_name(self):
466        # The name field in a tar header stores strings of at most 100 chars.
467        # If a string is shorter than 100 chars it has to be padded with '\0',
468        # which implies that a string of exactly 100 chars is stored without
469        # a trailing '\0'.
470        name = "0123456789" * 10
471        tar = tarfile.open(tmpname, self.mode)
472        t = tarfile.TarInfo(name)
473        tar.addfile(t)
474        tar.close()
475
476        tar = tarfile.open(tmpname)
477        self.assert_(tar.getnames()[0] == name,
478                "failed to store 100 char filename")
479        tar.close()
480
481    def test_tar_size(self):
482        # Test for bug #1013882.
483        tar = tarfile.open(tmpname, self.mode)
484        path = os.path.join(TEMPDIR, "file")
485        fobj = open(path, "wb")
486        fobj.write("aaa")
487        fobj.close()
488        tar.add(path)
489        tar.close()
490        self.assert_(os.path.getsize(tmpname) > 0,
491                "tarfile is empty")
492
493    # The test_*_size tests test for bug #1167128.
494    def test_file_size(self):
495        tar = tarfile.open(tmpname, self.mode)
496
497        path = os.path.join(TEMPDIR, "file")
498        fobj = open(path, "wb")
499        fobj.close()
500        tarinfo = tar.gettarinfo(path)
501        self.assertEqual(tarinfo.size, 0)
502
503        fobj = open(path, "wb")
504        fobj.write("aaa")
505        fobj.close()
506        tarinfo = tar.gettarinfo(path)
507        self.assertEqual(tarinfo.size, 3)
508
509        tar.close()
510
511    def test_directory_size(self):
512        path = os.path.join(TEMPDIR, "directory")
513        os.mkdir(path)
514        try:
515            tar = tarfile.open(tmpname, self.mode)
516            tarinfo = tar.gettarinfo(path)
517            self.assertEqual(tarinfo.size, 0)
518        finally:
519            os.rmdir(path)
520
521    def test_link_size(self):
522        if hasattr(os, "link"):
523            link = os.path.join(TEMPDIR, "link")
524            target = os.path.join(TEMPDIR, "link_target")
525            open(target, "wb").close()
526            os.link(target, link)
527            try:
528                tar = tarfile.open(tmpname, self.mode)
529                tarinfo = tar.gettarinfo(link)
530                self.assertEqual(tarinfo.size, 0)
531            finally:
532                os.remove(target)
533                os.remove(link)
534
535    def test_symlink_size(self):
536        if hasattr(os, "symlink"):
537            path = os.path.join(TEMPDIR, "symlink")
538            os.symlink("link_target", path)
539            try:
540                tar = tarfile.open(tmpname, self.mode)
541                tarinfo = tar.gettarinfo(path)
542                self.assertEqual(tarinfo.size, 0)
543            finally:
544                os.remove(path)
545
546    def test_add_self(self):
547        # Test for #1257255.
548        dstname = os.path.abspath(tmpname)
549
550        tar = tarfile.open(tmpname, self.mode)
551        self.assert_(tar.name == dstname, "archive name must be absolute")
552
553        tar.add(dstname)
554        self.assert_(tar.getnames() == [], "added the archive to itself")
555
556        cwd = os.getcwd()
557        os.chdir(TEMPDIR)
558        tar.add(dstname)
559        os.chdir(cwd)
560        self.assert_(tar.getnames() == [], "added the archive to itself")
561
562    def test_exclude(self):
563        tempdir = os.path.join(TEMPDIR, "exclude")
564        os.mkdir(tempdir)
565        try:
566            for name in ("foo", "bar", "baz"):
567                name = os.path.join(tempdir, name)
568                open(name, "wb").close()
569
570            def exclude(name):
571                return os.path.isfile(name)
572
573            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
574            tar.add(tempdir, arcname="empty_dir", exclude=exclude)
575            tar.close()
576
577            tar = tarfile.open(tmpname, "r")
578            self.assertEqual(len(tar.getmembers()), 1)
579            self.assertEqual(tar.getnames()[0], "empty_dir")
580        finally:
581            shutil.rmtree(tempdir)
582
583
584class StreamWriteTest(unittest.TestCase):
585
586    mode = "w|"
587
588    def test_stream_padding(self):
589        # Test for bug #1543303.
590        tar = tarfile.open(tmpname, self.mode)
591        tar.close()
592
593        if self.mode.endswith("gz"):
594            fobj = gzip.GzipFile(tmpname)
595            data = fobj.read()
596            fobj.close()
597        elif self.mode.endswith("bz2"):
598            dec = bz2.BZ2Decompressor()
599            data = open(tmpname, "rb").read()
600            data = dec.decompress(data)
601            self.assert_(len(dec.unused_data) == 0,
602                    "found trailing data")
603        else:
604            fobj = open(tmpname, "rb")
605            data = fobj.read()
606            fobj.close()
607
608        self.assert_(data.count("\0") == tarfile.RECORDSIZE,
609                         "incorrect zero padding")
610
611
612class GNUWriteTest(unittest.TestCase):
613    # This testcase checks for correct creation of GNU Longname
614    # and Longlink extended headers (cp. bug #812325).
615
616    def _length(self, s):
617        blocks, remainder = divmod(len(s) + 1, 512)
618        if remainder:
619            blocks += 1
620        return blocks * 512
621
622    def _calc_size(self, name, link=None):
623        # Initial tar header
624        count = 512
625
626        if len(name) > tarfile.LENGTH_NAME:
627            # GNU longname extended header + longname
628            count += 512
629            count += self._length(name)
630        if link is not None and len(link) > tarfile.LENGTH_LINK:
631            # GNU longlink extended header + longlink
632            count += 512
633            count += self._length(link)
634        return count
635
636    def _test(self, name, link=None):
637        tarinfo = tarfile.TarInfo(name)
638        if link:
639            tarinfo.linkname = link
640            tarinfo.type = tarfile.LNKTYPE
641
642        tar = tarfile.open(tmpname, "w")
643        tar.format = tarfile.GNU_FORMAT
644        tar.addfile(tarinfo)
645
646        v1 = self._calc_size(name, link)
647        v2 = tar.offset
648        self.assert_(v1 == v2, "GNU longname/longlink creation failed")
649
650        tar.close()
651
652        tar = tarfile.open(tmpname)
653        member = tar.next()
654        self.failIf(member is None, "unable to read longname member")
655        self.assert_(tarinfo.name == member.name and \
656                     tarinfo.linkname == member.linkname, \
657                     "unable to read longname member")
658
659    def test_longname_1023(self):
660        self._test(("longnam/" * 127) + "longnam")
661
662    def test_longname_1024(self):
663        self._test(("longnam/" * 127) + "longname")
664
665    def test_longname_1025(self):
666        self._test(("longnam/" * 127) + "longname_")
667
668    def test_longlink_1023(self):
669        self._test("name", ("longlnk/" * 127) + "longlnk")
670
671    def test_longlink_1024(self):
672        self._test("name", ("longlnk/" * 127) + "longlink")
673
674    def test_longlink_1025(self):
675        self._test("name", ("longlnk/" * 127) + "longlink_")
676
677    def test_longnamelink_1023(self):
678        self._test(("longnam/" * 127) + "longnam",
679                   ("longlnk/" * 127) + "longlnk")
680
681    def test_longnamelink_1024(self):
682        self._test(("longnam/" * 127) + "longname",
683                   ("longlnk/" * 127) + "longlink")
684
685    def test_longnamelink_1025(self):
686        self._test(("longnam/" * 127) + "longname_",
687                   ("longlnk/" * 127) + "longlink_")
688
689
690class HardlinkTest(unittest.TestCase):
691    # Test the creation of LNKTYPE (hardlink) members in an archive.
692
693    def setUp(self):
694        self.foo = os.path.join(TEMPDIR, "foo")
695        self.bar = os.path.join(TEMPDIR, "bar")
696
697        fobj = open(self.foo, "wb")
698        fobj.write("foo")
699        fobj.close()
700
701        os.link(self.foo, self.bar)
702
703        self.tar = tarfile.open(tmpname, "w")
704        self.tar.add(self.foo)
705
706    def tearDown(self):
707        os.remove(self.foo)
708        os.remove(self.bar)
709
710    def test_add_twice(self):
711        # The same name will be added as a REGTYPE every
712        # time regardless of st_nlink.
713        tarinfo = self.tar.gettarinfo(self.foo)
714        self.assert_(tarinfo.type == tarfile.REGTYPE,
715                "add file as regular failed")
716
717    def test_add_hardlink(self):
718        tarinfo = self.tar.gettarinfo(self.bar)
719        self.assert_(tarinfo.type == tarfile.LNKTYPE,
720                "add file as hardlink failed")
721
722    def test_dereference_hardlink(self):
723        self.tar.dereference = True
724        tarinfo = self.tar.gettarinfo(self.bar)
725        self.assert_(tarinfo.type == tarfile.REGTYPE,
726                "dereferencing hardlink failed")
727
728
729class PaxWriteTest(GNUWriteTest):
730
731    def _test(self, name, link=None):
732        # See GNUWriteTest.
733        tarinfo = tarfile.TarInfo(name)
734        if link:
735            tarinfo.linkname = link
736            tarinfo.type = tarfile.LNKTYPE
737
738        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
739        tar.addfile(tarinfo)
740        tar.close()
741
742        tar = tarfile.open(tmpname)
743        if link:
744            l = tar.getmembers()[0].linkname
745            self.assert_(link == l, "PAX longlink creation failed")
746        else:
747            n = tar.getmembers()[0].name
748            self.assert_(name == n, "PAX longname creation failed")
749
750    def test_pax_global_header(self):
751        pax_headers = {
752                "foo": "bar",
753                "uid": "0",
754                "mtime": "1.23",
755                "test": "���",
756                "���": "test"}
757
758        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, \
759                pax_headers=pax_headers)
760        tar.addfile(tarfile.TarInfo("test"))
761        tar.close()
762
763        # Test if the global header was written correctly.
764        tar = tarfile.open(tmpname, encoding="iso8859-1")
765        self.assertEqual(tar.pax_headers, pax_headers)
766        self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
767
768        # Test if all the fields are strings.
769        for key, val in tar.pax_headers.items():
770            self.assert_(type(key) is not bytes)
771            self.assert_(type(val) is not bytes)
772            if key in tarfile.PAX_NUMBER_FIELDS:
773                try:
774                    tarfile.PAX_NUMBER_FIELDS[key](val)
775                except (TypeError, ValueError):
776                    self.fail("unable to convert pax header field")
777
778    def test_pax_extended_header(self):
779        # The fields from the pax header have priority over the
780        # TarInfo.
781        pax_headers = {"path": "foo", "uid": "123"}
782
783        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
784        t = tarfile.TarInfo()
785        t.name = "���"     # non-ASCII
786        t.uid = 8**8        # too large
787        t.pax_headers = pax_headers
788        tar.addfile(t)
789        tar.close()
790
791        tar = tarfile.open(tmpname, encoding="iso8859-1")
792        t = tar.getmembers()[0]
793        self.assertEqual(t.pax_headers, pax_headers)
794        self.assertEqual(t.name, "foo")
795        self.assertEqual(t.uid, 123)
796
797
798class UstarUnicodeTest(unittest.TestCase):
799    # All *UnicodeTests FIXME
800
801    format = tarfile.USTAR_FORMAT
802
803    def test_iso8859_1_filename(self):
804        self._test_unicode_filename("iso8859-1")
805
806    def test_utf7_filename(self):
807        self._test_unicode_filename("utf7")
808
809    def test_utf8_filename(self):
810        self._test_unicode_filename("utf8")
811
812    def _test_unicode_filename(self, encoding):
813        tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
814        name = "���"
815        tar.addfile(tarfile.TarInfo(name))
816        tar.close()
817
818        tar = tarfile.open(tmpname, encoding=encoding)
819        self.assert_(type(tar.getnames()[0]) is not bytes)
820        self.assertEqual(tar.getmembers()[0].name, name)
821        tar.close()
822
823    def test_unicode_filename_error(self):
824        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
825        tarinfo = tarfile.TarInfo()
826
827        tarinfo.name = "���"
828        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
829
830        tarinfo.name = "foo"
831        tarinfo.uname = "���"
832        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
833
834    def test_unicode_argument(self):
835        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
836        for t in tar:
837            self.assert_(type(t.name) is str)
838            self.assert_(type(t.linkname) is str)
839            self.assert_(type(t.uname) is str)
840            self.assert_(type(t.gname) is str)
841        tar.close()
842
843    def test_uname_unicode(self):
844        for name in ("���", "���"):
845            t = tarfile.TarInfo("foo")
846            t.uname = name
847            t.gname = name
848
849            fobj = io.BytesIO()
850            tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
851            tar.addfile(t)
852            tar.close()
853            fobj.seek(0)
854
855            tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
856            t = tar.getmember("foo")
857            self.assertEqual(t.uname, "���")
858            self.assertEqual(t.gname, "���")
859
860
861class GNUUnicodeTest(UstarUnicodeTest):
862
863    format = tarfile.GNU_FORMAT
864
865
866class AppendTest(unittest.TestCase):
867    # Test append mode (cp. patch #1652681).
868
869    def setUp(self):
870        self.tarname = tmpname
871        if os.path.exists(self.tarname):
872            os.remove(self.tarname)
873
874    def _add_testfile(self, fileobj=None):
875        tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
876        tar.addfile(tarfile.TarInfo("bar"))
877        tar.close()
878
879    def _create_testtar(self, mode="w:"):
880        src = tarfile.open(tarname, encoding="iso8859-1")
881        t = src.getmember("ustar/regtype")
882        t.name = "foo"
883        f = src.extractfile(t)
884        tar = tarfile.open(self.tarname, mode)
885        tar.addfile(t, f)
886        tar.close()
887
888    def _test(self, names=["bar"], fileobj=None):
889        tar = tarfile.open(self.tarname, fileobj=fileobj)
890        self.assertEqual(tar.getnames(), names)
891
892    def test_non_existing(self):
893        self._add_testfile()
894        self._test()
895
896    def test_empty(self):
897        open(self.tarname, "w").close()
898        self._add_testfile()
899        self._test()
900
901    def test_empty_fileobj(self):
902        fobj = StringIO.StringIO()
903        self._add_testfile(fobj)
904        fobj.seek(0)
905        self._test(fileobj=fobj)
906
907    def test_fileobj(self):
908        self._create_testtar()
909        data = open(self.tarname).read()
910        fobj = StringIO.StringIO(data)
911        self._add_testfile(fobj)
912        fobj.seek(0)
913        self._test(names=["foo", "bar"], fileobj=fobj)
914
915    def test_existing(self):
916        self._create_testtar()
917        self._add_testfile()
918        self._test(names=["foo", "bar"])
919
920    def test_append_gz(self):
921        if gzip is None:
922            return
923        self._create_testtar("w:gz")
924        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
925
926    def test_append_bz2(self):
927        if bz2 is None:
928            return
929        self._create_testtar("w:bz2")
930        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
931
932
933class LimitsTest(unittest.TestCase):
934
935    def test_ustar_limits(self):
936        # 100 char name
937        tarinfo = tarfile.TarInfo("0123456789" * 10)
938        tarinfo.tobuf(tarfile.USTAR_FORMAT)
939
940        # 101 char name that cannot be stored
941        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
942        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
943
944        # 256 char name with a slash at pos 156
945        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
946        tarinfo.tobuf(tarfile.USTAR_FORMAT)
947
948        # 256 char name that cannot be stored
949        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
950        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
951
952        # 512 char name
953        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
954        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
955
956        # 512 char linkname
957        tarinfo = tarfile.TarInfo("longlink")
958        tarinfo.linkname = "123/" * 126 + "longname"
959        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
960
961        # uid > 8 digits
962        tarinfo = tarfile.TarInfo("name")
963        tarinfo.uid = 0o10000000
964        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
965
966    def test_gnu_limits(self):
967        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
968        tarinfo.tobuf(tarfile.GNU_FORMAT)
969
970        tarinfo = tarfile.TarInfo("longlink")
971        tarinfo.linkname = "123/" * 126 + "longname"
972        tarinfo.tobuf(tarfile.GNU_FORMAT)
973
974        # uid >= 256 ** 7
975        tarinfo = tarfile.TarInfo("name")
976        tarinfo.uid = 0o4000000000000000000
977        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
978
979    def test_pax_limits(self):
980        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
981        tarinfo.tobuf(tarfile.PAX_FORMAT)
982
983        tarinfo = tarfile.TarInfo("longlink")
984        tarinfo.linkname = "123/" * 126 + "longname"
985        tarinfo.tobuf(tarfile.PAX_FORMAT)
986
987        tarinfo = tarfile.TarInfo("name")
988        tarinfo.uid = 0o4000000000000000000
989        tarinfo.tobuf(tarfile.PAX_FORMAT)
990
991
992class MiscTest(unittest.TestCase):
993
994    def test_char_fields(self):
995        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0")
996        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo")
997        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo")
998        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo")
999
1000    def test_number_fields(self):
1001        self.assertEqual(tarfile.itn(1), b"0000001\x00")
1002        self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff")
1003
1004
1005class GzipMiscReadTest(MiscReadTest):
1006    tarname = gzipname
1007    mode = "r:gz"
1008class GzipUstarReadTest(UstarReadTest):
1009    tarname = gzipname
1010    mode = "r:gz"
1011class GzipStreamReadTest(StreamReadTest):
1012    tarname = gzipname
1013    mode = "r|gz"
1014class GzipWriteTest(WriteTest):
1015    mode = "w:gz"
1016class GzipStreamWriteTest(StreamWriteTest):
1017    mode = "w|gz"
1018
1019
1020class Bz2MiscReadTest(MiscReadTest):
1021    tarname = bz2name
1022    mode = "r:bz2"
1023class Bz2UstarReadTest(UstarReadTest):
1024    tarname = bz2name
1025    mode = "r:bz2"
1026class Bz2StreamReadTest(StreamReadTest):
1027    tarname = bz2name
1028    mode = "r|bz2"
1029class Bz2WriteTest(WriteTest):
1030    mode = "w:bz2"
1031class Bz2StreamWriteTest(StreamWriteTest):
1032    mode = "w|bz2"
1033
1034def test_main():
1035    if not os.path.exists(TEMPDIR):
1036        os.mkdir(TEMPDIR)
1037
1038    tests = [
1039        UstarReadTest,
1040        MiscReadTest,
1041        StreamReadTest,
1042        DetectReadTest,
1043        MemberReadTest,
1044        GNUReadTest,
1045        PaxReadTest,
1046        WriteTest,
1047        StreamWriteTest,
1048        GNUWriteTest,
1049        PaxWriteTest,
1050        UstarUnicodeTest,
1051        GNUUnicodeTest,
1052        AppendTest,
1053        LimitsTest,
1054        MiscTest,
1055    ]
1056
1057    if hasattr(os, "link"):
1058        tests.append(HardlinkTest)
1059
1060    fobj = open(tarname, "rb")
1061    data = fobj.read()
1062    fobj.close()
1063
1064    if gzip:
1065        # Create testtar.tar.gz and add gzip-specific tests.
1066        tar = gzip.open(gzipname, "wb")
1067        tar.write(data)
1068        tar.close()
1069
1070        tests += [
1071            GzipMiscReadTest,
1072            GzipUstarReadTest,
1073            GzipStreamReadTest,
1074            GzipWriteTest,
1075            GzipStreamWriteTest,
1076        ]
1077
1078    if bz2:
1079        # Create testtar.tar.bz2 and add bz2-specific tests.
1080        tar = bz2.BZ2File(bz2name, "wb")
1081        tar.write(data)
1082        tar.close()
1083
1084        tests += [
1085            Bz2MiscReadTest,
1086            Bz2UstarReadTest,
1087            Bz2StreamReadTest,
1088            Bz2WriteTest,
1089            Bz2StreamWriteTest,
1090        ]
1091
1092    try:
1093        test_support.run_unittest(*tests)
1094    finally:
1095        if os.path.exists(TEMPDIR):
1096            shutil.rmtree(TEMPDIR)
1097
1098if __name__ == "__main__":
1099    test_main()
1100