test_tarfile.py revision dd071045e776e1c3e8cf6750a2fd1d0958bf19b3
1import sys
2import os
3import io
4import shutil
5import io
6from hashlib import md5
7import errno
8
9import unittest
10import tarfile
11
12from test import support
13
14# Check for our compression modules.
15try:
16    import gzip
17    gzip.GzipFile
18except (ImportError, AttributeError):
19    gzip = None
20try:
21    import bz2
22except ImportError:
23    bz2 = None
24
25def md5sum(data):
26    return md5(data).hexdigest()
27
28TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
29tarname = support.findfile("testtar.tar")
30gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
31bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
32tmpname = os.path.join(TEMPDIR, "tmp.tar")
33
34md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
35md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
36
37
38class ReadTest(unittest.TestCase):
39
40    tarname = tarname
41    mode = "r:"
42
43    def setUp(self):
44        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
45
46    def tearDown(self):
47        self.tar.close()
48
49
50class UstarReadTest(ReadTest):
51
52    def test_fileobj_regular_file(self):
53        tarinfo = self.tar.getmember("ustar/regtype")
54        fobj = self.tar.extractfile(tarinfo)
55        try:
56            data = fobj.read()
57            self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
58                    "regular file extraction failed")
59        finally:
60            fobj.close()
61
62    def test_fileobj_readlines(self):
63        self.tar.extract("ustar/regtype", TEMPDIR)
64        tarinfo = self.tar.getmember("ustar/regtype")
65        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
66            lines1 = fobj1.readlines()
67
68        fobj = self.tar.extractfile(tarinfo)
69        try:
70            fobj2 = io.TextIOWrapper(fobj)
71            lines2 = fobj2.readlines()
72            self.assertTrue(lines1 == lines2,
73                    "fileobj.readlines() failed")
74            self.assertTrue(len(lines2) == 114,
75                    "fileobj.readlines() failed")
76            self.assertTrue(lines2[83] ==
77                    "I will gladly admit that Python is not the fastest running scripting language.\n",
78                    "fileobj.readlines() failed")
79        finally:
80            fobj.close()
81
82    def test_fileobj_iter(self):
83        self.tar.extract("ustar/regtype", TEMPDIR)
84        tarinfo = self.tar.getmember("ustar/regtype")
85        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
86            lines1 = fobj1.readlines()
87        fobj2 = self.tar.extractfile(tarinfo)
88        try:
89            lines2 = list(io.TextIOWrapper(fobj2))
90            self.assertTrue(lines1 == lines2,
91                         "fileobj.__iter__() failed")
92        finally:
93            fobj2.close()
94
95    def test_fileobj_seek(self):
96        self.tar.extract("ustar/regtype", TEMPDIR)
97        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
98            data = fobj.read()
99
100        tarinfo = self.tar.getmember("ustar/regtype")
101        fobj = self.tar.extractfile(tarinfo)
102
103        text = fobj.read()
104        fobj.seek(0)
105        self.assertEqual(0, fobj.tell(),
106                     "seek() to file's start failed")
107        fobj.seek(2048, 0)
108        self.assertEqual(2048, fobj.tell(),
109                     "seek() to absolute position failed")
110        fobj.seek(-1024, 1)
111        self.assertEqual(1024, fobj.tell(),
112                     "seek() to negative relative position failed")
113        fobj.seek(1024, 1)
114        self.assertEqual(2048, fobj.tell(),
115                     "seek() to positive relative position failed")
116        s = fobj.read(10)
117        self.assertTrue(s == data[2048:2058],
118                     "read() after seek failed")
119        fobj.seek(0, 2)
120        self.assertEqual(tarinfo.size, fobj.tell(),
121                     "seek() to file's end failed")
122        self.assertTrue(fobj.read() == b"",
123                     "read() at file's end did not return empty string")
124        fobj.seek(-tarinfo.size, 2)
125        self.assertEqual(0, fobj.tell(),
126                     "relative seek() to file's end failed")
127        fobj.seek(512)
128        s1 = fobj.readlines()
129        fobj.seek(512)
130        s2 = fobj.readlines()
131        self.assertTrue(s1 == s2,
132                     "readlines() after seek failed")
133        fobj.seek(0)
134        self.assertEqual(len(fobj.readline()), fobj.tell(),
135                     "tell() after readline() failed")
136        fobj.seek(512)
137        self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
138                     "tell() after seek() and readline() failed")
139        fobj.seek(0)
140        line = fobj.readline()
141        self.assertEqual(fobj.read(), data[len(line):],
142                     "read() after readline() failed")
143        fobj.close()
144
145    # Test if symbolic and hard links are resolved by extractfile().  The
146    # test link members each point to a regular member whose data is
147    # supposed to be exported.
148    def _test_fileobj_link(self, lnktype, regtype):
149        a = self.tar.extractfile(lnktype)
150        b = self.tar.extractfile(regtype)
151        try:
152            self.assertEqual(a.name, b.name)
153        finally:
154            a.close()
155            b.close()
156
157    def test_fileobj_link1(self):
158        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
159
160    def test_fileobj_link2(self):
161        self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
162
163    def test_fileobj_symlink1(self):
164        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
165
166    def test_fileobj_symlink2(self):
167        self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
168
169
170class CommonReadTest(ReadTest):
171
172    def test_empty_tarfile(self):
173        # Test for issue6123: Allow opening empty archives.
174        # This test checks if tarfile.open() is able to open an empty tar
175        # archive successfully. Note that an empty tar archive is not the
176        # same as an empty file!
177        with tarfile.open(tmpname, self.mode.replace("r", "w")):
178            pass
179        try:
180            tar = tarfile.open(tmpname, self.mode)
181            tar.getnames()
182        except tarfile.ReadError:
183            self.fail("tarfile.open() failed on empty archive")
184        else:
185            self.assertListEqual(tar.getmembers(), [])
186        finally:
187            tar.close()
188
189    def test_null_tarfile(self):
190        # Test for issue6123: Allow opening empty archives.
191        # This test guarantees that tarfile.open() does not treat an empty
192        # file as an empty tar archive.
193        with open(tmpname, "wb"):
194            pass
195        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
196        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
197
198    def test_ignore_zeros(self):
199        # Test TarFile's ignore_zeros option.
200        if self.mode.endswith(":gz"):
201            _open = gzip.GzipFile
202        elif self.mode.endswith(":bz2"):
203            _open = bz2.BZ2File
204        else:
205            _open = open
206
207        for char in (b'\0', b'a'):
208            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
209            # are ignored correctly.
210            with _open(tmpname, "wb") as fobj:
211                fobj.write(char * 1024)
212                fobj.write(tarfile.TarInfo("foo").tobuf())
213
214            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
215            try:
216                self.assertListEqual(tar.getnames(), ["foo"],
217                    "ignore_zeros=True should have skipped the %r-blocks" % char)
218            finally:
219                tar.close()
220
221
222class MiscReadTest(CommonReadTest):
223
224    def test_no_name_argument(self):
225        with open(self.tarname, "rb") as fobj:
226            tar = tarfile.open(fileobj=fobj, mode=self.mode)
227            self.assertEqual(tar.name, os.path.abspath(fobj.name))
228
229    def test_no_name_attribute(self):
230        with open(self.tarname, "rb") as fobj:
231            data = fobj.read()
232        fobj = io.BytesIO(data)
233        self.assertRaises(AttributeError, getattr, fobj, "name")
234        tar = tarfile.open(fileobj=fobj, mode=self.mode)
235        self.assertEqual(tar.name, None)
236
237    def test_empty_name_attribute(self):
238        with open(self.tarname, "rb") as fobj:
239            data = fobj.read()
240        fobj = io.BytesIO(data)
241        fobj.name = ""
242        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
243            self.assertEqual(tar.name, None)
244
245    def test_fileobj_with_offset(self):
246        # Skip the first member and store values from the second member
247        # of the testtar.
248        tar = tarfile.open(self.tarname, mode=self.mode)
249        try:
250            tar.next()
251            t = tar.next()
252            name = t.name
253            offset = t.offset
254            f = tar.extractfile(t)
255            data = f.read()
256            f.close()
257        finally:
258            tar.close()
259
260        # Open the testtar and seek to the offset of the second member.
261        if self.mode.endswith(":gz"):
262            _open = gzip.GzipFile
263        elif self.mode.endswith(":bz2"):
264            _open = bz2.BZ2File
265        else:
266            _open = open
267        fobj = _open(self.tarname, "rb")
268        try:
269            fobj.seek(offset)
270
271            # Test if the tarfile starts with the second member.
272            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
273            t = tar.next()
274            self.assertEqual(t.name, name)
275            # Read to the end of fileobj and test if seeking back to the
276            # beginning works.
277            tar.getmembers()
278            self.assertEqual(tar.extractfile(t).read(), data,
279                    "seek back did not work")
280            tar.close()
281        finally:
282            fobj.close()
283
284    def test_fail_comp(self):
285        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
286        if self.mode == "r:":
287            return
288        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
289        with open(tarname, "rb") as fobj:
290            self.assertRaises(tarfile.ReadError, tarfile.open,
291                              fileobj=fobj, mode=self.mode)
292
293    def test_v7_dirtype(self):
294        # Test old style dirtype member (bug #1336623):
295        # Old V7 tars create directory members using an AREGTYPE
296        # header with a "/" appended to the filename field.
297        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
298        self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
299                "v7 dirtype failed")
300
301    def test_xstar_type(self):
302        # The xstar format stores extra atime and ctime fields inside the
303        # space reserved for the prefix field. The prefix field must be
304        # ignored in this case, otherwise it will mess up the name.
305        try:
306            self.tar.getmember("misc/regtype-xstar")
307        except KeyError:
308            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
309
310    def test_check_members(self):
311        for tarinfo in self.tar:
312            self.assertTrue(int(tarinfo.mtime) == 0o7606136617,
313                    "wrong mtime for %s" % tarinfo.name)
314            if not tarinfo.name.startswith("ustar/"):
315                continue
316            self.assertTrue(tarinfo.uname == "tarfile",
317                    "wrong uname for %s" % tarinfo.name)
318
319    def test_find_members(self):
320        self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
321                "could not find all members")
322
323    @unittest.skipUnless(hasattr(os, "link"),
324                         "Missing hardlink implementation")
325    @support.skip_unless_symlink
326    def test_extract_hardlink(self):
327        # Test hardlink extraction (e.g. bug #857297).
328        tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
329
330        try:
331            tar.extract("ustar/regtype", TEMPDIR)
332            try:
333                tar.extract("ustar/lnktype", TEMPDIR)
334            except EnvironmentError as e:
335                if e.errno == errno.ENOENT:
336                    self.fail("hardlink not extracted properly")
337
338            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
339                data = f.read()
340            self.assertEqual(md5sum(data), md5_regtype)
341
342            try:
343                tar.extract("ustar/symtype", TEMPDIR)
344            except EnvironmentError as e:
345                if e.errno == errno.ENOENT:
346                    self.fail("symlink not extracted properly")
347
348            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
349                data = f.read()
350            self.assertEqual(md5sum(data), md5_regtype)
351        finally:
352            tar.close()
353
354    def test_extractall(self):
355        # Test if extractall() correctly restores directory permissions
356        # and times (see issue1735).
357        tar = tarfile.open(tarname, encoding="iso8859-1")
358        DIR = os.path.join(TEMPDIR, "extractall")
359        os.mkdir(DIR)
360        try:
361            directories = [t for t in tar if t.isdir()]
362            tar.extractall(DIR, directories)
363            for tarinfo in directories:
364                path = os.path.join(DIR, tarinfo.name)
365                if sys.platform != "win32":
366                    # Win32 has no support for fine grained permissions.
367                    self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777)
368                def format_mtime(mtime):
369                    if isinstance(mtime, float):
370                        return "{} ({})".format(mtime, mtime.hex())
371                    else:
372                        return "{!r} (int)".format(mtime)
373                file_mtime = os.path.getmtime(path)
374                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
375                    format_mtime(tarinfo.mtime),
376                    format_mtime(file_mtime),
377                    path)
378                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
379        finally:
380            tar.close()
381            shutil.rmtree(DIR)
382
383    def test_extract_directory(self):
384        dirtype = "ustar/dirtype"
385        DIR = os.path.join(TEMPDIR, "extractdir")
386        os.mkdir(DIR)
387        try:
388            with tarfile.open(tarname, encoding="iso8859-1") as tar:
389                tarinfo = tar.getmember(dirtype)
390                tar.extract(tarinfo, path=DIR)
391                extracted = os.path.join(DIR, dirtype)
392                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
393                if sys.platform != "win32":
394                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
395        finally:
396            shutil.rmtree(DIR)
397
398    def test_init_close_fobj(self):
399        # Issue #7341: Close the internal file object in the TarFile
400        # constructor in case of an error. For the test we rely on
401        # the fact that opening an empty file raises a ReadError.
402        empty = os.path.join(TEMPDIR, "empty")
403        with open(empty, "wb") as fobj:
404            fobj.write(b"")
405
406        try:
407            tar = object.__new__(tarfile.TarFile)
408            try:
409                tar.__init__(empty)
410            except tarfile.ReadError:
411                self.assertTrue(tar.fileobj.closed)
412            else:
413                self.fail("ReadError not raised")
414        finally:
415            support.unlink(empty)
416
417
418class StreamReadTest(CommonReadTest):
419
420    mode="r|"
421
422    def test_read_through(self):
423        # Issue #11224: A poorly designed _FileInFile.read() method
424        # caused seeking errors with stream tar files.
425        for tarinfo in self.tar:
426            if not tarinfo.isreg():
427                continue
428            fobj = self.tar.extractfile(tarinfo)
429            while True:
430                try:
431                    buf = fobj.read(512)
432                except tarfile.StreamError:
433                    self.fail("simple read-through using TarFile.extractfile() failed")
434                if not buf:
435                    break
436            fobj.close()
437
438    def test_fileobj_regular_file(self):
439        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
440        fobj = self.tar.extractfile(tarinfo)
441        data = fobj.read()
442        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
443                "regular file extraction failed")
444
445    def test_provoke_stream_error(self):
446        tarinfos = self.tar.getmembers()
447        f = self.tar.extractfile(tarinfos[0]) # read the first member
448        self.assertRaises(tarfile.StreamError, f.read)
449
450    def test_compare_members(self):
451        tar1 = tarfile.open(tarname, encoding="iso8859-1")
452        try:
453            tar2 = self.tar
454
455            while True:
456                t1 = tar1.next()
457                t2 = tar2.next()
458                if t1 is None:
459                    break
460                self.assertTrue(t2 is not None, "stream.next() failed.")
461
462                if t2.islnk() or t2.issym():
463                    self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
464                    continue
465
466                v1 = tar1.extractfile(t1)
467                v2 = tar2.extractfile(t2)
468                if v1 is None:
469                    continue
470                self.assertTrue(v2 is not None, "stream.extractfile() failed")
471                self.assertEqual(v1.read(), v2.read(), "stream extraction failed")
472        finally:
473            tar1.close()
474
475
476class DetectReadTest(unittest.TestCase):
477
478    def _testfunc_file(self, name, mode):
479        try:
480            tar = tarfile.open(name, mode)
481        except tarfile.ReadError as e:
482            self.fail()
483        else:
484            tar.close()
485
486    def _testfunc_fileobj(self, name, mode):
487        try:
488            with open(name, "rb") as f:
489                tar = tarfile.open(name, mode, fileobj=f)
490        except tarfile.ReadError as e:
491            self.fail()
492        else:
493            tar.close()
494
495    def _test_modes(self, testfunc):
496        testfunc(tarname, "r")
497        testfunc(tarname, "r:")
498        testfunc(tarname, "r:*")
499        testfunc(tarname, "r|")
500        testfunc(tarname, "r|*")
501
502        if gzip:
503            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
504            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
505            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
506            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
507
508            testfunc(gzipname, "r")
509            testfunc(gzipname, "r:*")
510            testfunc(gzipname, "r:gz")
511            testfunc(gzipname, "r|*")
512            testfunc(gzipname, "r|gz")
513
514        if bz2:
515            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
516            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
517            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
518            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
519
520            testfunc(bz2name, "r")
521            testfunc(bz2name, "r:*")
522            testfunc(bz2name, "r:bz2")
523            testfunc(bz2name, "r|*")
524            testfunc(bz2name, "r|bz2")
525
526    def test_detect_file(self):
527        self._test_modes(self._testfunc_file)
528
529    def test_detect_fileobj(self):
530        self._test_modes(self._testfunc_fileobj)
531
532
533class MemberReadTest(ReadTest):
534
535    def _test_member(self, tarinfo, chksum=None, **kwargs):
536        if chksum is not None:
537            self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
538                    "wrong md5sum for %s" % tarinfo.name)
539
540        kwargs["mtime"] = 0o7606136617
541        kwargs["uid"] = 1000
542        kwargs["gid"] = 100
543        if "old-v7" not in tarinfo.name:
544            # V7 tar can't handle alphabetic owners.
545            kwargs["uname"] = "tarfile"
546            kwargs["gname"] = "tarfile"
547        for k, v in kwargs.items():
548            self.assertTrue(getattr(tarinfo, k) == v,
549                    "wrong value in %s field of %s" % (k, tarinfo.name))
550
551    def test_find_regtype(self):
552        tarinfo = self.tar.getmember("ustar/regtype")
553        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
554
555    def test_find_conttype(self):
556        tarinfo = self.tar.getmember("ustar/conttype")
557        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
558
559    def test_find_dirtype(self):
560        tarinfo = self.tar.getmember("ustar/dirtype")
561        self._test_member(tarinfo, size=0)
562
563    def test_find_dirtype_with_size(self):
564        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
565        self._test_member(tarinfo, size=255)
566
567    def test_find_lnktype(self):
568        tarinfo = self.tar.getmember("ustar/lnktype")
569        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
570
571    def test_find_symtype(self):
572        tarinfo = self.tar.getmember("ustar/symtype")
573        self._test_member(tarinfo, size=0, linkname="regtype")
574
575    def test_find_blktype(self):
576        tarinfo = self.tar.getmember("ustar/blktype")
577        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
578
579    def test_find_chrtype(self):
580        tarinfo = self.tar.getmember("ustar/chrtype")
581        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
582
583    def test_find_fifotype(self):
584        tarinfo = self.tar.getmember("ustar/fifotype")
585        self._test_member(tarinfo, size=0)
586
587    def test_find_sparse(self):
588        tarinfo = self.tar.getmember("ustar/sparse")
589        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
590
591    def test_find_gnusparse(self):
592        tarinfo = self.tar.getmember("gnu/sparse")
593        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
594
595    def test_find_gnusparse_00(self):
596        tarinfo = self.tar.getmember("gnu/sparse-0.0")
597        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
598
599    def test_find_gnusparse_01(self):
600        tarinfo = self.tar.getmember("gnu/sparse-0.1")
601        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
602
603    def test_find_gnusparse_10(self):
604        tarinfo = self.tar.getmember("gnu/sparse-1.0")
605        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
606
607    def test_find_umlauts(self):
608        tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
609        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
610
611    def test_find_ustar_longname(self):
612        name = "ustar/" + "12345/" * 39 + "1234567/longname"
613        self.assertIn(name, self.tar.getnames())
614
615    def test_find_regtype_oldv7(self):
616        tarinfo = self.tar.getmember("misc/regtype-old-v7")
617        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
618
619    def test_find_pax_umlauts(self):
620        self.tar.close()
621        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
622        tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
623        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
624
625
626class LongnameTest(ReadTest):
627
628    def test_read_longname(self):
629        # Test reading of longname (bug #1471427).
630        longname = self.subdir + "/" + "123/" * 125 + "longname"
631        try:
632            tarinfo = self.tar.getmember(longname)
633        except KeyError:
634            self.fail("longname not found")
635        self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
636
637    def test_read_longlink(self):
638        longname = self.subdir + "/" + "123/" * 125 + "longname"
639        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
640        try:
641            tarinfo = self.tar.getmember(longlink)
642        except KeyError:
643            self.fail("longlink not found")
644        self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
645
646    def test_truncated_longname(self):
647        longname = self.subdir + "/" + "123/" * 125 + "longname"
648        tarinfo = self.tar.getmember(longname)
649        offset = tarinfo.offset
650        self.tar.fileobj.seek(offset)
651        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
652        self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
653
654    def test_header_offset(self):
655        # Test if the start offset of the TarInfo object includes
656        # the preceding extended header.
657        longname = self.subdir + "/" + "123/" * 125 + "longname"
658        offset = self.tar.getmember(longname).offset
659        with open(tarname, "rb") as fobj:
660            fobj.seek(offset)
661            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict")
662            self.assertEqual(tarinfo.type, self.longnametype)
663
664
665class GNUReadTest(LongnameTest):
666
667    subdir = "gnu"
668    longnametype = tarfile.GNUTYPE_LONGNAME
669
670    # Since 3.2 tarfile is supposed to accurately restore sparse members and
671    # produce files with holes. This is what we actually want to test here.
672    # Unfortunately, not all platforms/filesystems support sparse files, and
673    # even on platforms that do it is non-trivial to make reliable assertions
674    # about holes in files. Therefore, we first do one basic test which works
675    # an all platforms, and after that a test that will work only on
676    # platforms/filesystems that prove to support sparse files.
677    def _test_sparse_file(self, name):
678        self.tar.extract(name, TEMPDIR)
679        filename = os.path.join(TEMPDIR, name)
680        with open(filename, "rb") as fobj:
681            data = fobj.read()
682        self.assertEqual(md5sum(data), md5_sparse,
683                "wrong md5sum for %s" % name)
684
685        if self._fs_supports_holes():
686            s = os.stat(filename)
687            self.assertTrue(s.st_blocks * 512 < s.st_size)
688
689    def test_sparse_file_old(self):
690        self._test_sparse_file("gnu/sparse")
691
692    def test_sparse_file_00(self):
693        self._test_sparse_file("gnu/sparse-0.0")
694
695    def test_sparse_file_01(self):
696        self._test_sparse_file("gnu/sparse-0.1")
697
698    def test_sparse_file_10(self):
699        self._test_sparse_file("gnu/sparse-1.0")
700
701    @staticmethod
702    def _fs_supports_holes():
703        # Return True if the platform knows the st_blocks stat attribute and
704        # uses st_blocks units of 512 bytes, and if the filesystem is able to
705        # store holes in files.
706        if sys.platform == "linux2":
707            # Linux evidentially has 512 byte st_blocks units.
708            name = os.path.join(TEMPDIR, "sparse-test")
709            with open(name, "wb") as fobj:
710                fobj.seek(4096)
711                fobj.truncate()
712            s = os.stat(name)
713            os.remove(name)
714            return s.st_blocks == 0
715        else:
716            return False
717
718
719class PaxReadTest(LongnameTest):
720
721    subdir = "pax"
722    longnametype = tarfile.XHDTYPE
723
724    def test_pax_global_headers(self):
725        tar = tarfile.open(tarname, encoding="iso8859-1")
726        try:
727            tarinfo = tar.getmember("pax/regtype1")
728            self.assertEqual(tarinfo.uname, "foo")
729            self.assertEqual(tarinfo.gname, "bar")
730            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
731
732            tarinfo = tar.getmember("pax/regtype2")
733            self.assertEqual(tarinfo.uname, "")
734            self.assertEqual(tarinfo.gname, "bar")
735            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
736
737            tarinfo = tar.getmember("pax/regtype3")
738            self.assertEqual(tarinfo.uname, "tarfile")
739            self.assertEqual(tarinfo.gname, "tarfile")
740            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
741        finally:
742            tar.close()
743
744    def test_pax_number_fields(self):
745        # All following number fields are read from the pax header.
746        tar = tarfile.open(tarname, encoding="iso8859-1")
747        try:
748            tarinfo = tar.getmember("pax/regtype4")
749            self.assertEqual(tarinfo.size, 7011)
750            self.assertEqual(tarinfo.uid, 123)
751            self.assertEqual(tarinfo.gid, 123)
752            self.assertEqual(tarinfo.mtime, 1041808783.0)
753            self.assertEqual(type(tarinfo.mtime), float)
754            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
755            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
756        finally:
757            tar.close()
758
759
760class WriteTestBase(unittest.TestCase):
761    # Put all write tests in here that are supposed to be tested
762    # in all possible mode combinations.
763
764    def test_fileobj_no_close(self):
765        fobj = io.BytesIO()
766        tar = tarfile.open(fileobj=fobj, mode=self.mode)
767        tar.addfile(tarfile.TarInfo("foo"))
768        tar.close()
769        self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
770
771
772class WriteTest(WriteTestBase):
773
774    mode = "w:"
775
776    def test_100_char_name(self):
777        # The name field in a tar header stores strings of at most 100 chars.
778        # If a string is shorter than 100 chars it has to be padded with '\0',
779        # which implies that a string of exactly 100 chars is stored without
780        # a trailing '\0'.
781        name = "0123456789" * 10
782        tar = tarfile.open(tmpname, self.mode)
783        try:
784            t = tarfile.TarInfo(name)
785            tar.addfile(t)
786        finally:
787            tar.close()
788
789        tar = tarfile.open(tmpname)
790        try:
791            self.assertTrue(tar.getnames()[0] == name,
792                    "failed to store 100 char filename")
793        finally:
794            tar.close()
795
796    def test_tar_size(self):
797        # Test for bug #1013882.
798        tar = tarfile.open(tmpname, self.mode)
799        try:
800            path = os.path.join(TEMPDIR, "file")
801            with open(path, "wb") as fobj:
802                fobj.write(b"aaa")
803            tar.add(path)
804        finally:
805            tar.close()
806        self.assertTrue(os.path.getsize(tmpname) > 0,
807                "tarfile is empty")
808
809    # The test_*_size tests test for bug #1167128.
810    def test_file_size(self):
811        tar = tarfile.open(tmpname, self.mode)
812        try:
813            path = os.path.join(TEMPDIR, "file")
814            with open(path, "wb"):
815                pass
816            tarinfo = tar.gettarinfo(path)
817            self.assertEqual(tarinfo.size, 0)
818
819            with open(path, "wb") as fobj:
820                fobj.write(b"aaa")
821            tarinfo = tar.gettarinfo(path)
822            self.assertEqual(tarinfo.size, 3)
823        finally:
824            tar.close()
825
826    def test_directory_size(self):
827        path = os.path.join(TEMPDIR, "directory")
828        os.mkdir(path)
829        try:
830            tar = tarfile.open(tmpname, self.mode)
831            try:
832                tarinfo = tar.gettarinfo(path)
833                self.assertEqual(tarinfo.size, 0)
834            finally:
835                tar.close()
836        finally:
837            os.rmdir(path)
838
839    def test_link_size(self):
840        if hasattr(os, "link"):
841            link = os.path.join(TEMPDIR, "link")
842            target = os.path.join(TEMPDIR, "link_target")
843            with open(target, "wb") as fobj:
844                fobj.write(b"aaa")
845            os.link(target, link)
846            try:
847                tar = tarfile.open(tmpname, self.mode)
848                try:
849                    # Record the link target in the inodes list.
850                    tar.gettarinfo(target)
851                    tarinfo = tar.gettarinfo(link)
852                    self.assertEqual(tarinfo.size, 0)
853                finally:
854                    tar.close()
855            finally:
856                os.remove(target)
857                os.remove(link)
858
859    @support.skip_unless_symlink
860    def test_symlink_size(self):
861        path = os.path.join(TEMPDIR, "symlink")
862        os.symlink("link_target", path)
863        try:
864            tar = tarfile.open(tmpname, self.mode)
865            try:
866                tarinfo = tar.gettarinfo(path)
867                self.assertEqual(tarinfo.size, 0)
868            finally:
869                tar.close()
870        finally:
871            os.remove(path)
872
873    def test_add_self(self):
874        # Test for #1257255.
875        dstname = os.path.abspath(tmpname)
876        tar = tarfile.open(tmpname, self.mode)
877        try:
878            self.assertTrue(tar.name == dstname, "archive name must be absolute")
879            tar.add(dstname)
880            self.assertTrue(tar.getnames() == [], "added the archive to itself")
881
882            cwd = os.getcwd()
883            os.chdir(TEMPDIR)
884            tar.add(dstname)
885            os.chdir(cwd)
886            self.assertTrue(tar.getnames() == [], "added the archive to itself")
887        finally:
888            tar.close()
889
890    def test_exclude(self):
891        tempdir = os.path.join(TEMPDIR, "exclude")
892        os.mkdir(tempdir)
893        try:
894            for name in ("foo", "bar", "baz"):
895                name = os.path.join(tempdir, name)
896                open(name, "wb").close()
897
898            exclude = os.path.isfile
899
900            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
901            try:
902                with support.check_warnings(("use the filter argument",
903                                             DeprecationWarning)):
904                    tar.add(tempdir, arcname="empty_dir", exclude=exclude)
905            finally:
906                tar.close()
907
908            tar = tarfile.open(tmpname, "r")
909            try:
910                self.assertEqual(len(tar.getmembers()), 1)
911                self.assertEqual(tar.getnames()[0], "empty_dir")
912            finally:
913                tar.close()
914        finally:
915            shutil.rmtree(tempdir)
916
917    def test_filter(self):
918        tempdir = os.path.join(TEMPDIR, "filter")
919        os.mkdir(tempdir)
920        try:
921            for name in ("foo", "bar", "baz"):
922                name = os.path.join(tempdir, name)
923                open(name, "wb").close()
924
925            def filter(tarinfo):
926                if os.path.basename(tarinfo.name) == "bar":
927                    return
928                tarinfo.uid = 123
929                tarinfo.uname = "foo"
930                return tarinfo
931
932            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
933            try:
934                tar.add(tempdir, arcname="empty_dir", filter=filter)
935            finally:
936                tar.close()
937
938            # Verify that filter is a keyword-only argument
939            with self.assertRaises(TypeError):
940                tar.add(tempdir, "empty_dir", True, None, filter)
941
942            tar = tarfile.open(tmpname, "r")
943            try:
944                for tarinfo in tar:
945                    self.assertEqual(tarinfo.uid, 123)
946                    self.assertEqual(tarinfo.uname, "foo")
947                self.assertEqual(len(tar.getmembers()), 3)
948            finally:
949                tar.close()
950        finally:
951            shutil.rmtree(tempdir)
952
953    # Guarantee that stored pathnames are not modified. Don't
954    # remove ./ or ../ or double slashes. Still make absolute
955    # pathnames relative.
956    # For details see bug #6054.
957    def _test_pathname(self, path, cmp_path=None, dir=False):
958        # Create a tarfile with an empty member named path
959        # and compare the stored name with the original.
960        foo = os.path.join(TEMPDIR, "foo")
961        if not dir:
962            open(foo, "w").close()
963        else:
964            os.mkdir(foo)
965
966        tar = tarfile.open(tmpname, self.mode)
967        try:
968            tar.add(foo, arcname=path)
969        finally:
970            tar.close()
971
972        tar = tarfile.open(tmpname, "r")
973        try:
974            t = tar.next()
975        finally:
976            tar.close()
977
978        if not dir:
979            os.remove(foo)
980        else:
981            os.rmdir(foo)
982
983        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
984
985    def test_pathnames(self):
986        self._test_pathname("foo")
987        self._test_pathname(os.path.join("foo", ".", "bar"))
988        self._test_pathname(os.path.join("foo", "..", "bar"))
989        self._test_pathname(os.path.join(".", "foo"))
990        self._test_pathname(os.path.join(".", "foo", "."))
991        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
992        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
993        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
994        self._test_pathname(os.path.join("..", "foo"))
995        self._test_pathname(os.path.join("..", "foo", ".."))
996        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
997        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
998
999        self._test_pathname("foo" + os.sep + os.sep + "bar")
1000        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1001
1002    def test_abs_pathnames(self):
1003        if sys.platform == "win32":
1004            self._test_pathname("C:\\foo", "foo")
1005        else:
1006            self._test_pathname("/foo", "foo")
1007            self._test_pathname("///foo", "foo")
1008
1009    def test_cwd(self):
1010        # Test adding the current working directory.
1011        cwd = os.getcwd()
1012        os.chdir(TEMPDIR)
1013        try:
1014            tar = tarfile.open(tmpname, self.mode)
1015            try:
1016                tar.add(".")
1017            finally:
1018                tar.close()
1019
1020            tar = tarfile.open(tmpname, "r")
1021            try:
1022                for t in tar:
1023                    self.assertTrue(t.name == "." or t.name.startswith("./"))
1024            finally:
1025                tar.close()
1026        finally:
1027            os.chdir(cwd)
1028
1029
1030class StreamWriteTest(WriteTestBase):
1031
1032    mode = "w|"
1033
1034    def test_stream_padding(self):
1035        # Test for bug #1543303.
1036        tar = tarfile.open(tmpname, self.mode)
1037        tar.close()
1038
1039        if self.mode.endswith("gz"):
1040            with gzip.GzipFile(tmpname) as fobj:
1041                data = fobj.read()
1042        elif self.mode.endswith("bz2"):
1043            dec = bz2.BZ2Decompressor()
1044            with open(tmpname, "rb") as fobj:
1045                data = fobj.read()
1046            data = dec.decompress(data)
1047            self.assertTrue(len(dec.unused_data) == 0,
1048                    "found trailing data")
1049        else:
1050            with open(tmpname, "rb") as fobj:
1051                data = fobj.read()
1052
1053        self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE,
1054                         "incorrect zero padding")
1055
1056    def test_file_mode(self):
1057        # Test for issue #8464: Create files with correct
1058        # permissions.
1059        if sys.platform == "win32" or not hasattr(os, "umask"):
1060            return
1061
1062        if os.path.exists(tmpname):
1063            os.remove(tmpname)
1064
1065        original_umask = os.umask(0o022)
1066        try:
1067            tar = tarfile.open(tmpname, self.mode)
1068            tar.close()
1069            mode = os.stat(tmpname).st_mode & 0o777
1070            self.assertEqual(mode, 0o644, "wrong file permissions")
1071        finally:
1072            os.umask(original_umask)
1073
1074
1075class GNUWriteTest(unittest.TestCase):
1076    # This testcase checks for correct creation of GNU Longname
1077    # and Longlink extended headers (cp. bug #812325).
1078
1079    def _length(self, s):
1080        blocks, remainder = divmod(len(s) + 1, 512)
1081        if remainder:
1082            blocks += 1
1083        return blocks * 512
1084
1085    def _calc_size(self, name, link=None):
1086        # Initial tar header
1087        count = 512
1088
1089        if len(name) > tarfile.LENGTH_NAME:
1090            # GNU longname extended header + longname
1091            count += 512
1092            count += self._length(name)
1093        if link is not None and len(link) > tarfile.LENGTH_LINK:
1094            # GNU longlink extended header + longlink
1095            count += 512
1096            count += self._length(link)
1097        return count
1098
1099    def _test(self, name, link=None):
1100        tarinfo = tarfile.TarInfo(name)
1101        if link:
1102            tarinfo.linkname = link
1103            tarinfo.type = tarfile.LNKTYPE
1104
1105        tar = tarfile.open(tmpname, "w")
1106        try:
1107            tar.format = tarfile.GNU_FORMAT
1108            tar.addfile(tarinfo)
1109
1110            v1 = self._calc_size(name, link)
1111            v2 = tar.offset
1112            self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
1113        finally:
1114            tar.close()
1115
1116        tar = tarfile.open(tmpname)
1117        try:
1118            member = tar.next()
1119            self.assertIsNotNone(member,
1120                    "unable to read longname member")
1121            self.assertEqual(tarinfo.name, member.name,
1122                    "unable to read longname member")
1123            self.assertEqual(tarinfo.linkname, member.linkname,
1124                    "unable to read longname member")
1125        finally:
1126            tar.close()
1127
1128    def test_longname_1023(self):
1129        self._test(("longnam/" * 127) + "longnam")
1130
1131    def test_longname_1024(self):
1132        self._test(("longnam/" * 127) + "longname")
1133
1134    def test_longname_1025(self):
1135        self._test(("longnam/" * 127) + "longname_")
1136
1137    def test_longlink_1023(self):
1138        self._test("name", ("longlnk/" * 127) + "longlnk")
1139
1140    def test_longlink_1024(self):
1141        self._test("name", ("longlnk/" * 127) + "longlink")
1142
1143    def test_longlink_1025(self):
1144        self._test("name", ("longlnk/" * 127) + "longlink_")
1145
1146    def test_longnamelink_1023(self):
1147        self._test(("longnam/" * 127) + "longnam",
1148                   ("longlnk/" * 127) + "longlnk")
1149
1150    def test_longnamelink_1024(self):
1151        self._test(("longnam/" * 127) + "longname",
1152                   ("longlnk/" * 127) + "longlink")
1153
1154    def test_longnamelink_1025(self):
1155        self._test(("longnam/" * 127) + "longname_",
1156                   ("longlnk/" * 127) + "longlink_")
1157
1158
1159class HardlinkTest(unittest.TestCase):
1160    # Test the creation of LNKTYPE (hardlink) members in an archive.
1161
1162    def setUp(self):
1163        self.foo = os.path.join(TEMPDIR, "foo")
1164        self.bar = os.path.join(TEMPDIR, "bar")
1165
1166        with open(self.foo, "wb") as fobj:
1167            fobj.write(b"foo")
1168
1169        os.link(self.foo, self.bar)
1170
1171        self.tar = tarfile.open(tmpname, "w")
1172        self.tar.add(self.foo)
1173
1174    def tearDown(self):
1175        self.tar.close()
1176        support.unlink(self.foo)
1177        support.unlink(self.bar)
1178
1179    def test_add_twice(self):
1180        # The same name will be added as a REGTYPE every
1181        # time regardless of st_nlink.
1182        tarinfo = self.tar.gettarinfo(self.foo)
1183        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1184                "add file as regular failed")
1185
1186    def test_add_hardlink(self):
1187        tarinfo = self.tar.gettarinfo(self.bar)
1188        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
1189                "add file as hardlink failed")
1190
1191    def test_dereference_hardlink(self):
1192        self.tar.dereference = True
1193        tarinfo = self.tar.gettarinfo(self.bar)
1194        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1195                "dereferencing hardlink failed")
1196
1197
1198class PaxWriteTest(GNUWriteTest):
1199
1200    def _test(self, name, link=None):
1201        # See GNUWriteTest.
1202        tarinfo = tarfile.TarInfo(name)
1203        if link:
1204            tarinfo.linkname = link
1205            tarinfo.type = tarfile.LNKTYPE
1206
1207        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1208        try:
1209            tar.addfile(tarinfo)
1210        finally:
1211            tar.close()
1212
1213        tar = tarfile.open(tmpname)
1214        try:
1215            if link:
1216                l = tar.getmembers()[0].linkname
1217                self.assertTrue(link == l, "PAX longlink creation failed")
1218            else:
1219                n = tar.getmembers()[0].name
1220                self.assertTrue(name == n, "PAX longname creation failed")
1221        finally:
1222            tar.close()
1223
1224    def test_pax_global_header(self):
1225        pax_headers = {
1226                "foo": "bar",
1227                "uid": "0",
1228                "mtime": "1.23",
1229                "test": "\xe4\xf6\xfc",
1230                "\xe4\xf6\xfc": "test"}
1231
1232        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1233                pax_headers=pax_headers)
1234        try:
1235            tar.addfile(tarfile.TarInfo("test"))
1236        finally:
1237            tar.close()
1238
1239        # Test if the global header was written correctly.
1240        tar = tarfile.open(tmpname, encoding="iso8859-1")
1241        try:
1242            self.assertEqual(tar.pax_headers, pax_headers)
1243            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1244            # Test if all the fields are strings.
1245            for key, val in tar.pax_headers.items():
1246                self.assertTrue(type(key) is not bytes)
1247                self.assertTrue(type(val) is not bytes)
1248                if key in tarfile.PAX_NUMBER_FIELDS:
1249                    try:
1250                        tarfile.PAX_NUMBER_FIELDS[key](val)
1251                    except (TypeError, ValueError):
1252                        self.fail("unable to convert pax header field")
1253        finally:
1254            tar.close()
1255
1256    def test_pax_extended_header(self):
1257        # The fields from the pax header have priority over the
1258        # TarInfo.
1259        pax_headers = {"path": "foo", "uid": "123"}
1260
1261        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
1262        try:
1263            t = tarfile.TarInfo()
1264            t.name = "\xe4\xf6\xfc" # non-ASCII
1265            t.uid = 8**8 # too large
1266            t.pax_headers = pax_headers
1267            tar.addfile(t)
1268        finally:
1269            tar.close()
1270
1271        tar = tarfile.open(tmpname, encoding="iso8859-1")
1272        try:
1273            t = tar.getmembers()[0]
1274            self.assertEqual(t.pax_headers, pax_headers)
1275            self.assertEqual(t.name, "foo")
1276            self.assertEqual(t.uid, 123)
1277        finally:
1278            tar.close()
1279
1280
1281class UstarUnicodeTest(unittest.TestCase):
1282
1283    format = tarfile.USTAR_FORMAT
1284
1285    def test_iso8859_1_filename(self):
1286        self._test_unicode_filename("iso8859-1")
1287
1288    def test_utf7_filename(self):
1289        self._test_unicode_filename("utf7")
1290
1291    def test_utf8_filename(self):
1292        self._test_unicode_filename("utf8")
1293
1294    def _test_unicode_filename(self, encoding):
1295        tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
1296        try:
1297            name = "\xe4\xf6\xfc"
1298            tar.addfile(tarfile.TarInfo(name))
1299        finally:
1300            tar.close()
1301
1302        tar = tarfile.open(tmpname, encoding=encoding)
1303        try:
1304            self.assertEqual(tar.getmembers()[0].name, name)
1305        finally:
1306            tar.close()
1307
1308    def test_unicode_filename_error(self):
1309        if self.format == tarfile.PAX_FORMAT:
1310            # PAX_FORMAT ignores encoding in write mode.
1311            return
1312
1313        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
1314        try:
1315            tarinfo = tarfile.TarInfo()
1316
1317            tarinfo.name = "\xe4\xf6\xfc"
1318            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1319
1320            tarinfo.name = "foo"
1321            tarinfo.uname = "\xe4\xf6\xfc"
1322            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1323        finally:
1324            tar.close()
1325
1326    def test_unicode_argument(self):
1327        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
1328        try:
1329            for t in tar:
1330                self.assertTrue(type(t.name) is str)
1331                self.assertTrue(type(t.linkname) is str)
1332                self.assertTrue(type(t.uname) is str)
1333                self.assertTrue(type(t.gname) is str)
1334        finally:
1335            tar.close()
1336
1337    def test_uname_unicode(self):
1338        t = tarfile.TarInfo("foo")
1339        t.uname = "\xe4\xf6\xfc"
1340        t.gname = "\xe4\xf6\xfc"
1341
1342        tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1")
1343        try:
1344            tar.addfile(t)
1345        finally:
1346            tar.close()
1347
1348        tar = tarfile.open(tmpname, encoding="iso8859-1")
1349        try:
1350            t = tar.getmember("foo")
1351            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1352            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1353
1354            if self.format != tarfile.PAX_FORMAT:
1355                tar.close()
1356                tar = tarfile.open(tmpname, encoding="ascii")
1357                t = tar.getmember("foo")
1358                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1359                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1360        finally:
1361            tar.close()
1362
1363
1364class GNUUnicodeTest(UstarUnicodeTest):
1365
1366    format = tarfile.GNU_FORMAT
1367
1368    def test_bad_pax_header(self):
1369        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1370        # without a hdrcharset=BINARY header.
1371        for encoding, name in (("utf8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1372                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1373            with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar:
1374                try:
1375                    t = tar.getmember(name)
1376                except KeyError:
1377                    self.fail("unable to read bad GNU tar pax header")
1378
1379
1380class PAXUnicodeTest(UstarUnicodeTest):
1381
1382    format = tarfile.PAX_FORMAT
1383
1384    def test_binary_header(self):
1385        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1386        for encoding, name in (("utf8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1387                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1388            with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar:
1389                try:
1390                    t = tar.getmember(name)
1391                except KeyError:
1392                    self.fail("unable to read POSIX.1-2008 binary header")
1393
1394
1395class AppendTest(unittest.TestCase):
1396    # Test append mode (cp. patch #1652681).
1397
1398    def setUp(self):
1399        self.tarname = tmpname
1400        if os.path.exists(self.tarname):
1401            os.remove(self.tarname)
1402
1403    def _add_testfile(self, fileobj=None):
1404        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1405            tar.addfile(tarfile.TarInfo("bar"))
1406
1407    def _create_testtar(self, mode="w:"):
1408        with tarfile.open(tarname, encoding="iso8859-1") as src:
1409            t = src.getmember("ustar/regtype")
1410            t.name = "foo"
1411            f = src.extractfile(t)
1412            try:
1413                with tarfile.open(self.tarname, mode) as tar:
1414                    tar.addfile(t, f)
1415            finally:
1416                f.close()
1417
1418    def _test(self, names=["bar"], fileobj=None):
1419        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1420            self.assertEqual(tar.getnames(), names)
1421
1422    def test_non_existing(self):
1423        self._add_testfile()
1424        self._test()
1425
1426    def test_empty(self):
1427        tarfile.open(self.tarname, "w:").close()
1428        self._add_testfile()
1429        self._test()
1430
1431    def test_empty_fileobj(self):
1432        fobj = io.BytesIO(b"\0" * 1024)
1433        self._add_testfile(fobj)
1434        fobj.seek(0)
1435        self._test(fileobj=fobj)
1436
1437    def test_fileobj(self):
1438        self._create_testtar()
1439        with open(self.tarname, "rb") as fobj:
1440            data = fobj.read()
1441        fobj = io.BytesIO(data)
1442        self._add_testfile(fobj)
1443        fobj.seek(0)
1444        self._test(names=["foo", "bar"], fileobj=fobj)
1445
1446    def test_existing(self):
1447        self._create_testtar()
1448        self._add_testfile()
1449        self._test(names=["foo", "bar"])
1450
1451    def test_append_gz(self):
1452        if gzip is None:
1453            return
1454        self._create_testtar("w:gz")
1455        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1456
1457    def test_append_bz2(self):
1458        if bz2 is None:
1459            return
1460        self._create_testtar("w:bz2")
1461        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1462
1463    # Append mode is supposed to fail if the tarfile to append to
1464    # does not end with a zero block.
1465    def _test_error(self, data):
1466        with open(self.tarname, "wb") as fobj:
1467            fobj.write(data)
1468        self.assertRaises(tarfile.ReadError, self._add_testfile)
1469
1470    def test_null(self):
1471        self._test_error(b"")
1472
1473    def test_incomplete(self):
1474        self._test_error(b"\0" * 13)
1475
1476    def test_premature_eof(self):
1477        data = tarfile.TarInfo("foo").tobuf()
1478        self._test_error(data)
1479
1480    def test_trailing_garbage(self):
1481        data = tarfile.TarInfo("foo").tobuf()
1482        self._test_error(data + b"\0" * 13)
1483
1484    def test_invalid(self):
1485        self._test_error(b"a" * 512)
1486
1487
1488class LimitsTest(unittest.TestCase):
1489
1490    def test_ustar_limits(self):
1491        # 100 char name
1492        tarinfo = tarfile.TarInfo("0123456789" * 10)
1493        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1494
1495        # 101 char name that cannot be stored
1496        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1497        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1498
1499        # 256 char name with a slash at pos 156
1500        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1501        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1502
1503        # 256 char name that cannot be stored
1504        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1505        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1506
1507        # 512 char name
1508        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1509        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1510
1511        # 512 char linkname
1512        tarinfo = tarfile.TarInfo("longlink")
1513        tarinfo.linkname = "123/" * 126 + "longname"
1514        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1515
1516        # uid > 8 digits
1517        tarinfo = tarfile.TarInfo("name")
1518        tarinfo.uid = 0o10000000
1519        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1520
1521    def test_gnu_limits(self):
1522        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1523        tarinfo.tobuf(tarfile.GNU_FORMAT)
1524
1525        tarinfo = tarfile.TarInfo("longlink")
1526        tarinfo.linkname = "123/" * 126 + "longname"
1527        tarinfo.tobuf(tarfile.GNU_FORMAT)
1528
1529        # uid >= 256 ** 7
1530        tarinfo = tarfile.TarInfo("name")
1531        tarinfo.uid = 0o4000000000000000000
1532        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1533
1534    def test_pax_limits(self):
1535        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1536        tarinfo.tobuf(tarfile.PAX_FORMAT)
1537
1538        tarinfo = tarfile.TarInfo("longlink")
1539        tarinfo.linkname = "123/" * 126 + "longname"
1540        tarinfo.tobuf(tarfile.PAX_FORMAT)
1541
1542        tarinfo = tarfile.TarInfo("name")
1543        tarinfo.uid = 0o4000000000000000000
1544        tarinfo.tobuf(tarfile.PAX_FORMAT)
1545
1546
1547class MiscTest(unittest.TestCase):
1548
1549    def test_char_fields(self):
1550        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0")
1551        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo")
1552        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo")
1553        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo")
1554
1555    def test_number_fields(self):
1556        self.assertEqual(tarfile.itn(1), b"0000001\x00")
1557        self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff")
1558
1559
1560class ContextManagerTest(unittest.TestCase):
1561
1562    def test_basic(self):
1563        with tarfile.open(tarname) as tar:
1564            self.assertFalse(tar.closed, "closed inside runtime context")
1565        self.assertTrue(tar.closed, "context manager failed")
1566
1567    def test_closed(self):
1568        # The __enter__() method is supposed to raise IOError
1569        # if the TarFile object is already closed.
1570        tar = tarfile.open(tarname)
1571        tar.close()
1572        with self.assertRaises(IOError):
1573            with tar:
1574                pass
1575
1576    def test_exception(self):
1577        # Test if the IOError exception is passed through properly.
1578        with self.assertRaises(Exception) as exc:
1579            with tarfile.open(tarname) as tar:
1580                raise IOError
1581        self.assertIsInstance(exc.exception, IOError,
1582                              "wrong exception raised in context manager")
1583        self.assertTrue(tar.closed, "context manager failed")
1584
1585    def test_no_eof(self):
1586        # __exit__() must not write end-of-archive blocks if an
1587        # exception was raised.
1588        try:
1589            with tarfile.open(tmpname, "w") as tar:
1590                raise Exception
1591        except:
1592            pass
1593        self.assertEqual(os.path.getsize(tmpname), 0,
1594                "context manager wrote an end-of-archive block")
1595        self.assertTrue(tar.closed, "context manager failed")
1596
1597    def test_eof(self):
1598        # __exit__() must write end-of-archive blocks, i.e. call
1599        # TarFile.close() if there was no error.
1600        with tarfile.open(tmpname, "w"):
1601            pass
1602        self.assertNotEqual(os.path.getsize(tmpname), 0,
1603                "context manager wrote no end-of-archive block")
1604
1605    def test_fileobj(self):
1606        # Test that __exit__() did not close the external file
1607        # object.
1608        with open(tmpname, "wb") as fobj:
1609            try:
1610                with tarfile.open(fileobj=fobj, mode="w") as tar:
1611                    raise Exception
1612            except:
1613                pass
1614            self.assertFalse(fobj.closed, "external file object was closed")
1615            self.assertTrue(tar.closed, "context manager failed")
1616
1617
1618class LinkEmulationTest(ReadTest):
1619
1620    # Test for issue #8741 regression. On platforms that do not support
1621    # symbolic or hard links tarfile tries to extract these types of members as
1622    # the regular files they point to.
1623    def _test_link_extraction(self, name):
1624        self.tar.extract(name, TEMPDIR)
1625        data = open(os.path.join(TEMPDIR, name), "rb").read()
1626        self.assertEqual(md5sum(data), md5_regtype)
1627
1628    # When 8879 gets fixed, this will need to change. Currently on Windows
1629    # we have os.path.islink but no os.link, so these tests fail without the
1630    # following skip until link is completed.
1631    @unittest.skipIf(hasattr(os.path, "islink"),
1632                     "Skip emulation - has os.path.islink but not os.link")
1633    def test_hardlink_extraction1(self):
1634        self._test_link_extraction("ustar/lnktype")
1635
1636    @unittest.skipIf(hasattr(os.path, "islink"),
1637                     "Skip emulation - has os.path.islink but not os.link")
1638    def test_hardlink_extraction2(self):
1639        self._test_link_extraction("./ustar/linktest2/lnktype")
1640
1641    @unittest.skipIf(hasattr(os, "symlink"),
1642                     "Skip emulation if symlink exists")
1643    def test_symlink_extraction1(self):
1644        self._test_link_extraction("ustar/symtype")
1645
1646    @unittest.skipIf(hasattr(os, "symlink"),
1647                     "Skip emulation if symlink exists")
1648    def test_symlink_extraction2(self):
1649        self._test_link_extraction("./ustar/linktest2/symtype")
1650
1651
1652class GzipMiscReadTest(MiscReadTest):
1653    tarname = gzipname
1654    mode = "r:gz"
1655class GzipUstarReadTest(UstarReadTest):
1656    tarname = gzipname
1657    mode = "r:gz"
1658class GzipStreamReadTest(StreamReadTest):
1659    tarname = gzipname
1660    mode = "r|gz"
1661class GzipWriteTest(WriteTest):
1662    mode = "w:gz"
1663class GzipStreamWriteTest(StreamWriteTest):
1664    mode = "w|gz"
1665
1666
1667class Bz2MiscReadTest(MiscReadTest):
1668    tarname = bz2name
1669    mode = "r:bz2"
1670class Bz2UstarReadTest(UstarReadTest):
1671    tarname = bz2name
1672    mode = "r:bz2"
1673class Bz2StreamReadTest(StreamReadTest):
1674    tarname = bz2name
1675    mode = "r|bz2"
1676class Bz2WriteTest(WriteTest):
1677    mode = "w:bz2"
1678class Bz2StreamWriteTest(StreamWriteTest):
1679    mode = "w|bz2"
1680
1681class Bz2PartialReadTest(unittest.TestCase):
1682    # Issue5068: The _BZ2Proxy.read() method loops forever
1683    # on an empty or partial bzipped file.
1684
1685    def _test_partial_input(self, mode):
1686        class MyBytesIO(io.BytesIO):
1687            hit_eof = False
1688            def read(self, n):
1689                if self.hit_eof:
1690                    raise AssertionError("infinite loop detected in tarfile.open()")
1691                self.hit_eof = self.tell() == len(self.getvalue())
1692                return super(MyBytesIO, self).read(n)
1693            def seek(self, *args):
1694                self.hit_eof = False
1695                return super(MyBytesIO, self).seek(*args)
1696
1697        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1698        for x in range(len(data) + 1):
1699            try:
1700                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
1701            except tarfile.ReadError:
1702                pass # we have no interest in ReadErrors
1703
1704    def test_partial_input(self):
1705        self._test_partial_input("r")
1706
1707    def test_partial_input_bz2(self):
1708        self._test_partial_input("r:bz2")
1709
1710
1711def test_main():
1712    support.unlink(TEMPDIR)
1713    os.makedirs(TEMPDIR)
1714
1715    tests = [
1716        UstarReadTest,
1717        MiscReadTest,
1718        StreamReadTest,
1719        DetectReadTest,
1720        MemberReadTest,
1721        GNUReadTest,
1722        PaxReadTest,
1723        WriteTest,
1724        StreamWriteTest,
1725        GNUWriteTest,
1726        PaxWriteTest,
1727        UstarUnicodeTest,
1728        GNUUnicodeTest,
1729        PAXUnicodeTest,
1730        AppendTest,
1731        LimitsTest,
1732        MiscTest,
1733        ContextManagerTest,
1734    ]
1735
1736    if hasattr(os, "link"):
1737        tests.append(HardlinkTest)
1738    else:
1739        tests.append(LinkEmulationTest)
1740
1741    with open(tarname, "rb") as fobj:
1742        data = fobj.read()
1743
1744    if gzip:
1745        # Create testtar.tar.gz and add gzip-specific tests.
1746        support.unlink(gzipname)
1747        with gzip.open(gzipname, "wb") as tar:
1748            tar.write(data)
1749
1750        tests += [
1751            GzipMiscReadTest,
1752            GzipUstarReadTest,
1753            GzipStreamReadTest,
1754            GzipWriteTest,
1755            GzipStreamWriteTest,
1756        ]
1757
1758    if bz2:
1759        # Create testtar.tar.bz2 and add bz2-specific tests.
1760        support.unlink(bz2name)
1761        tar = bz2.BZ2File(bz2name, "wb")
1762        try:
1763            tar.write(data)
1764        finally:
1765            tar.close()
1766
1767        tests += [
1768            Bz2MiscReadTest,
1769            Bz2UstarReadTest,
1770            Bz2StreamReadTest,
1771            Bz2WriteTest,
1772            Bz2StreamWriteTest,
1773            Bz2PartialReadTest,
1774        ]
1775
1776    try:
1777        support.run_unittest(*tests)
1778    finally:
1779        if os.path.exists(TEMPDIR):
1780            shutil.rmtree(TEMPDIR)
1781
1782if __name__ == "__main__":
1783    test_main()
1784