test_tarfile.py revision 57f75799bf4b24b09cbee9a2de4b01b2c73757d4
1import sys
2import os
3import io
4import shutil
5import io
6from hashlib import md5
7import errno
8
9import unittest
10import tarfile
11
12from test import support
13
14# Check for our compression modules.
15try:
16    import gzip
17    gzip.GzipFile
18except (ImportError, AttributeError):
19    gzip = None
20try:
21    import bz2
22except ImportError:
23    bz2 = None
24
25def md5sum(data):
26    return md5(data).hexdigest()
27
28TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
29tarname = support.findfile("testtar.tar")
30gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
31bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
32tmpname = os.path.join(TEMPDIR, "tmp.tar")
33
34md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
35md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
36
37
38class ReadTest(unittest.TestCase):
39
40    tarname = tarname
41    mode = "r:"
42
43    def setUp(self):
44        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
45
46    def tearDown(self):
47        self.tar.close()
48
49
50class UstarReadTest(ReadTest):
51
52    def test_fileobj_regular_file(self):
53        tarinfo = self.tar.getmember("ustar/regtype")
54        fobj = self.tar.extractfile(tarinfo)
55        data = fobj.read()
56        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
57                "regular file extraction failed")
58
59    def test_fileobj_readlines(self):
60        self.tar.extract("ustar/regtype", TEMPDIR)
61        tarinfo = self.tar.getmember("ustar/regtype")
62        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
63            lines1 = fobj1.readlines()
64        fobj2 = io.TextIOWrapper(self.tar.extractfile(tarinfo))
65
66        lines2 = fobj2.readlines()
67        self.assertTrue(lines1 == lines2,
68                "fileobj.readlines() failed")
69        self.assertTrue(len(lines2) == 114,
70                "fileobj.readlines() failed")
71        self.assertTrue(lines2[83] ==
72                "I will gladly admit that Python is not the fastest running scripting language.\n",
73                "fileobj.readlines() failed")
74
75    def test_fileobj_iter(self):
76        self.tar.extract("ustar/regtype", TEMPDIR)
77        tarinfo = self.tar.getmember("ustar/regtype")
78        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
79            lines1 = fobj1.readlines()
80        fobj2 = self.tar.extractfile(tarinfo)
81        lines2 = list(io.TextIOWrapper(fobj2))
82        self.assertTrue(lines1 == lines2,
83                     "fileobj.__iter__() failed")
84
85    def test_fileobj_seek(self):
86        self.tar.extract("ustar/regtype", TEMPDIR)
87        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
88            data = fobj.read()
89
90        tarinfo = self.tar.getmember("ustar/regtype")
91        fobj = self.tar.extractfile(tarinfo)
92
93        text = fobj.read()
94        fobj.seek(0)
95        self.assertEqual(0, fobj.tell(),
96                     "seek() to file's start failed")
97        fobj.seek(2048, 0)
98        self.assertEqual(2048, fobj.tell(),
99                     "seek() to absolute position failed")
100        fobj.seek(-1024, 1)
101        self.assertEqual(1024, fobj.tell(),
102                     "seek() to negative relative position failed")
103        fobj.seek(1024, 1)
104        self.assertEqual(2048, fobj.tell(),
105                     "seek() to positive relative position failed")
106        s = fobj.read(10)
107        self.assertTrue(s == data[2048:2058],
108                     "read() after seek failed")
109        fobj.seek(0, 2)
110        self.assertEqual(tarinfo.size, fobj.tell(),
111                     "seek() to file's end failed")
112        self.assertTrue(fobj.read() == b"",
113                     "read() at file's end did not return empty string")
114        fobj.seek(-tarinfo.size, 2)
115        self.assertEqual(0, fobj.tell(),
116                     "relative seek() to file's end failed")
117        fobj.seek(512)
118        s1 = fobj.readlines()
119        fobj.seek(512)
120        s2 = fobj.readlines()
121        self.assertTrue(s1 == s2,
122                     "readlines() after seek failed")
123        fobj.seek(0)
124        self.assertEqual(len(fobj.readline()), fobj.tell(),
125                     "tell() after readline() failed")
126        fobj.seek(512)
127        self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
128                     "tell() after seek() and readline() failed")
129        fobj.seek(0)
130        line = fobj.readline()
131        self.assertEqual(fobj.read(), data[len(line):],
132                     "read() after readline() failed")
133        fobj.close()
134
135    # Test if symbolic and hard links are resolved by extractfile().  The
136    # test link members each point to a regular member whose data is
137    # supposed to be exported.
138    def _test_fileobj_link(self, lnktype, regtype):
139        a = self.tar.extractfile(lnktype)
140        b = self.tar.extractfile(regtype)
141        self.assertEqual(a.name, b.name)
142
143    def test_fileobj_link1(self):
144        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
145
146    def test_fileobj_link2(self):
147        self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
148
149    def test_fileobj_symlink1(self):
150        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
151
152    def test_fileobj_symlink2(self):
153        self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
154
155
156class CommonReadTest(ReadTest):
157
158    def test_empty_tarfile(self):
159        # Test for issue6123: Allow opening empty archives.
160        # This test checks if tarfile.open() is able to open an empty tar
161        # archive successfully. Note that an empty tar archive is not the
162        # same as an empty file!
163        with tarfile.open(tmpname, self.mode.replace("r", "w")):
164            pass
165        try:
166            tar = tarfile.open(tmpname, self.mode)
167            tar.getnames()
168        except tarfile.ReadError:
169            self.fail("tarfile.open() failed on empty archive")
170        else:
171            self.assertListEqual(tar.getmembers(), [])
172        finally:
173            tar.close()
174
175    def test_null_tarfile(self):
176        # Test for issue6123: Allow opening empty archives.
177        # This test guarantees that tarfile.open() does not treat an empty
178        # file as an empty tar archive.
179        with open(tmpname, "wb"):
180            pass
181        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
182        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
183
184    def test_ignore_zeros(self):
185        # Test TarFile's ignore_zeros option.
186        if self.mode.endswith(":gz"):
187            _open = gzip.GzipFile
188        elif self.mode.endswith(":bz2"):
189            _open = bz2.BZ2File
190        else:
191            _open = open
192
193        for char in (b'\0', b'a'):
194            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
195            # are ignored correctly.
196            with _open(tmpname, "wb") as fobj:
197                fobj.write(char * 1024)
198                fobj.write(tarfile.TarInfo("foo").tobuf())
199
200            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
201            try:
202                self.assertListEqual(tar.getnames(), ["foo"],
203                    "ignore_zeros=True should have skipped the %r-blocks" % char)
204            finally:
205                tar.close()
206
207
208class MiscReadTest(CommonReadTest):
209
210    def test_no_name_argument(self):
211        with open(self.tarname, "rb") as fobj:
212            tar = tarfile.open(fileobj=fobj, mode=self.mode)
213            self.assertEqual(tar.name, os.path.abspath(fobj.name))
214
215    def test_no_name_attribute(self):
216        with open(self.tarname, "rb") as fobj:
217            data = fobj.read()
218        fobj = io.BytesIO(data)
219        self.assertRaises(AttributeError, getattr, fobj, "name")
220        tar = tarfile.open(fileobj=fobj, mode=self.mode)
221        self.assertEqual(tar.name, None)
222
223    def test_empty_name_attribute(self):
224        with open(self.tarname, "rb") as fobj:
225            data = fobj.read()
226        fobj = io.BytesIO(data)
227        fobj.name = ""
228        tar = tarfile.open(fileobj=fobj, mode=self.mode)
229        self.assertEqual(tar.name, None)
230
231    def test_fileobj_with_offset(self):
232        # Skip the first member and store values from the second member
233        # of the testtar.
234        tar = tarfile.open(self.tarname, mode=self.mode)
235        try:
236            tar.next()
237            t = tar.next()
238            name = t.name
239            offset = t.offset
240            data = tar.extractfile(t).read()
241        finally:
242            tar.close()
243
244        # Open the testtar and seek to the offset of the second member.
245        if self.mode.endswith(":gz"):
246            _open = gzip.GzipFile
247        elif self.mode.endswith(":bz2"):
248            _open = bz2.BZ2File
249        else:
250            _open = open
251        fobj = _open(self.tarname, "rb")
252        try:
253            fobj.seek(offset)
254
255            # Test if the tarfile starts with the second member.
256            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
257            t = tar.next()
258            self.assertEqual(t.name, name)
259            # Read to the end of fileobj and test if seeking back to the
260            # beginning works.
261            tar.getmembers()
262            self.assertEqual(tar.extractfile(t).read(), data,
263                    "seek back did not work")
264            tar.close()
265        finally:
266            fobj.close()
267
268    def test_fail_comp(self):
269        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
270        if self.mode == "r:":
271            return
272        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
273        with open(tarname, "rb") as fobj:
274            self.assertRaises(tarfile.ReadError, tarfile.open,
275                              fileobj=fobj, mode=self.mode)
276
277    def test_v7_dirtype(self):
278        # Test old style dirtype member (bug #1336623):
279        # Old V7 tars create directory members using an AREGTYPE
280        # header with a "/" appended to the filename field.
281        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
282        self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
283                "v7 dirtype failed")
284
285    def test_xstar_type(self):
286        # The xstar format stores extra atime and ctime fields inside the
287        # space reserved for the prefix field. The prefix field must be
288        # ignored in this case, otherwise it will mess up the name.
289        try:
290            self.tar.getmember("misc/regtype-xstar")
291        except KeyError:
292            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
293
294    def test_check_members(self):
295        for tarinfo in self.tar:
296            self.assertTrue(int(tarinfo.mtime) == 0o7606136617,
297                    "wrong mtime for %s" % tarinfo.name)
298            if not tarinfo.name.startswith("ustar/"):
299                continue
300            self.assertTrue(tarinfo.uname == "tarfile",
301                    "wrong uname for %s" % tarinfo.name)
302
303    def test_find_members(self):
304        self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
305                "could not find all members")
306
307    @unittest.skipUnless(hasattr(os, "link"),
308                         "Missing hardlink implementation")
309    @support.skip_unless_symlink
310    def test_extract_hardlink(self):
311        # Test hardlink extraction (e.g. bug #857297).
312        tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
313
314        try:
315            tar.extract("ustar/regtype", TEMPDIR)
316            try:
317                tar.extract("ustar/lnktype", TEMPDIR)
318            except EnvironmentError as e:
319                if e.errno == errno.ENOENT:
320                    self.fail("hardlink not extracted properly")
321
322            data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read()
323            self.assertEqual(md5sum(data), md5_regtype)
324
325            try:
326                tar.extract("ustar/symtype", TEMPDIR)
327            except EnvironmentError as e:
328                if e.errno == errno.ENOENT:
329                    self.fail("symlink not extracted properly")
330
331            data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read()
332            self.assertEqual(md5sum(data), md5_regtype)
333        finally:
334            tar.close()
335
336    def test_extractall(self):
337        # Test if extractall() correctly restores directory permissions
338        # and times (see issue1735).
339        tar = tarfile.open(tarname, encoding="iso8859-1")
340        try:
341            directories = [t for t in tar if t.isdir()]
342            tar.extractall(TEMPDIR, directories)
343            for tarinfo in directories:
344                path = os.path.join(TEMPDIR, tarinfo.name)
345                if sys.platform != "win32":
346                    # Win32 has no support for fine grained permissions.
347                    self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777)
348                self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
349        finally:
350            tar.close()
351
352    def test_init_close_fobj(self):
353        # Issue #7341: Close the internal file object in the TarFile
354        # constructor in case of an error. For the test we rely on
355        # the fact that opening an empty file raises a ReadError.
356        empty = os.path.join(TEMPDIR, "empty")
357        with open(empty, "wb") as fobj:
358            fobj.write(b"")
359
360        try:
361            tar = object.__new__(tarfile.TarFile)
362            try:
363                tar.__init__(empty)
364            except tarfile.ReadError:
365                self.assertTrue(tar.fileobj.closed)
366            else:
367                self.fail("ReadError not raised")
368        finally:
369            support.unlink(empty)
370
371
372class StreamReadTest(CommonReadTest):
373
374    mode="r|"
375
376    def test_fileobj_regular_file(self):
377        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
378        fobj = self.tar.extractfile(tarinfo)
379        data = fobj.read()
380        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
381                "regular file extraction failed")
382
383    def test_provoke_stream_error(self):
384        tarinfos = self.tar.getmembers()
385        f = self.tar.extractfile(tarinfos[0]) # read the first member
386        self.assertRaises(tarfile.StreamError, f.read)
387
388    def test_compare_members(self):
389        tar1 = tarfile.open(tarname, encoding="iso8859-1")
390        try:
391            tar2 = self.tar
392
393            while True:
394                t1 = tar1.next()
395                t2 = tar2.next()
396                if t1 is None:
397                    break
398                self.assertTrue(t2 is not None, "stream.next() failed.")
399
400                if t2.islnk() or t2.issym():
401                    self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
402                    continue
403
404                v1 = tar1.extractfile(t1)
405                v2 = tar2.extractfile(t2)
406                if v1 is None:
407                    continue
408                self.assertTrue(v2 is not None, "stream.extractfile() failed")
409                self.assertEqual(v1.read(), v2.read(), "stream extraction failed")
410        finally:
411            tar1.close()
412
413
414class DetectReadTest(unittest.TestCase):
415
416    def _testfunc_file(self, name, mode):
417        try:
418            tar = tarfile.open(name, mode)
419        except tarfile.ReadError as e:
420            self.fail()
421        else:
422            tar.close()
423
424    def _testfunc_fileobj(self, name, mode):
425        try:
426            with open(name, "rb") as f:
427                tar = tarfile.open(name, mode, fileobj=f)
428        except tarfile.ReadError as e:
429            self.fail()
430        else:
431            tar.close()
432
433    def _test_modes(self, testfunc):
434        testfunc(tarname, "r")
435        testfunc(tarname, "r:")
436        testfunc(tarname, "r:*")
437        testfunc(tarname, "r|")
438        testfunc(tarname, "r|*")
439
440        if gzip:
441            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
442            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
443            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
444            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
445
446            testfunc(gzipname, "r")
447            testfunc(gzipname, "r:*")
448            testfunc(gzipname, "r:gz")
449            testfunc(gzipname, "r|*")
450            testfunc(gzipname, "r|gz")
451
452        if bz2:
453            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
454            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
455            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
456            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
457
458            testfunc(bz2name, "r")
459            testfunc(bz2name, "r:*")
460            testfunc(bz2name, "r:bz2")
461            testfunc(bz2name, "r|*")
462            testfunc(bz2name, "r|bz2")
463
464    def test_detect_file(self):
465        self._test_modes(self._testfunc_file)
466
467    def test_detect_fileobj(self):
468        self._test_modes(self._testfunc_fileobj)
469
470
471class MemberReadTest(ReadTest):
472
473    def _test_member(self, tarinfo, chksum=None, **kwargs):
474        if chksum is not None:
475            self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
476                    "wrong md5sum for %s" % tarinfo.name)
477
478        kwargs["mtime"] = 0o7606136617
479        kwargs["uid"] = 1000
480        kwargs["gid"] = 100
481        if "old-v7" not in tarinfo.name:
482            # V7 tar can't handle alphabetic owners.
483            kwargs["uname"] = "tarfile"
484            kwargs["gname"] = "tarfile"
485        for k, v in kwargs.items():
486            self.assertTrue(getattr(tarinfo, k) == v,
487                    "wrong value in %s field of %s" % (k, tarinfo.name))
488
489    def test_find_regtype(self):
490        tarinfo = self.tar.getmember("ustar/regtype")
491        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
492
493    def test_find_conttype(self):
494        tarinfo = self.tar.getmember("ustar/conttype")
495        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
496
497    def test_find_dirtype(self):
498        tarinfo = self.tar.getmember("ustar/dirtype")
499        self._test_member(tarinfo, size=0)
500
501    def test_find_dirtype_with_size(self):
502        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
503        self._test_member(tarinfo, size=255)
504
505    def test_find_lnktype(self):
506        tarinfo = self.tar.getmember("ustar/lnktype")
507        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
508
509    def test_find_symtype(self):
510        tarinfo = self.tar.getmember("ustar/symtype")
511        self._test_member(tarinfo, size=0, linkname="regtype")
512
513    def test_find_blktype(self):
514        tarinfo = self.tar.getmember("ustar/blktype")
515        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
516
517    def test_find_chrtype(self):
518        tarinfo = self.tar.getmember("ustar/chrtype")
519        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
520
521    def test_find_fifotype(self):
522        tarinfo = self.tar.getmember("ustar/fifotype")
523        self._test_member(tarinfo, size=0)
524
525    def test_find_sparse(self):
526        tarinfo = self.tar.getmember("ustar/sparse")
527        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
528
529    def test_find_umlauts(self):
530        tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
531        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
532
533    def test_find_ustar_longname(self):
534        name = "ustar/" + "12345/" * 39 + "1234567/longname"
535        self.assertIn(name, self.tar.getnames())
536
537    def test_find_regtype_oldv7(self):
538        tarinfo = self.tar.getmember("misc/regtype-old-v7")
539        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
540
541    def test_find_pax_umlauts(self):
542        self.tar.close()
543        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
544        tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
545        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
546
547
548class LongnameTest(ReadTest):
549
550    def test_read_longname(self):
551        # Test reading of longname (bug #1471427).
552        longname = self.subdir + "/" + "123/" * 125 + "longname"
553        try:
554            tarinfo = self.tar.getmember(longname)
555        except KeyError:
556            self.fail("longname not found")
557        self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
558
559    def test_read_longlink(self):
560        longname = self.subdir + "/" + "123/" * 125 + "longname"
561        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
562        try:
563            tarinfo = self.tar.getmember(longlink)
564        except KeyError:
565            self.fail("longlink not found")
566        self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
567
568    def test_truncated_longname(self):
569        longname = self.subdir + "/" + "123/" * 125 + "longname"
570        tarinfo = self.tar.getmember(longname)
571        offset = tarinfo.offset
572        self.tar.fileobj.seek(offset)
573        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
574        self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
575
576    def test_header_offset(self):
577        # Test if the start offset of the TarInfo object includes
578        # the preceding extended header.
579        longname = self.subdir + "/" + "123/" * 125 + "longname"
580        offset = self.tar.getmember(longname).offset
581        fobj = open(tarname, "rb")
582        fobj.seek(offset)
583        tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict")
584        self.assertEqual(tarinfo.type, self.longnametype)
585
586
587class GNUReadTest(LongnameTest):
588
589    subdir = "gnu"
590    longnametype = tarfile.GNUTYPE_LONGNAME
591
592    def test_sparse_file(self):
593        tarinfo1 = self.tar.getmember("ustar/sparse")
594        fobj1 = self.tar.extractfile(tarinfo1)
595        tarinfo2 = self.tar.getmember("gnu/sparse")
596        fobj2 = self.tar.extractfile(tarinfo2)
597        self.assertEqual(fobj1.read(), fobj2.read(),
598                "sparse file extraction failed")
599
600
601class PaxReadTest(LongnameTest):
602
603    subdir = "pax"
604    longnametype = tarfile.XHDTYPE
605
606    def test_pax_global_headers(self):
607        tar = tarfile.open(tarname, encoding="iso8859-1")
608        try:
609            tarinfo = tar.getmember("pax/regtype1")
610            self.assertEqual(tarinfo.uname, "foo")
611            self.assertEqual(tarinfo.gname, "bar")
612            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
613
614            tarinfo = tar.getmember("pax/regtype2")
615            self.assertEqual(tarinfo.uname, "")
616            self.assertEqual(tarinfo.gname, "bar")
617            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
618
619            tarinfo = tar.getmember("pax/regtype3")
620            self.assertEqual(tarinfo.uname, "tarfile")
621            self.assertEqual(tarinfo.gname, "tarfile")
622            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
623        finally:
624            tar.close()
625
626    def test_pax_number_fields(self):
627        # All following number fields are read from the pax header.
628        tar = tarfile.open(tarname, encoding="iso8859-1")
629        try:
630            tarinfo = tar.getmember("pax/regtype4")
631            self.assertEqual(tarinfo.size, 7011)
632            self.assertEqual(tarinfo.uid, 123)
633            self.assertEqual(tarinfo.gid, 123)
634            self.assertEqual(tarinfo.mtime, 1041808783.0)
635            self.assertEqual(type(tarinfo.mtime), float)
636            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
637            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
638        finally:
639            tar.close()
640
641
642class WriteTestBase(unittest.TestCase):
643    # Put all write tests in here that are supposed to be tested
644    # in all possible mode combinations.
645
646    def test_fileobj_no_close(self):
647        fobj = io.BytesIO()
648        tar = tarfile.open(fileobj=fobj, mode=self.mode)
649        tar.addfile(tarfile.TarInfo("foo"))
650        tar.close()
651        self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
652
653
654class WriteTest(WriteTestBase):
655
656    mode = "w:"
657
658    def test_100_char_name(self):
659        # The name field in a tar header stores strings of at most 100 chars.
660        # If a string is shorter than 100 chars it has to be padded with '\0',
661        # which implies that a string of exactly 100 chars is stored without
662        # a trailing '\0'.
663        name = "0123456789" * 10
664        tar = tarfile.open(tmpname, self.mode)
665        try:
666            t = tarfile.TarInfo(name)
667            tar.addfile(t)
668        finally:
669            tar.close()
670
671        tar = tarfile.open(tmpname)
672        try:
673            self.assertTrue(tar.getnames()[0] == name,
674                    "failed to store 100 char filename")
675        finally:
676            tar.close()
677
678    def test_tar_size(self):
679        # Test for bug #1013882.
680        tar = tarfile.open(tmpname, self.mode)
681        try:
682            path = os.path.join(TEMPDIR, "file")
683            with open(path, "wb") as fobj:
684                fobj.write(b"aaa")
685            tar.add(path)
686        finally:
687            tar.close()
688        self.assertTrue(os.path.getsize(tmpname) > 0,
689                "tarfile is empty")
690
691    # The test_*_size tests test for bug #1167128.
692    def test_file_size(self):
693        tar = tarfile.open(tmpname, self.mode)
694        try:
695            path = os.path.join(TEMPDIR, "file")
696            with open(path, "wb"):
697                pass
698            tarinfo = tar.gettarinfo(path)
699            self.assertEqual(tarinfo.size, 0)
700
701            with open(path, "wb") as fobj:
702                fobj.write(b"aaa")
703            tarinfo = tar.gettarinfo(path)
704            self.assertEqual(tarinfo.size, 3)
705        finally:
706            tar.close()
707
708    def test_directory_size(self):
709        path = os.path.join(TEMPDIR, "directory")
710        os.mkdir(path)
711        try:
712            tar = tarfile.open(tmpname, self.mode)
713            try:
714                tarinfo = tar.gettarinfo(path)
715                self.assertEqual(tarinfo.size, 0)
716            finally:
717                tar.close()
718        finally:
719            os.rmdir(path)
720
721    def test_link_size(self):
722        if hasattr(os, "link"):
723            link = os.path.join(TEMPDIR, "link")
724            target = os.path.join(TEMPDIR, "link_target")
725            with open(target, "wb") as fobj:
726                fobj.write(b"aaa")
727            os.link(target, link)
728            try:
729                tar = tarfile.open(tmpname, self.mode)
730                try:
731                    # Record the link target in the inodes list.
732                    tar.gettarinfo(target)
733                    tarinfo = tar.gettarinfo(link)
734                    self.assertEqual(tarinfo.size, 0)
735                finally:
736                    tar.close()
737            finally:
738                os.remove(target)
739                os.remove(link)
740
741    @support.skip_unless_symlink
742    def test_symlink_size(self):
743        path = os.path.join(TEMPDIR, "symlink")
744        os.symlink("link_target", path)
745        try:
746            tar = tarfile.open(tmpname, self.mode)
747            try:
748                tarinfo = tar.gettarinfo(path)
749                self.assertEqual(tarinfo.size, 0)
750            finally:
751                tar.close()
752        finally:
753            os.remove(path)
754
755    def test_add_self(self):
756        # Test for #1257255.
757        dstname = os.path.abspath(tmpname)
758        tar = tarfile.open(tmpname, self.mode)
759        try:
760            self.assertTrue(tar.name == dstname, "archive name must be absolute")
761            tar.add(dstname)
762            self.assertTrue(tar.getnames() == [], "added the archive to itself")
763
764            cwd = os.getcwd()
765            os.chdir(TEMPDIR)
766            tar.add(dstname)
767            os.chdir(cwd)
768            self.assertTrue(tar.getnames() == [], "added the archive to itself")
769        finally:
770            tar.close()
771
772    def test_exclude(self):
773        tempdir = os.path.join(TEMPDIR, "exclude")
774        os.mkdir(tempdir)
775        try:
776            for name in ("foo", "bar", "baz"):
777                name = os.path.join(tempdir, name)
778                open(name, "wb").close()
779
780            exclude = os.path.isfile
781
782            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
783            try:
784                with support.check_warnings(("use the filter argument",
785                                             DeprecationWarning)):
786                    tar.add(tempdir, arcname="empty_dir", exclude=exclude)
787            finally:
788                tar.close()
789
790            tar = tarfile.open(tmpname, "r")
791            try:
792                self.assertEqual(len(tar.getmembers()), 1)
793                self.assertEqual(tar.getnames()[0], "empty_dir")
794            finally:
795                tar.close()
796        finally:
797            shutil.rmtree(tempdir)
798
799    def test_filter(self):
800        tempdir = os.path.join(TEMPDIR, "filter")
801        os.mkdir(tempdir)
802        try:
803            for name in ("foo", "bar", "baz"):
804                name = os.path.join(tempdir, name)
805                open(name, "wb").close()
806
807            def filter(tarinfo):
808                if os.path.basename(tarinfo.name) == "bar":
809                    return
810                tarinfo.uid = 123
811                tarinfo.uname = "foo"
812                return tarinfo
813
814            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
815            try:
816                tar.add(tempdir, arcname="empty_dir", filter=filter)
817            finally:
818                tar.close()
819
820            tar = tarfile.open(tmpname, "r")
821            try:
822                for tarinfo in tar:
823                    self.assertEqual(tarinfo.uid, 123)
824                    self.assertEqual(tarinfo.uname, "foo")
825                self.assertEqual(len(tar.getmembers()), 3)
826            finally:
827                tar.close()
828        finally:
829            shutil.rmtree(tempdir)
830
831    # Guarantee that stored pathnames are not modified. Don't
832    # remove ./ or ../ or double slashes. Still make absolute
833    # pathnames relative.
834    # For details see bug #6054.
835    def _test_pathname(self, path, cmp_path=None, dir=False):
836        # Create a tarfile with an empty member named path
837        # and compare the stored name with the original.
838        foo = os.path.join(TEMPDIR, "foo")
839        if not dir:
840            open(foo, "w").close()
841        else:
842            os.mkdir(foo)
843
844        tar = tarfile.open(tmpname, self.mode)
845        try:
846            tar.add(foo, arcname=path)
847        finally:
848            tar.close()
849
850        tar = tarfile.open(tmpname, "r")
851        try:
852            t = tar.next()
853        finally:
854            tar.close()
855
856        if not dir:
857            os.remove(foo)
858        else:
859            os.rmdir(foo)
860
861        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
862
863    def test_pathnames(self):
864        self._test_pathname("foo")
865        self._test_pathname(os.path.join("foo", ".", "bar"))
866        self._test_pathname(os.path.join("foo", "..", "bar"))
867        self._test_pathname(os.path.join(".", "foo"))
868        self._test_pathname(os.path.join(".", "foo", "."))
869        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
870        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
871        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
872        self._test_pathname(os.path.join("..", "foo"))
873        self._test_pathname(os.path.join("..", "foo", ".."))
874        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
875        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
876
877        self._test_pathname("foo" + os.sep + os.sep + "bar")
878        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
879
880    def test_abs_pathnames(self):
881        if sys.platform == "win32":
882            self._test_pathname("C:\\foo", "foo")
883        else:
884            self._test_pathname("/foo", "foo")
885            self._test_pathname("///foo", "foo")
886
887    def test_cwd(self):
888        # Test adding the current working directory.
889        cwd = os.getcwd()
890        os.chdir(TEMPDIR)
891        try:
892            tar = tarfile.open(tmpname, self.mode)
893            try:
894                tar.add(".")
895            finally:
896                tar.close()
897
898            tar = tarfile.open(tmpname, "r")
899            try:
900                for t in tar:
901                    self.assert_(t.name == "." or t.name.startswith("./"))
902            finally:
903                tar.close()
904        finally:
905            os.chdir(cwd)
906
907
908class StreamWriteTest(WriteTestBase):
909
910    mode = "w|"
911
912    def test_stream_padding(self):
913        # Test for bug #1543303.
914        tar = tarfile.open(tmpname, self.mode)
915        tar.close()
916
917        if self.mode.endswith("gz"):
918            with gzip.GzipFile(tmpname) as fobj:
919                data = fobj.read()
920        elif self.mode.endswith("bz2"):
921            dec = bz2.BZ2Decompressor()
922            with open(tmpname, "rb") as fobj:
923                data = fobj.read()
924            data = dec.decompress(data)
925            self.assertTrue(len(dec.unused_data) == 0,
926                    "found trailing data")
927        else:
928            with open(tmpname, "rb") as fobj:
929                data = fobj.read()
930
931        self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE,
932                         "incorrect zero padding")
933
934    def test_file_mode(self):
935        # Test for issue #8464: Create files with correct
936        # permissions.
937        if sys.platform == "win32" or not hasattr(os, "umask"):
938            return
939
940        if os.path.exists(tmpname):
941            os.remove(tmpname)
942
943        original_umask = os.umask(0o022)
944        try:
945            tar = tarfile.open(tmpname, self.mode)
946            tar.close()
947            mode = os.stat(tmpname).st_mode & 0o777
948            self.assertEqual(mode, 0o644, "wrong file permissions")
949        finally:
950            os.umask(original_umask)
951
952
953class GNUWriteTest(unittest.TestCase):
954    # This testcase checks for correct creation of GNU Longname
955    # and Longlink extended headers (cp. bug #812325).
956
957    def _length(self, s):
958        blocks, remainder = divmod(len(s) + 1, 512)
959        if remainder:
960            blocks += 1
961        return blocks * 512
962
963    def _calc_size(self, name, link=None):
964        # Initial tar header
965        count = 512
966
967        if len(name) > tarfile.LENGTH_NAME:
968            # GNU longname extended header + longname
969            count += 512
970            count += self._length(name)
971        if link is not None and len(link) > tarfile.LENGTH_LINK:
972            # GNU longlink extended header + longlink
973            count += 512
974            count += self._length(link)
975        return count
976
977    def _test(self, name, link=None):
978        tarinfo = tarfile.TarInfo(name)
979        if link:
980            tarinfo.linkname = link
981            tarinfo.type = tarfile.LNKTYPE
982
983        tar = tarfile.open(tmpname, "w")
984        try:
985            tar.format = tarfile.GNU_FORMAT
986            tar.addfile(tarinfo)
987
988            v1 = self._calc_size(name, link)
989            v2 = tar.offset
990            self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
991        finally:
992            tar.close()
993
994        tar = tarfile.open(tmpname)
995        try:
996            member = tar.next()
997            self.assertIsNotNone(member,
998                    "unable to read longname member")
999            self.assertEqual(tarinfo.name, member.name,
1000                    "unable to read longname member")
1001            self.assertEqual(tarinfo.linkname, member.linkname,
1002                    "unable to read longname member")
1003        finally:
1004            tar.close()
1005
1006    def test_longname_1023(self):
1007        self._test(("longnam/" * 127) + "longnam")
1008
1009    def test_longname_1024(self):
1010        self._test(("longnam/" * 127) + "longname")
1011
1012    def test_longname_1025(self):
1013        self._test(("longnam/" * 127) + "longname_")
1014
1015    def test_longlink_1023(self):
1016        self._test("name", ("longlnk/" * 127) + "longlnk")
1017
1018    def test_longlink_1024(self):
1019        self._test("name", ("longlnk/" * 127) + "longlink")
1020
1021    def test_longlink_1025(self):
1022        self._test("name", ("longlnk/" * 127) + "longlink_")
1023
1024    def test_longnamelink_1023(self):
1025        self._test(("longnam/" * 127) + "longnam",
1026                   ("longlnk/" * 127) + "longlnk")
1027
1028    def test_longnamelink_1024(self):
1029        self._test(("longnam/" * 127) + "longname",
1030                   ("longlnk/" * 127) + "longlink")
1031
1032    def test_longnamelink_1025(self):
1033        self._test(("longnam/" * 127) + "longname_",
1034                   ("longlnk/" * 127) + "longlink_")
1035
1036
1037class HardlinkTest(unittest.TestCase):
1038    # Test the creation of LNKTYPE (hardlink) members in an archive.
1039
1040    def setUp(self):
1041        self.foo = os.path.join(TEMPDIR, "foo")
1042        self.bar = os.path.join(TEMPDIR, "bar")
1043
1044        with open(self.foo, "wb") as fobj:
1045            fobj.write(b"foo")
1046
1047        os.link(self.foo, self.bar)
1048
1049        self.tar = tarfile.open(tmpname, "w")
1050        self.tar.add(self.foo)
1051
1052    def tearDown(self):
1053        self.tar.close()
1054        support.unlink(self.foo)
1055        support.unlink(self.bar)
1056
1057    def test_add_twice(self):
1058        # The same name will be added as a REGTYPE every
1059        # time regardless of st_nlink.
1060        tarinfo = self.tar.gettarinfo(self.foo)
1061        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1062                "add file as regular failed")
1063
1064    def test_add_hardlink(self):
1065        tarinfo = self.tar.gettarinfo(self.bar)
1066        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
1067                "add file as hardlink failed")
1068
1069    def test_dereference_hardlink(self):
1070        self.tar.dereference = True
1071        tarinfo = self.tar.gettarinfo(self.bar)
1072        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1073                "dereferencing hardlink failed")
1074
1075
1076class PaxWriteTest(GNUWriteTest):
1077
1078    def _test(self, name, link=None):
1079        # See GNUWriteTest.
1080        tarinfo = tarfile.TarInfo(name)
1081        if link:
1082            tarinfo.linkname = link
1083            tarinfo.type = tarfile.LNKTYPE
1084
1085        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1086        try:
1087            tar.addfile(tarinfo)
1088        finally:
1089            tar.close()
1090
1091        tar = tarfile.open(tmpname)
1092        try:
1093            if link:
1094                l = tar.getmembers()[0].linkname
1095                self.assertTrue(link == l, "PAX longlink creation failed")
1096            else:
1097                n = tar.getmembers()[0].name
1098                self.assertTrue(name == n, "PAX longname creation failed")
1099        finally:
1100            tar.close()
1101
1102    def test_pax_global_header(self):
1103        pax_headers = {
1104                "foo": "bar",
1105                "uid": "0",
1106                "mtime": "1.23",
1107                "test": "\xe4\xf6\xfc",
1108                "\xe4\xf6\xfc": "test"}
1109
1110        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1111                pax_headers=pax_headers)
1112        try:
1113            tar.addfile(tarfile.TarInfo("test"))
1114        finally:
1115            tar.close()
1116
1117        # Test if the global header was written correctly.
1118        tar = tarfile.open(tmpname, encoding="iso8859-1")
1119        try:
1120            self.assertEqual(tar.pax_headers, pax_headers)
1121            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1122            # Test if all the fields are strings.
1123            for key, val in tar.pax_headers.items():
1124                self.assertTrue(type(key) is not bytes)
1125                self.assertTrue(type(val) is not bytes)
1126                if key in tarfile.PAX_NUMBER_FIELDS:
1127                    try:
1128                        tarfile.PAX_NUMBER_FIELDS[key](val)
1129                    except (TypeError, ValueError):
1130                        self.fail("unable to convert pax header field")
1131        finally:
1132            tar.close()
1133
1134    def test_pax_extended_header(self):
1135        # The fields from the pax header have priority over the
1136        # TarInfo.
1137        pax_headers = {"path": "foo", "uid": "123"}
1138
1139        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
1140        try:
1141            t = tarfile.TarInfo()
1142            t.name = "\xe4\xf6\xfc" # non-ASCII
1143            t.uid = 8**8 # too large
1144            t.pax_headers = pax_headers
1145            tar.addfile(t)
1146        finally:
1147            tar.close()
1148
1149        tar = tarfile.open(tmpname, encoding="iso8859-1")
1150        try:
1151            t = tar.getmembers()[0]
1152            self.assertEqual(t.pax_headers, pax_headers)
1153            self.assertEqual(t.name, "foo")
1154            self.assertEqual(t.uid, 123)
1155        finally:
1156            tar.close()
1157
1158
1159class UstarUnicodeTest(unittest.TestCase):
1160
1161    format = tarfile.USTAR_FORMAT
1162
1163    def test_iso8859_1_filename(self):
1164        self._test_unicode_filename("iso8859-1")
1165
1166    def test_utf7_filename(self):
1167        self._test_unicode_filename("utf7")
1168
1169    def test_utf8_filename(self):
1170        self._test_unicode_filename("utf8")
1171
1172    def _test_unicode_filename(self, encoding):
1173        tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
1174        try:
1175            name = "\xe4\xf6\xfc"
1176            tar.addfile(tarfile.TarInfo(name))
1177        finally:
1178            tar.close()
1179
1180        tar = tarfile.open(tmpname, encoding=encoding)
1181        try:
1182            self.assertEqual(tar.getmembers()[0].name, name)
1183        finally:
1184            tar.close()
1185
1186    def test_unicode_filename_error(self):
1187        if self.format == tarfile.PAX_FORMAT:
1188            # PAX_FORMAT ignores encoding in write mode.
1189            return
1190
1191        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
1192        try:
1193            tarinfo = tarfile.TarInfo()
1194
1195            tarinfo.name = "\xe4\xf6\xfc"
1196            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1197
1198            tarinfo.name = "foo"
1199            tarinfo.uname = "\xe4\xf6\xfc"
1200            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1201        finally:
1202            tar.close()
1203
1204    def test_unicode_argument(self):
1205        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
1206        try:
1207            for t in tar:
1208                self.assertTrue(type(t.name) is str)
1209                self.assertTrue(type(t.linkname) is str)
1210                self.assertTrue(type(t.uname) is str)
1211                self.assertTrue(type(t.gname) is str)
1212        finally:
1213            tar.close()
1214
1215    def test_uname_unicode(self):
1216        t = tarfile.TarInfo("foo")
1217        t.uname = "\xe4\xf6\xfc"
1218        t.gname = "\xe4\xf6\xfc"
1219
1220        tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1")
1221        try:
1222            tar.addfile(t)
1223        finally:
1224            tar.close()
1225
1226        tar = tarfile.open(tmpname, encoding="iso8859-1")
1227        try:
1228            t = tar.getmember("foo")
1229            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1230            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1231
1232            if self.format != tarfile.PAX_FORMAT:
1233                tar.close()
1234                tar = tarfile.open(tmpname, encoding="ascii")
1235                t = tar.getmember("foo")
1236                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1237                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1238        finally:
1239            tar.close()
1240
1241
1242class GNUUnicodeTest(UstarUnicodeTest):
1243
1244    format = tarfile.GNU_FORMAT
1245
1246    def test_bad_pax_header(self):
1247        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1248        # without a hdrcharset=BINARY header.
1249        for encoding, name in (("utf8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1250                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1251            with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar:
1252                try:
1253                    t = tar.getmember(name)
1254                except KeyError:
1255                    self.fail("unable to read bad GNU tar pax header")
1256
1257
1258class PAXUnicodeTest(UstarUnicodeTest):
1259
1260    format = tarfile.PAX_FORMAT
1261
1262    def test_binary_header(self):
1263        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1264        for encoding, name in (("utf8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1265                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1266            with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar:
1267                try:
1268                    t = tar.getmember(name)
1269                except KeyError:
1270                    self.fail("unable to read POSIX.1-2008 binary header")
1271
1272
1273class AppendTest(unittest.TestCase):
1274    # Test append mode (cp. patch #1652681).
1275
1276    def setUp(self):
1277        self.tarname = tmpname
1278        if os.path.exists(self.tarname):
1279            os.remove(self.tarname)
1280
1281    def _add_testfile(self, fileobj=None):
1282        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1283            tar.addfile(tarfile.TarInfo("bar"))
1284
1285    def _create_testtar(self, mode="w:"):
1286        with tarfile.open(tarname, encoding="iso8859-1") as src:
1287            t = src.getmember("ustar/regtype")
1288            t.name = "foo"
1289            f = src.extractfile(t)
1290            with tarfile.open(self.tarname, mode) as tar:
1291                tar.addfile(t, f)
1292
1293    def _test(self, names=["bar"], fileobj=None):
1294        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1295            self.assertEqual(tar.getnames(), names)
1296
1297    def test_non_existing(self):
1298        self._add_testfile()
1299        self._test()
1300
1301    def test_empty(self):
1302        tarfile.open(self.tarname, "w:").close()
1303        self._add_testfile()
1304        self._test()
1305
1306    def test_empty_fileobj(self):
1307        fobj = io.BytesIO(b"\0" * 1024)
1308        self._add_testfile(fobj)
1309        fobj.seek(0)
1310        self._test(fileobj=fobj)
1311
1312    def test_fileobj(self):
1313        self._create_testtar()
1314        with open(self.tarname, "rb") as fobj:
1315            data = fobj.read()
1316        fobj = io.BytesIO(data)
1317        self._add_testfile(fobj)
1318        fobj.seek(0)
1319        self._test(names=["foo", "bar"], fileobj=fobj)
1320
1321    def test_existing(self):
1322        self._create_testtar()
1323        self._add_testfile()
1324        self._test(names=["foo", "bar"])
1325
1326    def test_append_gz(self):
1327        if gzip is None:
1328            return
1329        self._create_testtar("w:gz")
1330        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1331
1332    def test_append_bz2(self):
1333        if bz2 is None:
1334            return
1335        self._create_testtar("w:bz2")
1336        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1337
1338    # Append mode is supposed to fail if the tarfile to append to
1339    # does not end with a zero block.
1340    def _test_error(self, data):
1341        with open(self.tarname, "wb") as fobj:
1342            fobj.write(data)
1343        self.assertRaises(tarfile.ReadError, self._add_testfile)
1344
1345    def test_null(self):
1346        self._test_error(b"")
1347
1348    def test_incomplete(self):
1349        self._test_error(b"\0" * 13)
1350
1351    def test_premature_eof(self):
1352        data = tarfile.TarInfo("foo").tobuf()
1353        self._test_error(data)
1354
1355    def test_trailing_garbage(self):
1356        data = tarfile.TarInfo("foo").tobuf()
1357        self._test_error(data + b"\0" * 13)
1358
1359    def test_invalid(self):
1360        self._test_error(b"a" * 512)
1361
1362
1363class LimitsTest(unittest.TestCase):
1364
1365    def test_ustar_limits(self):
1366        # 100 char name
1367        tarinfo = tarfile.TarInfo("0123456789" * 10)
1368        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1369
1370        # 101 char name that cannot be stored
1371        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1372        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1373
1374        # 256 char name with a slash at pos 156
1375        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1376        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1377
1378        # 256 char name that cannot be stored
1379        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1380        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1381
1382        # 512 char name
1383        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1384        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1385
1386        # 512 char linkname
1387        tarinfo = tarfile.TarInfo("longlink")
1388        tarinfo.linkname = "123/" * 126 + "longname"
1389        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1390
1391        # uid > 8 digits
1392        tarinfo = tarfile.TarInfo("name")
1393        tarinfo.uid = 0o10000000
1394        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1395
1396    def test_gnu_limits(self):
1397        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1398        tarinfo.tobuf(tarfile.GNU_FORMAT)
1399
1400        tarinfo = tarfile.TarInfo("longlink")
1401        tarinfo.linkname = "123/" * 126 + "longname"
1402        tarinfo.tobuf(tarfile.GNU_FORMAT)
1403
1404        # uid >= 256 ** 7
1405        tarinfo = tarfile.TarInfo("name")
1406        tarinfo.uid = 0o4000000000000000000
1407        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1408
1409    def test_pax_limits(self):
1410        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1411        tarinfo.tobuf(tarfile.PAX_FORMAT)
1412
1413        tarinfo = tarfile.TarInfo("longlink")
1414        tarinfo.linkname = "123/" * 126 + "longname"
1415        tarinfo.tobuf(tarfile.PAX_FORMAT)
1416
1417        tarinfo = tarfile.TarInfo("name")
1418        tarinfo.uid = 0o4000000000000000000
1419        tarinfo.tobuf(tarfile.PAX_FORMAT)
1420
1421
1422class MiscTest(unittest.TestCase):
1423
1424    def test_char_fields(self):
1425        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0")
1426        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo")
1427        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo")
1428        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo")
1429
1430    def test_number_fields(self):
1431        self.assertEqual(tarfile.itn(1), b"0000001\x00")
1432        self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff")
1433
1434
1435class ContextManagerTest(unittest.TestCase):
1436
1437    def test_basic(self):
1438        with tarfile.open(tarname) as tar:
1439            self.assertFalse(tar.closed, "closed inside runtime context")
1440        self.assertTrue(tar.closed, "context manager failed")
1441
1442    def test_closed(self):
1443        # The __enter__() method is supposed to raise IOError
1444        # if the TarFile object is already closed.
1445        tar = tarfile.open(tarname)
1446        tar.close()
1447        with self.assertRaises(IOError):
1448            with tar:
1449                pass
1450
1451    def test_exception(self):
1452        # Test if the IOError exception is passed through properly.
1453        with self.assertRaises(Exception) as exc:
1454            with tarfile.open(tarname) as tar:
1455                raise IOError
1456        self.assertIsInstance(exc.exception, IOError,
1457                              "wrong exception raised in context manager")
1458        self.assertTrue(tar.closed, "context manager failed")
1459
1460    def test_no_eof(self):
1461        # __exit__() must not write end-of-archive blocks if an
1462        # exception was raised.
1463        try:
1464            with tarfile.open(tmpname, "w") as tar:
1465                raise Exception
1466        except:
1467            pass
1468        self.assertEqual(os.path.getsize(tmpname), 0,
1469                "context manager wrote an end-of-archive block")
1470        self.assertTrue(tar.closed, "context manager failed")
1471
1472    def test_eof(self):
1473        # __exit__() must write end-of-archive blocks, i.e. call
1474        # TarFile.close() if there was no error.
1475        with tarfile.open(tmpname, "w"):
1476            pass
1477        self.assertNotEqual(os.path.getsize(tmpname), 0,
1478                "context manager wrote no end-of-archive block")
1479
1480    def test_fileobj(self):
1481        # Test that __exit__() did not close the external file
1482        # object.
1483        with open(tmpname, "wb") as fobj:
1484            try:
1485                with tarfile.open(fileobj=fobj, mode="w") as tar:
1486                    raise Exception
1487            except:
1488                pass
1489            self.assertFalse(fobj.closed, "external file object was closed")
1490            self.assertTrue(tar.closed, "context manager failed")
1491
1492
1493class LinkEmulationTest(ReadTest):
1494
1495    # Test for issue #8741 regression. On platforms that do not support
1496    # symbolic or hard links tarfile tries to extract these types of members as
1497    # the regular files they point to.
1498    def _test_link_extraction(self, name):
1499        self.tar.extract(name, TEMPDIR)
1500        data = open(os.path.join(TEMPDIR, name), "rb").read()
1501        self.assertEqual(md5sum(data), md5_regtype)
1502
1503    # When 8879 gets fixed, this will need to change. Currently on Windows
1504    # we have os.path.islink but no os.link, so these tests fail without the
1505    # following skip until link is completed.
1506    @unittest.skipIf(hasattr(os.path, "islink"),
1507                     "Skip emulation - has os.path.islink but not os.link")
1508    def test_hardlink_extraction1(self):
1509        self._test_link_extraction("ustar/lnktype")
1510
1511    @unittest.skipIf(hasattr(os.path, "islink"),
1512                     "Skip emulation - has os.path.islink but not os.link")
1513    def test_hardlink_extraction2(self):
1514        self._test_link_extraction("./ustar/linktest2/lnktype")
1515
1516    @unittest.skipIf(hasattr(os, "symlink"),
1517                     "Skip emulation if symlink exists")
1518    def test_symlink_extraction1(self):
1519        self._test_link_extraction("ustar/symtype")
1520
1521    @unittest.skipIf(hasattr(os, "symlink"),
1522                     "Skip emulation if symlink exists")
1523    def test_symlink_extraction2(self):
1524        self._test_link_extraction("./ustar/linktest2/symtype")
1525
1526
1527class GzipMiscReadTest(MiscReadTest):
1528    tarname = gzipname
1529    mode = "r:gz"
1530class GzipUstarReadTest(UstarReadTest):
1531    tarname = gzipname
1532    mode = "r:gz"
1533class GzipStreamReadTest(StreamReadTest):
1534    tarname = gzipname
1535    mode = "r|gz"
1536class GzipWriteTest(WriteTest):
1537    mode = "w:gz"
1538class GzipStreamWriteTest(StreamWriteTest):
1539    mode = "w|gz"
1540
1541
1542class Bz2MiscReadTest(MiscReadTest):
1543    tarname = bz2name
1544    mode = "r:bz2"
1545class Bz2UstarReadTest(UstarReadTest):
1546    tarname = bz2name
1547    mode = "r:bz2"
1548class Bz2StreamReadTest(StreamReadTest):
1549    tarname = bz2name
1550    mode = "r|bz2"
1551class Bz2WriteTest(WriteTest):
1552    mode = "w:bz2"
1553class Bz2StreamWriteTest(StreamWriteTest):
1554    mode = "w|bz2"
1555
1556class Bz2PartialReadTest(unittest.TestCase):
1557    # Issue5068: The _BZ2Proxy.read() method loops forever
1558    # on an empty or partial bzipped file.
1559
1560    def _test_partial_input(self, mode):
1561        class MyBytesIO(io.BytesIO):
1562            hit_eof = False
1563            def read(self, n):
1564                if self.hit_eof:
1565                    raise AssertionError("infinite loop detected in tarfile.open()")
1566                self.hit_eof = self.tell() == len(self.getvalue())
1567                return super(MyBytesIO, self).read(n)
1568            def seek(self, *args):
1569                self.hit_eof = False
1570                return super(MyBytesIO, self).seek(*args)
1571
1572        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1573        for x in range(len(data) + 1):
1574            try:
1575                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
1576            except tarfile.ReadError:
1577                pass # we have no interest in ReadErrors
1578
1579    def test_partial_input(self):
1580        self._test_partial_input("r")
1581
1582    def test_partial_input_bz2(self):
1583        self._test_partial_input("r:bz2")
1584
1585
1586def test_main():
1587    support.unlink(TEMPDIR)
1588    os.makedirs(TEMPDIR)
1589
1590    tests = [
1591        UstarReadTest,
1592        MiscReadTest,
1593        StreamReadTest,
1594        DetectReadTest,
1595        MemberReadTest,
1596        GNUReadTest,
1597        PaxReadTest,
1598        WriteTest,
1599        StreamWriteTest,
1600        GNUWriteTest,
1601        PaxWriteTest,
1602        UstarUnicodeTest,
1603        GNUUnicodeTest,
1604        PAXUnicodeTest,
1605        AppendTest,
1606        LimitsTest,
1607        MiscTest,
1608        ContextManagerTest,
1609    ]
1610
1611    if hasattr(os, "link"):
1612        tests.append(HardlinkTest)
1613    else:
1614        tests.append(LinkEmulationTest)
1615
1616    with open(tarname, "rb") as fobj:
1617        data = fobj.read()
1618
1619    if gzip:
1620        # Create testtar.tar.gz and add gzip-specific tests.
1621        support.unlink(gzipname)
1622        with gzip.open(gzipname, "wb") as tar:
1623            tar.write(data)
1624
1625        tests += [
1626            GzipMiscReadTest,
1627            GzipUstarReadTest,
1628            GzipStreamReadTest,
1629            GzipWriteTest,
1630            GzipStreamWriteTest,
1631        ]
1632
1633    if bz2:
1634        # Create testtar.tar.bz2 and add bz2-specific tests.
1635        support.unlink(bz2name)
1636        tar = bz2.BZ2File(bz2name, "wb")
1637        try:
1638            tar.write(data)
1639        finally:
1640            tar.close()
1641
1642        tests += [
1643            Bz2MiscReadTest,
1644            Bz2UstarReadTest,
1645            Bz2StreamReadTest,
1646            Bz2WriteTest,
1647            Bz2StreamWriteTest,
1648            Bz2PartialReadTest,
1649        ]
1650
1651    try:
1652        support.run_unittest(*tests)
1653    finally:
1654        if os.path.exists(TEMPDIR):
1655            shutil.rmtree(TEMPDIR)
1656
1657if __name__ == "__main__":
1658    test_main()
1659