test_tarfile.py revision 577473fe687b38c8f01b0c372d6d2563680045b3
1import sys
2import os
3import io
4import shutil
5import tempfile
6import io
7from hashlib import md5
8import errno
9
10import unittest
11import tarfile
12
13from test import support
14
15# Check for our compression modules.
16try:
17    import gzip
18    gzip.GzipFile
19except (ImportError, AttributeError):
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25
26def md5sum(data):
27    return md5(data).hexdigest()
28
29TEMPDIR = os.path.abspath(support.TESTFN)
30tarname = support.findfile("testtar.tar")
31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
33tmpname = os.path.join(TEMPDIR, "tmp.tar")
34
35md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
36md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
37
38
39class ReadTest(unittest.TestCase):
40
41    tarname = tarname
42    mode = "r:"
43
44    def setUp(self):
45        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
46
47    def tearDown(self):
48        self.tar.close()
49
50
51class UstarReadTest(ReadTest):
52
53    def test_fileobj_regular_file(self):
54        tarinfo = self.tar.getmember("ustar/regtype")
55        fobj = self.tar.extractfile(tarinfo)
56        data = fobj.read()
57        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
58                "regular file extraction failed")
59
60    def test_fileobj_readlines(self):
61        self.tar.extract("ustar/regtype", TEMPDIR)
62        tarinfo = self.tar.getmember("ustar/regtype")
63        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "r")
64        fobj2 = io.TextIOWrapper(self.tar.extractfile(tarinfo))
65
66        lines1 = fobj1.readlines()
67        lines2 = fobj2.readlines()
68        self.assertTrue(lines1 == lines2,
69                "fileobj.readlines() failed")
70        self.assertTrue(len(lines2) == 114,
71                "fileobj.readlines() failed")
72        self.assertTrue(lines2[83] == \
73                "I will gladly admit that Python is not the fastest running scripting language.\n",
74                "fileobj.readlines() failed")
75
76    def test_fileobj_iter(self):
77        self.tar.extract("ustar/regtype", TEMPDIR)
78        tarinfo = self.tar.getmember("ustar/regtype")
79        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
80        fobj2 = self.tar.extractfile(tarinfo)
81        lines1 = fobj1.readlines()
82        lines2 = list(io.TextIOWrapper(fobj2))
83        self.assertTrue(lines1 == lines2,
84                     "fileobj.__iter__() failed")
85
86    def test_fileobj_seek(self):
87        self.tar.extract("ustar/regtype", TEMPDIR)
88        fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
89        data = fobj.read()
90        fobj.close()
91
92        tarinfo = self.tar.getmember("ustar/regtype")
93        fobj = self.tar.extractfile(tarinfo)
94
95        text = fobj.read()
96        fobj.seek(0)
97        self.assertEqual(0, fobj.tell(),
98                     "seek() to file's start failed")
99        fobj.seek(2048, 0)
100        self.assertEqual(2048, fobj.tell(),
101                     "seek() to absolute position failed")
102        fobj.seek(-1024, 1)
103        self.assertEqual(1024, fobj.tell(),
104                     "seek() to negative relative position failed")
105        fobj.seek(1024, 1)
106        self.assertEqual(2048, fobj.tell(),
107                     "seek() to positive relative position failed")
108        s = fobj.read(10)
109        self.assertTrue(s == data[2048:2058],
110                     "read() after seek failed")
111        fobj.seek(0, 2)
112        self.assertEqual(tarinfo.size, fobj.tell(),
113                     "seek() to file's end failed")
114        self.assertTrue(fobj.read() == b"",
115                     "read() at file's end did not return empty string")
116        fobj.seek(-tarinfo.size, 2)
117        self.assertEqual(0, fobj.tell(),
118                     "relative seek() to file's end failed")
119        fobj.seek(512)
120        s1 = fobj.readlines()
121        fobj.seek(512)
122        s2 = fobj.readlines()
123        self.assertTrue(s1 == s2,
124                     "readlines() after seek failed")
125        fobj.seek(0)
126        self.assertEqual(len(fobj.readline()), fobj.tell(),
127                     "tell() after readline() failed")
128        fobj.seek(512)
129        self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
130                     "tell() after seek() and readline() failed")
131        fobj.seek(0)
132        line = fobj.readline()
133        self.assertEqual(fobj.read(), data[len(line):],
134                     "read() after readline() failed")
135        fobj.close()
136
137
138class CommonReadTest(ReadTest):
139
140    def test_empty_tarfile(self):
141        # Test for issue6123: Allow opening empty archives.
142        # This test checks if tarfile.open() is able to open an empty tar
143        # archive successfully. Note that an empty tar archive is not the
144        # same as an empty file!
145        tarfile.open(tmpname, self.mode.replace("r", "w")).close()
146        try:
147            tar = tarfile.open(tmpname, self.mode)
148            tar.getnames()
149        except tarfile.ReadError:
150            self.fail("tarfile.open() failed on empty archive")
151        self.assertListEqual(tar.getmembers(), [])
152
153    def test_null_tarfile(self):
154        # Test for issue6123: Allow opening empty archives.
155        # This test guarantees that tarfile.open() does not treat an empty
156        # file as an empty tar archive.
157        open(tmpname, "wb").close()
158        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
159        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
160
161    def test_ignore_zeros(self):
162        # Test TarFile's ignore_zeros option.
163        if self.mode.endswith(":gz"):
164            _open = gzip.GzipFile
165        elif self.mode.endswith(":bz2"):
166            _open = bz2.BZ2File
167        else:
168            _open = open
169
170        for char in (b'\0', b'a'):
171            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
172            # are ignored correctly.
173            fobj = _open(tmpname, "wb")
174            fobj.write(char * 1024)
175            fobj.write(tarfile.TarInfo("foo").tobuf())
176            fobj.close()
177
178            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
179            self.assertListEqual(tar.getnames(), ["foo"],
180                    "ignore_zeros=True should have skipped the %r-blocks" % char)
181            tar.close()
182
183
184class MiscReadTest(CommonReadTest):
185
186    def test_no_name_argument(self):
187        fobj = open(self.tarname, "rb")
188        tar = tarfile.open(fileobj=fobj, mode=self.mode)
189        self.assertEqual(tar.name, os.path.abspath(fobj.name))
190
191    def test_no_name_attribute(self):
192        data = open(self.tarname, "rb").read()
193        fobj = io.BytesIO(data)
194        self.assertRaises(AttributeError, getattr, fobj, "name")
195        tar = tarfile.open(fileobj=fobj, mode=self.mode)
196        self.assertEqual(tar.name, None)
197
198    def test_empty_name_attribute(self):
199        data = open(self.tarname, "rb").read()
200        fobj = io.BytesIO(data)
201        fobj.name = ""
202        tar = tarfile.open(fileobj=fobj, mode=self.mode)
203        self.assertEqual(tar.name, None)
204
205    def test_fileobj_with_offset(self):
206        # Skip the first member and store values from the second member
207        # of the testtar.
208        tar = tarfile.open(self.tarname, mode=self.mode)
209        tar.next()
210        t = tar.next()
211        name = t.name
212        offset = t.offset
213        data = tar.extractfile(t).read()
214        tar.close()
215
216        # Open the testtar and seek to the offset of the second member.
217        if self.mode.endswith(":gz"):
218            _open = gzip.GzipFile
219        elif self.mode.endswith(":bz2"):
220            _open = bz2.BZ2File
221        else:
222            _open = open
223        fobj = _open(self.tarname, "rb")
224        fobj.seek(offset)
225
226        # Test if the tarfile starts with the second member.
227        tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
228        t = tar.next()
229        self.assertEqual(t.name, name)
230        # Read to the end of fileobj and test if seeking back to the
231        # beginning works.
232        tar.getmembers()
233        self.assertEqual(tar.extractfile(t).read(), data,
234                "seek back did not work")
235        tar.close()
236
237    def test_fail_comp(self):
238        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
239        if self.mode == "r:":
240            return
241        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
242        fobj = open(tarname, "rb")
243        self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
244
245    def test_v7_dirtype(self):
246        # Test old style dirtype member (bug #1336623):
247        # Old V7 tars create directory members using an AREGTYPE
248        # header with a "/" appended to the filename field.
249        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
250        self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
251                "v7 dirtype failed")
252
253    def test_xstar_type(self):
254        # The xstar format stores extra atime and ctime fields inside the
255        # space reserved for the prefix field. The prefix field must be
256        # ignored in this case, otherwise it will mess up the name.
257        try:
258            self.tar.getmember("misc/regtype-xstar")
259        except KeyError:
260            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
261
262    def test_check_members(self):
263        for tarinfo in self.tar:
264            self.assertTrue(int(tarinfo.mtime) == 0o7606136617,
265                    "wrong mtime for %s" % tarinfo.name)
266            if not tarinfo.name.startswith("ustar/"):
267                continue
268            self.assertTrue(tarinfo.uname == "tarfile",
269                    "wrong uname for %s" % tarinfo.name)
270
271    def test_find_members(self):
272        self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
273                "could not find all members")
274
275    def test_extract_hardlink(self):
276        # Test hardlink extraction (e.g. bug #857297).
277        tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
278
279        tar.extract("ustar/regtype", TEMPDIR)
280        try:
281            tar.extract("ustar/lnktype", TEMPDIR)
282        except EnvironmentError as e:
283            if e.errno == errno.ENOENT:
284                self.fail("hardlink not extracted properly")
285
286        data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read()
287        self.assertEqual(md5sum(data), md5_regtype)
288
289        try:
290            tar.extract("ustar/symtype", TEMPDIR)
291        except EnvironmentError as e:
292            if e.errno == errno.ENOENT:
293                self.fail("symlink not extracted properly")
294
295        data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read()
296        self.assertEqual(md5sum(data), md5_regtype)
297
298    def test_extractall(self):
299        # Test if extractall() correctly restores directory permissions
300        # and times (see issue1735).
301        tar = tarfile.open(tarname, encoding="iso8859-1")
302        directories = [t for t in tar if t.isdir()]
303        tar.extractall(TEMPDIR, directories)
304        for tarinfo in directories:
305            path = os.path.join(TEMPDIR, tarinfo.name)
306            if sys.platform != "win32":
307                # Win32 has no support for fine grained permissions.
308                self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777)
309            self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
310        tar.close()
311
312    def test_init_close_fobj(self):
313        # Issue #7341: Close the internal file object in the TarFile
314        # constructor in case of an error. For the test we rely on
315        # the fact that opening an empty file raises a ReadError.
316        empty = os.path.join(TEMPDIR, "empty")
317        open(empty, "wb").write(b"")
318
319        try:
320            tar = object.__new__(tarfile.TarFile)
321            try:
322                tar.__init__(empty)
323            except tarfile.ReadError:
324                self.assertTrue(tar.fileobj.closed)
325            else:
326                self.fail("ReadError not raised")
327        finally:
328            os.remove(empty)
329
330
331class StreamReadTest(CommonReadTest):
332
333    mode="r|"
334
335    def test_fileobj_regular_file(self):
336        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
337        fobj = self.tar.extractfile(tarinfo)
338        data = fobj.read()
339        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
340                "regular file extraction failed")
341
342    def test_provoke_stream_error(self):
343        tarinfos = self.tar.getmembers()
344        f = self.tar.extractfile(tarinfos[0]) # read the first member
345        self.assertRaises(tarfile.StreamError, f.read)
346
347    def test_compare_members(self):
348        tar1 = tarfile.open(tarname, encoding="iso8859-1")
349        tar2 = self.tar
350
351        while True:
352            t1 = tar1.next()
353            t2 = tar2.next()
354            if t1 is None:
355                break
356            self.assertTrue(t2 is not None, "stream.next() failed.")
357
358            if t2.islnk() or t2.issym():
359                self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
360                continue
361
362            v1 = tar1.extractfile(t1)
363            v2 = tar2.extractfile(t2)
364            if v1 is None:
365                continue
366            self.assertTrue(v2 is not None, "stream.extractfile() failed")
367            self.assertEqual(v1.read(), v2.read(), "stream extraction failed")
368
369        tar1.close()
370
371
372class DetectReadTest(unittest.TestCase):
373
374    def _testfunc_file(self, name, mode):
375        try:
376            tarfile.open(name, mode)
377        except tarfile.ReadError as e:
378            self.fail()
379
380    def _testfunc_fileobj(self, name, mode):
381        try:
382            tarfile.open(name, mode, fileobj=open(name, "rb"))
383        except tarfile.ReadError as e:
384            self.fail()
385
386    def _test_modes(self, testfunc):
387        testfunc(tarname, "r")
388        testfunc(tarname, "r:")
389        testfunc(tarname, "r:*")
390        testfunc(tarname, "r|")
391        testfunc(tarname, "r|*")
392
393        if gzip:
394            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
395            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
396            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
397            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
398
399            testfunc(gzipname, "r")
400            testfunc(gzipname, "r:*")
401            testfunc(gzipname, "r:gz")
402            testfunc(gzipname, "r|*")
403            testfunc(gzipname, "r|gz")
404
405        if bz2:
406            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
407            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
408            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
409            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
410
411            testfunc(bz2name, "r")
412            testfunc(bz2name, "r:*")
413            testfunc(bz2name, "r:bz2")
414            testfunc(bz2name, "r|*")
415            testfunc(bz2name, "r|bz2")
416
417    def test_detect_file(self):
418        self._test_modes(self._testfunc_file)
419
420    def test_detect_fileobj(self):
421        self._test_modes(self._testfunc_fileobj)
422
423
424class MemberReadTest(ReadTest):
425
426    def _test_member(self, tarinfo, chksum=None, **kwargs):
427        if chksum is not None:
428            self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
429                    "wrong md5sum for %s" % tarinfo.name)
430
431        kwargs["mtime"] = 0o7606136617
432        kwargs["uid"] = 1000
433        kwargs["gid"] = 100
434        if "old-v7" not in tarinfo.name:
435            # V7 tar can't handle alphabetic owners.
436            kwargs["uname"] = "tarfile"
437            kwargs["gname"] = "tarfile"
438        for k, v in kwargs.items():
439            self.assertTrue(getattr(tarinfo, k) == v,
440                    "wrong value in %s field of %s" % (k, tarinfo.name))
441
442    def test_find_regtype(self):
443        tarinfo = self.tar.getmember("ustar/regtype")
444        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
445
446    def test_find_conttype(self):
447        tarinfo = self.tar.getmember("ustar/conttype")
448        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
449
450    def test_find_dirtype(self):
451        tarinfo = self.tar.getmember("ustar/dirtype")
452        self._test_member(tarinfo, size=0)
453
454    def test_find_dirtype_with_size(self):
455        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
456        self._test_member(tarinfo, size=255)
457
458    def test_find_lnktype(self):
459        tarinfo = self.tar.getmember("ustar/lnktype")
460        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
461
462    def test_find_symtype(self):
463        tarinfo = self.tar.getmember("ustar/symtype")
464        self._test_member(tarinfo, size=0, linkname="regtype")
465
466    def test_find_blktype(self):
467        tarinfo = self.tar.getmember("ustar/blktype")
468        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
469
470    def test_find_chrtype(self):
471        tarinfo = self.tar.getmember("ustar/chrtype")
472        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
473
474    def test_find_fifotype(self):
475        tarinfo = self.tar.getmember("ustar/fifotype")
476        self._test_member(tarinfo, size=0)
477
478    def test_find_sparse(self):
479        tarinfo = self.tar.getmember("ustar/sparse")
480        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
481
482    def test_find_umlauts(self):
483        tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
484        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
485
486    def test_find_ustar_longname(self):
487        name = "ustar/" + "12345/" * 39 + "1234567/longname"
488        self.assertIn(name, self.tar.getnames())
489
490    def test_find_regtype_oldv7(self):
491        tarinfo = self.tar.getmember("misc/regtype-old-v7")
492        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
493
494    def test_find_pax_umlauts(self):
495        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
496        tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
497        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
498
499
500class LongnameTest(ReadTest):
501
502    def test_read_longname(self):
503        # Test reading of longname (bug #1471427).
504        longname = self.subdir + "/" + "123/" * 125 + "longname"
505        try:
506            tarinfo = self.tar.getmember(longname)
507        except KeyError:
508            self.fail("longname not found")
509        self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
510
511    def test_read_longlink(self):
512        longname = self.subdir + "/" + "123/" * 125 + "longname"
513        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
514        try:
515            tarinfo = self.tar.getmember(longlink)
516        except KeyError:
517            self.fail("longlink not found")
518        self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
519
520    def test_truncated_longname(self):
521        longname = self.subdir + "/" + "123/" * 125 + "longname"
522        tarinfo = self.tar.getmember(longname)
523        offset = tarinfo.offset
524        self.tar.fileobj.seek(offset)
525        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
526        self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
527
528    def test_header_offset(self):
529        # Test if the start offset of the TarInfo object includes
530        # the preceding extended header.
531        longname = self.subdir + "/" + "123/" * 125 + "longname"
532        offset = self.tar.getmember(longname).offset
533        fobj = open(tarname, "rb")
534        fobj.seek(offset)
535        tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict")
536        self.assertEqual(tarinfo.type, self.longnametype)
537
538
539class GNUReadTest(LongnameTest):
540
541    subdir = "gnu"
542    longnametype = tarfile.GNUTYPE_LONGNAME
543
544    def test_sparse_file(self):
545        tarinfo1 = self.tar.getmember("ustar/sparse")
546        fobj1 = self.tar.extractfile(tarinfo1)
547        tarinfo2 = self.tar.getmember("gnu/sparse")
548        fobj2 = self.tar.extractfile(tarinfo2)
549        self.assertEqual(fobj1.read(), fobj2.read(),
550                "sparse file extraction failed")
551
552
553class PaxReadTest(LongnameTest):
554
555    subdir = "pax"
556    longnametype = tarfile.XHDTYPE
557
558    def test_pax_global_headers(self):
559        tar = tarfile.open(tarname, encoding="iso8859-1")
560
561        tarinfo = tar.getmember("pax/regtype1")
562        self.assertEqual(tarinfo.uname, "foo")
563        self.assertEqual(tarinfo.gname, "bar")
564        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
565
566        tarinfo = tar.getmember("pax/regtype2")
567        self.assertEqual(tarinfo.uname, "")
568        self.assertEqual(tarinfo.gname, "bar")
569        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
570
571        tarinfo = tar.getmember("pax/regtype3")
572        self.assertEqual(tarinfo.uname, "tarfile")
573        self.assertEqual(tarinfo.gname, "tarfile")
574        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
575
576    def test_pax_number_fields(self):
577        # All following number fields are read from the pax header.
578        tar = tarfile.open(tarname, encoding="iso8859-1")
579        tarinfo = tar.getmember("pax/regtype4")
580        self.assertEqual(tarinfo.size, 7011)
581        self.assertEqual(tarinfo.uid, 123)
582        self.assertEqual(tarinfo.gid, 123)
583        self.assertEqual(tarinfo.mtime, 1041808783.0)
584        self.assertEqual(type(tarinfo.mtime), float)
585        self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
586        self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
587
588
589class WriteTestBase(unittest.TestCase):
590    # Put all write tests in here that are supposed to be tested
591    # in all possible mode combinations.
592
593    def test_fileobj_no_close(self):
594        fobj = io.BytesIO()
595        tar = tarfile.open(fileobj=fobj, mode=self.mode)
596        tar.addfile(tarfile.TarInfo("foo"))
597        tar.close()
598        self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
599
600
601class WriteTest(WriteTestBase):
602
603    mode = "w:"
604
605    def test_100_char_name(self):
606        # The name field in a tar header stores strings of at most 100 chars.
607        # If a string is shorter than 100 chars it has to be padded with '\0',
608        # which implies that a string of exactly 100 chars is stored without
609        # a trailing '\0'.
610        name = "0123456789" * 10
611        tar = tarfile.open(tmpname, self.mode)
612        t = tarfile.TarInfo(name)
613        tar.addfile(t)
614        tar.close()
615
616        tar = tarfile.open(tmpname)
617        self.assertTrue(tar.getnames()[0] == name,
618                "failed to store 100 char filename")
619        tar.close()
620
621    def test_tar_size(self):
622        # Test for bug #1013882.
623        tar = tarfile.open(tmpname, self.mode)
624        path = os.path.join(TEMPDIR, "file")
625        fobj = open(path, "wb")
626        fobj.write(b"aaa")
627        fobj.close()
628        tar.add(path)
629        tar.close()
630        self.assertTrue(os.path.getsize(tmpname) > 0,
631                "tarfile is empty")
632
633    # The test_*_size tests test for bug #1167128.
634    def test_file_size(self):
635        tar = tarfile.open(tmpname, self.mode)
636
637        path = os.path.join(TEMPDIR, "file")
638        fobj = open(path, "wb")
639        fobj.close()
640        tarinfo = tar.gettarinfo(path)
641        self.assertEqual(tarinfo.size, 0)
642
643        fobj = open(path, "wb")
644        fobj.write(b"aaa")
645        fobj.close()
646        tarinfo = tar.gettarinfo(path)
647        self.assertEqual(tarinfo.size, 3)
648
649        tar.close()
650
651    def test_directory_size(self):
652        path = os.path.join(TEMPDIR, "directory")
653        os.mkdir(path)
654        try:
655            tar = tarfile.open(tmpname, self.mode)
656            tarinfo = tar.gettarinfo(path)
657            self.assertEqual(tarinfo.size, 0)
658        finally:
659            os.rmdir(path)
660
661    def test_link_size(self):
662        if hasattr(os, "link"):
663            link = os.path.join(TEMPDIR, "link")
664            target = os.path.join(TEMPDIR, "link_target")
665            open(target, "wb").close()
666            os.link(target, link)
667            try:
668                tar = tarfile.open(tmpname, self.mode)
669                tarinfo = tar.gettarinfo(link)
670                self.assertEqual(tarinfo.size, 0)
671            finally:
672                os.remove(target)
673                os.remove(link)
674
675    def test_symlink_size(self):
676        if hasattr(os, "symlink"):
677            path = os.path.join(TEMPDIR, "symlink")
678            os.symlink("link_target", path)
679            try:
680                tar = tarfile.open(tmpname, self.mode)
681                tarinfo = tar.gettarinfo(path)
682                self.assertEqual(tarinfo.size, 0)
683            finally:
684                os.remove(path)
685
686    def test_add_self(self):
687        # Test for #1257255.
688        dstname = os.path.abspath(tmpname)
689
690        tar = tarfile.open(tmpname, self.mode)
691        self.assertTrue(tar.name == dstname, "archive name must be absolute")
692
693        tar.add(dstname)
694        self.assertTrue(tar.getnames() == [], "added the archive to itself")
695
696        cwd = os.getcwd()
697        os.chdir(TEMPDIR)
698        tar.add(dstname)
699        os.chdir(cwd)
700        self.assertTrue(tar.getnames() == [], "added the archive to itself")
701
702    def test_exclude(self):
703        tempdir = os.path.join(TEMPDIR, "exclude")
704        os.mkdir(tempdir)
705        try:
706            for name in ("foo", "bar", "baz"):
707                name = os.path.join(tempdir, name)
708                open(name, "wb").close()
709
710            def exclude(name):
711                return os.path.isfile(name)
712
713            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
714            tar.add(tempdir, arcname="empty_dir", exclude=exclude)
715            tar.close()
716
717            tar = tarfile.open(tmpname, "r")
718            self.assertEqual(len(tar.getmembers()), 1)
719            self.assertEqual(tar.getnames()[0], "empty_dir")
720        finally:
721            shutil.rmtree(tempdir)
722
723    def test_filter(self):
724        tempdir = os.path.join(TEMPDIR, "filter")
725        os.mkdir(tempdir)
726        try:
727            for name in ("foo", "bar", "baz"):
728                name = os.path.join(tempdir, name)
729                open(name, "wb").close()
730
731            def filter(tarinfo):
732                if os.path.basename(tarinfo.name) == "bar":
733                    return
734                tarinfo.uid = 123
735                tarinfo.uname = "foo"
736                return tarinfo
737
738            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
739            tar.add(tempdir, arcname="empty_dir", filter=filter)
740            tar.close()
741
742            tar = tarfile.open(tmpname, "r")
743            for tarinfo in tar:
744                self.assertEqual(tarinfo.uid, 123)
745                self.assertEqual(tarinfo.uname, "foo")
746            self.assertEqual(len(tar.getmembers()), 3)
747            tar.close()
748        finally:
749            shutil.rmtree(tempdir)
750
751    # Guarantee that stored pathnames are not modified. Don't
752    # remove ./ or ../ or double slashes. Still make absolute
753    # pathnames relative.
754    # For details see bug #6054.
755    def _test_pathname(self, path, cmp_path=None, dir=False):
756        # Create a tarfile with an empty member named path
757        # and compare the stored name with the original.
758        foo = os.path.join(TEMPDIR, "foo")
759        if not dir:
760            open(foo, "w").close()
761        else:
762            os.mkdir(foo)
763
764        tar = tarfile.open(tmpname, self.mode)
765        tar.add(foo, arcname=path)
766        tar.close()
767
768        tar = tarfile.open(tmpname, "r")
769        t = tar.next()
770        tar.close()
771
772        if not dir:
773            os.remove(foo)
774        else:
775            os.rmdir(foo)
776
777        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
778
779    def test_pathnames(self):
780        self._test_pathname("foo")
781        self._test_pathname(os.path.join("foo", ".", "bar"))
782        self._test_pathname(os.path.join("foo", "..", "bar"))
783        self._test_pathname(os.path.join(".", "foo"))
784        self._test_pathname(os.path.join(".", "foo", "."))
785        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
786        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
787        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
788        self._test_pathname(os.path.join("..", "foo"))
789        self._test_pathname(os.path.join("..", "foo", ".."))
790        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
791        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
792
793        self._test_pathname("foo" + os.sep + os.sep + "bar")
794        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
795
796    def test_abs_pathnames(self):
797        if sys.platform == "win32":
798            self._test_pathname("C:\\foo", "foo")
799        else:
800            self._test_pathname("/foo", "foo")
801            self._test_pathname("///foo", "foo")
802
803    def test_cwd(self):
804        # Test adding the current working directory.
805        cwd = os.getcwd()
806        os.chdir(TEMPDIR)
807        try:
808            open("foo", "w").close()
809
810            tar = tarfile.open(tmpname, self.mode)
811            tar.add(".")
812            tar.close()
813
814            tar = tarfile.open(tmpname, "r")
815            for t in tar:
816                self.assert_(t.name == "." or t.name.startswith("./"))
817            tar.close()
818        finally:
819            os.chdir(cwd)
820
821
822class StreamWriteTest(WriteTestBase):
823
824    mode = "w|"
825
826    def test_stream_padding(self):
827        # Test for bug #1543303.
828        tar = tarfile.open(tmpname, self.mode)
829        tar.close()
830
831        if self.mode.endswith("gz"):
832            fobj = gzip.GzipFile(tmpname)
833            data = fobj.read()
834            fobj.close()
835        elif self.mode.endswith("bz2"):
836            dec = bz2.BZ2Decompressor()
837            data = open(tmpname, "rb").read()
838            data = dec.decompress(data)
839            self.assertTrue(len(dec.unused_data) == 0,
840                    "found trailing data")
841        else:
842            fobj = open(tmpname, "rb")
843            data = fobj.read()
844            fobj.close()
845
846        self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE,
847                         "incorrect zero padding")
848
849
850class GNUWriteTest(unittest.TestCase):
851    # This testcase checks for correct creation of GNU Longname
852    # and Longlink extended headers (cp. bug #812325).
853
854    def _length(self, s):
855        blocks, remainder = divmod(len(s) + 1, 512)
856        if remainder:
857            blocks += 1
858        return blocks * 512
859
860    def _calc_size(self, name, link=None):
861        # Initial tar header
862        count = 512
863
864        if len(name) > tarfile.LENGTH_NAME:
865            # GNU longname extended header + longname
866            count += 512
867            count += self._length(name)
868        if link is not None and len(link) > tarfile.LENGTH_LINK:
869            # GNU longlink extended header + longlink
870            count += 512
871            count += self._length(link)
872        return count
873
874    def _test(self, name, link=None):
875        tarinfo = tarfile.TarInfo(name)
876        if link:
877            tarinfo.linkname = link
878            tarinfo.type = tarfile.LNKTYPE
879
880        tar = tarfile.open(tmpname, "w")
881        tar.format = tarfile.GNU_FORMAT
882        tar.addfile(tarinfo)
883
884        v1 = self._calc_size(name, link)
885        v2 = tar.offset
886        self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
887
888        tar.close()
889
890        tar = tarfile.open(tmpname)
891        member = tar.next()
892        self.assertFalse(member is None, "unable to read longname member")
893        self.assertTrue(tarinfo.name == member.name and \
894                     tarinfo.linkname == member.linkname, \
895                     "unable to read longname member")
896
897    def test_longname_1023(self):
898        self._test(("longnam/" * 127) + "longnam")
899
900    def test_longname_1024(self):
901        self._test(("longnam/" * 127) + "longname")
902
903    def test_longname_1025(self):
904        self._test(("longnam/" * 127) + "longname_")
905
906    def test_longlink_1023(self):
907        self._test("name", ("longlnk/" * 127) + "longlnk")
908
909    def test_longlink_1024(self):
910        self._test("name", ("longlnk/" * 127) + "longlink")
911
912    def test_longlink_1025(self):
913        self._test("name", ("longlnk/" * 127) + "longlink_")
914
915    def test_longnamelink_1023(self):
916        self._test(("longnam/" * 127) + "longnam",
917                   ("longlnk/" * 127) + "longlnk")
918
919    def test_longnamelink_1024(self):
920        self._test(("longnam/" * 127) + "longname",
921                   ("longlnk/" * 127) + "longlink")
922
923    def test_longnamelink_1025(self):
924        self._test(("longnam/" * 127) + "longname_",
925                   ("longlnk/" * 127) + "longlink_")
926
927
928class HardlinkTest(unittest.TestCase):
929    # Test the creation of LNKTYPE (hardlink) members in an archive.
930
931    def setUp(self):
932        self.foo = os.path.join(TEMPDIR, "foo")
933        self.bar = os.path.join(TEMPDIR, "bar")
934
935        fobj = open(self.foo, "wb")
936        fobj.write(b"foo")
937        fobj.close()
938
939        os.link(self.foo, self.bar)
940
941        self.tar = tarfile.open(tmpname, "w")
942        self.tar.add(self.foo)
943
944    def tearDown(self):
945        self.tar.close()
946        os.remove(self.foo)
947        os.remove(self.bar)
948
949    def test_add_twice(self):
950        # The same name will be added as a REGTYPE every
951        # time regardless of st_nlink.
952        tarinfo = self.tar.gettarinfo(self.foo)
953        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
954                "add file as regular failed")
955
956    def test_add_hardlink(self):
957        tarinfo = self.tar.gettarinfo(self.bar)
958        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
959                "add file as hardlink failed")
960
961    def test_dereference_hardlink(self):
962        self.tar.dereference = True
963        tarinfo = self.tar.gettarinfo(self.bar)
964        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
965                "dereferencing hardlink failed")
966
967
968class PaxWriteTest(GNUWriteTest):
969
970    def _test(self, name, link=None):
971        # See GNUWriteTest.
972        tarinfo = tarfile.TarInfo(name)
973        if link:
974            tarinfo.linkname = link
975            tarinfo.type = tarfile.LNKTYPE
976
977        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
978        tar.addfile(tarinfo)
979        tar.close()
980
981        tar = tarfile.open(tmpname)
982        if link:
983            l = tar.getmembers()[0].linkname
984            self.assertTrue(link == l, "PAX longlink creation failed")
985        else:
986            n = tar.getmembers()[0].name
987            self.assertTrue(name == n, "PAX longname creation failed")
988
989    def test_pax_global_header(self):
990        pax_headers = {
991                "foo": "bar",
992                "uid": "0",
993                "mtime": "1.23",
994                "test": "\xe4\xf6\xfc",
995                "\xe4\xf6\xfc": "test"}
996
997        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, \
998                pax_headers=pax_headers)
999        tar.addfile(tarfile.TarInfo("test"))
1000        tar.close()
1001
1002        # Test if the global header was written correctly.
1003        tar = tarfile.open(tmpname, encoding="iso8859-1")
1004        self.assertEqual(tar.pax_headers, pax_headers)
1005        self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1006
1007        # Test if all the fields are strings.
1008        for key, val in tar.pax_headers.items():
1009            self.assertTrue(type(key) is not bytes)
1010            self.assertTrue(type(val) is not bytes)
1011            if key in tarfile.PAX_NUMBER_FIELDS:
1012                try:
1013                    tarfile.PAX_NUMBER_FIELDS[key](val)
1014                except (TypeError, ValueError):
1015                    self.fail("unable to convert pax header field")
1016
1017    def test_pax_extended_header(self):
1018        # The fields from the pax header have priority over the
1019        # TarInfo.
1020        pax_headers = {"path": "foo", "uid": "123"}
1021
1022        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
1023        t = tarfile.TarInfo()
1024        t.name = "\xe4\xf6\xfc" # non-ASCII
1025        t.uid = 8**8 # too large
1026        t.pax_headers = pax_headers
1027        tar.addfile(t)
1028        tar.close()
1029
1030        tar = tarfile.open(tmpname, encoding="iso8859-1")
1031        t = tar.getmembers()[0]
1032        self.assertEqual(t.pax_headers, pax_headers)
1033        self.assertEqual(t.name, "foo")
1034        self.assertEqual(t.uid, 123)
1035
1036
1037class UstarUnicodeTest(unittest.TestCase):
1038
1039    format = tarfile.USTAR_FORMAT
1040
1041    def test_iso8859_1_filename(self):
1042        self._test_unicode_filename("iso8859-1")
1043
1044    def test_utf7_filename(self):
1045        self._test_unicode_filename("utf7")
1046
1047    def test_utf8_filename(self):
1048        self._test_unicode_filename("utf8")
1049
1050    def _test_unicode_filename(self, encoding):
1051        tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
1052        name = "\xe4\xf6\xfc"
1053        tar.addfile(tarfile.TarInfo(name))
1054        tar.close()
1055
1056        tar = tarfile.open(tmpname, encoding=encoding)
1057        self.assertEqual(tar.getmembers()[0].name, name)
1058        tar.close()
1059
1060    def test_unicode_filename_error(self):
1061        if self.format == tarfile.PAX_FORMAT:
1062            # PAX_FORMAT ignores encoding in write mode.
1063            return
1064
1065        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
1066        tarinfo = tarfile.TarInfo()
1067
1068        tarinfo.name = "\xe4\xf6\xfc"
1069        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1070
1071        tarinfo.name = "foo"
1072        tarinfo.uname = "\xe4\xf6\xfc"
1073        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1074
1075    def test_unicode_argument(self):
1076        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
1077        for t in tar:
1078            self.assertTrue(type(t.name) is str)
1079            self.assertTrue(type(t.linkname) is str)
1080            self.assertTrue(type(t.uname) is str)
1081            self.assertTrue(type(t.gname) is str)
1082        tar.close()
1083
1084    def test_uname_unicode(self):
1085        t = tarfile.TarInfo("foo")
1086        t.uname = "\xe4\xf6\xfc"
1087        t.gname = "\xe4\xf6\xfc"
1088
1089        tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1")
1090        tar.addfile(t)
1091        tar.close()
1092
1093        tar = tarfile.open(tmpname, encoding="iso8859-1")
1094        t = tar.getmember("foo")
1095        self.assertEqual(t.uname, "\xe4\xf6\xfc")
1096        self.assertEqual(t.gname, "\xe4\xf6\xfc")
1097
1098        if self.format != tarfile.PAX_FORMAT:
1099            tar = tarfile.open(tmpname, encoding="ascii")
1100            t = tar.getmember("foo")
1101            self.assertEqual(t.uname, "\ufffd\ufffd\ufffd")
1102            self.assertEqual(t.gname, "\ufffd\ufffd\ufffd")
1103
1104
1105class GNUUnicodeTest(UstarUnicodeTest):
1106
1107    format = tarfile.GNU_FORMAT
1108
1109
1110class PAXUnicodeTest(UstarUnicodeTest):
1111
1112    format = tarfile.PAX_FORMAT
1113
1114
1115class AppendTest(unittest.TestCase):
1116    # Test append mode (cp. patch #1652681).
1117
1118    def setUp(self):
1119        self.tarname = tmpname
1120        if os.path.exists(self.tarname):
1121            os.remove(self.tarname)
1122
1123    def _add_testfile(self, fileobj=None):
1124        tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
1125        tar.addfile(tarfile.TarInfo("bar"))
1126        tar.close()
1127
1128    def _create_testtar(self, mode="w:"):
1129        src = tarfile.open(tarname, encoding="iso8859-1")
1130        t = src.getmember("ustar/regtype")
1131        t.name = "foo"
1132        f = src.extractfile(t)
1133        tar = tarfile.open(self.tarname, mode)
1134        tar.addfile(t, f)
1135        tar.close()
1136
1137    def _test(self, names=["bar"], fileobj=None):
1138        tar = tarfile.open(self.tarname, fileobj=fileobj)
1139        self.assertEqual(tar.getnames(), names)
1140
1141    def test_non_existing(self):
1142        self._add_testfile()
1143        self._test()
1144
1145    def test_empty(self):
1146        tarfile.open(self.tarname, "w:").close()
1147        self._add_testfile()
1148        self._test()
1149
1150    def test_empty_fileobj(self):
1151        fobj = io.BytesIO(b"\0" * 1024)
1152        self._add_testfile(fobj)
1153        fobj.seek(0)
1154        self._test(fileobj=fobj)
1155
1156    def test_fileobj(self):
1157        self._create_testtar()
1158        data = open(self.tarname, "rb").read()
1159        fobj = io.BytesIO(data)
1160        self._add_testfile(fobj)
1161        fobj.seek(0)
1162        self._test(names=["foo", "bar"], fileobj=fobj)
1163
1164    def test_existing(self):
1165        self._create_testtar()
1166        self._add_testfile()
1167        self._test(names=["foo", "bar"])
1168
1169    def test_append_gz(self):
1170        if gzip is None:
1171            return
1172        self._create_testtar("w:gz")
1173        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1174
1175    def test_append_bz2(self):
1176        if bz2 is None:
1177            return
1178        self._create_testtar("w:bz2")
1179        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1180
1181    # Append mode is supposed to fail if the tarfile to append to
1182    # does not end with a zero block.
1183    def _test_error(self, data):
1184        open(self.tarname, "wb").write(data)
1185        self.assertRaises(tarfile.ReadError, self._add_testfile)
1186
1187    def test_null(self):
1188        self._test_error(b"")
1189
1190    def test_incomplete(self):
1191        self._test_error(b"\0" * 13)
1192
1193    def test_premature_eof(self):
1194        data = tarfile.TarInfo("foo").tobuf()
1195        self._test_error(data)
1196
1197    def test_trailing_garbage(self):
1198        data = tarfile.TarInfo("foo").tobuf()
1199        self._test_error(data + b"\0" * 13)
1200
1201    def test_invalid(self):
1202        self._test_error(b"a" * 512)
1203
1204
1205class LimitsTest(unittest.TestCase):
1206
1207    def test_ustar_limits(self):
1208        # 100 char name
1209        tarinfo = tarfile.TarInfo("0123456789" * 10)
1210        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1211
1212        # 101 char name that cannot be stored
1213        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1214        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1215
1216        # 256 char name with a slash at pos 156
1217        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1218        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1219
1220        # 256 char name that cannot be stored
1221        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1222        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1223
1224        # 512 char name
1225        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1226        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1227
1228        # 512 char linkname
1229        tarinfo = tarfile.TarInfo("longlink")
1230        tarinfo.linkname = "123/" * 126 + "longname"
1231        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1232
1233        # uid > 8 digits
1234        tarinfo = tarfile.TarInfo("name")
1235        tarinfo.uid = 0o10000000
1236        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1237
1238    def test_gnu_limits(self):
1239        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1240        tarinfo.tobuf(tarfile.GNU_FORMAT)
1241
1242        tarinfo = tarfile.TarInfo("longlink")
1243        tarinfo.linkname = "123/" * 126 + "longname"
1244        tarinfo.tobuf(tarfile.GNU_FORMAT)
1245
1246        # uid >= 256 ** 7
1247        tarinfo = tarfile.TarInfo("name")
1248        tarinfo.uid = 0o4000000000000000000
1249        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1250
1251    def test_pax_limits(self):
1252        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1253        tarinfo.tobuf(tarfile.PAX_FORMAT)
1254
1255        tarinfo = tarfile.TarInfo("longlink")
1256        tarinfo.linkname = "123/" * 126 + "longname"
1257        tarinfo.tobuf(tarfile.PAX_FORMAT)
1258
1259        tarinfo = tarfile.TarInfo("name")
1260        tarinfo.uid = 0o4000000000000000000
1261        tarinfo.tobuf(tarfile.PAX_FORMAT)
1262
1263
1264class MiscTest(unittest.TestCase):
1265
1266    def test_char_fields(self):
1267        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0")
1268        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo")
1269        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo")
1270        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo")
1271
1272    def test_number_fields(self):
1273        self.assertEqual(tarfile.itn(1), b"0000001\x00")
1274        self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff")
1275
1276
1277class GzipMiscReadTest(MiscReadTest):
1278    tarname = gzipname
1279    mode = "r:gz"
1280class GzipUstarReadTest(UstarReadTest):
1281    tarname = gzipname
1282    mode = "r:gz"
1283class GzipStreamReadTest(StreamReadTest):
1284    tarname = gzipname
1285    mode = "r|gz"
1286class GzipWriteTest(WriteTest):
1287    mode = "w:gz"
1288class GzipStreamWriteTest(StreamWriteTest):
1289    mode = "w|gz"
1290
1291
1292class Bz2MiscReadTest(MiscReadTest):
1293    tarname = bz2name
1294    mode = "r:bz2"
1295class Bz2UstarReadTest(UstarReadTest):
1296    tarname = bz2name
1297    mode = "r:bz2"
1298class Bz2StreamReadTest(StreamReadTest):
1299    tarname = bz2name
1300    mode = "r|bz2"
1301class Bz2WriteTest(WriteTest):
1302    mode = "w:bz2"
1303class Bz2StreamWriteTest(StreamWriteTest):
1304    mode = "w|bz2"
1305
1306class Bz2PartialReadTest(unittest.TestCase):
1307    # Issue5068: The _BZ2Proxy.read() method loops forever
1308    # on an empty or partial bzipped file.
1309
1310    def _test_partial_input(self, mode):
1311        class MyBytesIO(io.BytesIO):
1312            hit_eof = False
1313            def read(self, n):
1314                if self.hit_eof:
1315                    raise AssertionError("infinite loop detected in tarfile.open()")
1316                self.hit_eof = self.tell() == len(self.getvalue())
1317                return super(MyBytesIO, self).read(n)
1318            def seek(self, *args):
1319                self.hit_eof = False
1320                return super(MyBytesIO, self).seek(*args)
1321
1322        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1323        for x in range(len(data) + 1):
1324            try:
1325                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
1326            except tarfile.ReadError:
1327                pass # we have no interest in ReadErrors
1328
1329    def test_partial_input(self):
1330        self._test_partial_input("r")
1331
1332    def test_partial_input_bz2(self):
1333        self._test_partial_input("r:bz2")
1334
1335
1336def test_main():
1337    os.makedirs(TEMPDIR)
1338
1339    tests = [
1340        UstarReadTest,
1341        MiscReadTest,
1342        StreamReadTest,
1343        DetectReadTest,
1344        MemberReadTest,
1345        GNUReadTest,
1346        PaxReadTest,
1347        WriteTest,
1348        StreamWriteTest,
1349        GNUWriteTest,
1350        PaxWriteTest,
1351        UstarUnicodeTest,
1352        GNUUnicodeTest,
1353        PAXUnicodeTest,
1354        AppendTest,
1355        LimitsTest,
1356        MiscTest,
1357    ]
1358
1359    if hasattr(os, "link"):
1360        tests.append(HardlinkTest)
1361
1362    fobj = open(tarname, "rb")
1363    data = fobj.read()
1364    fobj.close()
1365
1366    if gzip:
1367        # Create testtar.tar.gz and add gzip-specific tests.
1368        tar = gzip.open(gzipname, "wb")
1369        tar.write(data)
1370        tar.close()
1371
1372        tests += [
1373            GzipMiscReadTest,
1374            GzipUstarReadTest,
1375            GzipStreamReadTest,
1376            GzipWriteTest,
1377            GzipStreamWriteTest,
1378        ]
1379
1380    if bz2:
1381        # Create testtar.tar.bz2 and add bz2-specific tests.
1382        tar = bz2.BZ2File(bz2name, "wb")
1383        tar.write(data)
1384        tar.close()
1385
1386        tests += [
1387            Bz2MiscReadTest,
1388            Bz2UstarReadTest,
1389            Bz2StreamReadTest,
1390            Bz2WriteTest,
1391            Bz2StreamWriteTest,
1392            Bz2PartialReadTest,
1393        ]
1394
1395    try:
1396        support.run_unittest(*tests)
1397    finally:
1398        if os.path.exists(TEMPDIR):
1399            shutil.rmtree(TEMPDIR)
1400
1401if __name__ == "__main__":
1402    test_main()
1403