test_tarfile.py revision ad3e27ae4c2c21d422603eab36c6b48e6e09f732
172d2dab6058467036df73a5f668036a519043e5bChandler Carruthimport sys
272d2dab6058467036df73a5f668036a519043e5bChandler Carruthimport os
3651f13cea278ec967336033dd032faef0e9fc2ecStephen Hinesimport io
4faaec2234f21bf175dd05c723072202cdf39cb2fAnders Carlssonfrom hashlib import md5
5faaec2234f21bf175dd05c723072202cdf39cb2fAnders Carlssonfrom contextlib import contextmanager
6faaec2234f21bf175dd05c723072202cdf39cb2fAnders Carlsson
7faaec2234f21bf175dd05c723072202cdf39cb2fAnders Carlssonimport unittest
8import unittest.mock
9import tarfile
10
11from test import support
12from test.support import script_helper
13
14# Check for our compression modules.
15try:
16    import gzip
17except ImportError:
18    gzip = None
19try:
20    import bz2
21except ImportError:
22    bz2 = None
23try:
24    import lzma
25except ImportError:
26    lzma = None
27
28def md5sum(data):
29    return md5(data).hexdigest()
30
31TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
32tarextdir = TEMPDIR + '-extract-test'
33tarname = support.findfile("testtar.tar")
34gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
35bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
36xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
37tmpname = os.path.join(TEMPDIR, "tmp.tar")
38dotlessname = os.path.join(TEMPDIR, "testtar")
39
40md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
41md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
42
43
44class TarTest:
45    tarname = tarname
46    suffix = ''
47    open = io.FileIO
48    taropen = tarfile.TarFile.taropen
49
50    @property
51    def mode(self):
52        return self.prefix + self.suffix
53
54@support.requires_gzip
55class GzipTest:
56    tarname = gzipname
57    suffix = 'gz'
58    open = gzip.GzipFile if gzip else None
59    taropen = tarfile.TarFile.gzopen
60
61@support.requires_bz2
62class Bz2Test:
63    tarname = bz2name
64    suffix = 'bz2'
65    open = bz2.BZ2File if bz2 else None
66    taropen = tarfile.TarFile.bz2open
67
68@support.requires_lzma
69class LzmaTest:
70    tarname = xzname
71    suffix = 'xz'
72    open = lzma.LZMAFile if lzma else None
73    taropen = tarfile.TarFile.xzopen
74
75
76class ReadTest(TarTest):
77
78    prefix = "r:"
79
80    def setUp(self):
81        self.tar = tarfile.open(self.tarname, mode=self.mode,
82                                encoding="iso8859-1")
83
84    def tearDown(self):
85        self.tar.close()
86
87
88class UstarReadTest(ReadTest, unittest.TestCase):
89
90    def test_fileobj_regular_file(self):
91        tarinfo = self.tar.getmember("ustar/regtype")
92        with self.tar.extractfile(tarinfo) as fobj:
93            data = fobj.read()
94            self.assertEqual(len(data), tarinfo.size,
95                    "regular file extraction failed")
96            self.assertEqual(md5sum(data), md5_regtype,
97                    "regular file extraction failed")
98
99    def test_fileobj_readlines(self):
100        self.tar.extract("ustar/regtype", TEMPDIR)
101        tarinfo = self.tar.getmember("ustar/regtype")
102        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
103            lines1 = fobj1.readlines()
104
105        with self.tar.extractfile(tarinfo) as fobj:
106            fobj2 = io.TextIOWrapper(fobj)
107            lines2 = fobj2.readlines()
108            self.assertEqual(lines1, lines2,
109                    "fileobj.readlines() failed")
110            self.assertEqual(len(lines2), 114,
111                    "fileobj.readlines() failed")
112            self.assertEqual(lines2[83],
113                    "I will gladly admit that Python is not the fastest "
114                    "running scripting language.\n",
115                    "fileobj.readlines() failed")
116
117    def test_fileobj_iter(self):
118        self.tar.extract("ustar/regtype", TEMPDIR)
119        tarinfo = self.tar.getmember("ustar/regtype")
120        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
121            lines1 = fobj1.readlines()
122        with self.tar.extractfile(tarinfo) as fobj2:
123            lines2 = list(io.TextIOWrapper(fobj2))
124            self.assertEqual(lines1, lines2,
125                    "fileobj.__iter__() failed")
126
127    def test_fileobj_seek(self):
128        self.tar.extract("ustar/regtype", TEMPDIR)
129        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
130            data = fobj.read()
131
132        tarinfo = self.tar.getmember("ustar/regtype")
133        fobj = self.tar.extractfile(tarinfo)
134
135        text = fobj.read()
136        fobj.seek(0)
137        self.assertEqual(0, fobj.tell(),
138                     "seek() to file's start failed")
139        fobj.seek(2048, 0)
140        self.assertEqual(2048, fobj.tell(),
141                     "seek() to absolute position failed")
142        fobj.seek(-1024, 1)
143        self.assertEqual(1024, fobj.tell(),
144                     "seek() to negative relative position failed")
145        fobj.seek(1024, 1)
146        self.assertEqual(2048, fobj.tell(),
147                     "seek() to positive relative position failed")
148        s = fobj.read(10)
149        self.assertEqual(s, data[2048:2058],
150                     "read() after seek failed")
151        fobj.seek(0, 2)
152        self.assertEqual(tarinfo.size, fobj.tell(),
153                     "seek() to file's end failed")
154        self.assertEqual(fobj.read(), b"",
155                     "read() at file's end did not return empty string")
156        fobj.seek(-tarinfo.size, 2)
157        self.assertEqual(0, fobj.tell(),
158                     "relative seek() to file's end failed")
159        fobj.seek(512)
160        s1 = fobj.readlines()
161        fobj.seek(512)
162        s2 = fobj.readlines()
163        self.assertEqual(s1, s2,
164                     "readlines() after seek failed")
165        fobj.seek(0)
166        self.assertEqual(len(fobj.readline()), fobj.tell(),
167                     "tell() after readline() failed")
168        fobj.seek(512)
169        self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
170                     "tell() after seek() and readline() failed")
171        fobj.seek(0)
172        line = fobj.readline()
173        self.assertEqual(fobj.read(), data[len(line):],
174                     "read() after readline() failed")
175        fobj.close()
176
177    def test_fileobj_text(self):
178        with self.tar.extractfile("ustar/regtype") as fobj:
179            fobj = io.TextIOWrapper(fobj)
180            data = fobj.read().encode("iso8859-1")
181            self.assertEqual(md5sum(data), md5_regtype)
182            try:
183                fobj.seek(100)
184            except AttributeError:
185                # Issue #13815: seek() complained about a missing
186                # flush() method.
187                self.fail("seeking failed in text mode")
188
189    # Test if symbolic and hard links are resolved by extractfile().  The
190    # test link members each point to a regular member whose data is
191    # supposed to be exported.
192    def _test_fileobj_link(self, lnktype, regtype):
193        with self.tar.extractfile(lnktype) as a, \
194             self.tar.extractfile(regtype) as b:
195            self.assertEqual(a.name, b.name)
196
197    def test_fileobj_link1(self):
198        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
199
200    def test_fileobj_link2(self):
201        self._test_fileobj_link("./ustar/linktest2/lnktype",
202                                "ustar/linktest1/regtype")
203
204    def test_fileobj_symlink1(self):
205        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
206
207    def test_fileobj_symlink2(self):
208        self._test_fileobj_link("./ustar/linktest2/symtype",
209                                "ustar/linktest1/regtype")
210
211    def test_issue14160(self):
212        self._test_fileobj_link("symtype2", "ustar/regtype")
213
214class GzipUstarReadTest(GzipTest, UstarReadTest):
215    pass
216
217class Bz2UstarReadTest(Bz2Test, UstarReadTest):
218    pass
219
220class LzmaUstarReadTest(LzmaTest, UstarReadTest):
221    pass
222
223
224class ListTest(ReadTest, unittest.TestCase):
225
226    # Override setUp to use default encoding (UTF-8)
227    def setUp(self):
228        self.tar = tarfile.open(self.tarname, mode=self.mode)
229
230    def test_list(self):
231        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
232        with support.swap_attr(sys, 'stdout', tio):
233            self.tar.list(verbose=False)
234        out = tio.detach().getvalue()
235        self.assertIn(b'ustar/conttype', out)
236        self.assertIn(b'ustar/regtype', out)
237        self.assertIn(b'ustar/lnktype', out)
238        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
239        self.assertIn(b'./ustar/linktest2/symtype', out)
240        self.assertIn(b'./ustar/linktest2/lnktype', out)
241        # Make sure it puts trailing slash for directory
242        self.assertIn(b'ustar/dirtype/', out)
243        self.assertIn(b'ustar/dirtype-with-size/', out)
244        # Make sure it is able to print unencodable characters
245        def conv(b):
246            s = b.decode(self.tar.encoding, 'surrogateescape')
247            return s.encode('ascii', 'backslashreplace')
248        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
249        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
250                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
251        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
252                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
253        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
254        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
255        # Make sure it prints files separated by one newline without any
256        # 'ls -l'-like accessories if verbose flag is not being used
257        # ...
258        # ustar/conttype
259        # ustar/regtype
260        # ...
261        self.assertRegex(out, br'ustar/conttype ?\r?\n'
262                              br'ustar/regtype ?\r?\n')
263        # Make sure it does not print the source of link without verbose flag
264        self.assertNotIn(b'link to', out)
265        self.assertNotIn(b'->', out)
266
267    def test_list_verbose(self):
268        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
269        with support.swap_attr(sys, 'stdout', tio):
270            self.tar.list(verbose=True)
271        out = tio.detach().getvalue()
272        # Make sure it prints files separated by one newline with 'ls -l'-like
273        # accessories if verbose flag is being used
274        # ...
275        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
276        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
277        # ...
278        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
279                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
280                               br'ustar/\w+type ?\r?\n') * 2)
281        # Make sure it prints the source of link with verbose flag
282        self.assertIn(b'ustar/symtype -> regtype', out)
283        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
284        self.assertIn(b'./ustar/linktest2/lnktype link to '
285                      b'./ustar/linktest1/regtype', out)
286        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
287                      (b'/123' * 125) + b'/longname', out)
288        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
289                      (b'/123' * 125) + b'/longname', out)
290
291    def test_list_members(self):
292        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
293        def members(tar):
294            for tarinfo in tar.getmembers():
295                if 'reg' in tarinfo.name:
296                    yield tarinfo
297        with support.swap_attr(sys, 'stdout', tio):
298            self.tar.list(verbose=False, members=members(self.tar))
299        out = tio.detach().getvalue()
300        self.assertIn(b'ustar/regtype', out)
301        self.assertNotIn(b'ustar/conttype', out)
302
303
304class GzipListTest(GzipTest, ListTest):
305    pass
306
307
308class Bz2ListTest(Bz2Test, ListTest):
309    pass
310
311
312class LzmaListTest(LzmaTest, ListTest):
313    pass
314
315
316class CommonReadTest(ReadTest):
317
318    def test_empty_tarfile(self):
319        # Test for issue6123: Allow opening empty archives.
320        # This test checks if tarfile.open() is able to open an empty tar
321        # archive successfully. Note that an empty tar archive is not the
322        # same as an empty file!
323        with tarfile.open(tmpname, self.mode.replace("r", "w")):
324            pass
325        try:
326            tar = tarfile.open(tmpname, self.mode)
327            tar.getnames()
328        except tarfile.ReadError:
329            self.fail("tarfile.open() failed on empty archive")
330        else:
331            self.assertListEqual(tar.getmembers(), [])
332        finally:
333            tar.close()
334
335    def test_non_existent_tarfile(self):
336        # Test for issue11513: prevent non-existent gzipped tarfiles raising
337        # multiple exceptions.
338        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
339            tarfile.open("xxx", self.mode)
340
341    def test_null_tarfile(self):
342        # Test for issue6123: Allow opening empty archives.
343        # This test guarantees that tarfile.open() does not treat an empty
344        # file as an empty tar archive.
345        with open(tmpname, "wb"):
346            pass
347        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
348        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
349
350    def test_ignore_zeros(self):
351        # Test TarFile's ignore_zeros option.
352        for char in (b'\0', b'a'):
353            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
354            # are ignored correctly.
355            with self.open(tmpname, "w") as fobj:
356                fobj.write(char * 1024)
357                fobj.write(tarfile.TarInfo("foo").tobuf())
358
359            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
360            try:
361                self.assertListEqual(tar.getnames(), ["foo"],
362                    "ignore_zeros=True should have skipped the %r-blocks" %
363                    char)
364            finally:
365                tar.close()
366
367
368class MiscReadTestBase(CommonReadTest):
369    def requires_name_attribute(self):
370        pass
371
372    def test_no_name_argument(self):
373        self.requires_name_attribute()
374        with open(self.tarname, "rb") as fobj:
375            self.assertIsInstance(fobj.name, str)
376            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
377                self.assertIsInstance(tar.name, str)
378                self.assertEqual(tar.name, os.path.abspath(fobj.name))
379
380    def test_no_name_attribute(self):
381        with open(self.tarname, "rb") as fobj:
382            data = fobj.read()
383        fobj = io.BytesIO(data)
384        self.assertRaises(AttributeError, getattr, fobj, "name")
385        tar = tarfile.open(fileobj=fobj, mode=self.mode)
386        self.assertIsNone(tar.name)
387
388    def test_empty_name_attribute(self):
389        with open(self.tarname, "rb") as fobj:
390            data = fobj.read()
391        fobj = io.BytesIO(data)
392        fobj.name = ""
393        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
394            self.assertIsNone(tar.name)
395
396    def test_int_name_attribute(self):
397        # Issue 21044: tarfile.open() should handle fileobj with an integer
398        # 'name' attribute.
399        fd = os.open(self.tarname, os.O_RDONLY)
400        with open(fd, 'rb') as fobj:
401            self.assertIsInstance(fobj.name, int)
402            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
403                self.assertIsNone(tar.name)
404
405    def test_bytes_name_attribute(self):
406        self.requires_name_attribute()
407        tarname = os.fsencode(self.tarname)
408        with open(tarname, 'rb') as fobj:
409            self.assertIsInstance(fobj.name, bytes)
410            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
411                self.assertIsInstance(tar.name, bytes)
412                self.assertEqual(tar.name, os.path.abspath(fobj.name))
413
414    def test_illegal_mode_arg(self):
415        with open(tmpname, 'wb'):
416            pass
417        with self.assertRaisesRegex(ValueError, 'mode must be '):
418            tar = self.taropen(tmpname, 'q')
419        with self.assertRaisesRegex(ValueError, 'mode must be '):
420            tar = self.taropen(tmpname, 'rw')
421        with self.assertRaisesRegex(ValueError, 'mode must be '):
422            tar = self.taropen(tmpname, '')
423
424    def test_fileobj_with_offset(self):
425        # Skip the first member and store values from the second member
426        # of the testtar.
427        tar = tarfile.open(self.tarname, mode=self.mode)
428        try:
429            tar.next()
430            t = tar.next()
431            name = t.name
432            offset = t.offset
433            with tar.extractfile(t) as f:
434                data = f.read()
435        finally:
436            tar.close()
437
438        # Open the testtar and seek to the offset of the second member.
439        with self.open(self.tarname) as fobj:
440            fobj.seek(offset)
441
442            # Test if the tarfile starts with the second member.
443            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
444            t = tar.next()
445            self.assertEqual(t.name, name)
446            # Read to the end of fileobj and test if seeking back to the
447            # beginning works.
448            tar.getmembers()
449            self.assertEqual(tar.extractfile(t).read(), data,
450                    "seek back did not work")
451            tar.close()
452
453    def test_fail_comp(self):
454        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
455        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
456        with open(tarname, "rb") as fobj:
457            self.assertRaises(tarfile.ReadError, tarfile.open,
458                              fileobj=fobj, mode=self.mode)
459
460    def test_v7_dirtype(self):
461        # Test old style dirtype member (bug #1336623):
462        # Old V7 tars create directory members using an AREGTYPE
463        # header with a "/" appended to the filename field.
464        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
465        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
466                "v7 dirtype failed")
467
468    def test_xstar_type(self):
469        # The xstar format stores extra atime and ctime fields inside the
470        # space reserved for the prefix field. The prefix field must be
471        # ignored in this case, otherwise it will mess up the name.
472        try:
473            self.tar.getmember("misc/regtype-xstar")
474        except KeyError:
475            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
476
477    def test_check_members(self):
478        for tarinfo in self.tar:
479            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
480                    "wrong mtime for %s" % tarinfo.name)
481            if not tarinfo.name.startswith("ustar/"):
482                continue
483            self.assertEqual(tarinfo.uname, "tarfile",
484                    "wrong uname for %s" % tarinfo.name)
485
486    def test_find_members(self):
487        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
488                "could not find all members")
489
490    @unittest.skipUnless(hasattr(os, "link"),
491                         "Missing hardlink implementation")
492    @support.skip_unless_symlink
493    def test_extract_hardlink(self):
494        # Test hardlink extraction (e.g. bug #857297).
495        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
496            tar.extract("ustar/regtype", TEMPDIR)
497            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
498
499            tar.extract("ustar/lnktype", TEMPDIR)
500            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
501            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
502                data = f.read()
503            self.assertEqual(md5sum(data), md5_regtype)
504
505            tar.extract("ustar/symtype", TEMPDIR)
506            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
507            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
508                data = f.read()
509            self.assertEqual(md5sum(data), md5_regtype)
510
511    def test_extractall(self):
512        # Test if extractall() correctly restores directory permissions
513        # and times (see issue1735).
514        tar = tarfile.open(tarname, encoding="iso8859-1")
515        DIR = os.path.join(TEMPDIR, "extractall")
516        os.mkdir(DIR)
517        try:
518            directories = [t for t in tar if t.isdir()]
519            tar.extractall(DIR, directories)
520            for tarinfo in directories:
521                path = os.path.join(DIR, tarinfo.name)
522                if sys.platform != "win32":
523                    # Win32 has no support for fine grained permissions.
524                    self.assertEqual(tarinfo.mode & 0o777,
525                                     os.stat(path).st_mode & 0o777)
526                def format_mtime(mtime):
527                    if isinstance(mtime, float):
528                        return "{} ({})".format(mtime, mtime.hex())
529                    else:
530                        return "{!r} (int)".format(mtime)
531                file_mtime = os.path.getmtime(path)
532                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
533                    format_mtime(tarinfo.mtime),
534                    format_mtime(file_mtime),
535                    path)
536                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
537        finally:
538            tar.close()
539            support.rmtree(DIR)
540
541    def test_extract_directory(self):
542        dirtype = "ustar/dirtype"
543        DIR = os.path.join(TEMPDIR, "extractdir")
544        os.mkdir(DIR)
545        try:
546            with tarfile.open(tarname, encoding="iso8859-1") as tar:
547                tarinfo = tar.getmember(dirtype)
548                tar.extract(tarinfo, path=DIR)
549                extracted = os.path.join(DIR, dirtype)
550                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
551                if sys.platform != "win32":
552                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
553        finally:
554            support.rmtree(DIR)
555
556    def test_init_close_fobj(self):
557        # Issue #7341: Close the internal file object in the TarFile
558        # constructor in case of an error. For the test we rely on
559        # the fact that opening an empty file raises a ReadError.
560        empty = os.path.join(TEMPDIR, "empty")
561        with open(empty, "wb") as fobj:
562            fobj.write(b"")
563
564        try:
565            tar = object.__new__(tarfile.TarFile)
566            try:
567                tar.__init__(empty)
568            except tarfile.ReadError:
569                self.assertTrue(tar.fileobj.closed)
570            else:
571                self.fail("ReadError not raised")
572        finally:
573            support.unlink(empty)
574
575    def test_parallel_iteration(self):
576        # Issue #16601: Restarting iteration over tarfile continued
577        # from where it left off.
578        with tarfile.open(self.tarname) as tar:
579            for m1, m2 in zip(tar, tar):
580                self.assertEqual(m1.offset, m2.offset)
581                self.assertEqual(m1.get_info(), m2.get_info())
582
583class MiscReadTest(MiscReadTestBase, unittest.TestCase):
584    test_fail_comp = None
585
586class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
587    pass
588
589class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
590    def requires_name_attribute(self):
591        self.skipTest("BZ2File have no name attribute")
592
593class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
594    def requires_name_attribute(self):
595        self.skipTest("LZMAFile have no name attribute")
596
597
598class StreamReadTest(CommonReadTest, unittest.TestCase):
599
600    prefix="r|"
601
602    def test_read_through(self):
603        # Issue #11224: A poorly designed _FileInFile.read() method
604        # caused seeking errors with stream tar files.
605        for tarinfo in self.tar:
606            if not tarinfo.isreg():
607                continue
608            with self.tar.extractfile(tarinfo) as fobj:
609                while True:
610                    try:
611                        buf = fobj.read(512)
612                    except tarfile.StreamError:
613                        self.fail("simple read-through using "
614                                  "TarFile.extractfile() failed")
615                    if not buf:
616                        break
617
618    def test_fileobj_regular_file(self):
619        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
620        with self.tar.extractfile(tarinfo) as fobj:
621            data = fobj.read()
622        self.assertEqual(len(data), tarinfo.size,
623                "regular file extraction failed")
624        self.assertEqual(md5sum(data), md5_regtype,
625                "regular file extraction failed")
626
627    def test_provoke_stream_error(self):
628        tarinfos = self.tar.getmembers()
629        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
630            self.assertRaises(tarfile.StreamError, f.read)
631
632    def test_compare_members(self):
633        tar1 = tarfile.open(tarname, encoding="iso8859-1")
634        try:
635            tar2 = self.tar
636
637            while True:
638                t1 = tar1.next()
639                t2 = tar2.next()
640                if t1 is None:
641                    break
642                self.assertIsNotNone(t2, "stream.next() failed.")
643
644                if t2.islnk() or t2.issym():
645                    with self.assertRaises(tarfile.StreamError):
646                        tar2.extractfile(t2)
647                    continue
648
649                v1 = tar1.extractfile(t1)
650                v2 = tar2.extractfile(t2)
651                if v1 is None:
652                    continue
653                self.assertIsNotNone(v2, "stream.extractfile() failed")
654                self.assertEqual(v1.read(), v2.read(),
655                        "stream extraction failed")
656        finally:
657            tar1.close()
658
659class GzipStreamReadTest(GzipTest, StreamReadTest):
660    pass
661
662class Bz2StreamReadTest(Bz2Test, StreamReadTest):
663    pass
664
665class LzmaStreamReadTest(LzmaTest, StreamReadTest):
666    pass
667
668
669class DetectReadTest(TarTest, unittest.TestCase):
670    def _testfunc_file(self, name, mode):
671        try:
672            tar = tarfile.open(name, mode)
673        except tarfile.ReadError as e:
674            self.fail()
675        else:
676            tar.close()
677
678    def _testfunc_fileobj(self, name, mode):
679        try:
680            with open(name, "rb") as f:
681                tar = tarfile.open(name, mode, fileobj=f)
682        except tarfile.ReadError as e:
683            self.fail()
684        else:
685            tar.close()
686
687    def _test_modes(self, testfunc):
688        if self.suffix:
689            with self.assertRaises(tarfile.ReadError):
690                tarfile.open(tarname, mode="r:" + self.suffix)
691            with self.assertRaises(tarfile.ReadError):
692                tarfile.open(tarname, mode="r|" + self.suffix)
693            with self.assertRaises(tarfile.ReadError):
694                tarfile.open(self.tarname, mode="r:")
695            with self.assertRaises(tarfile.ReadError):
696                tarfile.open(self.tarname, mode="r|")
697        testfunc(self.tarname, "r")
698        testfunc(self.tarname, "r:" + self.suffix)
699        testfunc(self.tarname, "r:*")
700        testfunc(self.tarname, "r|" + self.suffix)
701        testfunc(self.tarname, "r|*")
702
703    def test_detect_file(self):
704        self._test_modes(self._testfunc_file)
705
706    def test_detect_fileobj(self):
707        self._test_modes(self._testfunc_fileobj)
708
709class GzipDetectReadTest(GzipTest, DetectReadTest):
710    pass
711
712class Bz2DetectReadTest(Bz2Test, DetectReadTest):
713    def test_detect_stream_bz2(self):
714        # Originally, tarfile's stream detection looked for the string
715        # "BZh91" at the start of the file. This is incorrect because
716        # the '9' represents the blocksize (900kB). If the file was
717        # compressed using another blocksize autodetection fails.
718        with open(tarname, "rb") as fobj:
719            data = fobj.read()
720
721        # Compress with blocksize 100kB, the file starts with "BZh11".
722        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
723            fobj.write(data)
724
725        self._testfunc_file(tmpname, "r|*")
726
727class LzmaDetectReadTest(LzmaTest, DetectReadTest):
728    pass
729
730
731class MemberReadTest(ReadTest, unittest.TestCase):
732
733    def _test_member(self, tarinfo, chksum=None, **kwargs):
734        if chksum is not None:
735            with self.tar.extractfile(tarinfo) as f:
736                self.assertEqual(md5sum(f.read()), chksum,
737                        "wrong md5sum for %s" % tarinfo.name)
738
739        kwargs["mtime"] = 0o7606136617
740        kwargs["uid"] = 1000
741        kwargs["gid"] = 100
742        if "old-v7" not in tarinfo.name:
743            # V7 tar can't handle alphabetic owners.
744            kwargs["uname"] = "tarfile"
745            kwargs["gname"] = "tarfile"
746        for k, v in kwargs.items():
747            self.assertEqual(getattr(tarinfo, k), v,
748                    "wrong value in %s field of %s" % (k, tarinfo.name))
749
750    def test_find_regtype(self):
751        tarinfo = self.tar.getmember("ustar/regtype")
752        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
753
754    def test_find_conttype(self):
755        tarinfo = self.tar.getmember("ustar/conttype")
756        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
757
758    def test_find_dirtype(self):
759        tarinfo = self.tar.getmember("ustar/dirtype")
760        self._test_member(tarinfo, size=0)
761
762    def test_find_dirtype_with_size(self):
763        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
764        self._test_member(tarinfo, size=255)
765
766    def test_find_lnktype(self):
767        tarinfo = self.tar.getmember("ustar/lnktype")
768        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
769
770    def test_find_symtype(self):
771        tarinfo = self.tar.getmember("ustar/symtype")
772        self._test_member(tarinfo, size=0, linkname="regtype")
773
774    def test_find_blktype(self):
775        tarinfo = self.tar.getmember("ustar/blktype")
776        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
777
778    def test_find_chrtype(self):
779        tarinfo = self.tar.getmember("ustar/chrtype")
780        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
781
782    def test_find_fifotype(self):
783        tarinfo = self.tar.getmember("ustar/fifotype")
784        self._test_member(tarinfo, size=0)
785
786    def test_find_sparse(self):
787        tarinfo = self.tar.getmember("ustar/sparse")
788        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
789
790    def test_find_gnusparse(self):
791        tarinfo = self.tar.getmember("gnu/sparse")
792        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
793
794    def test_find_gnusparse_00(self):
795        tarinfo = self.tar.getmember("gnu/sparse-0.0")
796        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
797
798    def test_find_gnusparse_01(self):
799        tarinfo = self.tar.getmember("gnu/sparse-0.1")
800        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
801
802    def test_find_gnusparse_10(self):
803        tarinfo = self.tar.getmember("gnu/sparse-1.0")
804        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
805
806    def test_find_umlauts(self):
807        tarinfo = self.tar.getmember("ustar/umlauts-"
808                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
809        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
810
811    def test_find_ustar_longname(self):
812        name = "ustar/" + "12345/" * 39 + "1234567/longname"
813        self.assertIn(name, self.tar.getnames())
814
815    def test_find_regtype_oldv7(self):
816        tarinfo = self.tar.getmember("misc/regtype-old-v7")
817        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
818
819    def test_find_pax_umlauts(self):
820        self.tar.close()
821        self.tar = tarfile.open(self.tarname, mode=self.mode,
822                                encoding="iso8859-1")
823        tarinfo = self.tar.getmember("pax/umlauts-"
824                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
825        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
826
827
828class LongnameTest:
829
830    def test_read_longname(self):
831        # Test reading of longname (bug #1471427).
832        longname = self.subdir + "/" + "123/" * 125 + "longname"
833        try:
834            tarinfo = self.tar.getmember(longname)
835        except KeyError:
836            self.fail("longname not found")
837        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
838                "read longname as dirtype")
839
840    def test_read_longlink(self):
841        longname = self.subdir + "/" + "123/" * 125 + "longname"
842        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
843        try:
844            tarinfo = self.tar.getmember(longlink)
845        except KeyError:
846            self.fail("longlink not found")
847        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
848
849    def test_truncated_longname(self):
850        longname = self.subdir + "/" + "123/" * 125 + "longname"
851        tarinfo = self.tar.getmember(longname)
852        offset = tarinfo.offset
853        self.tar.fileobj.seek(offset)
854        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
855        with self.assertRaises(tarfile.ReadError):
856            tarfile.open(name="foo.tar", fileobj=fobj)
857
858    def test_header_offset(self):
859        # Test if the start offset of the TarInfo object includes
860        # the preceding extended header.
861        longname = self.subdir + "/" + "123/" * 125 + "longname"
862        offset = self.tar.getmember(longname).offset
863        with open(tarname, "rb") as fobj:
864            fobj.seek(offset)
865            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
866                                              "iso8859-1", "strict")
867            self.assertEqual(tarinfo.type, self.longnametype)
868
869
870class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
871
872    subdir = "gnu"
873    longnametype = tarfile.GNUTYPE_LONGNAME
874
875    # Since 3.2 tarfile is supposed to accurately restore sparse members and
876    # produce files with holes. This is what we actually want to test here.
877    # Unfortunately, not all platforms/filesystems support sparse files, and
878    # even on platforms that do it is non-trivial to make reliable assertions
879    # about holes in files. Therefore, we first do one basic test which works
880    # an all platforms, and after that a test that will work only on
881    # platforms/filesystems that prove to support sparse files.
882    def _test_sparse_file(self, name):
883        self.tar.extract(name, TEMPDIR)
884        filename = os.path.join(TEMPDIR, name)
885        with open(filename, "rb") as fobj:
886            data = fobj.read()
887        self.assertEqual(md5sum(data), md5_sparse,
888                "wrong md5sum for %s" % name)
889
890        if self._fs_supports_holes():
891            s = os.stat(filename)
892            self.assertLess(s.st_blocks * 512, s.st_size)
893
894    def test_sparse_file_old(self):
895        self._test_sparse_file("gnu/sparse")
896
897    def test_sparse_file_00(self):
898        self._test_sparse_file("gnu/sparse-0.0")
899
900    def test_sparse_file_01(self):
901        self._test_sparse_file("gnu/sparse-0.1")
902
903    def test_sparse_file_10(self):
904        self._test_sparse_file("gnu/sparse-1.0")
905
906    @staticmethod
907    def _fs_supports_holes():
908        # Return True if the platform knows the st_blocks stat attribute and
909        # uses st_blocks units of 512 bytes, and if the filesystem is able to
910        # store holes in files.
911        if sys.platform.startswith("linux"):
912            # Linux evidentially has 512 byte st_blocks units.
913            name = os.path.join(TEMPDIR, "sparse-test")
914            with open(name, "wb") as fobj:
915                fobj.seek(4096)
916                fobj.truncate()
917            s = os.stat(name)
918            support.unlink(name)
919            return s.st_blocks == 0
920        else:
921            return False
922
923
924class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
925
926    subdir = "pax"
927    longnametype = tarfile.XHDTYPE
928
929    def test_pax_global_headers(self):
930        tar = tarfile.open(tarname, encoding="iso8859-1")
931        try:
932            tarinfo = tar.getmember("pax/regtype1")
933            self.assertEqual(tarinfo.uname, "foo")
934            self.assertEqual(tarinfo.gname, "bar")
935            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
936                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
937
938            tarinfo = tar.getmember("pax/regtype2")
939            self.assertEqual(tarinfo.uname, "")
940            self.assertEqual(tarinfo.gname, "bar")
941            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
942                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
943
944            tarinfo = tar.getmember("pax/regtype3")
945            self.assertEqual(tarinfo.uname, "tarfile")
946            self.assertEqual(tarinfo.gname, "tarfile")
947            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
948                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
949        finally:
950            tar.close()
951
952    def test_pax_number_fields(self):
953        # All following number fields are read from the pax header.
954        tar = tarfile.open(tarname, encoding="iso8859-1")
955        try:
956            tarinfo = tar.getmember("pax/regtype4")
957            self.assertEqual(tarinfo.size, 7011)
958            self.assertEqual(tarinfo.uid, 123)
959            self.assertEqual(tarinfo.gid, 123)
960            self.assertEqual(tarinfo.mtime, 1041808783.0)
961            self.assertEqual(type(tarinfo.mtime), float)
962            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
963            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
964        finally:
965            tar.close()
966
967
968class WriteTestBase(TarTest):
969    # Put all write tests in here that are supposed to be tested
970    # in all possible mode combinations.
971
972    def test_fileobj_no_close(self):
973        fobj = io.BytesIO()
974        tar = tarfile.open(fileobj=fobj, mode=self.mode)
975        tar.addfile(tarfile.TarInfo("foo"))
976        tar.close()
977        self.assertFalse(fobj.closed, "external fileobjs must never closed")
978        # Issue #20238: Incomplete gzip output with mode="w:gz"
979        data = fobj.getvalue()
980        del tar
981        support.gc_collect()
982        self.assertFalse(fobj.closed)
983        self.assertEqual(data, fobj.getvalue())
984
985
986class WriteTest(WriteTestBase, unittest.TestCase):
987
988    prefix = "w:"
989
990    def test_100_char_name(self):
991        # The name field in a tar header stores strings of at most 100 chars.
992        # If a string is shorter than 100 chars it has to be padded with '\0',
993        # which implies that a string of exactly 100 chars is stored without
994        # a trailing '\0'.
995        name = "0123456789" * 10
996        tar = tarfile.open(tmpname, self.mode)
997        try:
998            t = tarfile.TarInfo(name)
999            tar.addfile(t)
1000        finally:
1001            tar.close()
1002
1003        tar = tarfile.open(tmpname)
1004        try:
1005            self.assertEqual(tar.getnames()[0], name,
1006                    "failed to store 100 char filename")
1007        finally:
1008            tar.close()
1009
1010    def test_tar_size(self):
1011        # Test for bug #1013882.
1012        tar = tarfile.open(tmpname, self.mode)
1013        try:
1014            path = os.path.join(TEMPDIR, "file")
1015            with open(path, "wb") as fobj:
1016                fobj.write(b"aaa")
1017            tar.add(path)
1018        finally:
1019            tar.close()
1020        self.assertGreater(os.path.getsize(tmpname), 0,
1021                "tarfile is empty")
1022
1023    # The test_*_size tests test for bug #1167128.
1024    def test_file_size(self):
1025        tar = tarfile.open(tmpname, self.mode)
1026        try:
1027            path = os.path.join(TEMPDIR, "file")
1028            with open(path, "wb"):
1029                pass
1030            tarinfo = tar.gettarinfo(path)
1031            self.assertEqual(tarinfo.size, 0)
1032
1033            with open(path, "wb") as fobj:
1034                fobj.write(b"aaa")
1035            tarinfo = tar.gettarinfo(path)
1036            self.assertEqual(tarinfo.size, 3)
1037        finally:
1038            tar.close()
1039
1040    def test_directory_size(self):
1041        path = os.path.join(TEMPDIR, "directory")
1042        os.mkdir(path)
1043        try:
1044            tar = tarfile.open(tmpname, self.mode)
1045            try:
1046                tarinfo = tar.gettarinfo(path)
1047                self.assertEqual(tarinfo.size, 0)
1048            finally:
1049                tar.close()
1050        finally:
1051            support.rmdir(path)
1052
1053    @unittest.skipUnless(hasattr(os, "link"),
1054                         "Missing hardlink implementation")
1055    def test_link_size(self):
1056        link = os.path.join(TEMPDIR, "link")
1057        target = os.path.join(TEMPDIR, "link_target")
1058        with open(target, "wb") as fobj:
1059            fobj.write(b"aaa")
1060        os.link(target, link)
1061        try:
1062            tar = tarfile.open(tmpname, self.mode)
1063            try:
1064                # Record the link target in the inodes list.
1065                tar.gettarinfo(target)
1066                tarinfo = tar.gettarinfo(link)
1067                self.assertEqual(tarinfo.size, 0)
1068            finally:
1069                tar.close()
1070        finally:
1071            support.unlink(target)
1072            support.unlink(link)
1073
1074    @support.skip_unless_symlink
1075    def test_symlink_size(self):
1076        path = os.path.join(TEMPDIR, "symlink")
1077        os.symlink("link_target", path)
1078        try:
1079            tar = tarfile.open(tmpname, self.mode)
1080            try:
1081                tarinfo = tar.gettarinfo(path)
1082                self.assertEqual(tarinfo.size, 0)
1083            finally:
1084                tar.close()
1085        finally:
1086            support.unlink(path)
1087
1088    def test_add_self(self):
1089        # Test for #1257255.
1090        dstname = os.path.abspath(tmpname)
1091        tar = tarfile.open(tmpname, self.mode)
1092        try:
1093            self.assertEqual(tar.name, dstname,
1094                    "archive name must be absolute")
1095            tar.add(dstname)
1096            self.assertEqual(tar.getnames(), [],
1097                    "added the archive to itself")
1098
1099            cwd = os.getcwd()
1100            os.chdir(TEMPDIR)
1101            tar.add(dstname)
1102            os.chdir(cwd)
1103            self.assertEqual(tar.getnames(), [],
1104                    "added the archive to itself")
1105        finally:
1106            tar.close()
1107
1108    def test_exclude(self):
1109        tempdir = os.path.join(TEMPDIR, "exclude")
1110        os.mkdir(tempdir)
1111        try:
1112            for name in ("foo", "bar", "baz"):
1113                name = os.path.join(tempdir, name)
1114                support.create_empty_file(name)
1115
1116            exclude = os.path.isfile
1117
1118            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1119            try:
1120                with support.check_warnings(("use the filter argument",
1121                                             DeprecationWarning)):
1122                    tar.add(tempdir, arcname="empty_dir", exclude=exclude)
1123            finally:
1124                tar.close()
1125
1126            tar = tarfile.open(tmpname, "r")
1127            try:
1128                self.assertEqual(len(tar.getmembers()), 1)
1129                self.assertEqual(tar.getnames()[0], "empty_dir")
1130            finally:
1131                tar.close()
1132        finally:
1133            support.rmtree(tempdir)
1134
1135    def test_filter(self):
1136        tempdir = os.path.join(TEMPDIR, "filter")
1137        os.mkdir(tempdir)
1138        try:
1139            for name in ("foo", "bar", "baz"):
1140                name = os.path.join(tempdir, name)
1141                support.create_empty_file(name)
1142
1143            def filter(tarinfo):
1144                if os.path.basename(tarinfo.name) == "bar":
1145                    return
1146                tarinfo.uid = 123
1147                tarinfo.uname = "foo"
1148                return tarinfo
1149
1150            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1151            try:
1152                tar.add(tempdir, arcname="empty_dir", filter=filter)
1153            finally:
1154                tar.close()
1155
1156            # Verify that filter is a keyword-only argument
1157            with self.assertRaises(TypeError):
1158                tar.add(tempdir, "empty_dir", True, None, filter)
1159
1160            tar = tarfile.open(tmpname, "r")
1161            try:
1162                for tarinfo in tar:
1163                    self.assertEqual(tarinfo.uid, 123)
1164                    self.assertEqual(tarinfo.uname, "foo")
1165                self.assertEqual(len(tar.getmembers()), 3)
1166            finally:
1167                tar.close()
1168        finally:
1169            support.rmtree(tempdir)
1170
1171    # Guarantee that stored pathnames are not modified. Don't
1172    # remove ./ or ../ or double slashes. Still make absolute
1173    # pathnames relative.
1174    # For details see bug #6054.
1175    def _test_pathname(self, path, cmp_path=None, dir=False):
1176        # Create a tarfile with an empty member named path
1177        # and compare the stored name with the original.
1178        foo = os.path.join(TEMPDIR, "foo")
1179        if not dir:
1180            support.create_empty_file(foo)
1181        else:
1182            os.mkdir(foo)
1183
1184        tar = tarfile.open(tmpname, self.mode)
1185        try:
1186            tar.add(foo, arcname=path)
1187        finally:
1188            tar.close()
1189
1190        tar = tarfile.open(tmpname, "r")
1191        try:
1192            t = tar.next()
1193        finally:
1194            tar.close()
1195
1196        if not dir:
1197            support.unlink(foo)
1198        else:
1199            support.rmdir(foo)
1200
1201        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1202
1203
1204    @support.skip_unless_symlink
1205    def test_extractall_symlinks(self):
1206        # Test if extractall works properly when tarfile contains symlinks
1207        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1208        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1209        os.mkdir(tempdir)
1210        try:
1211            source_file = os.path.join(tempdir,'source')
1212            target_file = os.path.join(tempdir,'symlink')
1213            with open(source_file,'w') as f:
1214                f.write('something\n')
1215            os.symlink(source_file, target_file)
1216            tar = tarfile.open(temparchive,'w')
1217            tar.add(source_file)
1218            tar.add(target_file)
1219            tar.close()
1220            # Let's extract it to the location which contains the symlink
1221            tar = tarfile.open(temparchive,'r')
1222            # this should not raise OSError: [Errno 17] File exists
1223            try:
1224                tar.extractall(path=tempdir)
1225            except OSError:
1226                self.fail("extractall failed with symlinked files")
1227            finally:
1228                tar.close()
1229        finally:
1230            support.unlink(temparchive)
1231            support.rmtree(tempdir)
1232
1233    def test_pathnames(self):
1234        self._test_pathname("foo")
1235        self._test_pathname(os.path.join("foo", ".", "bar"))
1236        self._test_pathname(os.path.join("foo", "..", "bar"))
1237        self._test_pathname(os.path.join(".", "foo"))
1238        self._test_pathname(os.path.join(".", "foo", "."))
1239        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1240        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1241        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1242        self._test_pathname(os.path.join("..", "foo"))
1243        self._test_pathname(os.path.join("..", "foo", ".."))
1244        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1245        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1246
1247        self._test_pathname("foo" + os.sep + os.sep + "bar")
1248        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1249
1250    def test_abs_pathnames(self):
1251        if sys.platform == "win32":
1252            self._test_pathname("C:\\foo", "foo")
1253        else:
1254            self._test_pathname("/foo", "foo")
1255            self._test_pathname("///foo", "foo")
1256
1257    def test_cwd(self):
1258        # Test adding the current working directory.
1259        cwd = os.getcwd()
1260        os.chdir(TEMPDIR)
1261        try:
1262            tar = tarfile.open(tmpname, self.mode)
1263            try:
1264                tar.add(".")
1265            finally:
1266                tar.close()
1267
1268            tar = tarfile.open(tmpname, "r")
1269            try:
1270                for t in tar:
1271                    if t.name != ".":
1272                        self.assertTrue(t.name.startswith("./"), t.name)
1273            finally:
1274                tar.close()
1275        finally:
1276            os.chdir(cwd)
1277
1278    def test_open_nonwritable_fileobj(self):
1279        for exctype in OSError, EOFError, RuntimeError:
1280            class BadFile(io.BytesIO):
1281                first = True
1282                def write(self, data):
1283                    if self.first:
1284                        self.first = False
1285                        raise exctype
1286
1287            f = BadFile()
1288            with self.assertRaises(exctype):
1289                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1290                                   format=tarfile.PAX_FORMAT,
1291                                   pax_headers={'non': 'empty'})
1292            self.assertFalse(f.closed)
1293
1294class GzipWriteTest(GzipTest, WriteTest):
1295    pass
1296
1297class Bz2WriteTest(Bz2Test, WriteTest):
1298    pass
1299
1300class LzmaWriteTest(LzmaTest, WriteTest):
1301    pass
1302
1303
1304class StreamWriteTest(WriteTestBase, unittest.TestCase):
1305
1306    prefix = "w|"
1307    decompressor = None
1308
1309    def test_stream_padding(self):
1310        # Test for bug #1543303.
1311        tar = tarfile.open(tmpname, self.mode)
1312        tar.close()
1313        if self.decompressor:
1314            dec = self.decompressor()
1315            with open(tmpname, "rb") as fobj:
1316                data = fobj.read()
1317            data = dec.decompress(data)
1318            self.assertFalse(dec.unused_data, "found trailing data")
1319        else:
1320            with self.open(tmpname) as fobj:
1321                data = fobj.read()
1322        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1323                        "incorrect zero padding")
1324
1325    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1326                         "Missing umask implementation")
1327    def test_file_mode(self):
1328        # Test for issue #8464: Create files with correct
1329        # permissions.
1330        if os.path.exists(tmpname):
1331            support.unlink(tmpname)
1332
1333        original_umask = os.umask(0o022)
1334        try:
1335            tar = tarfile.open(tmpname, self.mode)
1336            tar.close()
1337            mode = os.stat(tmpname).st_mode & 0o777
1338            self.assertEqual(mode, 0o644, "wrong file permissions")
1339        finally:
1340            os.umask(original_umask)
1341
1342class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1343    pass
1344
1345class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1346    decompressor = bz2.BZ2Decompressor if bz2 else None
1347
1348class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1349    decompressor = lzma.LZMADecompressor if lzma else None
1350
1351
1352class GNUWriteTest(unittest.TestCase):
1353    # This testcase checks for correct creation of GNU Longname
1354    # and Longlink extended headers (cp. bug #812325).
1355
1356    def _length(self, s):
1357        blocks = len(s) // 512 + 1
1358        return blocks * 512
1359
1360    def _calc_size(self, name, link=None):
1361        # Initial tar header
1362        count = 512
1363
1364        if len(name) > tarfile.LENGTH_NAME:
1365            # GNU longname extended header + longname
1366            count += 512
1367            count += self._length(name)
1368        if link is not None and len(link) > tarfile.LENGTH_LINK:
1369            # GNU longlink extended header + longlink
1370            count += 512
1371            count += self._length(link)
1372        return count
1373
1374    def _test(self, name, link=None):
1375        tarinfo = tarfile.TarInfo(name)
1376        if link:
1377            tarinfo.linkname = link
1378            tarinfo.type = tarfile.LNKTYPE
1379
1380        tar = tarfile.open(tmpname, "w")
1381        try:
1382            tar.format = tarfile.GNU_FORMAT
1383            tar.addfile(tarinfo)
1384
1385            v1 = self._calc_size(name, link)
1386            v2 = tar.offset
1387            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1388        finally:
1389            tar.close()
1390
1391        tar = tarfile.open(tmpname)
1392        try:
1393            member = tar.next()
1394            self.assertIsNotNone(member,
1395                    "unable to read longname member")
1396            self.assertEqual(tarinfo.name, member.name,
1397                    "unable to read longname member")
1398            self.assertEqual(tarinfo.linkname, member.linkname,
1399                    "unable to read longname member")
1400        finally:
1401            tar.close()
1402
1403    def test_longname_1023(self):
1404        self._test(("longnam/" * 127) + "longnam")
1405
1406    def test_longname_1024(self):
1407        self._test(("longnam/" * 127) + "longname")
1408
1409    def test_longname_1025(self):
1410        self._test(("longnam/" * 127) + "longname_")
1411
1412    def test_longlink_1023(self):
1413        self._test("name", ("longlnk/" * 127) + "longlnk")
1414
1415    def test_longlink_1024(self):
1416        self._test("name", ("longlnk/" * 127) + "longlink")
1417
1418    def test_longlink_1025(self):
1419        self._test("name", ("longlnk/" * 127) + "longlink_")
1420
1421    def test_longnamelink_1023(self):
1422        self._test(("longnam/" * 127) + "longnam",
1423                   ("longlnk/" * 127) + "longlnk")
1424
1425    def test_longnamelink_1024(self):
1426        self._test(("longnam/" * 127) + "longname",
1427                   ("longlnk/" * 127) + "longlink")
1428
1429    def test_longnamelink_1025(self):
1430        self._test(("longnam/" * 127) + "longname_",
1431                   ("longlnk/" * 127) + "longlink_")
1432
1433
1434class CreateTest(TarTest, unittest.TestCase):
1435
1436    prefix = "x:"
1437
1438    file_path = os.path.join(TEMPDIR, "spameggs42")
1439
1440    def setUp(self):
1441        support.unlink(tmpname)
1442
1443    @classmethod
1444    def setUpClass(cls):
1445        with open(cls.file_path, "wb") as fobj:
1446            fobj.write(b"aaa")
1447
1448    @classmethod
1449    def tearDownClass(cls):
1450        support.unlink(cls.file_path)
1451
1452    def test_create(self):
1453        with tarfile.open(tmpname, self.mode) as tobj:
1454            tobj.add(self.file_path)
1455
1456        with self.taropen(tmpname) as tobj:
1457            names = tobj.getnames()
1458        self.assertEqual(len(names), 1)
1459        self.assertIn('spameggs42', names[0])
1460
1461    def test_create_existing(self):
1462        with tarfile.open(tmpname, self.mode) as tobj:
1463            tobj.add(self.file_path)
1464
1465        with self.assertRaises(FileExistsError):
1466            tobj = tarfile.open(tmpname, self.mode)
1467
1468        with self.taropen(tmpname) as tobj:
1469            names = tobj.getnames()
1470        self.assertEqual(len(names), 1)
1471        self.assertIn('spameggs42', names[0])
1472
1473    def test_create_taropen(self):
1474        with self.taropen(tmpname, "x") as tobj:
1475            tobj.add(self.file_path)
1476
1477        with self.taropen(tmpname) as tobj:
1478            names = tobj.getnames()
1479        self.assertEqual(len(names), 1)
1480        self.assertIn('spameggs42', names[0])
1481
1482    def test_create_existing_taropen(self):
1483        with self.taropen(tmpname, "x") as tobj:
1484            tobj.add(self.file_path)
1485
1486        with self.assertRaises(FileExistsError):
1487            with self.taropen(tmpname, "x"):
1488                pass
1489
1490        with self.taropen(tmpname) as tobj:
1491            names = tobj.getnames()
1492        self.assertEqual(len(names), 1)
1493        self.assertIn("spameggs42", names[0])
1494
1495
1496class GzipCreateTest(GzipTest, CreateTest):
1497    pass
1498
1499
1500class Bz2CreateTest(Bz2Test, CreateTest):
1501    pass
1502
1503
1504class LzmaCreateTest(LzmaTest, CreateTest):
1505    pass
1506
1507
1508class CreateWithXModeTest(CreateTest):
1509
1510    prefix = "x"
1511
1512    test_create_taropen = None
1513    test_create_existing_taropen = None
1514
1515
1516@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1517class HardlinkTest(unittest.TestCase):
1518    # Test the creation of LNKTYPE (hardlink) members in an archive.
1519
1520    def setUp(self):
1521        self.foo = os.path.join(TEMPDIR, "foo")
1522        self.bar = os.path.join(TEMPDIR, "bar")
1523
1524        with open(self.foo, "wb") as fobj:
1525            fobj.write(b"foo")
1526
1527        os.link(self.foo, self.bar)
1528
1529        self.tar = tarfile.open(tmpname, "w")
1530        self.tar.add(self.foo)
1531
1532    def tearDown(self):
1533        self.tar.close()
1534        support.unlink(self.foo)
1535        support.unlink(self.bar)
1536
1537    def test_add_twice(self):
1538        # The same name will be added as a REGTYPE every
1539        # time regardless of st_nlink.
1540        tarinfo = self.tar.gettarinfo(self.foo)
1541        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1542                "add file as regular failed")
1543
1544    def test_add_hardlink(self):
1545        tarinfo = self.tar.gettarinfo(self.bar)
1546        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1547                "add file as hardlink failed")
1548
1549    def test_dereference_hardlink(self):
1550        self.tar.dereference = True
1551        tarinfo = self.tar.gettarinfo(self.bar)
1552        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1553                "dereferencing hardlink failed")
1554
1555
1556class PaxWriteTest(GNUWriteTest):
1557
1558    def _test(self, name, link=None):
1559        # See GNUWriteTest.
1560        tarinfo = tarfile.TarInfo(name)
1561        if link:
1562            tarinfo.linkname = link
1563            tarinfo.type = tarfile.LNKTYPE
1564
1565        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1566        try:
1567            tar.addfile(tarinfo)
1568        finally:
1569            tar.close()
1570
1571        tar = tarfile.open(tmpname)
1572        try:
1573            if link:
1574                l = tar.getmembers()[0].linkname
1575                self.assertEqual(link, l, "PAX longlink creation failed")
1576            else:
1577                n = tar.getmembers()[0].name
1578                self.assertEqual(name, n, "PAX longname creation failed")
1579        finally:
1580            tar.close()
1581
1582    def test_pax_global_header(self):
1583        pax_headers = {
1584                "foo": "bar",
1585                "uid": "0",
1586                "mtime": "1.23",
1587                "test": "\xe4\xf6\xfc",
1588                "\xe4\xf6\xfc": "test"}
1589
1590        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1591                pax_headers=pax_headers)
1592        try:
1593            tar.addfile(tarfile.TarInfo("test"))
1594        finally:
1595            tar.close()
1596
1597        # Test if the global header was written correctly.
1598        tar = tarfile.open(tmpname, encoding="iso8859-1")
1599        try:
1600            self.assertEqual(tar.pax_headers, pax_headers)
1601            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1602            # Test if all the fields are strings.
1603            for key, val in tar.pax_headers.items():
1604                self.assertIsNot(type(key), bytes)
1605                self.assertIsNot(type(val), bytes)
1606                if key in tarfile.PAX_NUMBER_FIELDS:
1607                    try:
1608                        tarfile.PAX_NUMBER_FIELDS[key](val)
1609                    except (TypeError, ValueError):
1610                        self.fail("unable to convert pax header field")
1611        finally:
1612            tar.close()
1613
1614    def test_pax_extended_header(self):
1615        # The fields from the pax header have priority over the
1616        # TarInfo.
1617        pax_headers = {"path": "foo", "uid": "123"}
1618
1619        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1620                           encoding="iso8859-1")
1621        try:
1622            t = tarfile.TarInfo()
1623            t.name = "\xe4\xf6\xfc" # non-ASCII
1624            t.uid = 8**8 # too large
1625            t.pax_headers = pax_headers
1626            tar.addfile(t)
1627        finally:
1628            tar.close()
1629
1630        tar = tarfile.open(tmpname, encoding="iso8859-1")
1631        try:
1632            t = tar.getmembers()[0]
1633            self.assertEqual(t.pax_headers, pax_headers)
1634            self.assertEqual(t.name, "foo")
1635            self.assertEqual(t.uid, 123)
1636        finally:
1637            tar.close()
1638
1639
1640class UstarUnicodeTest(unittest.TestCase):
1641
1642    format = tarfile.USTAR_FORMAT
1643
1644    def test_iso8859_1_filename(self):
1645        self._test_unicode_filename("iso8859-1")
1646
1647    def test_utf7_filename(self):
1648        self._test_unicode_filename("utf7")
1649
1650    def test_utf8_filename(self):
1651        self._test_unicode_filename("utf-8")
1652
1653    def _test_unicode_filename(self, encoding):
1654        tar = tarfile.open(tmpname, "w", format=self.format,
1655                           encoding=encoding, errors="strict")
1656        try:
1657            name = "\xe4\xf6\xfc"
1658            tar.addfile(tarfile.TarInfo(name))
1659        finally:
1660            tar.close()
1661
1662        tar = tarfile.open(tmpname, encoding=encoding)
1663        try:
1664            self.assertEqual(tar.getmembers()[0].name, name)
1665        finally:
1666            tar.close()
1667
1668    def test_unicode_filename_error(self):
1669        tar = tarfile.open(tmpname, "w", format=self.format,
1670                           encoding="ascii", errors="strict")
1671        try:
1672            tarinfo = tarfile.TarInfo()
1673
1674            tarinfo.name = "\xe4\xf6\xfc"
1675            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1676
1677            tarinfo.name = "foo"
1678            tarinfo.uname = "\xe4\xf6\xfc"
1679            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1680        finally:
1681            tar.close()
1682
1683    def test_unicode_argument(self):
1684        tar = tarfile.open(tarname, "r",
1685                           encoding="iso8859-1", errors="strict")
1686        try:
1687            for t in tar:
1688                self.assertIs(type(t.name), str)
1689                self.assertIs(type(t.linkname), str)
1690                self.assertIs(type(t.uname), str)
1691                self.assertIs(type(t.gname), str)
1692        finally:
1693            tar.close()
1694
1695    def test_uname_unicode(self):
1696        t = tarfile.TarInfo("foo")
1697        t.uname = "\xe4\xf6\xfc"
1698        t.gname = "\xe4\xf6\xfc"
1699
1700        tar = tarfile.open(tmpname, mode="w", format=self.format,
1701                           encoding="iso8859-1")
1702        try:
1703            tar.addfile(t)
1704        finally:
1705            tar.close()
1706
1707        tar = tarfile.open(tmpname, encoding="iso8859-1")
1708        try:
1709            t = tar.getmember("foo")
1710            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1711            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1712
1713            if self.format != tarfile.PAX_FORMAT:
1714                tar.close()
1715                tar = tarfile.open(tmpname, encoding="ascii")
1716                t = tar.getmember("foo")
1717                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1718                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1719        finally:
1720            tar.close()
1721
1722
1723class GNUUnicodeTest(UstarUnicodeTest):
1724
1725    format = tarfile.GNU_FORMAT
1726
1727    def test_bad_pax_header(self):
1728        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1729        # without a hdrcharset=BINARY header.
1730        for encoding, name in (
1731                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1732                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1733            with tarfile.open(tarname, encoding=encoding,
1734                              errors="surrogateescape") as tar:
1735                try:
1736                    t = tar.getmember(name)
1737                except KeyError:
1738                    self.fail("unable to read bad GNU tar pax header")
1739
1740
1741class PAXUnicodeTest(UstarUnicodeTest):
1742
1743    format = tarfile.PAX_FORMAT
1744
1745    # PAX_FORMAT ignores encoding in write mode.
1746    test_unicode_filename_error = None
1747
1748    def test_binary_header(self):
1749        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1750        for encoding, name in (
1751                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1752                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1753            with tarfile.open(tarname, encoding=encoding,
1754                              errors="surrogateescape") as tar:
1755                try:
1756                    t = tar.getmember(name)
1757                except KeyError:
1758                    self.fail("unable to read POSIX.1-2008 binary header")
1759
1760
1761class AppendTestBase:
1762    # Test append mode (cp. patch #1652681).
1763
1764    def setUp(self):
1765        self.tarname = tmpname
1766        if os.path.exists(self.tarname):
1767            support.unlink(self.tarname)
1768
1769    def _create_testtar(self, mode="w:"):
1770        with tarfile.open(tarname, encoding="iso8859-1") as src:
1771            t = src.getmember("ustar/regtype")
1772            t.name = "foo"
1773            with src.extractfile(t) as f:
1774                with tarfile.open(self.tarname, mode) as tar:
1775                    tar.addfile(t, f)
1776
1777    def test_append_compressed(self):
1778        self._create_testtar("w:" + self.suffix)
1779        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1780
1781class AppendTest(AppendTestBase, unittest.TestCase):
1782    test_append_compressed = None
1783
1784    def _add_testfile(self, fileobj=None):
1785        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1786            tar.addfile(tarfile.TarInfo("bar"))
1787
1788    def _test(self, names=["bar"], fileobj=None):
1789        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1790            self.assertEqual(tar.getnames(), names)
1791
1792    def test_non_existing(self):
1793        self._add_testfile()
1794        self._test()
1795
1796    def test_empty(self):
1797        tarfile.open(self.tarname, "w:").close()
1798        self._add_testfile()
1799        self._test()
1800
1801    def test_empty_fileobj(self):
1802        fobj = io.BytesIO(b"\0" * 1024)
1803        self._add_testfile(fobj)
1804        fobj.seek(0)
1805        self._test(fileobj=fobj)
1806
1807    def test_fileobj(self):
1808        self._create_testtar()
1809        with open(self.tarname, "rb") as fobj:
1810            data = fobj.read()
1811        fobj = io.BytesIO(data)
1812        self._add_testfile(fobj)
1813        fobj.seek(0)
1814        self._test(names=["foo", "bar"], fileobj=fobj)
1815
1816    def test_existing(self):
1817        self._create_testtar()
1818        self._add_testfile()
1819        self._test(names=["foo", "bar"])
1820
1821    # Append mode is supposed to fail if the tarfile to append to
1822    # does not end with a zero block.
1823    def _test_error(self, data):
1824        with open(self.tarname, "wb") as fobj:
1825            fobj.write(data)
1826        self.assertRaises(tarfile.ReadError, self._add_testfile)
1827
1828    def test_null(self):
1829        self._test_error(b"")
1830
1831    def test_incomplete(self):
1832        self._test_error(b"\0" * 13)
1833
1834    def test_premature_eof(self):
1835        data = tarfile.TarInfo("foo").tobuf()
1836        self._test_error(data)
1837
1838    def test_trailing_garbage(self):
1839        data = tarfile.TarInfo("foo").tobuf()
1840        self._test_error(data + b"\0" * 13)
1841
1842    def test_invalid(self):
1843        self._test_error(b"a" * 512)
1844
1845class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
1846    pass
1847
1848class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
1849    pass
1850
1851class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
1852    pass
1853
1854
1855class LimitsTest(unittest.TestCase):
1856
1857    def test_ustar_limits(self):
1858        # 100 char name
1859        tarinfo = tarfile.TarInfo("0123456789" * 10)
1860        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1861
1862        # 101 char name that cannot be stored
1863        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1864        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1865
1866        # 256 char name with a slash at pos 156
1867        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1868        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1869
1870        # 256 char name that cannot be stored
1871        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1872        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1873
1874        # 512 char name
1875        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1876        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1877
1878        # 512 char linkname
1879        tarinfo = tarfile.TarInfo("longlink")
1880        tarinfo.linkname = "123/" * 126 + "longname"
1881        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1882
1883        # uid > 8 digits
1884        tarinfo = tarfile.TarInfo("name")
1885        tarinfo.uid = 0o10000000
1886        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1887
1888    def test_gnu_limits(self):
1889        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1890        tarinfo.tobuf(tarfile.GNU_FORMAT)
1891
1892        tarinfo = tarfile.TarInfo("longlink")
1893        tarinfo.linkname = "123/" * 126 + "longname"
1894        tarinfo.tobuf(tarfile.GNU_FORMAT)
1895
1896        # uid >= 256 ** 7
1897        tarinfo = tarfile.TarInfo("name")
1898        tarinfo.uid = 0o4000000000000000000
1899        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1900
1901    def test_pax_limits(self):
1902        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1903        tarinfo.tobuf(tarfile.PAX_FORMAT)
1904
1905        tarinfo = tarfile.TarInfo("longlink")
1906        tarinfo.linkname = "123/" * 126 + "longname"
1907        tarinfo.tobuf(tarfile.PAX_FORMAT)
1908
1909        tarinfo = tarfile.TarInfo("name")
1910        tarinfo.uid = 0o4000000000000000000
1911        tarinfo.tobuf(tarfile.PAX_FORMAT)
1912
1913
1914class MiscTest(unittest.TestCase):
1915
1916    def test_char_fields(self):
1917        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
1918                         b"foo\0\0\0\0\0")
1919        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
1920                         b"foo")
1921        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
1922                         "foo")
1923        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
1924                         "foo")
1925
1926    def test_read_number_fields(self):
1927        # Issue 13158: Test if GNU tar specific base-256 number fields
1928        # are decoded correctly.
1929        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
1930        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
1931        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
1932                         0o10000000)
1933        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
1934                         0xffffffff)
1935        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
1936                         -1)
1937        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
1938                         -100)
1939        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
1940                         -0x100000000000000)
1941
1942    def test_write_number_fields(self):
1943        self.assertEqual(tarfile.itn(1), b"0000001\x00")
1944        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
1945        self.assertEqual(tarfile.itn(0o10000000),
1946                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
1947        self.assertEqual(tarfile.itn(0xffffffff),
1948                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
1949        self.assertEqual(tarfile.itn(-1),
1950                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
1951        self.assertEqual(tarfile.itn(-100),
1952                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
1953        self.assertEqual(tarfile.itn(-0x100000000000000),
1954                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
1955
1956    def test_number_field_limits(self):
1957        with self.assertRaises(ValueError):
1958            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
1959        with self.assertRaises(ValueError):
1960            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
1961        with self.assertRaises(ValueError):
1962            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
1963        with self.assertRaises(ValueError):
1964            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
1965
1966
1967class CommandLineTest(unittest.TestCase):
1968
1969    def tarfilecmd(self, *args, **kwargs):
1970        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
1971                                                      **kwargs)
1972        return out.replace(os.linesep.encode(), b'\n')
1973
1974    def tarfilecmd_failure(self, *args):
1975        return script_helper.assert_python_failure('-m', 'tarfile', *args)
1976
1977    def make_simple_tarfile(self, tar_name):
1978        files = [support.findfile('tokenize_tests.txt'),
1979                 support.findfile('tokenize_tests-no-coding-cookie-'
1980                                  'and-utf8-bom-sig-only.txt')]
1981        self.addCleanup(support.unlink, tar_name)
1982        with tarfile.open(tar_name, 'w') as tf:
1983            for tardata in files:
1984                tf.add(tardata, arcname=os.path.basename(tardata))
1985
1986    def test_test_command(self):
1987        for tar_name in testtarnames:
1988            for opt in '-t', '--test':
1989                out = self.tarfilecmd(opt, tar_name)
1990                self.assertEqual(out, b'')
1991
1992    def test_test_command_verbose(self):
1993        for tar_name in testtarnames:
1994            for opt in '-v', '--verbose':
1995                out = self.tarfilecmd(opt, '-t', tar_name)
1996                self.assertIn(b'is a tar archive.\n', out)
1997
1998    def test_test_command_invalid_file(self):
1999        zipname = support.findfile('zipdir.zip')
2000        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2001        self.assertIn(b' is not a tar archive.', err)
2002        self.assertEqual(out, b'')
2003        self.assertEqual(rc, 1)
2004
2005        for tar_name in testtarnames:
2006            with self.subTest(tar_name=tar_name):
2007                with open(tar_name, 'rb') as f:
2008                    data = f.read()
2009                try:
2010                    with open(tmpname, 'wb') as f:
2011                        f.write(data[:511])
2012                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2013                    self.assertEqual(out, b'')
2014                    self.assertEqual(rc, 1)
2015                finally:
2016                    support.unlink(tmpname)
2017
2018    def test_list_command(self):
2019        for tar_name in testtarnames:
2020            with support.captured_stdout() as t:
2021                with tarfile.open(tar_name, 'r') as tf:
2022                    tf.list(verbose=False)
2023            expected = t.getvalue().encode('ascii', 'backslashreplace')
2024            for opt in '-l', '--list':
2025                out = self.tarfilecmd(opt, tar_name,
2026                                      PYTHONIOENCODING='ascii')
2027                self.assertEqual(out, expected)
2028
2029    def test_list_command_verbose(self):
2030        for tar_name in testtarnames:
2031            with support.captured_stdout() as t:
2032                with tarfile.open(tar_name, 'r') as tf:
2033                    tf.list(verbose=True)
2034            expected = t.getvalue().encode('ascii', 'backslashreplace')
2035            for opt in '-v', '--verbose':
2036                out = self.tarfilecmd(opt, '-l', tar_name,
2037                                      PYTHONIOENCODING='ascii')
2038                self.assertEqual(out, expected)
2039
2040    def test_list_command_invalid_file(self):
2041        zipname = support.findfile('zipdir.zip')
2042        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2043        self.assertIn(b' is not a tar archive.', err)
2044        self.assertEqual(out, b'')
2045        self.assertEqual(rc, 1)
2046
2047    def test_create_command(self):
2048        files = [support.findfile('tokenize_tests.txt'),
2049                 support.findfile('tokenize_tests-no-coding-cookie-'
2050                                  'and-utf8-bom-sig-only.txt')]
2051        for opt in '-c', '--create':
2052            try:
2053                out = self.tarfilecmd(opt, tmpname, *files)
2054                self.assertEqual(out, b'')
2055                with tarfile.open(tmpname) as tar:
2056                    tar.getmembers()
2057            finally:
2058                support.unlink(tmpname)
2059
2060    def test_create_command_verbose(self):
2061        files = [support.findfile('tokenize_tests.txt'),
2062                 support.findfile('tokenize_tests-no-coding-cookie-'
2063                                  'and-utf8-bom-sig-only.txt')]
2064        for opt in '-v', '--verbose':
2065            try:
2066                out = self.tarfilecmd(opt, '-c', tmpname, *files)
2067                self.assertIn(b' file created.', out)
2068                with tarfile.open(tmpname) as tar:
2069                    tar.getmembers()
2070            finally:
2071                support.unlink(tmpname)
2072
2073    def test_create_command_dotless_filename(self):
2074        files = [support.findfile('tokenize_tests.txt')]
2075        try:
2076            out = self.tarfilecmd('-c', dotlessname, *files)
2077            self.assertEqual(out, b'')
2078            with tarfile.open(dotlessname) as tar:
2079                tar.getmembers()
2080        finally:
2081            support.unlink(dotlessname)
2082
2083    def test_create_command_dot_started_filename(self):
2084        tar_name = os.path.join(TEMPDIR, ".testtar")
2085        files = [support.findfile('tokenize_tests.txt')]
2086        try:
2087            out = self.tarfilecmd('-c', tar_name, *files)
2088            self.assertEqual(out, b'')
2089            with tarfile.open(tar_name) as tar:
2090                tar.getmembers()
2091        finally:
2092            support.unlink(tar_name)
2093
2094    def test_create_command_compressed(self):
2095        files = [support.findfile('tokenize_tests.txt'),
2096                 support.findfile('tokenize_tests-no-coding-cookie-'
2097                                  'and-utf8-bom-sig-only.txt')]
2098        for filetype in (GzipTest, Bz2Test, LzmaTest):
2099            if not filetype.open:
2100                continue
2101            try:
2102                tar_name = tmpname + '.' + filetype.suffix
2103                out = self.tarfilecmd('-c', tar_name, *files)
2104                with filetype.taropen(tar_name) as tar:
2105                    tar.getmembers()
2106            finally:
2107                support.unlink(tar_name)
2108
2109    def test_extract_command(self):
2110        self.make_simple_tarfile(tmpname)
2111        for opt in '-e', '--extract':
2112            try:
2113                with support.temp_cwd(tarextdir):
2114                    out = self.tarfilecmd(opt, tmpname)
2115                self.assertEqual(out, b'')
2116            finally:
2117                support.rmtree(tarextdir)
2118
2119    def test_extract_command_verbose(self):
2120        self.make_simple_tarfile(tmpname)
2121        for opt in '-v', '--verbose':
2122            try:
2123                with support.temp_cwd(tarextdir):
2124                    out = self.tarfilecmd(opt, '-e', tmpname)
2125                self.assertIn(b' file is extracted.', out)
2126            finally:
2127                support.rmtree(tarextdir)
2128
2129    def test_extract_command_different_directory(self):
2130        self.make_simple_tarfile(tmpname)
2131        try:
2132            with support.temp_cwd(tarextdir):
2133                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2134            self.assertEqual(out, b'')
2135        finally:
2136            support.rmtree(tarextdir)
2137
2138    def test_extract_command_invalid_file(self):
2139        zipname = support.findfile('zipdir.zip')
2140        with support.temp_cwd(tarextdir):
2141            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2142        self.assertIn(b' is not a tar archive.', err)
2143        self.assertEqual(out, b'')
2144        self.assertEqual(rc, 1)
2145
2146
2147class ContextManagerTest(unittest.TestCase):
2148
2149    def test_basic(self):
2150        with tarfile.open(tarname) as tar:
2151            self.assertFalse(tar.closed, "closed inside runtime context")
2152        self.assertTrue(tar.closed, "context manager failed")
2153
2154    def test_closed(self):
2155        # The __enter__() method is supposed to raise OSError
2156        # if the TarFile object is already closed.
2157        tar = tarfile.open(tarname)
2158        tar.close()
2159        with self.assertRaises(OSError):
2160            with tar:
2161                pass
2162
2163    def test_exception(self):
2164        # Test if the OSError exception is passed through properly.
2165        with self.assertRaises(Exception) as exc:
2166            with tarfile.open(tarname) as tar:
2167                raise OSError
2168        self.assertIsInstance(exc.exception, OSError,
2169                              "wrong exception raised in context manager")
2170        self.assertTrue(tar.closed, "context manager failed")
2171
2172    def test_no_eof(self):
2173        # __exit__() must not write end-of-archive blocks if an
2174        # exception was raised.
2175        try:
2176            with tarfile.open(tmpname, "w") as tar:
2177                raise Exception
2178        except:
2179            pass
2180        self.assertEqual(os.path.getsize(tmpname), 0,
2181                "context manager wrote an end-of-archive block")
2182        self.assertTrue(tar.closed, "context manager failed")
2183
2184    def test_eof(self):
2185        # __exit__() must write end-of-archive blocks, i.e. call
2186        # TarFile.close() if there was no error.
2187        with tarfile.open(tmpname, "w"):
2188            pass
2189        self.assertNotEqual(os.path.getsize(tmpname), 0,
2190                "context manager wrote no end-of-archive block")
2191
2192    def test_fileobj(self):
2193        # Test that __exit__() did not close the external file
2194        # object.
2195        with open(tmpname, "wb") as fobj:
2196            try:
2197                with tarfile.open(fileobj=fobj, mode="w") as tar:
2198                    raise Exception
2199            except:
2200                pass
2201            self.assertFalse(fobj.closed, "external file object was closed")
2202            self.assertTrue(tar.closed, "context manager failed")
2203
2204
2205@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2206class LinkEmulationTest(ReadTest, unittest.TestCase):
2207
2208    # Test for issue #8741 regression. On platforms that do not support
2209    # symbolic or hard links tarfile tries to extract these types of members
2210    # as the regular files they point to.
2211    def _test_link_extraction(self, name):
2212        self.tar.extract(name, TEMPDIR)
2213        with open(os.path.join(TEMPDIR, name), "rb") as f:
2214            data = f.read()
2215        self.assertEqual(md5sum(data), md5_regtype)
2216
2217    # See issues #1578269, #8879, and #17689 for some history on these skips
2218    @unittest.skipIf(hasattr(os.path, "islink"),
2219                     "Skip emulation - has os.path.islink but not os.link")
2220    def test_hardlink_extraction1(self):
2221        self._test_link_extraction("ustar/lnktype")
2222
2223    @unittest.skipIf(hasattr(os.path, "islink"),
2224                     "Skip emulation - has os.path.islink but not os.link")
2225    def test_hardlink_extraction2(self):
2226        self._test_link_extraction("./ustar/linktest2/lnktype")
2227
2228    @unittest.skipIf(hasattr(os, "symlink"),
2229                     "Skip emulation if symlink exists")
2230    def test_symlink_extraction1(self):
2231        self._test_link_extraction("ustar/symtype")
2232
2233    @unittest.skipIf(hasattr(os, "symlink"),
2234                     "Skip emulation if symlink exists")
2235    def test_symlink_extraction2(self):
2236        self._test_link_extraction("./ustar/linktest2/symtype")
2237
2238
2239class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2240    # Issue5068: The _BZ2Proxy.read() method loops forever
2241    # on an empty or partial bzipped file.
2242
2243    def _test_partial_input(self, mode):
2244        class MyBytesIO(io.BytesIO):
2245            hit_eof = False
2246            def read(self, n):
2247                if self.hit_eof:
2248                    raise AssertionError("infinite loop detected in "
2249                                         "tarfile.open()")
2250                self.hit_eof = self.tell() == len(self.getvalue())
2251                return super(MyBytesIO, self).read(n)
2252            def seek(self, *args):
2253                self.hit_eof = False
2254                return super(MyBytesIO, self).seek(*args)
2255
2256        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2257        for x in range(len(data) + 1):
2258            try:
2259                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2260            except tarfile.ReadError:
2261                pass # we have no interest in ReadErrors
2262
2263    def test_partial_input(self):
2264        self._test_partial_input("r")
2265
2266    def test_partial_input_bz2(self):
2267        self._test_partial_input("r:bz2")
2268
2269
2270def root_is_uid_gid_0():
2271    try:
2272        import pwd, grp
2273    except ImportError:
2274        return False
2275    if pwd.getpwuid(0)[0] != 'root':
2276        return False
2277    if grp.getgrgid(0)[0] != 'root':
2278        return False
2279    return True
2280
2281
2282@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2283@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2284class NumericOwnerTest(unittest.TestCase):
2285    # mock the following:
2286    #  os.chown: so we can test what's being called
2287    #  os.chmod: so the modes are not actually changed. if they are, we can't
2288    #             delete the files/directories
2289    #  os.geteuid: so we can lie and say we're root (uid = 0)
2290
2291    @staticmethod
2292    def _make_test_archive(filename_1, dirname_1, filename_2):
2293        # the file contents to write
2294        fobj = io.BytesIO(b"content")
2295
2296        # create a tar file with a file, a directory, and a file within that
2297        #  directory. Assign various .uid/.gid values to them
2298        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2299                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2300                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2301                 ]
2302        with tarfile.open(tmpname, 'w') as tarfl:
2303            for name, uid, gid, typ, contents in items:
2304                t = tarfile.TarInfo(name)
2305                t.uid = uid
2306                t.gid = gid
2307                t.uname = 'root'
2308                t.gname = 'root'
2309                t.type = typ
2310                tarfl.addfile(t, contents)
2311
2312        # return the full pathname to the tar file
2313        return tmpname
2314
2315    @staticmethod
2316    @contextmanager
2317    def _setup_test(mock_geteuid):
2318        mock_geteuid.return_value = 0  # lie and say we're root
2319        fname = 'numeric-owner-testfile'
2320        dirname = 'dir'
2321
2322        # the names we want stored in the tarfile
2323        filename_1 = fname
2324        dirname_1 = dirname
2325        filename_2 = os.path.join(dirname, fname)
2326
2327        # create the tarfile with the contents we're after
2328        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2329                                                           dirname_1,
2330                                                           filename_2)
2331
2332        # open the tarfile for reading. yield it and the names of the items
2333        #  we stored into the file
2334        with tarfile.open(tar_filename) as tarfl:
2335            yield tarfl, filename_1, dirname_1, filename_2
2336
2337    @unittest.mock.patch('os.chown')
2338    @unittest.mock.patch('os.chmod')
2339    @unittest.mock.patch('os.geteuid')
2340    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2341                                        mock_chown):
2342        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2343                                                filename_2):
2344            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2345            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2346
2347        # convert to filesystem paths
2348        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2349        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2350
2351        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2352                                     unittest.mock.call(f_filename_2, 88, 87),
2353                                     ],
2354                                    any_order=True)
2355
2356    @unittest.mock.patch('os.chown')
2357    @unittest.mock.patch('os.chmod')
2358    @unittest.mock.patch('os.geteuid')
2359    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2360                                           mock_chown):
2361        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2362                                                filename_2):
2363            tarfl.extractall(TEMPDIR, numeric_owner=True)
2364
2365        # convert to filesystem paths
2366        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2367        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2368        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2369
2370        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2371                                     unittest.mock.call(f_dirname_1, 77, 76),
2372                                     unittest.mock.call(f_filename_2, 88, 87),
2373                                     ],
2374                                    any_order=True)
2375
2376    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2377    #  because the uname and gname in the test file are 'root', and extract()
2378    #  will look them up using pwd and grp to find their uid and gid, which we
2379    #  test here to be 0.
2380    @unittest.skipUnless(root_is_uid_gid_0(),
2381                         'uid=0,gid=0 must be named "root"')
2382    @unittest.mock.patch('os.chown')
2383    @unittest.mock.patch('os.chmod')
2384    @unittest.mock.patch('os.geteuid')
2385    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2386                                           mock_chown):
2387        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2388            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2389
2390        # convert to filesystem paths
2391        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2392
2393        mock_chown.assert_called_with(f_filename_1, 0, 0)
2394
2395    @unittest.mock.patch('os.geteuid')
2396    def test_keyword_only(self, mock_geteuid):
2397        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2398            self.assertRaises(TypeError,
2399                              tarfl.extract, filename_1, TEMPDIR, False, True)
2400
2401
2402def setUpModule():
2403    support.unlink(TEMPDIR)
2404    os.makedirs(TEMPDIR)
2405
2406    global testtarnames
2407    testtarnames = [tarname]
2408    with open(tarname, "rb") as fobj:
2409        data = fobj.read()
2410
2411    # Create compressed tarfiles.
2412    for c in GzipTest, Bz2Test, LzmaTest:
2413        if c.open:
2414            support.unlink(c.tarname)
2415            testtarnames.append(c.tarname)
2416            with c.open(c.tarname, "wb") as tar:
2417                tar.write(data)
2418
2419def tearDownModule():
2420    if os.path.exists(TEMPDIR):
2421        support.rmtree(TEMPDIR)
2422
2423if __name__ == "__main__":
2424    unittest.main()
2425