test_tarfile.py revision 53ad0cd2842b7327bde4ca04ee11c544e522ff43
1import sys
2import os
3import io
4import shutil
5from hashlib import md5
6
7import unittest
8import tarfile
9
10from test import support
11
12# Check for our compression modules.
13try:
14    import gzip
15except ImportError:
16    gzip = None
17try:
18    import bz2
19except ImportError:
20    bz2 = None
21try:
22    import lzma
23except ImportError:
24    lzma = None
25
26def md5sum(data):
27    return md5(data).hexdigest()
28
29TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
30tarname = support.findfile("testtar.tar")
31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
33xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
34tmpname = os.path.join(TEMPDIR, "tmp.tar")
35
36md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
37md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
38
39
40class TarTest:
41    tarname = tarname
42    suffix = ''
43    open = io.FileIO
44    taropen = tarfile.TarFile.taropen
45
46    @property
47    def mode(self):
48        return self.prefix + self.suffix
49
50@support.requires_gzip
51class GzipTest:
52    tarname = gzipname
53    suffix = 'gz'
54    open = gzip.GzipFile if gzip else None
55    taropen = tarfile.TarFile.gzopen
56
57@support.requires_bz2
58class Bz2Test:
59    tarname = bz2name
60    suffix = 'bz2'
61    open = bz2.BZ2File if bz2 else None
62    taropen = tarfile.TarFile.bz2open
63
64@support.requires_lzma
65class LzmaTest:
66    tarname = xzname
67    suffix = 'xz'
68    open = lzma.LZMAFile if lzma else None
69    taropen = tarfile.TarFile.xzopen
70
71
72class ReadTest(TarTest):
73
74    prefix = "r:"
75
76    def setUp(self):
77        self.tar = tarfile.open(self.tarname, mode=self.mode,
78                                encoding="iso8859-1")
79
80    def tearDown(self):
81        self.tar.close()
82
83
84class UstarReadTest(ReadTest, unittest.TestCase):
85
86    def test_fileobj_regular_file(self):
87        tarinfo = self.tar.getmember("ustar/regtype")
88        with self.tar.extractfile(tarinfo) as fobj:
89            data = fobj.read()
90            self.assertEqual(len(data), tarinfo.size,
91                    "regular file extraction failed")
92            self.assertEqual(md5sum(data), md5_regtype,
93                    "regular file extraction failed")
94
95    def test_fileobj_readlines(self):
96        self.tar.extract("ustar/regtype", TEMPDIR)
97        tarinfo = self.tar.getmember("ustar/regtype")
98        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
99            lines1 = fobj1.readlines()
100
101        with self.tar.extractfile(tarinfo) as fobj:
102            fobj2 = io.TextIOWrapper(fobj)
103            lines2 = fobj2.readlines()
104            self.assertEqual(lines1, lines2,
105                    "fileobj.readlines() failed")
106            self.assertEqual(len(lines2), 114,
107                    "fileobj.readlines() failed")
108            self.assertEqual(lines2[83],
109                    "I will gladly admit that Python is not the fastest "
110                    "running scripting language.\n",
111                    "fileobj.readlines() failed")
112
113    def test_fileobj_iter(self):
114        self.tar.extract("ustar/regtype", TEMPDIR)
115        tarinfo = self.tar.getmember("ustar/regtype")
116        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
117            lines1 = fobj1.readlines()
118        with self.tar.extractfile(tarinfo) as fobj2:
119            lines2 = list(io.TextIOWrapper(fobj2))
120            self.assertEqual(lines1, lines2,
121                    "fileobj.__iter__() failed")
122
123    def test_fileobj_seek(self):
124        self.tar.extract("ustar/regtype", TEMPDIR)
125        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
126            data = fobj.read()
127
128        tarinfo = self.tar.getmember("ustar/regtype")
129        fobj = self.tar.extractfile(tarinfo)
130
131        text = fobj.read()
132        fobj.seek(0)
133        self.assertEqual(0, fobj.tell(),
134                     "seek() to file's start failed")
135        fobj.seek(2048, 0)
136        self.assertEqual(2048, fobj.tell(),
137                     "seek() to absolute position failed")
138        fobj.seek(-1024, 1)
139        self.assertEqual(1024, fobj.tell(),
140                     "seek() to negative relative position failed")
141        fobj.seek(1024, 1)
142        self.assertEqual(2048, fobj.tell(),
143                     "seek() to positive relative position failed")
144        s = fobj.read(10)
145        self.assertEqual(s, data[2048:2058],
146                     "read() after seek failed")
147        fobj.seek(0, 2)
148        self.assertEqual(tarinfo.size, fobj.tell(),
149                     "seek() to file's end failed")
150        self.assertEqual(fobj.read(), b"",
151                     "read() at file's end did not return empty string")
152        fobj.seek(-tarinfo.size, 2)
153        self.assertEqual(0, fobj.tell(),
154                     "relative seek() to file's end failed")
155        fobj.seek(512)
156        s1 = fobj.readlines()
157        fobj.seek(512)
158        s2 = fobj.readlines()
159        self.assertEqual(s1, s2,
160                     "readlines() after seek failed")
161        fobj.seek(0)
162        self.assertEqual(len(fobj.readline()), fobj.tell(),
163                     "tell() after readline() failed")
164        fobj.seek(512)
165        self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
166                     "tell() after seek() and readline() failed")
167        fobj.seek(0)
168        line = fobj.readline()
169        self.assertEqual(fobj.read(), data[len(line):],
170                     "read() after readline() failed")
171        fobj.close()
172
173    def test_fileobj_text(self):
174        with self.tar.extractfile("ustar/regtype") as fobj:
175            fobj = io.TextIOWrapper(fobj)
176            data = fobj.read().encode("iso8859-1")
177            self.assertEqual(md5sum(data), md5_regtype)
178            try:
179                fobj.seek(100)
180            except AttributeError:
181                # Issue #13815: seek() complained about a missing
182                # flush() method.
183                self.fail("seeking failed in text mode")
184
185    # Test if symbolic and hard links are resolved by extractfile().  The
186    # test link members each point to a regular member whose data is
187    # supposed to be exported.
188    def _test_fileobj_link(self, lnktype, regtype):
189        with self.tar.extractfile(lnktype) as a, \
190             self.tar.extractfile(regtype) as b:
191            self.assertEqual(a.name, b.name)
192
193    def test_fileobj_link1(self):
194        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
195
196    def test_fileobj_link2(self):
197        self._test_fileobj_link("./ustar/linktest2/lnktype",
198                                "ustar/linktest1/regtype")
199
200    def test_fileobj_symlink1(self):
201        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
202
203    def test_fileobj_symlink2(self):
204        self._test_fileobj_link("./ustar/linktest2/symtype",
205                                "ustar/linktest1/regtype")
206
207    def test_issue14160(self):
208        self._test_fileobj_link("symtype2", "ustar/regtype")
209
210class GzipUstarReadTest(GzipTest, UstarReadTest):
211    pass
212
213class Bz2UstarReadTest(Bz2Test, UstarReadTest):
214    pass
215
216class LzmaUstarReadTest(LzmaTest, UstarReadTest):
217    pass
218
219
220class CommonReadTest(ReadTest):
221
222    def test_empty_tarfile(self):
223        # Test for issue6123: Allow opening empty archives.
224        # This test checks if tarfile.open() is able to open an empty tar
225        # archive successfully. Note that an empty tar archive is not the
226        # same as an empty file!
227        with tarfile.open(tmpname, self.mode.replace("r", "w")):
228            pass
229        try:
230            tar = tarfile.open(tmpname, self.mode)
231            tar.getnames()
232        except tarfile.ReadError:
233            self.fail("tarfile.open() failed on empty archive")
234        else:
235            self.assertListEqual(tar.getmembers(), [])
236        finally:
237            tar.close()
238
239    def test_non_existent_tarfile(self):
240        # Test for issue11513: prevent non-existent gzipped tarfiles raising
241        # multiple exceptions.
242        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
243            tarfile.open("xxx", self.mode)
244
245    def test_null_tarfile(self):
246        # Test for issue6123: Allow opening empty archives.
247        # This test guarantees that tarfile.open() does not treat an empty
248        # file as an empty tar archive.
249        with open(tmpname, "wb"):
250            pass
251        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
252        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
253
254    def test_ignore_zeros(self):
255        # Test TarFile's ignore_zeros option.
256        for char in (b'\0', b'a'):
257            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
258            # are ignored correctly.
259            with self.open(tmpname, "w") as fobj:
260                fobj.write(char * 1024)
261                fobj.write(tarfile.TarInfo("foo").tobuf())
262
263            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
264            try:
265                self.assertListEqual(tar.getnames(), ["foo"],
266                    "ignore_zeros=True should have skipped the %r-blocks" %
267                    char)
268            finally:
269                tar.close()
270
271
272class MiscReadTestBase(CommonReadTest):
273    def test_no_name_argument(self):
274        with open(self.tarname, "rb") as fobj:
275            tar = tarfile.open(fileobj=fobj, mode=self.mode)
276            self.assertEqual(tar.name, os.path.abspath(fobj.name))
277
278    def test_no_name_attribute(self):
279        with open(self.tarname, "rb") as fobj:
280            data = fobj.read()
281        fobj = io.BytesIO(data)
282        self.assertRaises(AttributeError, getattr, fobj, "name")
283        tar = tarfile.open(fileobj=fobj, mode=self.mode)
284        self.assertEqual(tar.name, None)
285
286    def test_empty_name_attribute(self):
287        with open(self.tarname, "rb") as fobj:
288            data = fobj.read()
289        fobj = io.BytesIO(data)
290        fobj.name = ""
291        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
292            self.assertEqual(tar.name, None)
293
294    def test_illegal_mode_arg(self):
295        with open(tmpname, 'wb'):
296            pass
297        with self.assertRaisesRegex(ValueError, 'mode must be '):
298            tar = self.taropen(tmpname, 'q')
299        with self.assertRaisesRegex(ValueError, 'mode must be '):
300            tar = self.taropen(tmpname, 'rw')
301        with self.assertRaisesRegex(ValueError, 'mode must be '):
302            tar = self.taropen(tmpname, '')
303
304    def test_fileobj_with_offset(self):
305        # Skip the first member and store values from the second member
306        # of the testtar.
307        tar = tarfile.open(self.tarname, mode=self.mode)
308        try:
309            tar.next()
310            t = tar.next()
311            name = t.name
312            offset = t.offset
313            with tar.extractfile(t) as f:
314                data = f.read()
315        finally:
316            tar.close()
317
318        # Open the testtar and seek to the offset of the second member.
319        with self.open(self.tarname) as fobj:
320            fobj.seek(offset)
321
322            # Test if the tarfile starts with the second member.
323            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
324            t = tar.next()
325            self.assertEqual(t.name, name)
326            # Read to the end of fileobj and test if seeking back to the
327            # beginning works.
328            tar.getmembers()
329            self.assertEqual(tar.extractfile(t).read(), data,
330                    "seek back did not work")
331            tar.close()
332
333    def test_fail_comp(self):
334        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
335        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
336        with open(tarname, "rb") as fobj:
337            self.assertRaises(tarfile.ReadError, tarfile.open,
338                              fileobj=fobj, mode=self.mode)
339
340    def test_v7_dirtype(self):
341        # Test old style dirtype member (bug #1336623):
342        # Old V7 tars create directory members using an AREGTYPE
343        # header with a "/" appended to the filename field.
344        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
345        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
346                "v7 dirtype failed")
347
348    def test_xstar_type(self):
349        # The xstar format stores extra atime and ctime fields inside the
350        # space reserved for the prefix field. The prefix field must be
351        # ignored in this case, otherwise it will mess up the name.
352        try:
353            self.tar.getmember("misc/regtype-xstar")
354        except KeyError:
355            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
356
357    def test_check_members(self):
358        for tarinfo in self.tar:
359            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
360                    "wrong mtime for %s" % tarinfo.name)
361            if not tarinfo.name.startswith("ustar/"):
362                continue
363            self.assertEqual(tarinfo.uname, "tarfile",
364                    "wrong uname for %s" % tarinfo.name)
365
366    def test_find_members(self):
367        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
368                "could not find all members")
369
370    @unittest.skipUnless(hasattr(os, "link"),
371                         "Missing hardlink implementation")
372    @support.skip_unless_symlink
373    def test_extract_hardlink(self):
374        # Test hardlink extraction (e.g. bug #857297).
375        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
376            tar.extract("ustar/regtype", TEMPDIR)
377            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
378
379            tar.extract("ustar/lnktype", TEMPDIR)
380            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
381            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
382                data = f.read()
383            self.assertEqual(md5sum(data), md5_regtype)
384
385            tar.extract("ustar/symtype", TEMPDIR)
386            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
387            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
388                data = f.read()
389            self.assertEqual(md5sum(data), md5_regtype)
390
391    def test_extractall(self):
392        # Test if extractall() correctly restores directory permissions
393        # and times (see issue1735).
394        tar = tarfile.open(tarname, encoding="iso8859-1")
395        DIR = os.path.join(TEMPDIR, "extractall")
396        os.mkdir(DIR)
397        try:
398            directories = [t for t in tar if t.isdir()]
399            tar.extractall(DIR, directories)
400            for tarinfo in directories:
401                path = os.path.join(DIR, tarinfo.name)
402                if sys.platform != "win32":
403                    # Win32 has no support for fine grained permissions.
404                    self.assertEqual(tarinfo.mode & 0o777,
405                                     os.stat(path).st_mode & 0o777)
406                def format_mtime(mtime):
407                    if isinstance(mtime, float):
408                        return "{} ({})".format(mtime, mtime.hex())
409                    else:
410                        return "{!r} (int)".format(mtime)
411                file_mtime = os.path.getmtime(path)
412                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
413                    format_mtime(tarinfo.mtime),
414                    format_mtime(file_mtime),
415                    path)
416                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
417        finally:
418            tar.close()
419            shutil.rmtree(DIR)
420
421    def test_extract_directory(self):
422        dirtype = "ustar/dirtype"
423        DIR = os.path.join(TEMPDIR, "extractdir")
424        os.mkdir(DIR)
425        try:
426            with tarfile.open(tarname, encoding="iso8859-1") as tar:
427                tarinfo = tar.getmember(dirtype)
428                tar.extract(tarinfo, path=DIR)
429                extracted = os.path.join(DIR, dirtype)
430                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
431                if sys.platform != "win32":
432                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
433        finally:
434            shutil.rmtree(DIR)
435
436    def test_init_close_fobj(self):
437        # Issue #7341: Close the internal file object in the TarFile
438        # constructor in case of an error. For the test we rely on
439        # the fact that opening an empty file raises a ReadError.
440        empty = os.path.join(TEMPDIR, "empty")
441        with open(empty, "wb") as fobj:
442            fobj.write(b"")
443
444        try:
445            tar = object.__new__(tarfile.TarFile)
446            try:
447                tar.__init__(empty)
448            except tarfile.ReadError:
449                self.assertTrue(tar.fileobj.closed)
450            else:
451                self.fail("ReadError not raised")
452        finally:
453            support.unlink(empty)
454
455    def test_parallel_iteration(self):
456        # Issue #16601: Restarting iteration over tarfile continued
457        # from where it left off.
458        with tarfile.open(self.tarname) as tar:
459            for m1, m2 in zip(tar, tar):
460                self.assertEqual(m1.offset, m2.offset)
461                self.assertEqual(m1.get_info(), m2.get_info())
462
463class MiscReadTest(MiscReadTestBase, unittest.TestCase):
464    test_fail_comp = None
465
466class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
467    pass
468
469class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
470    def test_no_name_argument(self):
471        self.skipTest("BZ2File have no name attribute")
472
473class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
474    def test_no_name_argument(self):
475        self.skipTest("LZMAFile have no name attribute")
476
477
478class StreamReadTest(CommonReadTest, unittest.TestCase):
479
480    prefix="r|"
481
482    def test_read_through(self):
483        # Issue #11224: A poorly designed _FileInFile.read() method
484        # caused seeking errors with stream tar files.
485        for tarinfo in self.tar:
486            if not tarinfo.isreg():
487                continue
488            with self.tar.extractfile(tarinfo) as fobj:
489                while True:
490                    try:
491                        buf = fobj.read(512)
492                    except tarfile.StreamError:
493                        self.fail("simple read-through using "
494                                  "TarFile.extractfile() failed")
495                    if not buf:
496                        break
497
498    def test_fileobj_regular_file(self):
499        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
500        with self.tar.extractfile(tarinfo) as fobj:
501            data = fobj.read()
502        self.assertEqual(len(data), tarinfo.size,
503                "regular file extraction failed")
504        self.assertEqual(md5sum(data), md5_regtype,
505                "regular file extraction failed")
506
507    def test_provoke_stream_error(self):
508        tarinfos = self.tar.getmembers()
509        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
510            self.assertRaises(tarfile.StreamError, f.read)
511
512    def test_compare_members(self):
513        tar1 = tarfile.open(tarname, encoding="iso8859-1")
514        try:
515            tar2 = self.tar
516
517            while True:
518                t1 = tar1.next()
519                t2 = tar2.next()
520                if t1 is None:
521                    break
522                self.assertIsNotNone(t2, "stream.next() failed.")
523
524                if t2.islnk() or t2.issym():
525                    with self.assertRaises(tarfile.StreamError):
526                        tar2.extractfile(t2)
527                    continue
528
529                v1 = tar1.extractfile(t1)
530                v2 = tar2.extractfile(t2)
531                if v1 is None:
532                    continue
533                self.assertIsNotNone(v2, "stream.extractfile() failed")
534                self.assertEqual(v1.read(), v2.read(),
535                        "stream extraction failed")
536        finally:
537            tar1.close()
538
539class GzipStreamReadTest(GzipTest, StreamReadTest):
540    pass
541
542class Bz2StreamReadTest(Bz2Test, StreamReadTest):
543    pass
544
545class LzmaStreamReadTest(LzmaTest, StreamReadTest):
546    pass
547
548
549class DetectReadTest(TarTest, unittest.TestCase):
550    def _testfunc_file(self, name, mode):
551        try:
552            tar = tarfile.open(name, mode)
553        except tarfile.ReadError as e:
554            self.fail()
555        else:
556            tar.close()
557
558    def _testfunc_fileobj(self, name, mode):
559        try:
560            with open(name, "rb") as f:
561                tar = tarfile.open(name, mode, fileobj=f)
562        except tarfile.ReadError as e:
563            self.fail()
564        else:
565            tar.close()
566
567    def _test_modes(self, testfunc):
568        if self.suffix:
569            with self.assertRaises(tarfile.ReadError):
570                tarfile.open(tarname, mode="r:" + self.suffix)
571            with self.assertRaises(tarfile.ReadError):
572                tarfile.open(tarname, mode="r|" + self.suffix)
573            with self.assertRaises(tarfile.ReadError):
574                tarfile.open(self.tarname, mode="r:")
575            with self.assertRaises(tarfile.ReadError):
576                tarfile.open(self.tarname, mode="r|")
577        testfunc(self.tarname, "r")
578        testfunc(self.tarname, "r:" + self.suffix)
579        testfunc(self.tarname, "r:*")
580        testfunc(self.tarname, "r|" + self.suffix)
581        testfunc(self.tarname, "r|*")
582
583    def test_detect_file(self):
584        self._test_modes(self._testfunc_file)
585
586    def test_detect_fileobj(self):
587        self._test_modes(self._testfunc_fileobj)
588
589class GzipDetectReadTest(GzipTest, DetectReadTest):
590    pass
591
592class Bz2DetectReadTest(Bz2Test, DetectReadTest):
593    def test_detect_stream_bz2(self):
594        # Originally, tarfile's stream detection looked for the string
595        # "BZh91" at the start of the file. This is incorrect because
596        # the '9' represents the blocksize (900kB). If the file was
597        # compressed using another blocksize autodetection fails.
598        with open(tarname, "rb") as fobj:
599            data = fobj.read()
600
601        # Compress with blocksize 100kB, the file starts with "BZh11".
602        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
603            fobj.write(data)
604
605        self._testfunc_file(tmpname, "r|*")
606
607class LzmaDetectReadTest(LzmaTest, DetectReadTest):
608    pass
609
610
611class MemberReadTest(ReadTest, unittest.TestCase):
612
613    def _test_member(self, tarinfo, chksum=None, **kwargs):
614        if chksum is not None:
615            with self.tar.extractfile(tarinfo) as f:
616                self.assertEqual(md5sum(f.read()), chksum,
617                        "wrong md5sum for %s" % tarinfo.name)
618
619        kwargs["mtime"] = 0o7606136617
620        kwargs["uid"] = 1000
621        kwargs["gid"] = 100
622        if "old-v7" not in tarinfo.name:
623            # V7 tar can't handle alphabetic owners.
624            kwargs["uname"] = "tarfile"
625            kwargs["gname"] = "tarfile"
626        for k, v in kwargs.items():
627            self.assertEqual(getattr(tarinfo, k), v,
628                    "wrong value in %s field of %s" % (k, tarinfo.name))
629
630    def test_find_regtype(self):
631        tarinfo = self.tar.getmember("ustar/regtype")
632        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
633
634    def test_find_conttype(self):
635        tarinfo = self.tar.getmember("ustar/conttype")
636        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
637
638    def test_find_dirtype(self):
639        tarinfo = self.tar.getmember("ustar/dirtype")
640        self._test_member(tarinfo, size=0)
641
642    def test_find_dirtype_with_size(self):
643        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
644        self._test_member(tarinfo, size=255)
645
646    def test_find_lnktype(self):
647        tarinfo = self.tar.getmember("ustar/lnktype")
648        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
649
650    def test_find_symtype(self):
651        tarinfo = self.tar.getmember("ustar/symtype")
652        self._test_member(tarinfo, size=0, linkname="regtype")
653
654    def test_find_blktype(self):
655        tarinfo = self.tar.getmember("ustar/blktype")
656        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
657
658    def test_find_chrtype(self):
659        tarinfo = self.tar.getmember("ustar/chrtype")
660        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
661
662    def test_find_fifotype(self):
663        tarinfo = self.tar.getmember("ustar/fifotype")
664        self._test_member(tarinfo, size=0)
665
666    def test_find_sparse(self):
667        tarinfo = self.tar.getmember("ustar/sparse")
668        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
669
670    def test_find_gnusparse(self):
671        tarinfo = self.tar.getmember("gnu/sparse")
672        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
673
674    def test_find_gnusparse_00(self):
675        tarinfo = self.tar.getmember("gnu/sparse-0.0")
676        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
677
678    def test_find_gnusparse_01(self):
679        tarinfo = self.tar.getmember("gnu/sparse-0.1")
680        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
681
682    def test_find_gnusparse_10(self):
683        tarinfo = self.tar.getmember("gnu/sparse-1.0")
684        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
685
686    def test_find_umlauts(self):
687        tarinfo = self.tar.getmember("ustar/umlauts-"
688                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
689        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
690
691    def test_find_ustar_longname(self):
692        name = "ustar/" + "12345/" * 39 + "1234567/longname"
693        self.assertIn(name, self.tar.getnames())
694
695    def test_find_regtype_oldv7(self):
696        tarinfo = self.tar.getmember("misc/regtype-old-v7")
697        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
698
699    def test_find_pax_umlauts(self):
700        self.tar.close()
701        self.tar = tarfile.open(self.tarname, mode=self.mode,
702                                encoding="iso8859-1")
703        tarinfo = self.tar.getmember("pax/umlauts-"
704                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
705        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
706
707
708class LongnameTest:
709
710    def test_read_longname(self):
711        # Test reading of longname (bug #1471427).
712        longname = self.subdir + "/" + "123/" * 125 + "longname"
713        try:
714            tarinfo = self.tar.getmember(longname)
715        except KeyError:
716            self.fail("longname not found")
717        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
718                "read longname as dirtype")
719
720    def test_read_longlink(self):
721        longname = self.subdir + "/" + "123/" * 125 + "longname"
722        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
723        try:
724            tarinfo = self.tar.getmember(longlink)
725        except KeyError:
726            self.fail("longlink not found")
727        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
728
729    def test_truncated_longname(self):
730        longname = self.subdir + "/" + "123/" * 125 + "longname"
731        tarinfo = self.tar.getmember(longname)
732        offset = tarinfo.offset
733        self.tar.fileobj.seek(offset)
734        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
735        with self.assertRaises(tarfile.ReadError):
736            tarfile.open(name="foo.tar", fileobj=fobj)
737
738    def test_header_offset(self):
739        # Test if the start offset of the TarInfo object includes
740        # the preceding extended header.
741        longname = self.subdir + "/" + "123/" * 125 + "longname"
742        offset = self.tar.getmember(longname).offset
743        with open(tarname, "rb") as fobj:
744            fobj.seek(offset)
745            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
746                                              "iso8859-1", "strict")
747            self.assertEqual(tarinfo.type, self.longnametype)
748
749
750class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
751
752    subdir = "gnu"
753    longnametype = tarfile.GNUTYPE_LONGNAME
754
755    # Since 3.2 tarfile is supposed to accurately restore sparse members and
756    # produce files with holes. This is what we actually want to test here.
757    # Unfortunately, not all platforms/filesystems support sparse files, and
758    # even on platforms that do it is non-trivial to make reliable assertions
759    # about holes in files. Therefore, we first do one basic test which works
760    # an all platforms, and after that a test that will work only on
761    # platforms/filesystems that prove to support sparse files.
762    def _test_sparse_file(self, name):
763        self.tar.extract(name, TEMPDIR)
764        filename = os.path.join(TEMPDIR, name)
765        with open(filename, "rb") as fobj:
766            data = fobj.read()
767        self.assertEqual(md5sum(data), md5_sparse,
768                "wrong md5sum for %s" % name)
769
770        if self._fs_supports_holes():
771            s = os.stat(filename)
772            self.assertLess(s.st_blocks * 512, s.st_size)
773
774    def test_sparse_file_old(self):
775        self._test_sparse_file("gnu/sparse")
776
777    def test_sparse_file_00(self):
778        self._test_sparse_file("gnu/sparse-0.0")
779
780    def test_sparse_file_01(self):
781        self._test_sparse_file("gnu/sparse-0.1")
782
783    def test_sparse_file_10(self):
784        self._test_sparse_file("gnu/sparse-1.0")
785
786    @staticmethod
787    def _fs_supports_holes():
788        # Return True if the platform knows the st_blocks stat attribute and
789        # uses st_blocks units of 512 bytes, and if the filesystem is able to
790        # store holes in files.
791        if sys.platform.startswith("linux"):
792            # Linux evidentially has 512 byte st_blocks units.
793            name = os.path.join(TEMPDIR, "sparse-test")
794            with open(name, "wb") as fobj:
795                fobj.seek(4096)
796                fobj.truncate()
797            s = os.stat(name)
798            os.remove(name)
799            return s.st_blocks == 0
800        else:
801            return False
802
803
804class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
805
806    subdir = "pax"
807    longnametype = tarfile.XHDTYPE
808
809    def test_pax_global_headers(self):
810        tar = tarfile.open(tarname, encoding="iso8859-1")
811        try:
812            tarinfo = tar.getmember("pax/regtype1")
813            self.assertEqual(tarinfo.uname, "foo")
814            self.assertEqual(tarinfo.gname, "bar")
815            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
816                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
817
818            tarinfo = tar.getmember("pax/regtype2")
819            self.assertEqual(tarinfo.uname, "")
820            self.assertEqual(tarinfo.gname, "bar")
821            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
822                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
823
824            tarinfo = tar.getmember("pax/regtype3")
825            self.assertEqual(tarinfo.uname, "tarfile")
826            self.assertEqual(tarinfo.gname, "tarfile")
827            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
828                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
829        finally:
830            tar.close()
831
832    def test_pax_number_fields(self):
833        # All following number fields are read from the pax header.
834        tar = tarfile.open(tarname, encoding="iso8859-1")
835        try:
836            tarinfo = tar.getmember("pax/regtype4")
837            self.assertEqual(tarinfo.size, 7011)
838            self.assertEqual(tarinfo.uid, 123)
839            self.assertEqual(tarinfo.gid, 123)
840            self.assertEqual(tarinfo.mtime, 1041808783.0)
841            self.assertEqual(type(tarinfo.mtime), float)
842            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
843            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
844        finally:
845            tar.close()
846
847
848class WriteTestBase(TarTest):
849    # Put all write tests in here that are supposed to be tested
850    # in all possible mode combinations.
851
852    def test_fileobj_no_close(self):
853        fobj = io.BytesIO()
854        tar = tarfile.open(fileobj=fobj, mode=self.mode)
855        tar.addfile(tarfile.TarInfo("foo"))
856        tar.close()
857        self.assertFalse(fobj.closed, "external fileobjs must never closed")
858
859
860class WriteTest(WriteTestBase, unittest.TestCase):
861
862    prefix = "w:"
863
864    def test_100_char_name(self):
865        # The name field in a tar header stores strings of at most 100 chars.
866        # If a string is shorter than 100 chars it has to be padded with '\0',
867        # which implies that a string of exactly 100 chars is stored without
868        # a trailing '\0'.
869        name = "0123456789" * 10
870        tar = tarfile.open(tmpname, self.mode)
871        try:
872            t = tarfile.TarInfo(name)
873            tar.addfile(t)
874        finally:
875            tar.close()
876
877        tar = tarfile.open(tmpname)
878        try:
879            self.assertEqual(tar.getnames()[0], name,
880                    "failed to store 100 char filename")
881        finally:
882            tar.close()
883
884    def test_tar_size(self):
885        # Test for bug #1013882.
886        tar = tarfile.open(tmpname, self.mode)
887        try:
888            path = os.path.join(TEMPDIR, "file")
889            with open(path, "wb") as fobj:
890                fobj.write(b"aaa")
891            tar.add(path)
892        finally:
893            tar.close()
894        self.assertGreater(os.path.getsize(tmpname), 0,
895                "tarfile is empty")
896
897    # The test_*_size tests test for bug #1167128.
898    def test_file_size(self):
899        tar = tarfile.open(tmpname, self.mode)
900        try:
901            path = os.path.join(TEMPDIR, "file")
902            with open(path, "wb"):
903                pass
904            tarinfo = tar.gettarinfo(path)
905            self.assertEqual(tarinfo.size, 0)
906
907            with open(path, "wb") as fobj:
908                fobj.write(b"aaa")
909            tarinfo = tar.gettarinfo(path)
910            self.assertEqual(tarinfo.size, 3)
911        finally:
912            tar.close()
913
914    def test_directory_size(self):
915        path = os.path.join(TEMPDIR, "directory")
916        os.mkdir(path)
917        try:
918            tar = tarfile.open(tmpname, self.mode)
919            try:
920                tarinfo = tar.gettarinfo(path)
921                self.assertEqual(tarinfo.size, 0)
922            finally:
923                tar.close()
924        finally:
925            os.rmdir(path)
926
927    @unittest.skipUnless(hasattr(os, "link"),
928                         "Missing hardlink implementation")
929    def test_link_size(self):
930        link = os.path.join(TEMPDIR, "link")
931        target = os.path.join(TEMPDIR, "link_target")
932        with open(target, "wb") as fobj:
933            fobj.write(b"aaa")
934        os.link(target, link)
935        try:
936            tar = tarfile.open(tmpname, self.mode)
937            try:
938                # Record the link target in the inodes list.
939                tar.gettarinfo(target)
940                tarinfo = tar.gettarinfo(link)
941                self.assertEqual(tarinfo.size, 0)
942            finally:
943                tar.close()
944        finally:
945            os.remove(target)
946            os.remove(link)
947
948    @support.skip_unless_symlink
949    def test_symlink_size(self):
950        path = os.path.join(TEMPDIR, "symlink")
951        os.symlink("link_target", path)
952        try:
953            tar = tarfile.open(tmpname, self.mode)
954            try:
955                tarinfo = tar.gettarinfo(path)
956                self.assertEqual(tarinfo.size, 0)
957            finally:
958                tar.close()
959        finally:
960            os.remove(path)
961
962    def test_add_self(self):
963        # Test for #1257255.
964        dstname = os.path.abspath(tmpname)
965        tar = tarfile.open(tmpname, self.mode)
966        try:
967            self.assertEqual(tar.name, dstname,
968                    "archive name must be absolute")
969            tar.add(dstname)
970            self.assertEqual(tar.getnames(), [],
971                    "added the archive to itself")
972
973            cwd = os.getcwd()
974            os.chdir(TEMPDIR)
975            tar.add(dstname)
976            os.chdir(cwd)
977            self.assertEqual(tar.getnames(), [],
978                    "added the archive to itself")
979        finally:
980            tar.close()
981
982    def test_exclude(self):
983        tempdir = os.path.join(TEMPDIR, "exclude")
984        os.mkdir(tempdir)
985        try:
986            for name in ("foo", "bar", "baz"):
987                name = os.path.join(tempdir, name)
988                support.create_empty_file(name)
989
990            exclude = os.path.isfile
991
992            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
993            try:
994                with support.check_warnings(("use the filter argument",
995                                             DeprecationWarning)):
996                    tar.add(tempdir, arcname="empty_dir", exclude=exclude)
997            finally:
998                tar.close()
999
1000            tar = tarfile.open(tmpname, "r")
1001            try:
1002                self.assertEqual(len(tar.getmembers()), 1)
1003                self.assertEqual(tar.getnames()[0], "empty_dir")
1004            finally:
1005                tar.close()
1006        finally:
1007            shutil.rmtree(tempdir)
1008
1009    def test_filter(self):
1010        tempdir = os.path.join(TEMPDIR, "filter")
1011        os.mkdir(tempdir)
1012        try:
1013            for name in ("foo", "bar", "baz"):
1014                name = os.path.join(tempdir, name)
1015                support.create_empty_file(name)
1016
1017            def filter(tarinfo):
1018                if os.path.basename(tarinfo.name) == "bar":
1019                    return
1020                tarinfo.uid = 123
1021                tarinfo.uname = "foo"
1022                return tarinfo
1023
1024            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1025            try:
1026                tar.add(tempdir, arcname="empty_dir", filter=filter)
1027            finally:
1028                tar.close()
1029
1030            # Verify that filter is a keyword-only argument
1031            with self.assertRaises(TypeError):
1032                tar.add(tempdir, "empty_dir", True, None, filter)
1033
1034            tar = tarfile.open(tmpname, "r")
1035            try:
1036                for tarinfo in tar:
1037                    self.assertEqual(tarinfo.uid, 123)
1038                    self.assertEqual(tarinfo.uname, "foo")
1039                self.assertEqual(len(tar.getmembers()), 3)
1040            finally:
1041                tar.close()
1042        finally:
1043            shutil.rmtree(tempdir)
1044
1045    # Guarantee that stored pathnames are not modified. Don't
1046    # remove ./ or ../ or double slashes. Still make absolute
1047    # pathnames relative.
1048    # For details see bug #6054.
1049    def _test_pathname(self, path, cmp_path=None, dir=False):
1050        # Create a tarfile with an empty member named path
1051        # and compare the stored name with the original.
1052        foo = os.path.join(TEMPDIR, "foo")
1053        if not dir:
1054            support.create_empty_file(foo)
1055        else:
1056            os.mkdir(foo)
1057
1058        tar = tarfile.open(tmpname, self.mode)
1059        try:
1060            tar.add(foo, arcname=path)
1061        finally:
1062            tar.close()
1063
1064        tar = tarfile.open(tmpname, "r")
1065        try:
1066            t = tar.next()
1067        finally:
1068            tar.close()
1069
1070        if not dir:
1071            os.remove(foo)
1072        else:
1073            os.rmdir(foo)
1074
1075        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1076
1077
1078    @support.skip_unless_symlink
1079    def test_extractall_symlinks(self):
1080        # Test if extractall works properly when tarfile contains symlinks
1081        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1082        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1083        os.mkdir(tempdir)
1084        try:
1085            source_file = os.path.join(tempdir,'source')
1086            target_file = os.path.join(tempdir,'symlink')
1087            with open(source_file,'w') as f:
1088                f.write('something\n')
1089            os.symlink(source_file, target_file)
1090            tar = tarfile.open(temparchive,'w')
1091            tar.add(source_file)
1092            tar.add(target_file)
1093            tar.close()
1094            # Let's extract it to the location which contains the symlink
1095            tar = tarfile.open(temparchive,'r')
1096            # this should not raise OSError: [Errno 17] File exists
1097            try:
1098                tar.extractall(path=tempdir)
1099            except OSError:
1100                self.fail("extractall failed with symlinked files")
1101            finally:
1102                tar.close()
1103        finally:
1104            os.unlink(temparchive)
1105            shutil.rmtree(tempdir)
1106
1107    def test_pathnames(self):
1108        self._test_pathname("foo")
1109        self._test_pathname(os.path.join("foo", ".", "bar"))
1110        self._test_pathname(os.path.join("foo", "..", "bar"))
1111        self._test_pathname(os.path.join(".", "foo"))
1112        self._test_pathname(os.path.join(".", "foo", "."))
1113        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1114        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1115        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1116        self._test_pathname(os.path.join("..", "foo"))
1117        self._test_pathname(os.path.join("..", "foo", ".."))
1118        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1119        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1120
1121        self._test_pathname("foo" + os.sep + os.sep + "bar")
1122        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1123
1124    def test_abs_pathnames(self):
1125        if sys.platform == "win32":
1126            self._test_pathname("C:\\foo", "foo")
1127        else:
1128            self._test_pathname("/foo", "foo")
1129            self._test_pathname("///foo", "foo")
1130
1131    def test_cwd(self):
1132        # Test adding the current working directory.
1133        cwd = os.getcwd()
1134        os.chdir(TEMPDIR)
1135        try:
1136            tar = tarfile.open(tmpname, self.mode)
1137            try:
1138                tar.add(".")
1139            finally:
1140                tar.close()
1141
1142            tar = tarfile.open(tmpname, "r")
1143            try:
1144                for t in tar:
1145                    if t.name != ".":
1146                        self.assertTrue(t.name.startswith("./"), t.name)
1147            finally:
1148                tar.close()
1149        finally:
1150            os.chdir(cwd)
1151
1152class GzipWriteTest(GzipTest, WriteTest):
1153    pass
1154
1155class Bz2WriteTest(Bz2Test, WriteTest):
1156    pass
1157
1158class LzmaWriteTest(LzmaTest, WriteTest):
1159    pass
1160
1161
1162class StreamWriteTest(WriteTestBase, unittest.TestCase):
1163
1164    prefix = "w|"
1165    decompressor = None
1166
1167    def test_stream_padding(self):
1168        # Test for bug #1543303.
1169        tar = tarfile.open(tmpname, self.mode)
1170        tar.close()
1171        if self.decompressor:
1172            dec = self.decompressor()
1173            with open(tmpname, "rb") as fobj:
1174                data = fobj.read()
1175            data = dec.decompress(data)
1176            self.assertFalse(dec.unused_data, "found trailing data")
1177        else:
1178            with self.open(tmpname) as fobj:
1179                data = fobj.read()
1180        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1181                        "incorrect zero padding")
1182
1183    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1184                         "Missing umask implementation")
1185    def test_file_mode(self):
1186        # Test for issue #8464: Create files with correct
1187        # permissions.
1188        if os.path.exists(tmpname):
1189            os.remove(tmpname)
1190
1191        original_umask = os.umask(0o022)
1192        try:
1193            tar = tarfile.open(tmpname, self.mode)
1194            tar.close()
1195            mode = os.stat(tmpname).st_mode & 0o777
1196            self.assertEqual(mode, 0o644, "wrong file permissions")
1197        finally:
1198            os.umask(original_umask)
1199
1200class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1201    pass
1202
1203class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1204    decompressor = bz2.BZ2Decompressor if bz2 else None
1205
1206class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1207    decompressor = lzma.LZMADecompressor if lzma else None
1208
1209
1210class GNUWriteTest(unittest.TestCase):
1211    # This testcase checks for correct creation of GNU Longname
1212    # and Longlink extended headers (cp. bug #812325).
1213
1214    def _length(self, s):
1215        blocks = len(s) // 512 + 1
1216        return blocks * 512
1217
1218    def _calc_size(self, name, link=None):
1219        # Initial tar header
1220        count = 512
1221
1222        if len(name) > tarfile.LENGTH_NAME:
1223            # GNU longname extended header + longname
1224            count += 512
1225            count += self._length(name)
1226        if link is not None and len(link) > tarfile.LENGTH_LINK:
1227            # GNU longlink extended header + longlink
1228            count += 512
1229            count += self._length(link)
1230        return count
1231
1232    def _test(self, name, link=None):
1233        tarinfo = tarfile.TarInfo(name)
1234        if link:
1235            tarinfo.linkname = link
1236            tarinfo.type = tarfile.LNKTYPE
1237
1238        tar = tarfile.open(tmpname, "w")
1239        try:
1240            tar.format = tarfile.GNU_FORMAT
1241            tar.addfile(tarinfo)
1242
1243            v1 = self._calc_size(name, link)
1244            v2 = tar.offset
1245            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1246        finally:
1247            tar.close()
1248
1249        tar = tarfile.open(tmpname)
1250        try:
1251            member = tar.next()
1252            self.assertIsNotNone(member,
1253                    "unable to read longname member")
1254            self.assertEqual(tarinfo.name, member.name,
1255                    "unable to read longname member")
1256            self.assertEqual(tarinfo.linkname, member.linkname,
1257                    "unable to read longname member")
1258        finally:
1259            tar.close()
1260
1261    def test_longname_1023(self):
1262        self._test(("longnam/" * 127) + "longnam")
1263
1264    def test_longname_1024(self):
1265        self._test(("longnam/" * 127) + "longname")
1266
1267    def test_longname_1025(self):
1268        self._test(("longnam/" * 127) + "longname_")
1269
1270    def test_longlink_1023(self):
1271        self._test("name", ("longlnk/" * 127) + "longlnk")
1272
1273    def test_longlink_1024(self):
1274        self._test("name", ("longlnk/" * 127) + "longlink")
1275
1276    def test_longlink_1025(self):
1277        self._test("name", ("longlnk/" * 127) + "longlink_")
1278
1279    def test_longnamelink_1023(self):
1280        self._test(("longnam/" * 127) + "longnam",
1281                   ("longlnk/" * 127) + "longlnk")
1282
1283    def test_longnamelink_1024(self):
1284        self._test(("longnam/" * 127) + "longname",
1285                   ("longlnk/" * 127) + "longlink")
1286
1287    def test_longnamelink_1025(self):
1288        self._test(("longnam/" * 127) + "longname_",
1289                   ("longlnk/" * 127) + "longlink_")
1290
1291
1292@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1293class HardlinkTest(unittest.TestCase):
1294    # Test the creation of LNKTYPE (hardlink) members in an archive.
1295
1296    def setUp(self):
1297        self.foo = os.path.join(TEMPDIR, "foo")
1298        self.bar = os.path.join(TEMPDIR, "bar")
1299
1300        with open(self.foo, "wb") as fobj:
1301            fobj.write(b"foo")
1302
1303        os.link(self.foo, self.bar)
1304
1305        self.tar = tarfile.open(tmpname, "w")
1306        self.tar.add(self.foo)
1307
1308    def tearDown(self):
1309        self.tar.close()
1310        support.unlink(self.foo)
1311        support.unlink(self.bar)
1312
1313    def test_add_twice(self):
1314        # The same name will be added as a REGTYPE every
1315        # time regardless of st_nlink.
1316        tarinfo = self.tar.gettarinfo(self.foo)
1317        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1318                "add file as regular failed")
1319
1320    def test_add_hardlink(self):
1321        tarinfo = self.tar.gettarinfo(self.bar)
1322        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1323                "add file as hardlink failed")
1324
1325    def test_dereference_hardlink(self):
1326        self.tar.dereference = True
1327        tarinfo = self.tar.gettarinfo(self.bar)
1328        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1329                "dereferencing hardlink failed")
1330
1331
1332class PaxWriteTest(GNUWriteTest):
1333
1334    def _test(self, name, link=None):
1335        # See GNUWriteTest.
1336        tarinfo = tarfile.TarInfo(name)
1337        if link:
1338            tarinfo.linkname = link
1339            tarinfo.type = tarfile.LNKTYPE
1340
1341        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1342        try:
1343            tar.addfile(tarinfo)
1344        finally:
1345            tar.close()
1346
1347        tar = tarfile.open(tmpname)
1348        try:
1349            if link:
1350                l = tar.getmembers()[0].linkname
1351                self.assertEqual(link, l, "PAX longlink creation failed")
1352            else:
1353                n = tar.getmembers()[0].name
1354                self.assertEqual(name, n, "PAX longname creation failed")
1355        finally:
1356            tar.close()
1357
1358    def test_pax_global_header(self):
1359        pax_headers = {
1360                "foo": "bar",
1361                "uid": "0",
1362                "mtime": "1.23",
1363                "test": "\xe4\xf6\xfc",
1364                "\xe4\xf6\xfc": "test"}
1365
1366        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1367                pax_headers=pax_headers)
1368        try:
1369            tar.addfile(tarfile.TarInfo("test"))
1370        finally:
1371            tar.close()
1372
1373        # Test if the global header was written correctly.
1374        tar = tarfile.open(tmpname, encoding="iso8859-1")
1375        try:
1376            self.assertEqual(tar.pax_headers, pax_headers)
1377            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1378            # Test if all the fields are strings.
1379            for key, val in tar.pax_headers.items():
1380                self.assertIsNot(type(key), bytes)
1381                self.assertIsNot(type(val), bytes)
1382                if key in tarfile.PAX_NUMBER_FIELDS:
1383                    try:
1384                        tarfile.PAX_NUMBER_FIELDS[key](val)
1385                    except (TypeError, ValueError):
1386                        self.fail("unable to convert pax header field")
1387        finally:
1388            tar.close()
1389
1390    def test_pax_extended_header(self):
1391        # The fields from the pax header have priority over the
1392        # TarInfo.
1393        pax_headers = {"path": "foo", "uid": "123"}
1394
1395        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1396                           encoding="iso8859-1")
1397        try:
1398            t = tarfile.TarInfo()
1399            t.name = "\xe4\xf6\xfc" # non-ASCII
1400            t.uid = 8**8 # too large
1401            t.pax_headers = pax_headers
1402            tar.addfile(t)
1403        finally:
1404            tar.close()
1405
1406        tar = tarfile.open(tmpname, encoding="iso8859-1")
1407        try:
1408            t = tar.getmembers()[0]
1409            self.assertEqual(t.pax_headers, pax_headers)
1410            self.assertEqual(t.name, "foo")
1411            self.assertEqual(t.uid, 123)
1412        finally:
1413            tar.close()
1414
1415
1416class UstarUnicodeTest(unittest.TestCase):
1417
1418    format = tarfile.USTAR_FORMAT
1419
1420    def test_iso8859_1_filename(self):
1421        self._test_unicode_filename("iso8859-1")
1422
1423    def test_utf7_filename(self):
1424        self._test_unicode_filename("utf7")
1425
1426    def test_utf8_filename(self):
1427        self._test_unicode_filename("utf-8")
1428
1429    def _test_unicode_filename(self, encoding):
1430        tar = tarfile.open(tmpname, "w", format=self.format,
1431                           encoding=encoding, errors="strict")
1432        try:
1433            name = "\xe4\xf6\xfc"
1434            tar.addfile(tarfile.TarInfo(name))
1435        finally:
1436            tar.close()
1437
1438        tar = tarfile.open(tmpname, encoding=encoding)
1439        try:
1440            self.assertEqual(tar.getmembers()[0].name, name)
1441        finally:
1442            tar.close()
1443
1444    def test_unicode_filename_error(self):
1445        tar = tarfile.open(tmpname, "w", format=self.format,
1446                           encoding="ascii", errors="strict")
1447        try:
1448            tarinfo = tarfile.TarInfo()
1449
1450            tarinfo.name = "\xe4\xf6\xfc"
1451            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1452
1453            tarinfo.name = "foo"
1454            tarinfo.uname = "\xe4\xf6\xfc"
1455            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1456        finally:
1457            tar.close()
1458
1459    def test_unicode_argument(self):
1460        tar = tarfile.open(tarname, "r",
1461                           encoding="iso8859-1", errors="strict")
1462        try:
1463            for t in tar:
1464                self.assertIs(type(t.name), str)
1465                self.assertIs(type(t.linkname), str)
1466                self.assertIs(type(t.uname), str)
1467                self.assertIs(type(t.gname), str)
1468        finally:
1469            tar.close()
1470
1471    def test_uname_unicode(self):
1472        t = tarfile.TarInfo("foo")
1473        t.uname = "\xe4\xf6\xfc"
1474        t.gname = "\xe4\xf6\xfc"
1475
1476        tar = tarfile.open(tmpname, mode="w", format=self.format,
1477                           encoding="iso8859-1")
1478        try:
1479            tar.addfile(t)
1480        finally:
1481            tar.close()
1482
1483        tar = tarfile.open(tmpname, encoding="iso8859-1")
1484        try:
1485            t = tar.getmember("foo")
1486            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1487            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1488
1489            if self.format != tarfile.PAX_FORMAT:
1490                tar.close()
1491                tar = tarfile.open(tmpname, encoding="ascii")
1492                t = tar.getmember("foo")
1493                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1494                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1495        finally:
1496            tar.close()
1497
1498
1499class GNUUnicodeTest(UstarUnicodeTest):
1500
1501    format = tarfile.GNU_FORMAT
1502
1503    def test_bad_pax_header(self):
1504        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1505        # without a hdrcharset=BINARY header.
1506        for encoding, name in (
1507                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1508                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1509            with tarfile.open(tarname, encoding=encoding,
1510                              errors="surrogateescape") as tar:
1511                try:
1512                    t = tar.getmember(name)
1513                except KeyError:
1514                    self.fail("unable to read bad GNU tar pax header")
1515
1516
1517class PAXUnicodeTest(UstarUnicodeTest):
1518
1519    format = tarfile.PAX_FORMAT
1520
1521    # PAX_FORMAT ignores encoding in write mode.
1522    test_unicode_filename_error = None
1523
1524    def test_binary_header(self):
1525        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1526        for encoding, name in (
1527                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1528                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1529            with tarfile.open(tarname, encoding=encoding,
1530                              errors="surrogateescape") as tar:
1531                try:
1532                    t = tar.getmember(name)
1533                except KeyError:
1534                    self.fail("unable to read POSIX.1-2008 binary header")
1535
1536
1537class AppendTestBase:
1538    # Test append mode (cp. patch #1652681).
1539
1540    def setUp(self):
1541        self.tarname = tmpname
1542        if os.path.exists(self.tarname):
1543            os.remove(self.tarname)
1544
1545    def _create_testtar(self, mode="w:"):
1546        with tarfile.open(tarname, encoding="iso8859-1") as src:
1547            t = src.getmember("ustar/regtype")
1548            t.name = "foo"
1549            with src.extractfile(t) as f:
1550                with tarfile.open(self.tarname, mode) as tar:
1551                    tar.addfile(t, f)
1552
1553    def test_append_compressed(self):
1554        self._create_testtar("w:" + self.suffix)
1555        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1556
1557class AppendTest(AppendTestBase, unittest.TestCase):
1558    test_append_compressed = None
1559
1560    def _add_testfile(self, fileobj=None):
1561        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1562            tar.addfile(tarfile.TarInfo("bar"))
1563
1564    def _test(self, names=["bar"], fileobj=None):
1565        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1566            self.assertEqual(tar.getnames(), names)
1567
1568    def test_non_existing(self):
1569        self._add_testfile()
1570        self._test()
1571
1572    def test_empty(self):
1573        tarfile.open(self.tarname, "w:").close()
1574        self._add_testfile()
1575        self._test()
1576
1577    def test_empty_fileobj(self):
1578        fobj = io.BytesIO(b"\0" * 1024)
1579        self._add_testfile(fobj)
1580        fobj.seek(0)
1581        self._test(fileobj=fobj)
1582
1583    def test_fileobj(self):
1584        self._create_testtar()
1585        with open(self.tarname, "rb") as fobj:
1586            data = fobj.read()
1587        fobj = io.BytesIO(data)
1588        self._add_testfile(fobj)
1589        fobj.seek(0)
1590        self._test(names=["foo", "bar"], fileobj=fobj)
1591
1592    def test_existing(self):
1593        self._create_testtar()
1594        self._add_testfile()
1595        self._test(names=["foo", "bar"])
1596
1597    # Append mode is supposed to fail if the tarfile to append to
1598    # does not end with a zero block.
1599    def _test_error(self, data):
1600        with open(self.tarname, "wb") as fobj:
1601            fobj.write(data)
1602        self.assertRaises(tarfile.ReadError, self._add_testfile)
1603
1604    def test_null(self):
1605        self._test_error(b"")
1606
1607    def test_incomplete(self):
1608        self._test_error(b"\0" * 13)
1609
1610    def test_premature_eof(self):
1611        data = tarfile.TarInfo("foo").tobuf()
1612        self._test_error(data)
1613
1614    def test_trailing_garbage(self):
1615        data = tarfile.TarInfo("foo").tobuf()
1616        self._test_error(data + b"\0" * 13)
1617
1618    def test_invalid(self):
1619        self._test_error(b"a" * 512)
1620
1621class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
1622    pass
1623
1624class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
1625    pass
1626
1627class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
1628    pass
1629
1630
1631class LimitsTest(unittest.TestCase):
1632
1633    def test_ustar_limits(self):
1634        # 100 char name
1635        tarinfo = tarfile.TarInfo("0123456789" * 10)
1636        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1637
1638        # 101 char name that cannot be stored
1639        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1640        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1641
1642        # 256 char name with a slash at pos 156
1643        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1644        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1645
1646        # 256 char name that cannot be stored
1647        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1648        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1649
1650        # 512 char name
1651        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1652        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1653
1654        # 512 char linkname
1655        tarinfo = tarfile.TarInfo("longlink")
1656        tarinfo.linkname = "123/" * 126 + "longname"
1657        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1658
1659        # uid > 8 digits
1660        tarinfo = tarfile.TarInfo("name")
1661        tarinfo.uid = 0o10000000
1662        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1663
1664    def test_gnu_limits(self):
1665        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1666        tarinfo.tobuf(tarfile.GNU_FORMAT)
1667
1668        tarinfo = tarfile.TarInfo("longlink")
1669        tarinfo.linkname = "123/" * 126 + "longname"
1670        tarinfo.tobuf(tarfile.GNU_FORMAT)
1671
1672        # uid >= 256 ** 7
1673        tarinfo = tarfile.TarInfo("name")
1674        tarinfo.uid = 0o4000000000000000000
1675        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1676
1677    def test_pax_limits(self):
1678        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1679        tarinfo.tobuf(tarfile.PAX_FORMAT)
1680
1681        tarinfo = tarfile.TarInfo("longlink")
1682        tarinfo.linkname = "123/" * 126 + "longname"
1683        tarinfo.tobuf(tarfile.PAX_FORMAT)
1684
1685        tarinfo = tarfile.TarInfo("name")
1686        tarinfo.uid = 0o4000000000000000000
1687        tarinfo.tobuf(tarfile.PAX_FORMAT)
1688
1689
1690class MiscTest(unittest.TestCase):
1691
1692    def test_char_fields(self):
1693        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
1694                         b"foo\0\0\0\0\0")
1695        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
1696                         b"foo")
1697        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
1698                         "foo")
1699        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
1700                         "foo")
1701
1702    def test_read_number_fields(self):
1703        # Issue 13158: Test if GNU tar specific base-256 number fields
1704        # are decoded correctly.
1705        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
1706        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
1707        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
1708                         0o10000000)
1709        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
1710                         0xffffffff)
1711        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
1712                         -1)
1713        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
1714                         -100)
1715        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
1716                         -0x100000000000000)
1717
1718    def test_write_number_fields(self):
1719        self.assertEqual(tarfile.itn(1), b"0000001\x00")
1720        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
1721        self.assertEqual(tarfile.itn(0o10000000),
1722                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
1723        self.assertEqual(tarfile.itn(0xffffffff),
1724                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
1725        self.assertEqual(tarfile.itn(-1),
1726                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
1727        self.assertEqual(tarfile.itn(-100),
1728                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
1729        self.assertEqual(tarfile.itn(-0x100000000000000),
1730                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
1731
1732    def test_number_field_limits(self):
1733        with self.assertRaises(ValueError):
1734            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
1735        with self.assertRaises(ValueError):
1736            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
1737        with self.assertRaises(ValueError):
1738            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
1739        with self.assertRaises(ValueError):
1740            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
1741
1742
1743class ContextManagerTest(unittest.TestCase):
1744
1745    def test_basic(self):
1746        with tarfile.open(tarname) as tar:
1747            self.assertFalse(tar.closed, "closed inside runtime context")
1748        self.assertTrue(tar.closed, "context manager failed")
1749
1750    def test_closed(self):
1751        # The __enter__() method is supposed to raise IOError
1752        # if the TarFile object is already closed.
1753        tar = tarfile.open(tarname)
1754        tar.close()
1755        with self.assertRaises(IOError):
1756            with tar:
1757                pass
1758
1759    def test_exception(self):
1760        # Test if the IOError exception is passed through properly.
1761        with self.assertRaises(Exception) as exc:
1762            with tarfile.open(tarname) as tar:
1763                raise IOError
1764        self.assertIsInstance(exc.exception, IOError,
1765                              "wrong exception raised in context manager")
1766        self.assertTrue(tar.closed, "context manager failed")
1767
1768    def test_no_eof(self):
1769        # __exit__() must not write end-of-archive blocks if an
1770        # exception was raised.
1771        try:
1772            with tarfile.open(tmpname, "w") as tar:
1773                raise Exception
1774        except:
1775            pass
1776        self.assertEqual(os.path.getsize(tmpname), 0,
1777                "context manager wrote an end-of-archive block")
1778        self.assertTrue(tar.closed, "context manager failed")
1779
1780    def test_eof(self):
1781        # __exit__() must write end-of-archive blocks, i.e. call
1782        # TarFile.close() if there was no error.
1783        with tarfile.open(tmpname, "w"):
1784            pass
1785        self.assertNotEqual(os.path.getsize(tmpname), 0,
1786                "context manager wrote no end-of-archive block")
1787
1788    def test_fileobj(self):
1789        # Test that __exit__() did not close the external file
1790        # object.
1791        with open(tmpname, "wb") as fobj:
1792            try:
1793                with tarfile.open(fileobj=fobj, mode="w") as tar:
1794                    raise Exception
1795            except:
1796                pass
1797            self.assertFalse(fobj.closed, "external file object was closed")
1798            self.assertTrue(tar.closed, "context manager failed")
1799
1800
1801@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
1802class LinkEmulationTest(ReadTest, unittest.TestCase):
1803
1804    # Test for issue #8741 regression. On platforms that do not support
1805    # symbolic or hard links tarfile tries to extract these types of members
1806    # as the regular files they point to.
1807    def _test_link_extraction(self, name):
1808        self.tar.extract(name, TEMPDIR)
1809        with open(os.path.join(TEMPDIR, name), "rb") as f:
1810            data = f.read()
1811        self.assertEqual(md5sum(data), md5_regtype)
1812
1813    # See issues #1578269, #8879, and #17689 for some history on these skips
1814    @unittest.skipIf(hasattr(os.path, "islink"),
1815                     "Skip emulation - has os.path.islink but not os.link")
1816    def test_hardlink_extraction1(self):
1817        self._test_link_extraction("ustar/lnktype")
1818
1819    @unittest.skipIf(hasattr(os.path, "islink"),
1820                     "Skip emulation - has os.path.islink but not os.link")
1821    def test_hardlink_extraction2(self):
1822        self._test_link_extraction("./ustar/linktest2/lnktype")
1823
1824    @unittest.skipIf(hasattr(os, "symlink"),
1825                     "Skip emulation if symlink exists")
1826    def test_symlink_extraction1(self):
1827        self._test_link_extraction("ustar/symtype")
1828
1829    @unittest.skipIf(hasattr(os, "symlink"),
1830                     "Skip emulation if symlink exists")
1831    def test_symlink_extraction2(self):
1832        self._test_link_extraction("./ustar/linktest2/symtype")
1833
1834
1835class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
1836    # Issue5068: The _BZ2Proxy.read() method loops forever
1837    # on an empty or partial bzipped file.
1838
1839    def _test_partial_input(self, mode):
1840        class MyBytesIO(io.BytesIO):
1841            hit_eof = False
1842            def read(self, n):
1843                if self.hit_eof:
1844                    raise AssertionError("infinite loop detected in "
1845                                         "tarfile.open()")
1846                self.hit_eof = self.tell() == len(self.getvalue())
1847                return super(MyBytesIO, self).read(n)
1848            def seek(self, *args):
1849                self.hit_eof = False
1850                return super(MyBytesIO, self).seek(*args)
1851
1852        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1853        for x in range(len(data) + 1):
1854            try:
1855                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
1856            except tarfile.ReadError:
1857                pass # we have no interest in ReadErrors
1858
1859    def test_partial_input(self):
1860        self._test_partial_input("r")
1861
1862    def test_partial_input_bz2(self):
1863        self._test_partial_input("r:bz2")
1864
1865
1866def setUpModule():
1867    support.unlink(TEMPDIR)
1868    os.makedirs(TEMPDIR)
1869
1870    with open(tarname, "rb") as fobj:
1871        data = fobj.read()
1872
1873    # Create compressed tarfiles.
1874    for c in GzipTest, Bz2Test, LzmaTest:
1875        if c.open:
1876            support.unlink(c.tarname)
1877            with c.open(c.tarname, "wb") as tar:
1878                tar.write(data)
1879
1880def tearDownModule():
1881    if os.path.exists(TEMPDIR):
1882        shutil.rmtree(TEMPDIR)
1883
1884if __name__ == "__main__":
1885    unittest.main()
1886