test_tarfile.py revision ab91fdef1f1e556203a2eee98ba7d379e4790de9
1import sys
2import os
3import io
4import shutil
5import tempfile
6import io
7from hashlib import md5
8import errno
9
10import unittest
11import tarfile
12
13from test import support
14
15# Check for our compression modules.
16try:
17    import gzip
18    gzip.GzipFile
19except (ImportError, AttributeError):
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25
26def md5sum(data):
27    return md5(data).hexdigest()
28
29def path(path):
30    return support.findfile(path)
31
32TEMPDIR = os.path.join(tempfile.gettempdir(), "test_tarfile_tmp")
33tarname = path("testtar.tar")
34gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
35bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
36tmpname = os.path.join(TEMPDIR, "tmp.tar")
37
38md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
39md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
40
41
42class ReadTest(unittest.TestCase):
43
44    tarname = tarname
45    mode = "r:"
46
47    def setUp(self):
48        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
49
50    def tearDown(self):
51        self.tar.close()
52
53
54class UstarReadTest(ReadTest):
55
56    def test_fileobj_regular_file(self):
57        tarinfo = self.tar.getmember("ustar/regtype")
58        fobj = self.tar.extractfile(tarinfo)
59        data = fobj.read()
60        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
61                "regular file extraction failed")
62
63    def test_fileobj_readlines(self):
64        self.tar.extract("ustar/regtype", TEMPDIR)
65        tarinfo = self.tar.getmember("ustar/regtype")
66        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "r")
67        fobj2 = io.TextIOWrapper(self.tar.extractfile(tarinfo))
68
69        lines1 = fobj1.readlines()
70        lines2 = fobj2.readlines()
71        self.assertTrue(lines1 == lines2,
72                "fileobj.readlines() failed")
73        self.assertTrue(len(lines2) == 114,
74                "fileobj.readlines() failed")
75        self.assertTrue(lines2[83] == \
76                "I will gladly admit that Python is not the fastest running scripting language.\n",
77                "fileobj.readlines() failed")
78
79    def test_fileobj_iter(self):
80        self.tar.extract("ustar/regtype", TEMPDIR)
81        tarinfo = self.tar.getmember("ustar/regtype")
82        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
83        fobj2 = self.tar.extractfile(tarinfo)
84        lines1 = fobj1.readlines()
85        lines2 = list(io.TextIOWrapper(fobj2))
86        self.assertTrue(lines1 == lines2,
87                     "fileobj.__iter__() failed")
88
89    def test_fileobj_seek(self):
90        self.tar.extract("ustar/regtype", TEMPDIR)
91        fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
92        data = fobj.read()
93        fobj.close()
94
95        tarinfo = self.tar.getmember("ustar/regtype")
96        fobj = self.tar.extractfile(tarinfo)
97
98        text = fobj.read()
99        fobj.seek(0)
100        self.assertEqual(0, fobj.tell(),
101                     "seek() to file's start failed")
102        fobj.seek(2048, 0)
103        self.assertEqual(2048, fobj.tell(),
104                     "seek() to absolute position failed")
105        fobj.seek(-1024, 1)
106        self.assertEqual(1024, fobj.tell(),
107                     "seek() to negative relative position failed")
108        fobj.seek(1024, 1)
109        self.assertEqual(2048, fobj.tell(),
110                     "seek() to positive relative position failed")
111        s = fobj.read(10)
112        self.assertTrue(s == data[2048:2058],
113                     "read() after seek failed")
114        fobj.seek(0, 2)
115        self.assertEqual(tarinfo.size, fobj.tell(),
116                     "seek() to file's end failed")
117        self.assertTrue(fobj.read() == b"",
118                     "read() at file's end did not return empty string")
119        fobj.seek(-tarinfo.size, 2)
120        self.assertEqual(0, fobj.tell(),
121                     "relative seek() to file's end failed")
122        fobj.seek(512)
123        s1 = fobj.readlines()
124        fobj.seek(512)
125        s2 = fobj.readlines()
126        self.assertTrue(s1 == s2,
127                     "readlines() after seek failed")
128        fobj.seek(0)
129        self.assertEqual(len(fobj.readline()), fobj.tell(),
130                     "tell() after readline() failed")
131        fobj.seek(512)
132        self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
133                     "tell() after seek() and readline() failed")
134        fobj.seek(0)
135        line = fobj.readline()
136        self.assertEqual(fobj.read(), data[len(line):],
137                     "read() after readline() failed")
138        fobj.close()
139
140
141class MiscReadTest(ReadTest):
142
143    def test_no_name_argument(self):
144        fobj = open(self.tarname, "rb")
145        tar = tarfile.open(fileobj=fobj, mode=self.mode)
146        self.assertEqual(tar.name, os.path.abspath(fobj.name))
147
148    def test_no_name_attribute(self):
149        data = open(self.tarname, "rb").read()
150        fobj = io.BytesIO(data)
151        self.assertRaises(AttributeError, getattr, fobj, "name")
152        tar = tarfile.open(fileobj=fobj, mode=self.mode)
153        self.assertEqual(tar.name, None)
154
155    def test_empty_name_attribute(self):
156        data = open(self.tarname, "rb").read()
157        fobj = io.BytesIO(data)
158        fobj.name = ""
159        tar = tarfile.open(fileobj=fobj, mode=self.mode)
160        self.assertEqual(tar.name, None)
161
162    def test_fileobj_with_offset(self):
163        # Skip the first member and store values from the second member
164        # of the testtar.
165        tar = tarfile.open(self.tarname, mode=self.mode)
166        tar.next()
167        t = tar.next()
168        name = t.name
169        offset = t.offset
170        data = tar.extractfile(t).read()
171        tar.close()
172
173        # Open the testtar and seek to the offset of the second member.
174        if self.mode.endswith(":gz"):
175            _open = gzip.GzipFile
176        elif self.mode.endswith(":bz2"):
177            _open = bz2.BZ2File
178        else:
179            _open = open
180        fobj = _open(self.tarname, "rb")
181        fobj.seek(offset)
182
183        # Test if the tarfile starts with the second member.
184        tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
185        t = tar.next()
186        self.assertEqual(t.name, name)
187        # Read to the end of fileobj and test if seeking back to the
188        # beginning works.
189        tar.getmembers()
190        self.assertEqual(tar.extractfile(t).read(), data,
191                "seek back did not work")
192        tar.close()
193
194    def test_fail_comp(self):
195        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
196        if self.mode == "r:":
197            return
198        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
199        fobj = open(tarname, "rb")
200        self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
201
202    def test_v7_dirtype(self):
203        # Test old style dirtype member (bug #1336623):
204        # Old V7 tars create directory members using an AREGTYPE
205        # header with a "/" appended to the filename field.
206        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
207        self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
208                "v7 dirtype failed")
209
210    def test_xstar_type(self):
211        # The xstar format stores extra atime and ctime fields inside the
212        # space reserved for the prefix field. The prefix field must be
213        # ignored in this case, otherwise it will mess up the name.
214        try:
215            self.tar.getmember("misc/regtype-xstar")
216        except KeyError:
217            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
218
219    def test_check_members(self):
220        for tarinfo in self.tar:
221            self.assertTrue(int(tarinfo.mtime) == 0o7606136617,
222                    "wrong mtime for %s" % tarinfo.name)
223            if not tarinfo.name.startswith("ustar/"):
224                continue
225            self.assertTrue(tarinfo.uname == "tarfile",
226                    "wrong uname for %s" % tarinfo.name)
227
228    def test_find_members(self):
229        self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
230                "could not find all members")
231
232    def test_extract_hardlink(self):
233        # Test hardlink extraction (e.g. bug #857297).
234        tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
235
236        tar.extract("ustar/regtype", TEMPDIR)
237        try:
238            tar.extract("ustar/lnktype", TEMPDIR)
239        except EnvironmentError as e:
240            if e.errno == errno.ENOENT:
241                self.fail("hardlink not extracted properly")
242
243        data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read()
244        self.assertEqual(md5sum(data), md5_regtype)
245
246        try:
247            tar.extract("ustar/symtype", TEMPDIR)
248        except EnvironmentError as e:
249            if e.errno == errno.ENOENT:
250                self.fail("symlink not extracted properly")
251
252        data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read()
253        self.assertEqual(md5sum(data), md5_regtype)
254
255    def test_extractall(self):
256        # Test if extractall() correctly restores directory permissions
257        # and times (see issue1735).
258        tar = tarfile.open(tarname, encoding="iso8859-1")
259        directories = [t for t in tar if t.isdir()]
260        tar.extractall(TEMPDIR, directories)
261        for tarinfo in directories:
262            path = os.path.join(TEMPDIR, tarinfo.name)
263            if sys.platform != "win32":
264                # Win32 has no support for fine grained permissions.
265                self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777)
266            self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
267        tar.close()
268
269
270class StreamReadTest(ReadTest):
271
272    mode="r|"
273
274    def test_fileobj_regular_file(self):
275        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
276        fobj = self.tar.extractfile(tarinfo)
277        data = fobj.read()
278        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
279                "regular file extraction failed")
280
281    def test_provoke_stream_error(self):
282        tarinfos = self.tar.getmembers()
283        f = self.tar.extractfile(tarinfos[0]) # read the first member
284        self.assertRaises(tarfile.StreamError, f.read)
285
286    def test_compare_members(self):
287        tar1 = tarfile.open(tarname, encoding="iso8859-1")
288        tar2 = self.tar
289
290        while True:
291            t1 = tar1.next()
292            t2 = tar2.next()
293            if t1 is None:
294                break
295            self.assertTrue(t2 is not None, "stream.next() failed.")
296
297            if t2.islnk() or t2.issym():
298                self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
299                continue
300
301            v1 = tar1.extractfile(t1)
302            v2 = tar2.extractfile(t2)
303            if v1 is None:
304                continue
305            self.assertTrue(v2 is not None, "stream.extractfile() failed")
306            self.assertEqual(v1.read(), v2.read(), "stream extraction failed")
307
308        tar1.close()
309
310
311class DetectReadTest(unittest.TestCase):
312
313    def _testfunc_file(self, name, mode):
314        try:
315            tarfile.open(name, mode)
316        except tarfile.ReadError as e:
317            self.fail()
318
319    def _testfunc_fileobj(self, name, mode):
320        try:
321            tarfile.open(name, mode, fileobj=open(name, "rb"))
322        except tarfile.ReadError as e:
323            self.fail()
324
325    def _test_modes(self, testfunc):
326        testfunc(tarname, "r")
327        testfunc(tarname, "r:")
328        testfunc(tarname, "r:*")
329        testfunc(tarname, "r|")
330        testfunc(tarname, "r|*")
331
332        if gzip:
333            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
334            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
335            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
336            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
337
338            testfunc(gzipname, "r")
339            testfunc(gzipname, "r:*")
340            testfunc(gzipname, "r:gz")
341            testfunc(gzipname, "r|*")
342            testfunc(gzipname, "r|gz")
343
344        if bz2:
345            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
346            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
347            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
348            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
349
350            testfunc(bz2name, "r")
351            testfunc(bz2name, "r:*")
352            testfunc(bz2name, "r:bz2")
353            testfunc(bz2name, "r|*")
354            testfunc(bz2name, "r|bz2")
355
356    def test_detect_file(self):
357        self._test_modes(self._testfunc_file)
358
359    def test_detect_fileobj(self):
360        self._test_modes(self._testfunc_fileobj)
361
362
363class MemberReadTest(ReadTest):
364
365    def _test_member(self, tarinfo, chksum=None, **kwargs):
366        if chksum is not None:
367            self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
368                    "wrong md5sum for %s" % tarinfo.name)
369
370        kwargs["mtime"] = 0o7606136617
371        kwargs["uid"] = 1000
372        kwargs["gid"] = 100
373        if "old-v7" not in tarinfo.name:
374            # V7 tar can't handle alphabetic owners.
375            kwargs["uname"] = "tarfile"
376            kwargs["gname"] = "tarfile"
377        for k, v in kwargs.items():
378            self.assertTrue(getattr(tarinfo, k) == v,
379                    "wrong value in %s field of %s" % (k, tarinfo.name))
380
381    def test_find_regtype(self):
382        tarinfo = self.tar.getmember("ustar/regtype")
383        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
384
385    def test_find_conttype(self):
386        tarinfo = self.tar.getmember("ustar/conttype")
387        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
388
389    def test_find_dirtype(self):
390        tarinfo = self.tar.getmember("ustar/dirtype")
391        self._test_member(tarinfo, size=0)
392
393    def test_find_dirtype_with_size(self):
394        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
395        self._test_member(tarinfo, size=255)
396
397    def test_find_lnktype(self):
398        tarinfo = self.tar.getmember("ustar/lnktype")
399        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
400
401    def test_find_symtype(self):
402        tarinfo = self.tar.getmember("ustar/symtype")
403        self._test_member(tarinfo, size=0, linkname="regtype")
404
405    def test_find_blktype(self):
406        tarinfo = self.tar.getmember("ustar/blktype")
407        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
408
409    def test_find_chrtype(self):
410        tarinfo = self.tar.getmember("ustar/chrtype")
411        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
412
413    def test_find_fifotype(self):
414        tarinfo = self.tar.getmember("ustar/fifotype")
415        self._test_member(tarinfo, size=0)
416
417    def test_find_sparse(self):
418        tarinfo = self.tar.getmember("ustar/sparse")
419        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
420
421    def test_find_umlauts(self):
422        tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
423        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
424
425    def test_find_ustar_longname(self):
426        name = "ustar/" + "12345/" * 39 + "1234567/longname"
427        self.assertTrue(name in self.tar.getnames())
428
429    def test_find_regtype_oldv7(self):
430        tarinfo = self.tar.getmember("misc/regtype-old-v7")
431        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
432
433    def test_find_pax_umlauts(self):
434        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
435        tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
436        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
437
438
439class LongnameTest(ReadTest):
440
441    def test_read_longname(self):
442        # Test reading of longname (bug #1471427).
443        longname = self.subdir + "/" + "123/" * 125 + "longname"
444        try:
445            tarinfo = self.tar.getmember(longname)
446        except KeyError:
447            self.fail("longname not found")
448        self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
449
450    def test_read_longlink(self):
451        longname = self.subdir + "/" + "123/" * 125 + "longname"
452        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
453        try:
454            tarinfo = self.tar.getmember(longlink)
455        except KeyError:
456            self.fail("longlink not found")
457        self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
458
459    def test_truncated_longname(self):
460        longname = self.subdir + "/" + "123/" * 125 + "longname"
461        tarinfo = self.tar.getmember(longname)
462        offset = tarinfo.offset
463        self.tar.fileobj.seek(offset)
464        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
465        self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
466
467    def test_header_offset(self):
468        # Test if the start offset of the TarInfo object includes
469        # the preceding extended header.
470        longname = self.subdir + "/" + "123/" * 125 + "longname"
471        offset = self.tar.getmember(longname).offset
472        fobj = open(tarname, "rb")
473        fobj.seek(offset)
474        tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict")
475        self.assertEqual(tarinfo.type, self.longnametype)
476
477
478class GNUReadTest(LongnameTest):
479
480    subdir = "gnu"
481    longnametype = tarfile.GNUTYPE_LONGNAME
482
483    def test_sparse_file(self):
484        tarinfo1 = self.tar.getmember("ustar/sparse")
485        fobj1 = self.tar.extractfile(tarinfo1)
486        tarinfo2 = self.tar.getmember("gnu/sparse")
487        fobj2 = self.tar.extractfile(tarinfo2)
488        self.assertEqual(fobj1.read(), fobj2.read(),
489                "sparse file extraction failed")
490
491
492class PaxReadTest(LongnameTest):
493
494    subdir = "pax"
495    longnametype = tarfile.XHDTYPE
496
497    def test_pax_global_headers(self):
498        tar = tarfile.open(tarname, encoding="iso8859-1")
499
500        tarinfo = tar.getmember("pax/regtype1")
501        self.assertEqual(tarinfo.uname, "foo")
502        self.assertEqual(tarinfo.gname, "bar")
503        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
504
505        tarinfo = tar.getmember("pax/regtype2")
506        self.assertEqual(tarinfo.uname, "")
507        self.assertEqual(tarinfo.gname, "bar")
508        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
509
510        tarinfo = tar.getmember("pax/regtype3")
511        self.assertEqual(tarinfo.uname, "tarfile")
512        self.assertEqual(tarinfo.gname, "tarfile")
513        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
514
515    def test_pax_number_fields(self):
516        # All following number fields are read from the pax header.
517        tar = tarfile.open(tarname, encoding="iso8859-1")
518        tarinfo = tar.getmember("pax/regtype4")
519        self.assertEqual(tarinfo.size, 7011)
520        self.assertEqual(tarinfo.uid, 123)
521        self.assertEqual(tarinfo.gid, 123)
522        self.assertEqual(tarinfo.mtime, 1041808783.0)
523        self.assertEqual(type(tarinfo.mtime), float)
524        self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
525        self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
526
527
528class WriteTestBase(unittest.TestCase):
529    # Put all write tests in here that are supposed to be tested
530    # in all possible mode combinations.
531
532    def test_fileobj_no_close(self):
533        fobj = io.BytesIO()
534        tar = tarfile.open(fileobj=fobj, mode=self.mode)
535        tar.addfile(tarfile.TarInfo("foo"))
536        tar.close()
537        self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
538
539
540class WriteTest(WriteTestBase):
541
542    mode = "w:"
543
544    def test_100_char_name(self):
545        # The name field in a tar header stores strings of at most 100 chars.
546        # If a string is shorter than 100 chars it has to be padded with '\0',
547        # which implies that a string of exactly 100 chars is stored without
548        # a trailing '\0'.
549        name = "0123456789" * 10
550        tar = tarfile.open(tmpname, self.mode)
551        t = tarfile.TarInfo(name)
552        tar.addfile(t)
553        tar.close()
554
555        tar = tarfile.open(tmpname)
556        self.assertTrue(tar.getnames()[0] == name,
557                "failed to store 100 char filename")
558        tar.close()
559
560    def test_tar_size(self):
561        # Test for bug #1013882.
562        tar = tarfile.open(tmpname, self.mode)
563        path = os.path.join(TEMPDIR, "file")
564        fobj = open(path, "wb")
565        fobj.write(b"aaa")
566        fobj.close()
567        tar.add(path)
568        tar.close()
569        self.assertTrue(os.path.getsize(tmpname) > 0,
570                "tarfile is empty")
571
572    # The test_*_size tests test for bug #1167128.
573    def test_file_size(self):
574        tar = tarfile.open(tmpname, self.mode)
575
576        path = os.path.join(TEMPDIR, "file")
577        fobj = open(path, "wb")
578        fobj.close()
579        tarinfo = tar.gettarinfo(path)
580        self.assertEqual(tarinfo.size, 0)
581
582        fobj = open(path, "wb")
583        fobj.write(b"aaa")
584        fobj.close()
585        tarinfo = tar.gettarinfo(path)
586        self.assertEqual(tarinfo.size, 3)
587
588        tar.close()
589
590    def test_directory_size(self):
591        path = os.path.join(TEMPDIR, "directory")
592        os.mkdir(path)
593        try:
594            tar = tarfile.open(tmpname, self.mode)
595            tarinfo = tar.gettarinfo(path)
596            self.assertEqual(tarinfo.size, 0)
597        finally:
598            os.rmdir(path)
599
600    def test_link_size(self):
601        if hasattr(os, "link"):
602            link = os.path.join(TEMPDIR, "link")
603            target = os.path.join(TEMPDIR, "link_target")
604            open(target, "wb").close()
605            os.link(target, link)
606            try:
607                tar = tarfile.open(tmpname, self.mode)
608                tarinfo = tar.gettarinfo(link)
609                self.assertEqual(tarinfo.size, 0)
610            finally:
611                os.remove(target)
612                os.remove(link)
613
614    def test_symlink_size(self):
615        if hasattr(os, "symlink"):
616            path = os.path.join(TEMPDIR, "symlink")
617            os.symlink("link_target", path)
618            try:
619                tar = tarfile.open(tmpname, self.mode)
620                tarinfo = tar.gettarinfo(path)
621                self.assertEqual(tarinfo.size, 0)
622            finally:
623                os.remove(path)
624
625    def test_add_self(self):
626        # Test for #1257255.
627        dstname = os.path.abspath(tmpname)
628
629        tar = tarfile.open(tmpname, self.mode)
630        self.assertTrue(tar.name == dstname, "archive name must be absolute")
631
632        tar.add(dstname)
633        self.assertTrue(tar.getnames() == [], "added the archive to itself")
634
635        cwd = os.getcwd()
636        os.chdir(TEMPDIR)
637        tar.add(dstname)
638        os.chdir(cwd)
639        self.assertTrue(tar.getnames() == [], "added the archive to itself")
640
641    def test_exclude(self):
642        tempdir = os.path.join(TEMPDIR, "exclude")
643        os.mkdir(tempdir)
644        try:
645            for name in ("foo", "bar", "baz"):
646                name = os.path.join(tempdir, name)
647                open(name, "wb").close()
648
649            def exclude(name):
650                return os.path.isfile(name)
651
652            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
653            tar.add(tempdir, arcname="empty_dir", exclude=exclude)
654            tar.close()
655
656            tar = tarfile.open(tmpname, "r")
657            self.assertEqual(len(tar.getmembers()), 1)
658            self.assertEqual(tar.getnames()[0], "empty_dir")
659        finally:
660            shutil.rmtree(tempdir)
661
662
663class StreamWriteTest(WriteTestBase):
664
665    mode = "w|"
666
667    def test_stream_padding(self):
668        # Test for bug #1543303.
669        tar = tarfile.open(tmpname, self.mode)
670        tar.close()
671
672        if self.mode.endswith("gz"):
673            fobj = gzip.GzipFile(tmpname)
674            data = fobj.read()
675            fobj.close()
676        elif self.mode.endswith("bz2"):
677            dec = bz2.BZ2Decompressor()
678            data = open(tmpname, "rb").read()
679            data = dec.decompress(data)
680            self.assertTrue(len(dec.unused_data) == 0,
681                    "found trailing data")
682        else:
683            fobj = open(tmpname, "rb")
684            data = fobj.read()
685            fobj.close()
686
687        self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE,
688                         "incorrect zero padding")
689
690
691class GNUWriteTest(unittest.TestCase):
692    # This testcase checks for correct creation of GNU Longname
693    # and Longlink extended headers (cp. bug #812325).
694
695    def _length(self, s):
696        blocks, remainder = divmod(len(s) + 1, 512)
697        if remainder:
698            blocks += 1
699        return blocks * 512
700
701    def _calc_size(self, name, link=None):
702        # Initial tar header
703        count = 512
704
705        if len(name) > tarfile.LENGTH_NAME:
706            # GNU longname extended header + longname
707            count += 512
708            count += self._length(name)
709        if link is not None and len(link) > tarfile.LENGTH_LINK:
710            # GNU longlink extended header + longlink
711            count += 512
712            count += self._length(link)
713        return count
714
715    def _test(self, name, link=None):
716        tarinfo = tarfile.TarInfo(name)
717        if link:
718            tarinfo.linkname = link
719            tarinfo.type = tarfile.LNKTYPE
720
721        tar = tarfile.open(tmpname, "w")
722        tar.format = tarfile.GNU_FORMAT
723        tar.addfile(tarinfo)
724
725        v1 = self._calc_size(name, link)
726        v2 = tar.offset
727        self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
728
729        tar.close()
730
731        tar = tarfile.open(tmpname)
732        member = tar.next()
733        self.assertFalse(member is None, "unable to read longname member")
734        self.assertTrue(tarinfo.name == member.name and \
735                     tarinfo.linkname == member.linkname, \
736                     "unable to read longname member")
737
738    def test_longname_1023(self):
739        self._test(("longnam/" * 127) + "longnam")
740
741    def test_longname_1024(self):
742        self._test(("longnam/" * 127) + "longname")
743
744    def test_longname_1025(self):
745        self._test(("longnam/" * 127) + "longname_")
746
747    def test_longlink_1023(self):
748        self._test("name", ("longlnk/" * 127) + "longlnk")
749
750    def test_longlink_1024(self):
751        self._test("name", ("longlnk/" * 127) + "longlink")
752
753    def test_longlink_1025(self):
754        self._test("name", ("longlnk/" * 127) + "longlink_")
755
756    def test_longnamelink_1023(self):
757        self._test(("longnam/" * 127) + "longnam",
758                   ("longlnk/" * 127) + "longlnk")
759
760    def test_longnamelink_1024(self):
761        self._test(("longnam/" * 127) + "longname",
762                   ("longlnk/" * 127) + "longlink")
763
764    def test_longnamelink_1025(self):
765        self._test(("longnam/" * 127) + "longname_",
766                   ("longlnk/" * 127) + "longlink_")
767
768
769class HardlinkTest(unittest.TestCase):
770    # Test the creation of LNKTYPE (hardlink) members in an archive.
771
772    def setUp(self):
773        self.foo = os.path.join(TEMPDIR, "foo")
774        self.bar = os.path.join(TEMPDIR, "bar")
775
776        fobj = open(self.foo, "wb")
777        fobj.write(b"foo")
778        fobj.close()
779
780        os.link(self.foo, self.bar)
781
782        self.tar = tarfile.open(tmpname, "w")
783        self.tar.add(self.foo)
784
785    def tearDown(self):
786        self.tar.close()
787        os.remove(self.foo)
788        os.remove(self.bar)
789
790    def test_add_twice(self):
791        # The same name will be added as a REGTYPE every
792        # time regardless of st_nlink.
793        tarinfo = self.tar.gettarinfo(self.foo)
794        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
795                "add file as regular failed")
796
797    def test_add_hardlink(self):
798        tarinfo = self.tar.gettarinfo(self.bar)
799        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
800                "add file as hardlink failed")
801
802    def test_dereference_hardlink(self):
803        self.tar.dereference = True
804        tarinfo = self.tar.gettarinfo(self.bar)
805        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
806                "dereferencing hardlink failed")
807
808
809class PaxWriteTest(GNUWriteTest):
810
811    def _test(self, name, link=None):
812        # See GNUWriteTest.
813        tarinfo = tarfile.TarInfo(name)
814        if link:
815            tarinfo.linkname = link
816            tarinfo.type = tarfile.LNKTYPE
817
818        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
819        tar.addfile(tarinfo)
820        tar.close()
821
822        tar = tarfile.open(tmpname)
823        if link:
824            l = tar.getmembers()[0].linkname
825            self.assertTrue(link == l, "PAX longlink creation failed")
826        else:
827            n = tar.getmembers()[0].name
828            self.assertTrue(name == n, "PAX longname creation failed")
829
830    def test_pax_global_header(self):
831        pax_headers = {
832                "foo": "bar",
833                "uid": "0",
834                "mtime": "1.23",
835                "test": "\xe4\xf6\xfc",
836                "\xe4\xf6\xfc": "test"}
837
838        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, \
839                pax_headers=pax_headers)
840        tar.addfile(tarfile.TarInfo("test"))
841        tar.close()
842
843        # Test if the global header was written correctly.
844        tar = tarfile.open(tmpname, encoding="iso8859-1")
845        self.assertEqual(tar.pax_headers, pax_headers)
846        self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
847
848        # Test if all the fields are strings.
849        for key, val in tar.pax_headers.items():
850            self.assertTrue(type(key) is not bytes)
851            self.assertTrue(type(val) is not bytes)
852            if key in tarfile.PAX_NUMBER_FIELDS:
853                try:
854                    tarfile.PAX_NUMBER_FIELDS[key](val)
855                except (TypeError, ValueError):
856                    self.fail("unable to convert pax header field")
857
858    def test_pax_extended_header(self):
859        # The fields from the pax header have priority over the
860        # TarInfo.
861        pax_headers = {"path": "foo", "uid": "123"}
862
863        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
864        t = tarfile.TarInfo()
865        t.name = "\xe4\xf6\xfc" # non-ASCII
866        t.uid = 8**8 # too large
867        t.pax_headers = pax_headers
868        tar.addfile(t)
869        tar.close()
870
871        tar = tarfile.open(tmpname, encoding="iso8859-1")
872        t = tar.getmembers()[0]
873        self.assertEqual(t.pax_headers, pax_headers)
874        self.assertEqual(t.name, "foo")
875        self.assertEqual(t.uid, 123)
876
877
878class UstarUnicodeTest(unittest.TestCase):
879
880    format = tarfile.USTAR_FORMAT
881
882    def test_iso8859_1_filename(self):
883        self._test_unicode_filename("iso8859-1")
884
885    def test_utf7_filename(self):
886        self._test_unicode_filename("utf7")
887
888    def test_utf8_filename(self):
889        self._test_unicode_filename("utf8")
890
891    def _test_unicode_filename(self, encoding):
892        tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
893        name = "\xe4\xf6\xfc"
894        tar.addfile(tarfile.TarInfo(name))
895        tar.close()
896
897        tar = tarfile.open(tmpname, encoding=encoding)
898        self.assertEqual(tar.getmembers()[0].name, name)
899        tar.close()
900
901    def test_unicode_filename_error(self):
902        if self.format == tarfile.PAX_FORMAT:
903            # PAX_FORMAT ignores encoding in write mode.
904            return
905
906        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
907        tarinfo = tarfile.TarInfo()
908
909        tarinfo.name = "\xe4\xf6\xfc"
910        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
911
912        tarinfo.name = "foo"
913        tarinfo.uname = "\xe4\xf6\xfc"
914        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
915
916    def test_unicode_argument(self):
917        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
918        for t in tar:
919            self.assertTrue(type(t.name) is str)
920            self.assertTrue(type(t.linkname) is str)
921            self.assertTrue(type(t.uname) is str)
922            self.assertTrue(type(t.gname) is str)
923        tar.close()
924
925    def test_uname_unicode(self):
926        t = tarfile.TarInfo("foo")
927        t.uname = "\xe4\xf6\xfc"
928        t.gname = "\xe4\xf6\xfc"
929
930        tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1")
931        tar.addfile(t)
932        tar.close()
933
934        tar = tarfile.open(tmpname, encoding="iso8859-1")
935        t = tar.getmember("foo")
936        self.assertEqual(t.uname, "\xe4\xf6\xfc")
937        self.assertEqual(t.gname, "\xe4\xf6\xfc")
938
939        if self.format != tarfile.PAX_FORMAT:
940            tar = tarfile.open(tmpname, encoding="ascii")
941            t = tar.getmember("foo")
942            self.assertEqual(t.uname, "\ufffd\ufffd\ufffd")
943            self.assertEqual(t.gname, "\ufffd\ufffd\ufffd")
944
945
946class GNUUnicodeTest(UstarUnicodeTest):
947
948    format = tarfile.GNU_FORMAT
949
950
951class PAXUnicodeTest(UstarUnicodeTest):
952
953    format = tarfile.PAX_FORMAT
954
955
956class AppendTest(unittest.TestCase):
957    # Test append mode (cp. patch #1652681).
958
959    def setUp(self):
960        self.tarname = tmpname
961        if os.path.exists(self.tarname):
962            os.remove(self.tarname)
963
964    def _add_testfile(self, fileobj=None):
965        tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
966        tar.addfile(tarfile.TarInfo("bar"))
967        tar.close()
968
969    def _create_testtar(self, mode="w:"):
970        src = tarfile.open(tarname, encoding="iso8859-1")
971        t = src.getmember("ustar/regtype")
972        t.name = "foo"
973        f = src.extractfile(t)
974        tar = tarfile.open(self.tarname, mode)
975        tar.addfile(t, f)
976        tar.close()
977
978    def _test(self, names=["bar"], fileobj=None):
979        tar = tarfile.open(self.tarname, fileobj=fileobj)
980        self.assertEqual(tar.getnames(), names)
981
982    def test_non_existing(self):
983        self._add_testfile()
984        self._test()
985
986    def test_empty(self):
987        open(self.tarname, "w").close()
988        self._add_testfile()
989        self._test()
990
991    def test_empty_fileobj(self):
992        fobj = io.BytesIO()
993        self._add_testfile(fobj)
994        fobj.seek(0)
995        self._test(fileobj=fobj)
996
997    def test_fileobj(self):
998        self._create_testtar()
999        data = open(self.tarname, "rb").read()
1000        fobj = io.BytesIO(data)
1001        self._add_testfile(fobj)
1002        fobj.seek(0)
1003        self._test(names=["foo", "bar"], fileobj=fobj)
1004
1005    def test_existing(self):
1006        self._create_testtar()
1007        self._add_testfile()
1008        self._test(names=["foo", "bar"])
1009
1010    def test_append_gz(self):
1011        if gzip is None:
1012            return
1013        self._create_testtar("w:gz")
1014        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1015
1016    def test_append_bz2(self):
1017        if bz2 is None:
1018            return
1019        self._create_testtar("w:bz2")
1020        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1021
1022
1023class LimitsTest(unittest.TestCase):
1024
1025    def test_ustar_limits(self):
1026        # 100 char name
1027        tarinfo = tarfile.TarInfo("0123456789" * 10)
1028        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1029
1030        # 101 char name that cannot be stored
1031        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1032        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1033
1034        # 256 char name with a slash at pos 156
1035        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1036        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1037
1038        # 256 char name that cannot be stored
1039        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1040        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1041
1042        # 512 char name
1043        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1044        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1045
1046        # 512 char linkname
1047        tarinfo = tarfile.TarInfo("longlink")
1048        tarinfo.linkname = "123/" * 126 + "longname"
1049        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1050
1051        # uid > 8 digits
1052        tarinfo = tarfile.TarInfo("name")
1053        tarinfo.uid = 0o10000000
1054        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1055
1056    def test_gnu_limits(self):
1057        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1058        tarinfo.tobuf(tarfile.GNU_FORMAT)
1059
1060        tarinfo = tarfile.TarInfo("longlink")
1061        tarinfo.linkname = "123/" * 126 + "longname"
1062        tarinfo.tobuf(tarfile.GNU_FORMAT)
1063
1064        # uid >= 256 ** 7
1065        tarinfo = tarfile.TarInfo("name")
1066        tarinfo.uid = 0o4000000000000000000
1067        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1068
1069    def test_pax_limits(self):
1070        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1071        tarinfo.tobuf(tarfile.PAX_FORMAT)
1072
1073        tarinfo = tarfile.TarInfo("longlink")
1074        tarinfo.linkname = "123/" * 126 + "longname"
1075        tarinfo.tobuf(tarfile.PAX_FORMAT)
1076
1077        tarinfo = tarfile.TarInfo("name")
1078        tarinfo.uid = 0o4000000000000000000
1079        tarinfo.tobuf(tarfile.PAX_FORMAT)
1080
1081
1082class MiscTest(unittest.TestCase):
1083
1084    def test_char_fields(self):
1085        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0")
1086        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo")
1087        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo")
1088        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo")
1089
1090    def test_number_fields(self):
1091        self.assertEqual(tarfile.itn(1), b"0000001\x00")
1092        self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff")
1093
1094
1095class GzipMiscReadTest(MiscReadTest):
1096    tarname = gzipname
1097    mode = "r:gz"
1098class GzipUstarReadTest(UstarReadTest):
1099    tarname = gzipname
1100    mode = "r:gz"
1101class GzipStreamReadTest(StreamReadTest):
1102    tarname = gzipname
1103    mode = "r|gz"
1104class GzipWriteTest(WriteTest):
1105    mode = "w:gz"
1106class GzipStreamWriteTest(StreamWriteTest):
1107    mode = "w|gz"
1108
1109
1110class Bz2MiscReadTest(MiscReadTest):
1111    tarname = bz2name
1112    mode = "r:bz2"
1113class Bz2UstarReadTest(UstarReadTest):
1114    tarname = bz2name
1115    mode = "r:bz2"
1116class Bz2StreamReadTest(StreamReadTest):
1117    tarname = bz2name
1118    mode = "r|bz2"
1119class Bz2WriteTest(WriteTest):
1120    mode = "w:bz2"
1121class Bz2StreamWriteTest(StreamWriteTest):
1122    mode = "w|bz2"
1123
1124class Bz2PartialReadTest(unittest.TestCase):
1125    # Issue5068: The _BZ2Proxy.read() method loops forever
1126    # on an empty or partial bzipped file.
1127
1128    def _test_partial_input(self, mode):
1129        class MyBytesIO(io.BytesIO):
1130            hit_eof = False
1131            def read(self, n):
1132                if self.hit_eof:
1133                    raise AssertionError("infinite loop detected in tarfile.open()")
1134                self.hit_eof = self.tell() == len(self.getvalue())
1135                return super(MyBytesIO, self).read(n)
1136
1137        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1138        for x in range(len(data) + 1):
1139            tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
1140
1141    def test_partial_input(self):
1142        self._test_partial_input("r")
1143
1144    def test_partial_input_bz2(self):
1145        self._test_partial_input("r:bz2")
1146
1147
1148def test_main():
1149    if not os.path.exists(TEMPDIR):
1150        os.mkdir(TEMPDIR)
1151
1152    tests = [
1153        UstarReadTest,
1154        MiscReadTest,
1155        StreamReadTest,
1156        DetectReadTest,
1157        MemberReadTest,
1158        GNUReadTest,
1159        PaxReadTest,
1160        WriteTest,
1161        StreamWriteTest,
1162        GNUWriteTest,
1163        PaxWriteTest,
1164        UstarUnicodeTest,
1165        GNUUnicodeTest,
1166        PAXUnicodeTest,
1167        AppendTest,
1168        LimitsTest,
1169        MiscTest,
1170    ]
1171
1172    if hasattr(os, "link"):
1173        tests.append(HardlinkTest)
1174
1175    fobj = open(tarname, "rb")
1176    data = fobj.read()
1177    fobj.close()
1178
1179    if gzip:
1180        # Create testtar.tar.gz and add gzip-specific tests.
1181        tar = gzip.open(gzipname, "wb")
1182        tar.write(data)
1183        tar.close()
1184
1185        tests += [
1186            GzipMiscReadTest,
1187            GzipUstarReadTest,
1188            GzipStreamReadTest,
1189            GzipWriteTest,
1190            GzipStreamWriteTest,
1191        ]
1192
1193    if bz2:
1194        # Create testtar.tar.bz2 and add bz2-specific tests.
1195        tar = bz2.BZ2File(bz2name, "wb")
1196        tar.write(data)
1197        tar.close()
1198
1199        tests += [
1200            Bz2MiscReadTest,
1201            Bz2UstarReadTest,
1202            Bz2StreamReadTest,
1203            Bz2WriteTest,
1204            Bz2StreamWriteTest,
1205            Bz2PartialReadTest,
1206        ]
1207
1208    try:
1209        support.run_unittest(*tests)
1210    finally:
1211        if os.path.exists(TEMPDIR):
1212            shutil.rmtree(TEMPDIR)
1213
1214if __name__ == "__main__":
1215    test_main()
1216