1import sys
2import os
3import io
4from hashlib import md5
5from contextlib import contextmanager
6from random import Random
7
8import unittest
9import unittest.mock
10import tarfile
11
12from test import support
13from test.support import script_helper
14
15# Check for our compression modules.
16try:
17    import gzip
18except ImportError:
19    gzip = None
20try:
21    import bz2
22except ImportError:
23    bz2 = None
24try:
25    import lzma
26except ImportError:
27    lzma = None
28
29def md5sum(data):
30    return md5(data).hexdigest()
31
32TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
33tarextdir = TEMPDIR + '-extract-test'
34tarname = support.findfile("testtar.tar")
35gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
36bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
37xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
38tmpname = os.path.join(TEMPDIR, "tmp.tar")
39dotlessname = os.path.join(TEMPDIR, "testtar")
40
41md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
42md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
43
44
45class TarTest:
46    tarname = tarname
47    suffix = ''
48    open = io.FileIO
49    taropen = tarfile.TarFile.taropen
50
51    @property
52    def mode(self):
53        return self.prefix + self.suffix
54
55@support.requires_gzip
56class GzipTest:
57    tarname = gzipname
58    suffix = 'gz'
59    open = gzip.GzipFile if gzip else None
60    taropen = tarfile.TarFile.gzopen
61
62@support.requires_bz2
63class Bz2Test:
64    tarname = bz2name
65    suffix = 'bz2'
66    open = bz2.BZ2File if bz2 else None
67    taropen = tarfile.TarFile.bz2open
68
69@support.requires_lzma
70class LzmaTest:
71    tarname = xzname
72    suffix = 'xz'
73    open = lzma.LZMAFile if lzma else None
74    taropen = tarfile.TarFile.xzopen
75
76
77class ReadTest(TarTest):
78
79    prefix = "r:"
80
81    def setUp(self):
82        self.tar = tarfile.open(self.tarname, mode=self.mode,
83                                encoding="iso8859-1")
84
85    def tearDown(self):
86        self.tar.close()
87
88
89class UstarReadTest(ReadTest, unittest.TestCase):
90
91    def test_fileobj_regular_file(self):
92        tarinfo = self.tar.getmember("ustar/regtype")
93        with self.tar.extractfile(tarinfo) as fobj:
94            data = fobj.read()
95            self.assertEqual(len(data), tarinfo.size,
96                    "regular file extraction failed")
97            self.assertEqual(md5sum(data), md5_regtype,
98                    "regular file extraction failed")
99
100    def test_fileobj_readlines(self):
101        self.tar.extract("ustar/regtype", TEMPDIR)
102        tarinfo = self.tar.getmember("ustar/regtype")
103        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
104            lines1 = fobj1.readlines()
105
106        with self.tar.extractfile(tarinfo) as fobj:
107            fobj2 = io.TextIOWrapper(fobj)
108            lines2 = fobj2.readlines()
109            self.assertEqual(lines1, lines2,
110                    "fileobj.readlines() failed")
111            self.assertEqual(len(lines2), 114,
112                    "fileobj.readlines() failed")
113            self.assertEqual(lines2[83],
114                    "I will gladly admit that Python is not the fastest "
115                    "running scripting language.\n",
116                    "fileobj.readlines() failed")
117
118    def test_fileobj_iter(self):
119        self.tar.extract("ustar/regtype", TEMPDIR)
120        tarinfo = self.tar.getmember("ustar/regtype")
121        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
122            lines1 = fobj1.readlines()
123        with self.tar.extractfile(tarinfo) as fobj2:
124            lines2 = list(io.TextIOWrapper(fobj2))
125            self.assertEqual(lines1, lines2,
126                    "fileobj.__iter__() failed")
127
128    def test_fileobj_seek(self):
129        self.tar.extract("ustar/regtype", TEMPDIR)
130        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
131            data = fobj.read()
132
133        tarinfo = self.tar.getmember("ustar/regtype")
134        fobj = self.tar.extractfile(tarinfo)
135
136        text = fobj.read()
137        fobj.seek(0)
138        self.assertEqual(0, fobj.tell(),
139                     "seek() to file's start failed")
140        fobj.seek(2048, 0)
141        self.assertEqual(2048, fobj.tell(),
142                     "seek() to absolute position failed")
143        fobj.seek(-1024, 1)
144        self.assertEqual(1024, fobj.tell(),
145                     "seek() to negative relative position failed")
146        fobj.seek(1024, 1)
147        self.assertEqual(2048, fobj.tell(),
148                     "seek() to positive relative position failed")
149        s = fobj.read(10)
150        self.assertEqual(s, data[2048:2058],
151                     "read() after seek failed")
152        fobj.seek(0, 2)
153        self.assertEqual(tarinfo.size, fobj.tell(),
154                     "seek() to file's end failed")
155        self.assertEqual(fobj.read(), b"",
156                     "read() at file's end did not return empty string")
157        fobj.seek(-tarinfo.size, 2)
158        self.assertEqual(0, fobj.tell(),
159                     "relative seek() to file's end failed")
160        fobj.seek(512)
161        s1 = fobj.readlines()
162        fobj.seek(512)
163        s2 = fobj.readlines()
164        self.assertEqual(s1, s2,
165                     "readlines() after seek failed")
166        fobj.seek(0)
167        self.assertEqual(len(fobj.readline()), fobj.tell(),
168                     "tell() after readline() failed")
169        fobj.seek(512)
170        self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
171                     "tell() after seek() and readline() failed")
172        fobj.seek(0)
173        line = fobj.readline()
174        self.assertEqual(fobj.read(), data[len(line):],
175                     "read() after readline() failed")
176        fobj.close()
177
178    def test_fileobj_text(self):
179        with self.tar.extractfile("ustar/regtype") as fobj:
180            fobj = io.TextIOWrapper(fobj)
181            data = fobj.read().encode("iso8859-1")
182            self.assertEqual(md5sum(data), md5_regtype)
183            try:
184                fobj.seek(100)
185            except AttributeError:
186                # Issue #13815: seek() complained about a missing
187                # flush() method.
188                self.fail("seeking failed in text mode")
189
190    # Test if symbolic and hard links are resolved by extractfile().  The
191    # test link members each point to a regular member whose data is
192    # supposed to be exported.
193    def _test_fileobj_link(self, lnktype, regtype):
194        with self.tar.extractfile(lnktype) as a, \
195             self.tar.extractfile(regtype) as b:
196            self.assertEqual(a.name, b.name)
197
198    def test_fileobj_link1(self):
199        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
200
201    def test_fileobj_link2(self):
202        self._test_fileobj_link("./ustar/linktest2/lnktype",
203                                "ustar/linktest1/regtype")
204
205    def test_fileobj_symlink1(self):
206        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
207
208    def test_fileobj_symlink2(self):
209        self._test_fileobj_link("./ustar/linktest2/symtype",
210                                "ustar/linktest1/regtype")
211
212    def test_issue14160(self):
213        self._test_fileobj_link("symtype2", "ustar/regtype")
214
215class GzipUstarReadTest(GzipTest, UstarReadTest):
216    pass
217
218class Bz2UstarReadTest(Bz2Test, UstarReadTest):
219    pass
220
221class LzmaUstarReadTest(LzmaTest, UstarReadTest):
222    pass
223
224
225class ListTest(ReadTest, unittest.TestCase):
226
227    # Override setUp to use default encoding (UTF-8)
228    def setUp(self):
229        self.tar = tarfile.open(self.tarname, mode=self.mode)
230
231    def test_list(self):
232        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
233        with support.swap_attr(sys, 'stdout', tio):
234            self.tar.list(verbose=False)
235        out = tio.detach().getvalue()
236        self.assertIn(b'ustar/conttype', out)
237        self.assertIn(b'ustar/regtype', out)
238        self.assertIn(b'ustar/lnktype', out)
239        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
240        self.assertIn(b'./ustar/linktest2/symtype', out)
241        self.assertIn(b'./ustar/linktest2/lnktype', out)
242        # Make sure it puts trailing slash for directory
243        self.assertIn(b'ustar/dirtype/', out)
244        self.assertIn(b'ustar/dirtype-with-size/', out)
245        # Make sure it is able to print unencodable characters
246        def conv(b):
247            s = b.decode(self.tar.encoding, 'surrogateescape')
248            return s.encode('ascii', 'backslashreplace')
249        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
250        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
251                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
252        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
253                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
254        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
255        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
256        # Make sure it prints files separated by one newline without any
257        # 'ls -l'-like accessories if verbose flag is not being used
258        # ...
259        # ustar/conttype
260        # ustar/regtype
261        # ...
262        self.assertRegex(out, br'ustar/conttype ?\r?\n'
263                              br'ustar/regtype ?\r?\n')
264        # Make sure it does not print the source of link without verbose flag
265        self.assertNotIn(b'link to', out)
266        self.assertNotIn(b'->', out)
267
268    def test_list_verbose(self):
269        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
270        with support.swap_attr(sys, 'stdout', tio):
271            self.tar.list(verbose=True)
272        out = tio.detach().getvalue()
273        # Make sure it prints files separated by one newline with 'ls -l'-like
274        # accessories if verbose flag is being used
275        # ...
276        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
277        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
278        # ...
279        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
280                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
281                               br'ustar/\w+type ?\r?\n') * 2)
282        # Make sure it prints the source of link with verbose flag
283        self.assertIn(b'ustar/symtype -> regtype', out)
284        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
285        self.assertIn(b'./ustar/linktest2/lnktype link to '
286                      b'./ustar/linktest1/regtype', out)
287        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
288                      (b'/123' * 125) + b'/longname', out)
289        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
290                      (b'/123' * 125) + b'/longname', out)
291
292    def test_list_members(self):
293        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
294        def members(tar):
295            for tarinfo in tar.getmembers():
296                if 'reg' in tarinfo.name:
297                    yield tarinfo
298        with support.swap_attr(sys, 'stdout', tio):
299            self.tar.list(verbose=False, members=members(self.tar))
300        out = tio.detach().getvalue()
301        self.assertIn(b'ustar/regtype', out)
302        self.assertNotIn(b'ustar/conttype', out)
303
304
305class GzipListTest(GzipTest, ListTest):
306    pass
307
308
309class Bz2ListTest(Bz2Test, ListTest):
310    pass
311
312
313class LzmaListTest(LzmaTest, ListTest):
314    pass
315
316
317class CommonReadTest(ReadTest):
318
319    def test_empty_tarfile(self):
320        # Test for issue6123: Allow opening empty archives.
321        # This test checks if tarfile.open() is able to open an empty tar
322        # archive successfully. Note that an empty tar archive is not the
323        # same as an empty file!
324        with tarfile.open(tmpname, self.mode.replace("r", "w")):
325            pass
326        try:
327            tar = tarfile.open(tmpname, self.mode)
328            tar.getnames()
329        except tarfile.ReadError:
330            self.fail("tarfile.open() failed on empty archive")
331        else:
332            self.assertListEqual(tar.getmembers(), [])
333        finally:
334            tar.close()
335
336    def test_non_existent_tarfile(self):
337        # Test for issue11513: prevent non-existent gzipped tarfiles raising
338        # multiple exceptions.
339        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
340            tarfile.open("xxx", self.mode)
341
342    def test_null_tarfile(self):
343        # Test for issue6123: Allow opening empty archives.
344        # This test guarantees that tarfile.open() does not treat an empty
345        # file as an empty tar archive.
346        with open(tmpname, "wb"):
347            pass
348        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
349        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
350
351    def test_ignore_zeros(self):
352        # Test TarFile's ignore_zeros option.
353        # generate 512 pseudorandom bytes
354        data = Random(0).getrandbits(512*8).to_bytes(512, 'big')
355        for char in (b'\0', b'a'):
356            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
357            # are ignored correctly.
358            with self.open(tmpname, "w") as fobj:
359                fobj.write(char * 1024)
360                tarinfo = tarfile.TarInfo("foo")
361                tarinfo.size = len(data)
362                fobj.write(tarinfo.tobuf())
363                fobj.write(data)
364
365            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
366            try:
367                self.assertListEqual(tar.getnames(), ["foo"],
368                    "ignore_zeros=True should have skipped the %r-blocks" %
369                    char)
370            finally:
371                tar.close()
372
373    def test_premature_end_of_archive(self):
374        for size in (512, 600, 1024, 1200):
375            with tarfile.open(tmpname, "w:") as tar:
376                t = tarfile.TarInfo("foo")
377                t.size = 1024
378                tar.addfile(t, io.BytesIO(b"a" * 1024))
379
380            with open(tmpname, "r+b") as fobj:
381                fobj.truncate(size)
382
383            with tarfile.open(tmpname) as tar:
384                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
385                    for t in tar:
386                        pass
387
388            with tarfile.open(tmpname) as tar:
389                t = tar.next()
390
391                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
392                    tar.extract(t, TEMPDIR)
393
394                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
395                    tar.extractfile(t).read()
396
397class MiscReadTestBase(CommonReadTest):
398    def requires_name_attribute(self):
399        pass
400
401    def test_no_name_argument(self):
402        self.requires_name_attribute()
403        with open(self.tarname, "rb") as fobj:
404            self.assertIsInstance(fobj.name, str)
405            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
406                self.assertIsInstance(tar.name, str)
407                self.assertEqual(tar.name, os.path.abspath(fobj.name))
408
409    def test_no_name_attribute(self):
410        with open(self.tarname, "rb") as fobj:
411            data = fobj.read()
412        fobj = io.BytesIO(data)
413        self.assertRaises(AttributeError, getattr, fobj, "name")
414        tar = tarfile.open(fileobj=fobj, mode=self.mode)
415        self.assertIsNone(tar.name)
416
417    def test_empty_name_attribute(self):
418        with open(self.tarname, "rb") as fobj:
419            data = fobj.read()
420        fobj = io.BytesIO(data)
421        fobj.name = ""
422        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
423            self.assertIsNone(tar.name)
424
425    def test_int_name_attribute(self):
426        # Issue 21044: tarfile.open() should handle fileobj with an integer
427        # 'name' attribute.
428        fd = os.open(self.tarname, os.O_RDONLY)
429        with open(fd, 'rb') as fobj:
430            self.assertIsInstance(fobj.name, int)
431            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
432                self.assertIsNone(tar.name)
433
434    def test_bytes_name_attribute(self):
435        self.requires_name_attribute()
436        tarname = os.fsencode(self.tarname)
437        with open(tarname, 'rb') as fobj:
438            self.assertIsInstance(fobj.name, bytes)
439            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
440                self.assertIsInstance(tar.name, bytes)
441                self.assertEqual(tar.name, os.path.abspath(fobj.name))
442
443    def test_illegal_mode_arg(self):
444        with open(tmpname, 'wb'):
445            pass
446        with self.assertRaisesRegex(ValueError, 'mode must be '):
447            tar = self.taropen(tmpname, 'q')
448        with self.assertRaisesRegex(ValueError, 'mode must be '):
449            tar = self.taropen(tmpname, 'rw')
450        with self.assertRaisesRegex(ValueError, 'mode must be '):
451            tar = self.taropen(tmpname, '')
452
453    def test_fileobj_with_offset(self):
454        # Skip the first member and store values from the second member
455        # of the testtar.
456        tar = tarfile.open(self.tarname, mode=self.mode)
457        try:
458            tar.next()
459            t = tar.next()
460            name = t.name
461            offset = t.offset
462            with tar.extractfile(t) as f:
463                data = f.read()
464        finally:
465            tar.close()
466
467        # Open the testtar and seek to the offset of the second member.
468        with self.open(self.tarname) as fobj:
469            fobj.seek(offset)
470
471            # Test if the tarfile starts with the second member.
472            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
473            t = tar.next()
474            self.assertEqual(t.name, name)
475            # Read to the end of fileobj and test if seeking back to the
476            # beginning works.
477            tar.getmembers()
478            self.assertEqual(tar.extractfile(t).read(), data,
479                    "seek back did not work")
480            tar.close()
481
482    def test_fail_comp(self):
483        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
484        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
485        with open(tarname, "rb") as fobj:
486            self.assertRaises(tarfile.ReadError, tarfile.open,
487                              fileobj=fobj, mode=self.mode)
488
489    def test_v7_dirtype(self):
490        # Test old style dirtype member (bug #1336623):
491        # Old V7 tars create directory members using an AREGTYPE
492        # header with a "/" appended to the filename field.
493        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
494        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
495                "v7 dirtype failed")
496
497    def test_xstar_type(self):
498        # The xstar format stores extra atime and ctime fields inside the
499        # space reserved for the prefix field. The prefix field must be
500        # ignored in this case, otherwise it will mess up the name.
501        try:
502            self.tar.getmember("misc/regtype-xstar")
503        except KeyError:
504            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
505
506    def test_check_members(self):
507        for tarinfo in self.tar:
508            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
509                    "wrong mtime for %s" % tarinfo.name)
510            if not tarinfo.name.startswith("ustar/"):
511                continue
512            self.assertEqual(tarinfo.uname, "tarfile",
513                    "wrong uname for %s" % tarinfo.name)
514
515    def test_find_members(self):
516        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
517                "could not find all members")
518
519    @unittest.skipUnless(hasattr(os, "link"),
520                         "Missing hardlink implementation")
521    @support.skip_unless_symlink
522    def test_extract_hardlink(self):
523        # Test hardlink extraction (e.g. bug #857297).
524        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
525            tar.extract("ustar/regtype", TEMPDIR)
526            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
527
528            tar.extract("ustar/lnktype", TEMPDIR)
529            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
530            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
531                data = f.read()
532            self.assertEqual(md5sum(data), md5_regtype)
533
534            tar.extract("ustar/symtype", TEMPDIR)
535            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
536            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
537                data = f.read()
538            self.assertEqual(md5sum(data), md5_regtype)
539
540    def test_extractall(self):
541        # Test if extractall() correctly restores directory permissions
542        # and times (see issue1735).
543        tar = tarfile.open(tarname, encoding="iso8859-1")
544        DIR = os.path.join(TEMPDIR, "extractall")
545        os.mkdir(DIR)
546        try:
547            directories = [t for t in tar if t.isdir()]
548            tar.extractall(DIR, directories)
549            for tarinfo in directories:
550                path = os.path.join(DIR, tarinfo.name)
551                if sys.platform != "win32":
552                    # Win32 has no support for fine grained permissions.
553                    self.assertEqual(tarinfo.mode & 0o777,
554                                     os.stat(path).st_mode & 0o777)
555                def format_mtime(mtime):
556                    if isinstance(mtime, float):
557                        return "{} ({})".format(mtime, mtime.hex())
558                    else:
559                        return "{!r} (int)".format(mtime)
560                file_mtime = os.path.getmtime(path)
561                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
562                    format_mtime(tarinfo.mtime),
563                    format_mtime(file_mtime),
564                    path)
565                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
566        finally:
567            tar.close()
568            support.rmtree(DIR)
569
570    def test_extract_directory(self):
571        dirtype = "ustar/dirtype"
572        DIR = os.path.join(TEMPDIR, "extractdir")
573        os.mkdir(DIR)
574        try:
575            with tarfile.open(tarname, encoding="iso8859-1") as tar:
576                tarinfo = tar.getmember(dirtype)
577                tar.extract(tarinfo, path=DIR)
578                extracted = os.path.join(DIR, dirtype)
579                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
580                if sys.platform != "win32":
581                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
582        finally:
583            support.rmtree(DIR)
584
585    def test_init_close_fobj(self):
586        # Issue #7341: Close the internal file object in the TarFile
587        # constructor in case of an error. For the test we rely on
588        # the fact that opening an empty file raises a ReadError.
589        empty = os.path.join(TEMPDIR, "empty")
590        with open(empty, "wb") as fobj:
591            fobj.write(b"")
592
593        try:
594            tar = object.__new__(tarfile.TarFile)
595            try:
596                tar.__init__(empty)
597            except tarfile.ReadError:
598                self.assertTrue(tar.fileobj.closed)
599            else:
600                self.fail("ReadError not raised")
601        finally:
602            support.unlink(empty)
603
604    def test_parallel_iteration(self):
605        # Issue #16601: Restarting iteration over tarfile continued
606        # from where it left off.
607        with tarfile.open(self.tarname) as tar:
608            for m1, m2 in zip(tar, tar):
609                self.assertEqual(m1.offset, m2.offset)
610                self.assertEqual(m1.get_info(), m2.get_info())
611
612class MiscReadTest(MiscReadTestBase, unittest.TestCase):
613    test_fail_comp = None
614
615class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
616    pass
617
618class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
619    def requires_name_attribute(self):
620        self.skipTest("BZ2File have no name attribute")
621
622class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
623    def requires_name_attribute(self):
624        self.skipTest("LZMAFile have no name attribute")
625
626
627class StreamReadTest(CommonReadTest, unittest.TestCase):
628
629    prefix="r|"
630
631    def test_read_through(self):
632        # Issue #11224: A poorly designed _FileInFile.read() method
633        # caused seeking errors with stream tar files.
634        for tarinfo in self.tar:
635            if not tarinfo.isreg():
636                continue
637            with self.tar.extractfile(tarinfo) as fobj:
638                while True:
639                    try:
640                        buf = fobj.read(512)
641                    except tarfile.StreamError:
642                        self.fail("simple read-through using "
643                                  "TarFile.extractfile() failed")
644                    if not buf:
645                        break
646
647    def test_fileobj_regular_file(self):
648        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
649        with self.tar.extractfile(tarinfo) as fobj:
650            data = fobj.read()
651        self.assertEqual(len(data), tarinfo.size,
652                "regular file extraction failed")
653        self.assertEqual(md5sum(data), md5_regtype,
654                "regular file extraction failed")
655
656    def test_provoke_stream_error(self):
657        tarinfos = self.tar.getmembers()
658        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
659            self.assertRaises(tarfile.StreamError, f.read)
660
661    def test_compare_members(self):
662        tar1 = tarfile.open(tarname, encoding="iso8859-1")
663        try:
664            tar2 = self.tar
665
666            while True:
667                t1 = tar1.next()
668                t2 = tar2.next()
669                if t1 is None:
670                    break
671                self.assertIsNotNone(t2, "stream.next() failed.")
672
673                if t2.islnk() or t2.issym():
674                    with self.assertRaises(tarfile.StreamError):
675                        tar2.extractfile(t2)
676                    continue
677
678                v1 = tar1.extractfile(t1)
679                v2 = tar2.extractfile(t2)
680                if v1 is None:
681                    continue
682                self.assertIsNotNone(v2, "stream.extractfile() failed")
683                self.assertEqual(v1.read(), v2.read(),
684                        "stream extraction failed")
685        finally:
686            tar1.close()
687
688class GzipStreamReadTest(GzipTest, StreamReadTest):
689    pass
690
691class Bz2StreamReadTest(Bz2Test, StreamReadTest):
692    pass
693
694class LzmaStreamReadTest(LzmaTest, StreamReadTest):
695    pass
696
697
698class DetectReadTest(TarTest, unittest.TestCase):
699    def _testfunc_file(self, name, mode):
700        try:
701            tar = tarfile.open(name, mode)
702        except tarfile.ReadError as e:
703            self.fail()
704        else:
705            tar.close()
706
707    def _testfunc_fileobj(self, name, mode):
708        try:
709            with open(name, "rb") as f:
710                tar = tarfile.open(name, mode, fileobj=f)
711        except tarfile.ReadError as e:
712            self.fail()
713        else:
714            tar.close()
715
716    def _test_modes(self, testfunc):
717        if self.suffix:
718            with self.assertRaises(tarfile.ReadError):
719                tarfile.open(tarname, mode="r:" + self.suffix)
720            with self.assertRaises(tarfile.ReadError):
721                tarfile.open(tarname, mode="r|" + self.suffix)
722            with self.assertRaises(tarfile.ReadError):
723                tarfile.open(self.tarname, mode="r:")
724            with self.assertRaises(tarfile.ReadError):
725                tarfile.open(self.tarname, mode="r|")
726        testfunc(self.tarname, "r")
727        testfunc(self.tarname, "r:" + self.suffix)
728        testfunc(self.tarname, "r:*")
729        testfunc(self.tarname, "r|" + self.suffix)
730        testfunc(self.tarname, "r|*")
731
732    def test_detect_file(self):
733        self._test_modes(self._testfunc_file)
734
735    def test_detect_fileobj(self):
736        self._test_modes(self._testfunc_fileobj)
737
738class GzipDetectReadTest(GzipTest, DetectReadTest):
739    pass
740
741class Bz2DetectReadTest(Bz2Test, DetectReadTest):
742    def test_detect_stream_bz2(self):
743        # Originally, tarfile's stream detection looked for the string
744        # "BZh91" at the start of the file. This is incorrect because
745        # the '9' represents the blocksize (900kB). If the file was
746        # compressed using another blocksize autodetection fails.
747        with open(tarname, "rb") as fobj:
748            data = fobj.read()
749
750        # Compress with blocksize 100kB, the file starts with "BZh11".
751        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
752            fobj.write(data)
753
754        self._testfunc_file(tmpname, "r|*")
755
756class LzmaDetectReadTest(LzmaTest, DetectReadTest):
757    pass
758
759
760class MemberReadTest(ReadTest, unittest.TestCase):
761
762    def _test_member(self, tarinfo, chksum=None, **kwargs):
763        if chksum is not None:
764            with self.tar.extractfile(tarinfo) as f:
765                self.assertEqual(md5sum(f.read()), chksum,
766                        "wrong md5sum for %s" % tarinfo.name)
767
768        kwargs["mtime"] = 0o7606136617
769        kwargs["uid"] = 1000
770        kwargs["gid"] = 100
771        if "old-v7" not in tarinfo.name:
772            # V7 tar can't handle alphabetic owners.
773            kwargs["uname"] = "tarfile"
774            kwargs["gname"] = "tarfile"
775        for k, v in kwargs.items():
776            self.assertEqual(getattr(tarinfo, k), v,
777                    "wrong value in %s field of %s" % (k, tarinfo.name))
778
779    def test_find_regtype(self):
780        tarinfo = self.tar.getmember("ustar/regtype")
781        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
782
783    def test_find_conttype(self):
784        tarinfo = self.tar.getmember("ustar/conttype")
785        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
786
787    def test_find_dirtype(self):
788        tarinfo = self.tar.getmember("ustar/dirtype")
789        self._test_member(tarinfo, size=0)
790
791    def test_find_dirtype_with_size(self):
792        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
793        self._test_member(tarinfo, size=255)
794
795    def test_find_lnktype(self):
796        tarinfo = self.tar.getmember("ustar/lnktype")
797        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
798
799    def test_find_symtype(self):
800        tarinfo = self.tar.getmember("ustar/symtype")
801        self._test_member(tarinfo, size=0, linkname="regtype")
802
803    def test_find_blktype(self):
804        tarinfo = self.tar.getmember("ustar/blktype")
805        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
806
807    def test_find_chrtype(self):
808        tarinfo = self.tar.getmember("ustar/chrtype")
809        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
810
811    def test_find_fifotype(self):
812        tarinfo = self.tar.getmember("ustar/fifotype")
813        self._test_member(tarinfo, size=0)
814
815    def test_find_sparse(self):
816        tarinfo = self.tar.getmember("ustar/sparse")
817        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
818
819    def test_find_gnusparse(self):
820        tarinfo = self.tar.getmember("gnu/sparse")
821        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
822
823    def test_find_gnusparse_00(self):
824        tarinfo = self.tar.getmember("gnu/sparse-0.0")
825        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
826
827    def test_find_gnusparse_01(self):
828        tarinfo = self.tar.getmember("gnu/sparse-0.1")
829        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
830
831    def test_find_gnusparse_10(self):
832        tarinfo = self.tar.getmember("gnu/sparse-1.0")
833        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
834
835    def test_find_umlauts(self):
836        tarinfo = self.tar.getmember("ustar/umlauts-"
837                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
838        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
839
840    def test_find_ustar_longname(self):
841        name = "ustar/" + "12345/" * 39 + "1234567/longname"
842        self.assertIn(name, self.tar.getnames())
843
844    def test_find_regtype_oldv7(self):
845        tarinfo = self.tar.getmember("misc/regtype-old-v7")
846        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
847
848    def test_find_pax_umlauts(self):
849        self.tar.close()
850        self.tar = tarfile.open(self.tarname, mode=self.mode,
851                                encoding="iso8859-1")
852        tarinfo = self.tar.getmember("pax/umlauts-"
853                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
854        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
855
856
857class LongnameTest:
858
859    def test_read_longname(self):
860        # Test reading of longname (bug #1471427).
861        longname = self.subdir + "/" + "123/" * 125 + "longname"
862        try:
863            tarinfo = self.tar.getmember(longname)
864        except KeyError:
865            self.fail("longname not found")
866        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
867                "read longname as dirtype")
868
869    def test_read_longlink(self):
870        longname = self.subdir + "/" + "123/" * 125 + "longname"
871        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
872        try:
873            tarinfo = self.tar.getmember(longlink)
874        except KeyError:
875            self.fail("longlink not found")
876        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
877
878    def test_truncated_longname(self):
879        longname = self.subdir + "/" + "123/" * 125 + "longname"
880        tarinfo = self.tar.getmember(longname)
881        offset = tarinfo.offset
882        self.tar.fileobj.seek(offset)
883        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
884        with self.assertRaises(tarfile.ReadError):
885            tarfile.open(name="foo.tar", fileobj=fobj)
886
887    def test_header_offset(self):
888        # Test if the start offset of the TarInfo object includes
889        # the preceding extended header.
890        longname = self.subdir + "/" + "123/" * 125 + "longname"
891        offset = self.tar.getmember(longname).offset
892        with open(tarname, "rb") as fobj:
893            fobj.seek(offset)
894            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
895                                              "iso8859-1", "strict")
896            self.assertEqual(tarinfo.type, self.longnametype)
897
898
899class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
900
901    subdir = "gnu"
902    longnametype = tarfile.GNUTYPE_LONGNAME
903
904    # Since 3.2 tarfile is supposed to accurately restore sparse members and
905    # produce files with holes. This is what we actually want to test here.
906    # Unfortunately, not all platforms/filesystems support sparse files, and
907    # even on platforms that do it is non-trivial to make reliable assertions
908    # about holes in files. Therefore, we first do one basic test which works
909    # an all platforms, and after that a test that will work only on
910    # platforms/filesystems that prove to support sparse files.
911    def _test_sparse_file(self, name):
912        self.tar.extract(name, TEMPDIR)
913        filename = os.path.join(TEMPDIR, name)
914        with open(filename, "rb") as fobj:
915            data = fobj.read()
916        self.assertEqual(md5sum(data), md5_sparse,
917                "wrong md5sum for %s" % name)
918
919        if self._fs_supports_holes():
920            s = os.stat(filename)
921            self.assertLess(s.st_blocks * 512, s.st_size)
922
923    def test_sparse_file_old(self):
924        self._test_sparse_file("gnu/sparse")
925
926    def test_sparse_file_00(self):
927        self._test_sparse_file("gnu/sparse-0.0")
928
929    def test_sparse_file_01(self):
930        self._test_sparse_file("gnu/sparse-0.1")
931
932    def test_sparse_file_10(self):
933        self._test_sparse_file("gnu/sparse-1.0")
934
935    @staticmethod
936    def _fs_supports_holes():
937        # Return True if the platform knows the st_blocks stat attribute and
938        # uses st_blocks units of 512 bytes, and if the filesystem is able to
939        # store holes in files.
940        if sys.platform.startswith("linux"):
941            # Linux evidentially has 512 byte st_blocks units.
942            name = os.path.join(TEMPDIR, "sparse-test")
943            with open(name, "wb") as fobj:
944                fobj.seek(4096)
945                fobj.truncate()
946            s = os.stat(name)
947            support.unlink(name)
948            return s.st_blocks == 0
949        else:
950            return False
951
952
953class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
954
955    subdir = "pax"
956    longnametype = tarfile.XHDTYPE
957
958    def test_pax_global_headers(self):
959        tar = tarfile.open(tarname, encoding="iso8859-1")
960        try:
961            tarinfo = tar.getmember("pax/regtype1")
962            self.assertEqual(tarinfo.uname, "foo")
963            self.assertEqual(tarinfo.gname, "bar")
964            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
965                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
966
967            tarinfo = tar.getmember("pax/regtype2")
968            self.assertEqual(tarinfo.uname, "")
969            self.assertEqual(tarinfo.gname, "bar")
970            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
971                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
972
973            tarinfo = tar.getmember("pax/regtype3")
974            self.assertEqual(tarinfo.uname, "tarfile")
975            self.assertEqual(tarinfo.gname, "tarfile")
976            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
977                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
978        finally:
979            tar.close()
980
981    def test_pax_number_fields(self):
982        # All following number fields are read from the pax header.
983        tar = tarfile.open(tarname, encoding="iso8859-1")
984        try:
985            tarinfo = tar.getmember("pax/regtype4")
986            self.assertEqual(tarinfo.size, 7011)
987            self.assertEqual(tarinfo.uid, 123)
988            self.assertEqual(tarinfo.gid, 123)
989            self.assertEqual(tarinfo.mtime, 1041808783.0)
990            self.assertEqual(type(tarinfo.mtime), float)
991            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
992            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
993        finally:
994            tar.close()
995
996
997class WriteTestBase(TarTest):
998    # Put all write tests in here that are supposed to be tested
999    # in all possible mode combinations.
1000
1001    def test_fileobj_no_close(self):
1002        fobj = io.BytesIO()
1003        tar = tarfile.open(fileobj=fobj, mode=self.mode)
1004        tar.addfile(tarfile.TarInfo("foo"))
1005        tar.close()
1006        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1007        # Issue #20238: Incomplete gzip output with mode="w:gz"
1008        data = fobj.getvalue()
1009        del tar
1010        support.gc_collect()
1011        self.assertFalse(fobj.closed)
1012        self.assertEqual(data, fobj.getvalue())
1013
1014    def test_eof_marker(self):
1015        # Make sure an end of archive marker is written (two zero blocks).
1016        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1017        # So, we create an archive that has exactly 10240 bytes without the
1018        # marker, and has 20480 bytes once the marker is written.
1019        with tarfile.open(tmpname, self.mode) as tar:
1020            t = tarfile.TarInfo("foo")
1021            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1022            tar.addfile(t, io.BytesIO(b"a" * t.size))
1023
1024        with self.open(tmpname, "rb") as fobj:
1025            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1026
1027
1028class WriteTest(WriteTestBase, unittest.TestCase):
1029
1030    prefix = "w:"
1031
1032    def test_100_char_name(self):
1033        # The name field in a tar header stores strings of at most 100 chars.
1034        # If a string is shorter than 100 chars it has to be padded with '\0',
1035        # which implies that a string of exactly 100 chars is stored without
1036        # a trailing '\0'.
1037        name = "0123456789" * 10
1038        tar = tarfile.open(tmpname, self.mode)
1039        try:
1040            t = tarfile.TarInfo(name)
1041            tar.addfile(t)
1042        finally:
1043            tar.close()
1044
1045        tar = tarfile.open(tmpname)
1046        try:
1047            self.assertEqual(tar.getnames()[0], name,
1048                    "failed to store 100 char filename")
1049        finally:
1050            tar.close()
1051
1052    def test_tar_size(self):
1053        # Test for bug #1013882.
1054        tar = tarfile.open(tmpname, self.mode)
1055        try:
1056            path = os.path.join(TEMPDIR, "file")
1057            with open(path, "wb") as fobj:
1058                fobj.write(b"aaa")
1059            tar.add(path)
1060        finally:
1061            tar.close()
1062        self.assertGreater(os.path.getsize(tmpname), 0,
1063                "tarfile is empty")
1064
1065    # The test_*_size tests test for bug #1167128.
1066    def test_file_size(self):
1067        tar = tarfile.open(tmpname, self.mode)
1068        try:
1069            path = os.path.join(TEMPDIR, "file")
1070            with open(path, "wb"):
1071                pass
1072            tarinfo = tar.gettarinfo(path)
1073            self.assertEqual(tarinfo.size, 0)
1074
1075            with open(path, "wb") as fobj:
1076                fobj.write(b"aaa")
1077            tarinfo = tar.gettarinfo(path)
1078            self.assertEqual(tarinfo.size, 3)
1079        finally:
1080            tar.close()
1081
1082    def test_directory_size(self):
1083        path = os.path.join(TEMPDIR, "directory")
1084        os.mkdir(path)
1085        try:
1086            tar = tarfile.open(tmpname, self.mode)
1087            try:
1088                tarinfo = tar.gettarinfo(path)
1089                self.assertEqual(tarinfo.size, 0)
1090            finally:
1091                tar.close()
1092        finally:
1093            support.rmdir(path)
1094
1095    @unittest.skipUnless(hasattr(os, "link"),
1096                         "Missing hardlink implementation")
1097    def test_link_size(self):
1098        link = os.path.join(TEMPDIR, "link")
1099        target = os.path.join(TEMPDIR, "link_target")
1100        with open(target, "wb") as fobj:
1101            fobj.write(b"aaa")
1102        os.link(target, link)
1103        try:
1104            tar = tarfile.open(tmpname, self.mode)
1105            try:
1106                # Record the link target in the inodes list.
1107                tar.gettarinfo(target)
1108                tarinfo = tar.gettarinfo(link)
1109                self.assertEqual(tarinfo.size, 0)
1110            finally:
1111                tar.close()
1112        finally:
1113            support.unlink(target)
1114            support.unlink(link)
1115
1116    @support.skip_unless_symlink
1117    def test_symlink_size(self):
1118        path = os.path.join(TEMPDIR, "symlink")
1119        os.symlink("link_target", path)
1120        try:
1121            tar = tarfile.open(tmpname, self.mode)
1122            try:
1123                tarinfo = tar.gettarinfo(path)
1124                self.assertEqual(tarinfo.size, 0)
1125            finally:
1126                tar.close()
1127        finally:
1128            support.unlink(path)
1129
1130    def test_add_self(self):
1131        # Test for #1257255.
1132        dstname = os.path.abspath(tmpname)
1133        tar = tarfile.open(tmpname, self.mode)
1134        try:
1135            self.assertEqual(tar.name, dstname,
1136                    "archive name must be absolute")
1137            tar.add(dstname)
1138            self.assertEqual(tar.getnames(), [],
1139                    "added the archive to itself")
1140
1141            with support.change_cwd(TEMPDIR):
1142                tar.add(dstname)
1143            self.assertEqual(tar.getnames(), [],
1144                    "added the archive to itself")
1145        finally:
1146            tar.close()
1147
1148    def test_exclude(self):
1149        tempdir = os.path.join(TEMPDIR, "exclude")
1150        os.mkdir(tempdir)
1151        try:
1152            for name in ("foo", "bar", "baz"):
1153                name = os.path.join(tempdir, name)
1154                support.create_empty_file(name)
1155
1156            exclude = os.path.isfile
1157
1158            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1159            try:
1160                with support.check_warnings(("use the filter argument",
1161                                             DeprecationWarning)):
1162                    tar.add(tempdir, arcname="empty_dir", exclude=exclude)
1163            finally:
1164                tar.close()
1165
1166            tar = tarfile.open(tmpname, "r")
1167            try:
1168                self.assertEqual(len(tar.getmembers()), 1)
1169                self.assertEqual(tar.getnames()[0], "empty_dir")
1170            finally:
1171                tar.close()
1172        finally:
1173            support.rmtree(tempdir)
1174
1175    def test_filter(self):
1176        tempdir = os.path.join(TEMPDIR, "filter")
1177        os.mkdir(tempdir)
1178        try:
1179            for name in ("foo", "bar", "baz"):
1180                name = os.path.join(tempdir, name)
1181                support.create_empty_file(name)
1182
1183            def filter(tarinfo):
1184                if os.path.basename(tarinfo.name) == "bar":
1185                    return
1186                tarinfo.uid = 123
1187                tarinfo.uname = "foo"
1188                return tarinfo
1189
1190            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1191            try:
1192                tar.add(tempdir, arcname="empty_dir", filter=filter)
1193            finally:
1194                tar.close()
1195
1196            # Verify that filter is a keyword-only argument
1197            with self.assertRaises(TypeError):
1198                tar.add(tempdir, "empty_dir", True, None, filter)
1199
1200            tar = tarfile.open(tmpname, "r")
1201            try:
1202                for tarinfo in tar:
1203                    self.assertEqual(tarinfo.uid, 123)
1204                    self.assertEqual(tarinfo.uname, "foo")
1205                self.assertEqual(len(tar.getmembers()), 3)
1206            finally:
1207                tar.close()
1208        finally:
1209            support.rmtree(tempdir)
1210
1211    # Guarantee that stored pathnames are not modified. Don't
1212    # remove ./ or ../ or double slashes. Still make absolute
1213    # pathnames relative.
1214    # For details see bug #6054.
1215    def _test_pathname(self, path, cmp_path=None, dir=False):
1216        # Create a tarfile with an empty member named path
1217        # and compare the stored name with the original.
1218        foo = os.path.join(TEMPDIR, "foo")
1219        if not dir:
1220            support.create_empty_file(foo)
1221        else:
1222            os.mkdir(foo)
1223
1224        tar = tarfile.open(tmpname, self.mode)
1225        try:
1226            tar.add(foo, arcname=path)
1227        finally:
1228            tar.close()
1229
1230        tar = tarfile.open(tmpname, "r")
1231        try:
1232            t = tar.next()
1233        finally:
1234            tar.close()
1235
1236        if not dir:
1237            support.unlink(foo)
1238        else:
1239            support.rmdir(foo)
1240
1241        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1242
1243
1244    @support.skip_unless_symlink
1245    def test_extractall_symlinks(self):
1246        # Test if extractall works properly when tarfile contains symlinks
1247        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1248        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1249        os.mkdir(tempdir)
1250        try:
1251            source_file = os.path.join(tempdir,'source')
1252            target_file = os.path.join(tempdir,'symlink')
1253            with open(source_file,'w') as f:
1254                f.write('something\n')
1255            os.symlink(source_file, target_file)
1256            tar = tarfile.open(temparchive,'w')
1257            tar.add(source_file)
1258            tar.add(target_file)
1259            tar.close()
1260            # Let's extract it to the location which contains the symlink
1261            tar = tarfile.open(temparchive,'r')
1262            # this should not raise OSError: [Errno 17] File exists
1263            try:
1264                tar.extractall(path=tempdir)
1265            except OSError:
1266                self.fail("extractall failed with symlinked files")
1267            finally:
1268                tar.close()
1269        finally:
1270            support.unlink(temparchive)
1271            support.rmtree(tempdir)
1272
1273    def test_pathnames(self):
1274        self._test_pathname("foo")
1275        self._test_pathname(os.path.join("foo", ".", "bar"))
1276        self._test_pathname(os.path.join("foo", "..", "bar"))
1277        self._test_pathname(os.path.join(".", "foo"))
1278        self._test_pathname(os.path.join(".", "foo", "."))
1279        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1280        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1281        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1282        self._test_pathname(os.path.join("..", "foo"))
1283        self._test_pathname(os.path.join("..", "foo", ".."))
1284        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1285        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1286
1287        self._test_pathname("foo" + os.sep + os.sep + "bar")
1288        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1289
1290    def test_abs_pathnames(self):
1291        if sys.platform == "win32":
1292            self._test_pathname("C:\\foo", "foo")
1293        else:
1294            self._test_pathname("/foo", "foo")
1295            self._test_pathname("///foo", "foo")
1296
1297    def test_cwd(self):
1298        # Test adding the current working directory.
1299        with support.change_cwd(TEMPDIR):
1300            tar = tarfile.open(tmpname, self.mode)
1301            try:
1302                tar.add(".")
1303            finally:
1304                tar.close()
1305
1306            tar = tarfile.open(tmpname, "r")
1307            try:
1308                for t in tar:
1309                    if t.name != ".":
1310                        self.assertTrue(t.name.startswith("./"), t.name)
1311            finally:
1312                tar.close()
1313
1314    def test_open_nonwritable_fileobj(self):
1315        for exctype in OSError, EOFError, RuntimeError:
1316            class BadFile(io.BytesIO):
1317                first = True
1318                def write(self, data):
1319                    if self.first:
1320                        self.first = False
1321                        raise exctype
1322
1323            f = BadFile()
1324            with self.assertRaises(exctype):
1325                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1326                                   format=tarfile.PAX_FORMAT,
1327                                   pax_headers={'non': 'empty'})
1328            self.assertFalse(f.closed)
1329
1330class GzipWriteTest(GzipTest, WriteTest):
1331    pass
1332
1333class Bz2WriteTest(Bz2Test, WriteTest):
1334    pass
1335
1336class LzmaWriteTest(LzmaTest, WriteTest):
1337    pass
1338
1339
1340class StreamWriteTest(WriteTestBase, unittest.TestCase):
1341
1342    prefix = "w|"
1343    decompressor = None
1344
1345    def test_stream_padding(self):
1346        # Test for bug #1543303.
1347        tar = tarfile.open(tmpname, self.mode)
1348        tar.close()
1349        if self.decompressor:
1350            dec = self.decompressor()
1351            with open(tmpname, "rb") as fobj:
1352                data = fobj.read()
1353            data = dec.decompress(data)
1354            self.assertFalse(dec.unused_data, "found trailing data")
1355        else:
1356            with self.open(tmpname) as fobj:
1357                data = fobj.read()
1358        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1359                        "incorrect zero padding")
1360
1361    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1362                         "Missing umask implementation")
1363    def test_file_mode(self):
1364        # Test for issue #8464: Create files with correct
1365        # permissions.
1366        if os.path.exists(tmpname):
1367            support.unlink(tmpname)
1368
1369        original_umask = os.umask(0o022)
1370        try:
1371            tar = tarfile.open(tmpname, self.mode)
1372            tar.close()
1373            mode = os.stat(tmpname).st_mode & 0o777
1374            self.assertEqual(mode, 0o644, "wrong file permissions")
1375        finally:
1376            os.umask(original_umask)
1377
1378class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1379    pass
1380
1381class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1382    decompressor = bz2.BZ2Decompressor if bz2 else None
1383
1384class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1385    decompressor = lzma.LZMADecompressor if lzma else None
1386
1387
1388class GNUWriteTest(unittest.TestCase):
1389    # This testcase checks for correct creation of GNU Longname
1390    # and Longlink extended headers (cp. bug #812325).
1391
1392    def _length(self, s):
1393        blocks = len(s) // 512 + 1
1394        return blocks * 512
1395
1396    def _calc_size(self, name, link=None):
1397        # Initial tar header
1398        count = 512
1399
1400        if len(name) > tarfile.LENGTH_NAME:
1401            # GNU longname extended header + longname
1402            count += 512
1403            count += self._length(name)
1404        if link is not None and len(link) > tarfile.LENGTH_LINK:
1405            # GNU longlink extended header + longlink
1406            count += 512
1407            count += self._length(link)
1408        return count
1409
1410    def _test(self, name, link=None):
1411        tarinfo = tarfile.TarInfo(name)
1412        if link:
1413            tarinfo.linkname = link
1414            tarinfo.type = tarfile.LNKTYPE
1415
1416        tar = tarfile.open(tmpname, "w")
1417        try:
1418            tar.format = tarfile.GNU_FORMAT
1419            tar.addfile(tarinfo)
1420
1421            v1 = self._calc_size(name, link)
1422            v2 = tar.offset
1423            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1424        finally:
1425            tar.close()
1426
1427        tar = tarfile.open(tmpname)
1428        try:
1429            member = tar.next()
1430            self.assertIsNotNone(member,
1431                    "unable to read longname member")
1432            self.assertEqual(tarinfo.name, member.name,
1433                    "unable to read longname member")
1434            self.assertEqual(tarinfo.linkname, member.linkname,
1435                    "unable to read longname member")
1436        finally:
1437            tar.close()
1438
1439    def test_longname_1023(self):
1440        self._test(("longnam/" * 127) + "longnam")
1441
1442    def test_longname_1024(self):
1443        self._test(("longnam/" * 127) + "longname")
1444
1445    def test_longname_1025(self):
1446        self._test(("longnam/" * 127) + "longname_")
1447
1448    def test_longlink_1023(self):
1449        self._test("name", ("longlnk/" * 127) + "longlnk")
1450
1451    def test_longlink_1024(self):
1452        self._test("name", ("longlnk/" * 127) + "longlink")
1453
1454    def test_longlink_1025(self):
1455        self._test("name", ("longlnk/" * 127) + "longlink_")
1456
1457    def test_longnamelink_1023(self):
1458        self._test(("longnam/" * 127) + "longnam",
1459                   ("longlnk/" * 127) + "longlnk")
1460
1461    def test_longnamelink_1024(self):
1462        self._test(("longnam/" * 127) + "longname",
1463                   ("longlnk/" * 127) + "longlink")
1464
1465    def test_longnamelink_1025(self):
1466        self._test(("longnam/" * 127) + "longname_",
1467                   ("longlnk/" * 127) + "longlink_")
1468
1469
1470class CreateTest(WriteTestBase, unittest.TestCase):
1471
1472    prefix = "x:"
1473
1474    file_path = os.path.join(TEMPDIR, "spameggs42")
1475
1476    def setUp(self):
1477        support.unlink(tmpname)
1478
1479    @classmethod
1480    def setUpClass(cls):
1481        with open(cls.file_path, "wb") as fobj:
1482            fobj.write(b"aaa")
1483
1484    @classmethod
1485    def tearDownClass(cls):
1486        support.unlink(cls.file_path)
1487
1488    def test_create(self):
1489        with tarfile.open(tmpname, self.mode) as tobj:
1490            tobj.add(self.file_path)
1491
1492        with self.taropen(tmpname) as tobj:
1493            names = tobj.getnames()
1494        self.assertEqual(len(names), 1)
1495        self.assertIn('spameggs42', names[0])
1496
1497    def test_create_existing(self):
1498        with tarfile.open(tmpname, self.mode) as tobj:
1499            tobj.add(self.file_path)
1500
1501        with self.assertRaises(FileExistsError):
1502            tobj = tarfile.open(tmpname, self.mode)
1503
1504        with self.taropen(tmpname) as tobj:
1505            names = tobj.getnames()
1506        self.assertEqual(len(names), 1)
1507        self.assertIn('spameggs42', names[0])
1508
1509    def test_create_taropen(self):
1510        with self.taropen(tmpname, "x") as tobj:
1511            tobj.add(self.file_path)
1512
1513        with self.taropen(tmpname) as tobj:
1514            names = tobj.getnames()
1515        self.assertEqual(len(names), 1)
1516        self.assertIn('spameggs42', names[0])
1517
1518    def test_create_existing_taropen(self):
1519        with self.taropen(tmpname, "x") as tobj:
1520            tobj.add(self.file_path)
1521
1522        with self.assertRaises(FileExistsError):
1523            with self.taropen(tmpname, "x"):
1524                pass
1525
1526        with self.taropen(tmpname) as tobj:
1527            names = tobj.getnames()
1528        self.assertEqual(len(names), 1)
1529        self.assertIn("spameggs42", names[0])
1530
1531
1532class GzipCreateTest(GzipTest, CreateTest):
1533    pass
1534
1535
1536class Bz2CreateTest(Bz2Test, CreateTest):
1537    pass
1538
1539
1540class LzmaCreateTest(LzmaTest, CreateTest):
1541    pass
1542
1543
1544class CreateWithXModeTest(CreateTest):
1545
1546    prefix = "x"
1547
1548    test_create_taropen = None
1549    test_create_existing_taropen = None
1550
1551
1552@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1553class HardlinkTest(unittest.TestCase):
1554    # Test the creation of LNKTYPE (hardlink) members in an archive.
1555
1556    def setUp(self):
1557        self.foo = os.path.join(TEMPDIR, "foo")
1558        self.bar = os.path.join(TEMPDIR, "bar")
1559
1560        with open(self.foo, "wb") as fobj:
1561            fobj.write(b"foo")
1562
1563        os.link(self.foo, self.bar)
1564
1565        self.tar = tarfile.open(tmpname, "w")
1566        self.tar.add(self.foo)
1567
1568    def tearDown(self):
1569        self.tar.close()
1570        support.unlink(self.foo)
1571        support.unlink(self.bar)
1572
1573    def test_add_twice(self):
1574        # The same name will be added as a REGTYPE every
1575        # time regardless of st_nlink.
1576        tarinfo = self.tar.gettarinfo(self.foo)
1577        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1578                "add file as regular failed")
1579
1580    def test_add_hardlink(self):
1581        tarinfo = self.tar.gettarinfo(self.bar)
1582        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1583                "add file as hardlink failed")
1584
1585    def test_dereference_hardlink(self):
1586        self.tar.dereference = True
1587        tarinfo = self.tar.gettarinfo(self.bar)
1588        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1589                "dereferencing hardlink failed")
1590
1591
1592class PaxWriteTest(GNUWriteTest):
1593
1594    def _test(self, name, link=None):
1595        # See GNUWriteTest.
1596        tarinfo = tarfile.TarInfo(name)
1597        if link:
1598            tarinfo.linkname = link
1599            tarinfo.type = tarfile.LNKTYPE
1600
1601        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1602        try:
1603            tar.addfile(tarinfo)
1604        finally:
1605            tar.close()
1606
1607        tar = tarfile.open(tmpname)
1608        try:
1609            if link:
1610                l = tar.getmembers()[0].linkname
1611                self.assertEqual(link, l, "PAX longlink creation failed")
1612            else:
1613                n = tar.getmembers()[0].name
1614                self.assertEqual(name, n, "PAX longname creation failed")
1615        finally:
1616            tar.close()
1617
1618    def test_pax_global_header(self):
1619        pax_headers = {
1620                "foo": "bar",
1621                "uid": "0",
1622                "mtime": "1.23",
1623                "test": "\xe4\xf6\xfc",
1624                "\xe4\xf6\xfc": "test"}
1625
1626        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1627                pax_headers=pax_headers)
1628        try:
1629            tar.addfile(tarfile.TarInfo("test"))
1630        finally:
1631            tar.close()
1632
1633        # Test if the global header was written correctly.
1634        tar = tarfile.open(tmpname, encoding="iso8859-1")
1635        try:
1636            self.assertEqual(tar.pax_headers, pax_headers)
1637            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1638            # Test if all the fields are strings.
1639            for key, val in tar.pax_headers.items():
1640                self.assertIsNot(type(key), bytes)
1641                self.assertIsNot(type(val), bytes)
1642                if key in tarfile.PAX_NUMBER_FIELDS:
1643                    try:
1644                        tarfile.PAX_NUMBER_FIELDS[key](val)
1645                    except (TypeError, ValueError):
1646                        self.fail("unable to convert pax header field")
1647        finally:
1648            tar.close()
1649
1650    def test_pax_extended_header(self):
1651        # The fields from the pax header have priority over the
1652        # TarInfo.
1653        pax_headers = {"path": "foo", "uid": "123"}
1654
1655        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1656                           encoding="iso8859-1")
1657        try:
1658            t = tarfile.TarInfo()
1659            t.name = "\xe4\xf6\xfc" # non-ASCII
1660            t.uid = 8**8 # too large
1661            t.pax_headers = pax_headers
1662            tar.addfile(t)
1663        finally:
1664            tar.close()
1665
1666        tar = tarfile.open(tmpname, encoding="iso8859-1")
1667        try:
1668            t = tar.getmembers()[0]
1669            self.assertEqual(t.pax_headers, pax_headers)
1670            self.assertEqual(t.name, "foo")
1671            self.assertEqual(t.uid, 123)
1672        finally:
1673            tar.close()
1674
1675
1676class UnicodeTest:
1677
1678    def test_iso8859_1_filename(self):
1679        self._test_unicode_filename("iso8859-1")
1680
1681    def test_utf7_filename(self):
1682        self._test_unicode_filename("utf7")
1683
1684    def test_utf8_filename(self):
1685        self._test_unicode_filename("utf-8")
1686
1687    def _test_unicode_filename(self, encoding):
1688        tar = tarfile.open(tmpname, "w", format=self.format,
1689                           encoding=encoding, errors="strict")
1690        try:
1691            name = "\xe4\xf6\xfc"
1692            tar.addfile(tarfile.TarInfo(name))
1693        finally:
1694            tar.close()
1695
1696        tar = tarfile.open(tmpname, encoding=encoding)
1697        try:
1698            self.assertEqual(tar.getmembers()[0].name, name)
1699        finally:
1700            tar.close()
1701
1702    def test_unicode_filename_error(self):
1703        tar = tarfile.open(tmpname, "w", format=self.format,
1704                           encoding="ascii", errors="strict")
1705        try:
1706            tarinfo = tarfile.TarInfo()
1707
1708            tarinfo.name = "\xe4\xf6\xfc"
1709            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1710
1711            tarinfo.name = "foo"
1712            tarinfo.uname = "\xe4\xf6\xfc"
1713            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1714        finally:
1715            tar.close()
1716
1717    def test_unicode_argument(self):
1718        tar = tarfile.open(tarname, "r",
1719                           encoding="iso8859-1", errors="strict")
1720        try:
1721            for t in tar:
1722                self.assertIs(type(t.name), str)
1723                self.assertIs(type(t.linkname), str)
1724                self.assertIs(type(t.uname), str)
1725                self.assertIs(type(t.gname), str)
1726        finally:
1727            tar.close()
1728
1729    def test_uname_unicode(self):
1730        t = tarfile.TarInfo("foo")
1731        t.uname = "\xe4\xf6\xfc"
1732        t.gname = "\xe4\xf6\xfc"
1733
1734        tar = tarfile.open(tmpname, mode="w", format=self.format,
1735                           encoding="iso8859-1")
1736        try:
1737            tar.addfile(t)
1738        finally:
1739            tar.close()
1740
1741        tar = tarfile.open(tmpname, encoding="iso8859-1")
1742        try:
1743            t = tar.getmember("foo")
1744            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1745            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1746
1747            if self.format != tarfile.PAX_FORMAT:
1748                tar.close()
1749                tar = tarfile.open(tmpname, encoding="ascii")
1750                t = tar.getmember("foo")
1751                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1752                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1753        finally:
1754            tar.close()
1755
1756
1757class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1758
1759    format = tarfile.USTAR_FORMAT
1760
1761    # Test whether the utf-8 encoded version of a filename exceeds the 100
1762    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1763    # bytes).
1764    def test_unicode_name1(self):
1765        self._test_ustar_name("0123456789" * 10)
1766        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1767        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1768        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1769
1770    def test_unicode_name2(self):
1771        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1772        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1773
1774    # Test whether the utf-8 encoded version of a filename exceeds the 155
1775    # bytes prefix + '/' + 100 bytes name limit.
1776    def test_unicode_longname1(self):
1777        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1778        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1779        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1780        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1781
1782    def test_unicode_longname2(self):
1783        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1784        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1785
1786    def test_unicode_longname3(self):
1787        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1788        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1789        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1790
1791    def test_unicode_longname4(self):
1792        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
1793        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
1794
1795    def _test_ustar_name(self, name, exc=None):
1796        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1797            t = tarfile.TarInfo(name)
1798            if exc is None:
1799                tar.addfile(t)
1800            else:
1801                self.assertRaises(exc, tar.addfile, t)
1802
1803        if exc is None:
1804            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1805                for t in tar:
1806                    self.assertEqual(name, t.name)
1807                    break
1808
1809    # Test the same as above for the 100 bytes link field.
1810    def test_unicode_link1(self):
1811        self._test_ustar_link("0123456789" * 10)
1812        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
1813        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
1814        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
1815
1816    def test_unicode_link2(self):
1817        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
1818        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
1819
1820    def _test_ustar_link(self, name, exc=None):
1821        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1822            t = tarfile.TarInfo("foo")
1823            t.linkname = name
1824            if exc is None:
1825                tar.addfile(t)
1826            else:
1827                self.assertRaises(exc, tar.addfile, t)
1828
1829        if exc is None:
1830            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1831                for t in tar:
1832                    self.assertEqual(name, t.linkname)
1833                    break
1834
1835
1836class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
1837
1838    format = tarfile.GNU_FORMAT
1839
1840    def test_bad_pax_header(self):
1841        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1842        # without a hdrcharset=BINARY header.
1843        for encoding, name in (
1844                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1845                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1846            with tarfile.open(tarname, encoding=encoding,
1847                              errors="surrogateescape") as tar:
1848                try:
1849                    t = tar.getmember(name)
1850                except KeyError:
1851                    self.fail("unable to read bad GNU tar pax header")
1852
1853
1854class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
1855
1856    format = tarfile.PAX_FORMAT
1857
1858    # PAX_FORMAT ignores encoding in write mode.
1859    test_unicode_filename_error = None
1860
1861    def test_binary_header(self):
1862        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1863        for encoding, name in (
1864                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1865                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1866            with tarfile.open(tarname, encoding=encoding,
1867                              errors="surrogateescape") as tar:
1868                try:
1869                    t = tar.getmember(name)
1870                except KeyError:
1871                    self.fail("unable to read POSIX.1-2008 binary header")
1872
1873
1874class AppendTestBase:
1875    # Test append mode (cp. patch #1652681).
1876
1877    def setUp(self):
1878        self.tarname = tmpname
1879        if os.path.exists(self.tarname):
1880            support.unlink(self.tarname)
1881
1882    def _create_testtar(self, mode="w:"):
1883        with tarfile.open(tarname, encoding="iso8859-1") as src:
1884            t = src.getmember("ustar/regtype")
1885            t.name = "foo"
1886            with src.extractfile(t) as f:
1887                with tarfile.open(self.tarname, mode) as tar:
1888                    tar.addfile(t, f)
1889
1890    def test_append_compressed(self):
1891        self._create_testtar("w:" + self.suffix)
1892        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1893
1894class AppendTest(AppendTestBase, unittest.TestCase):
1895    test_append_compressed = None
1896
1897    def _add_testfile(self, fileobj=None):
1898        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1899            tar.addfile(tarfile.TarInfo("bar"))
1900
1901    def _test(self, names=["bar"], fileobj=None):
1902        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1903            self.assertEqual(tar.getnames(), names)
1904
1905    def test_non_existing(self):
1906        self._add_testfile()
1907        self._test()
1908
1909    def test_empty(self):
1910        tarfile.open(self.tarname, "w:").close()
1911        self._add_testfile()
1912        self._test()
1913
1914    def test_empty_fileobj(self):
1915        fobj = io.BytesIO(b"\0" * 1024)
1916        self._add_testfile(fobj)
1917        fobj.seek(0)
1918        self._test(fileobj=fobj)
1919
1920    def test_fileobj(self):
1921        self._create_testtar()
1922        with open(self.tarname, "rb") as fobj:
1923            data = fobj.read()
1924        fobj = io.BytesIO(data)
1925        self._add_testfile(fobj)
1926        fobj.seek(0)
1927        self._test(names=["foo", "bar"], fileobj=fobj)
1928
1929    def test_existing(self):
1930        self._create_testtar()
1931        self._add_testfile()
1932        self._test(names=["foo", "bar"])
1933
1934    # Append mode is supposed to fail if the tarfile to append to
1935    # does not end with a zero block.
1936    def _test_error(self, data):
1937        with open(self.tarname, "wb") as fobj:
1938            fobj.write(data)
1939        self.assertRaises(tarfile.ReadError, self._add_testfile)
1940
1941    def test_null(self):
1942        self._test_error(b"")
1943
1944    def test_incomplete(self):
1945        self._test_error(b"\0" * 13)
1946
1947    def test_premature_eof(self):
1948        data = tarfile.TarInfo("foo").tobuf()
1949        self._test_error(data)
1950
1951    def test_trailing_garbage(self):
1952        data = tarfile.TarInfo("foo").tobuf()
1953        self._test_error(data + b"\0" * 13)
1954
1955    def test_invalid(self):
1956        self._test_error(b"a" * 512)
1957
1958class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
1959    pass
1960
1961class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
1962    pass
1963
1964class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
1965    pass
1966
1967
1968class LimitsTest(unittest.TestCase):
1969
1970    def test_ustar_limits(self):
1971        # 100 char name
1972        tarinfo = tarfile.TarInfo("0123456789" * 10)
1973        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1974
1975        # 101 char name that cannot be stored
1976        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1977        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1978
1979        # 256 char name with a slash at pos 156
1980        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1981        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1982
1983        # 256 char name that cannot be stored
1984        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1985        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1986
1987        # 512 char name
1988        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1989        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1990
1991        # 512 char linkname
1992        tarinfo = tarfile.TarInfo("longlink")
1993        tarinfo.linkname = "123/" * 126 + "longname"
1994        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1995
1996        # uid > 8 digits
1997        tarinfo = tarfile.TarInfo("name")
1998        tarinfo.uid = 0o10000000
1999        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2000
2001    def test_gnu_limits(self):
2002        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2003        tarinfo.tobuf(tarfile.GNU_FORMAT)
2004
2005        tarinfo = tarfile.TarInfo("longlink")
2006        tarinfo.linkname = "123/" * 126 + "longname"
2007        tarinfo.tobuf(tarfile.GNU_FORMAT)
2008
2009        # uid >= 256 ** 7
2010        tarinfo = tarfile.TarInfo("name")
2011        tarinfo.uid = 0o4000000000000000000
2012        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2013
2014    def test_pax_limits(self):
2015        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2016        tarinfo.tobuf(tarfile.PAX_FORMAT)
2017
2018        tarinfo = tarfile.TarInfo("longlink")
2019        tarinfo.linkname = "123/" * 126 + "longname"
2020        tarinfo.tobuf(tarfile.PAX_FORMAT)
2021
2022        tarinfo = tarfile.TarInfo("name")
2023        tarinfo.uid = 0o4000000000000000000
2024        tarinfo.tobuf(tarfile.PAX_FORMAT)
2025
2026
2027class MiscTest(unittest.TestCase):
2028
2029    def test_char_fields(self):
2030        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2031                         b"foo\0\0\0\0\0")
2032        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2033                         b"foo")
2034        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2035                         "foo")
2036        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2037                         "foo")
2038
2039    def test_read_number_fields(self):
2040        # Issue 13158: Test if GNU tar specific base-256 number fields
2041        # are decoded correctly.
2042        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2043        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2044        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2045                         0o10000000)
2046        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2047                         0xffffffff)
2048        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2049                         -1)
2050        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2051                         -100)
2052        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2053                         -0x100000000000000)
2054
2055        # Issue 24514: Test if empty number fields are converted to zero.
2056        self.assertEqual(tarfile.nti(b"\0"), 0)
2057        self.assertEqual(tarfile.nti(b"       \0"), 0)
2058
2059    def test_write_number_fields(self):
2060        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2061        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2062        self.assertEqual(tarfile.itn(0o10000000),
2063                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2064        self.assertEqual(tarfile.itn(0xffffffff),
2065                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2066        self.assertEqual(tarfile.itn(-1),
2067                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2068        self.assertEqual(tarfile.itn(-100),
2069                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2070        self.assertEqual(tarfile.itn(-0x100000000000000),
2071                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2072
2073    def test_number_field_limits(self):
2074        with self.assertRaises(ValueError):
2075            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2076        with self.assertRaises(ValueError):
2077            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2078        with self.assertRaises(ValueError):
2079            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2080        with self.assertRaises(ValueError):
2081            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2082
2083    def test__all__(self):
2084        blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2085                     'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2086                     'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2087                     'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2088                     'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2089                     'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2090                     'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2091                     'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2092                     'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2093                     'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2094                     'filemode',
2095                     'EmptyHeaderError', 'TruncatedHeaderError',
2096                     'EOFHeaderError', 'InvalidHeaderError',
2097                     'SubsequentHeaderError', 'ExFileObject',
2098                     'main'}
2099        support.check__all__(self, tarfile, blacklist=blacklist)
2100
2101
2102class CommandLineTest(unittest.TestCase):
2103
2104    def tarfilecmd(self, *args, **kwargs):
2105        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2106                                                      **kwargs)
2107        return out.replace(os.linesep.encode(), b'\n')
2108
2109    def tarfilecmd_failure(self, *args):
2110        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2111
2112    def make_simple_tarfile(self, tar_name):
2113        files = [support.findfile('tokenize_tests.txt'),
2114                 support.findfile('tokenize_tests-no-coding-cookie-'
2115                                  'and-utf8-bom-sig-only.txt')]
2116        self.addCleanup(support.unlink, tar_name)
2117        with tarfile.open(tar_name, 'w') as tf:
2118            for tardata in files:
2119                tf.add(tardata, arcname=os.path.basename(tardata))
2120
2121    def test_test_command(self):
2122        for tar_name in testtarnames:
2123            for opt in '-t', '--test':
2124                out = self.tarfilecmd(opt, tar_name)
2125                self.assertEqual(out, b'')
2126
2127    def test_test_command_verbose(self):
2128        for tar_name in testtarnames:
2129            for opt in '-v', '--verbose':
2130                out = self.tarfilecmd(opt, '-t', tar_name)
2131                self.assertIn(b'is a tar archive.\n', out)
2132
2133    def test_test_command_invalid_file(self):
2134        zipname = support.findfile('zipdir.zip')
2135        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2136        self.assertIn(b' is not a tar archive.', err)
2137        self.assertEqual(out, b'')
2138        self.assertEqual(rc, 1)
2139
2140        for tar_name in testtarnames:
2141            with self.subTest(tar_name=tar_name):
2142                with open(tar_name, 'rb') as f:
2143                    data = f.read()
2144                try:
2145                    with open(tmpname, 'wb') as f:
2146                        f.write(data[:511])
2147                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2148                    self.assertEqual(out, b'')
2149                    self.assertEqual(rc, 1)
2150                finally:
2151                    support.unlink(tmpname)
2152
2153    def test_list_command(self):
2154        for tar_name in testtarnames:
2155            with support.captured_stdout() as t:
2156                with tarfile.open(tar_name, 'r') as tf:
2157                    tf.list(verbose=False)
2158            expected = t.getvalue().encode('ascii', 'backslashreplace')
2159            for opt in '-l', '--list':
2160                out = self.tarfilecmd(opt, tar_name,
2161                                      PYTHONIOENCODING='ascii')
2162                self.assertEqual(out, expected)
2163
2164    def test_list_command_verbose(self):
2165        for tar_name in testtarnames:
2166            with support.captured_stdout() as t:
2167                with tarfile.open(tar_name, 'r') as tf:
2168                    tf.list(verbose=True)
2169            expected = t.getvalue().encode('ascii', 'backslashreplace')
2170            for opt in '-v', '--verbose':
2171                out = self.tarfilecmd(opt, '-l', tar_name,
2172                                      PYTHONIOENCODING='ascii')
2173                self.assertEqual(out, expected)
2174
2175    def test_list_command_invalid_file(self):
2176        zipname = support.findfile('zipdir.zip')
2177        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2178        self.assertIn(b' is not a tar archive.', err)
2179        self.assertEqual(out, b'')
2180        self.assertEqual(rc, 1)
2181
2182    def test_create_command(self):
2183        files = [support.findfile('tokenize_tests.txt'),
2184                 support.findfile('tokenize_tests-no-coding-cookie-'
2185                                  'and-utf8-bom-sig-only.txt')]
2186        for opt in '-c', '--create':
2187            try:
2188                out = self.tarfilecmd(opt, tmpname, *files)
2189                self.assertEqual(out, b'')
2190                with tarfile.open(tmpname) as tar:
2191                    tar.getmembers()
2192            finally:
2193                support.unlink(tmpname)
2194
2195    def test_create_command_verbose(self):
2196        files = [support.findfile('tokenize_tests.txt'),
2197                 support.findfile('tokenize_tests-no-coding-cookie-'
2198                                  'and-utf8-bom-sig-only.txt')]
2199        for opt in '-v', '--verbose':
2200            try:
2201                out = self.tarfilecmd(opt, '-c', tmpname, *files)
2202                self.assertIn(b' file created.', out)
2203                with tarfile.open(tmpname) as tar:
2204                    tar.getmembers()
2205            finally:
2206                support.unlink(tmpname)
2207
2208    def test_create_command_dotless_filename(self):
2209        files = [support.findfile('tokenize_tests.txt')]
2210        try:
2211            out = self.tarfilecmd('-c', dotlessname, *files)
2212            self.assertEqual(out, b'')
2213            with tarfile.open(dotlessname) as tar:
2214                tar.getmembers()
2215        finally:
2216            support.unlink(dotlessname)
2217
2218    def test_create_command_dot_started_filename(self):
2219        tar_name = os.path.join(TEMPDIR, ".testtar")
2220        files = [support.findfile('tokenize_tests.txt')]
2221        try:
2222            out = self.tarfilecmd('-c', tar_name, *files)
2223            self.assertEqual(out, b'')
2224            with tarfile.open(tar_name) as tar:
2225                tar.getmembers()
2226        finally:
2227            support.unlink(tar_name)
2228
2229    def test_create_command_compressed(self):
2230        files = [support.findfile('tokenize_tests.txt'),
2231                 support.findfile('tokenize_tests-no-coding-cookie-'
2232                                  'and-utf8-bom-sig-only.txt')]
2233        for filetype in (GzipTest, Bz2Test, LzmaTest):
2234            if not filetype.open:
2235                continue
2236            try:
2237                tar_name = tmpname + '.' + filetype.suffix
2238                out = self.tarfilecmd('-c', tar_name, *files)
2239                with filetype.taropen(tar_name) as tar:
2240                    tar.getmembers()
2241            finally:
2242                support.unlink(tar_name)
2243
2244    def test_extract_command(self):
2245        self.make_simple_tarfile(tmpname)
2246        for opt in '-e', '--extract':
2247            try:
2248                with support.temp_cwd(tarextdir):
2249                    out = self.tarfilecmd(opt, tmpname)
2250                self.assertEqual(out, b'')
2251            finally:
2252                support.rmtree(tarextdir)
2253
2254    def test_extract_command_verbose(self):
2255        self.make_simple_tarfile(tmpname)
2256        for opt in '-v', '--verbose':
2257            try:
2258                with support.temp_cwd(tarextdir):
2259                    out = self.tarfilecmd(opt, '-e', tmpname)
2260                self.assertIn(b' file is extracted.', out)
2261            finally:
2262                support.rmtree(tarextdir)
2263
2264    def test_extract_command_different_directory(self):
2265        self.make_simple_tarfile(tmpname)
2266        try:
2267            with support.temp_cwd(tarextdir):
2268                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2269            self.assertEqual(out, b'')
2270        finally:
2271            support.rmtree(tarextdir)
2272
2273    def test_extract_command_invalid_file(self):
2274        zipname = support.findfile('zipdir.zip')
2275        with support.temp_cwd(tarextdir):
2276            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2277        self.assertIn(b' is not a tar archive.', err)
2278        self.assertEqual(out, b'')
2279        self.assertEqual(rc, 1)
2280
2281
2282class ContextManagerTest(unittest.TestCase):
2283
2284    def test_basic(self):
2285        with tarfile.open(tarname) as tar:
2286            self.assertFalse(tar.closed, "closed inside runtime context")
2287        self.assertTrue(tar.closed, "context manager failed")
2288
2289    def test_closed(self):
2290        # The __enter__() method is supposed to raise OSError
2291        # if the TarFile object is already closed.
2292        tar = tarfile.open(tarname)
2293        tar.close()
2294        with self.assertRaises(OSError):
2295            with tar:
2296                pass
2297
2298    def test_exception(self):
2299        # Test if the OSError exception is passed through properly.
2300        with self.assertRaises(Exception) as exc:
2301            with tarfile.open(tarname) as tar:
2302                raise OSError
2303        self.assertIsInstance(exc.exception, OSError,
2304                              "wrong exception raised in context manager")
2305        self.assertTrue(tar.closed, "context manager failed")
2306
2307    def test_no_eof(self):
2308        # __exit__() must not write end-of-archive blocks if an
2309        # exception was raised.
2310        try:
2311            with tarfile.open(tmpname, "w") as tar:
2312                raise Exception
2313        except:
2314            pass
2315        self.assertEqual(os.path.getsize(tmpname), 0,
2316                "context manager wrote an end-of-archive block")
2317        self.assertTrue(tar.closed, "context manager failed")
2318
2319    def test_eof(self):
2320        # __exit__() must write end-of-archive blocks, i.e. call
2321        # TarFile.close() if there was no error.
2322        with tarfile.open(tmpname, "w"):
2323            pass
2324        self.assertNotEqual(os.path.getsize(tmpname), 0,
2325                "context manager wrote no end-of-archive block")
2326
2327    def test_fileobj(self):
2328        # Test that __exit__() did not close the external file
2329        # object.
2330        with open(tmpname, "wb") as fobj:
2331            try:
2332                with tarfile.open(fileobj=fobj, mode="w") as tar:
2333                    raise Exception
2334            except:
2335                pass
2336            self.assertFalse(fobj.closed, "external file object was closed")
2337            self.assertTrue(tar.closed, "context manager failed")
2338
2339
2340@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2341class LinkEmulationTest(ReadTest, unittest.TestCase):
2342
2343    # Test for issue #8741 regression. On platforms that do not support
2344    # symbolic or hard links tarfile tries to extract these types of members
2345    # as the regular files they point to.
2346    def _test_link_extraction(self, name):
2347        self.tar.extract(name, TEMPDIR)
2348        with open(os.path.join(TEMPDIR, name), "rb") as f:
2349            data = f.read()
2350        self.assertEqual(md5sum(data), md5_regtype)
2351
2352    # See issues #1578269, #8879, and #17689 for some history on these skips
2353    @unittest.skipIf(hasattr(os.path, "islink"),
2354                     "Skip emulation - has os.path.islink but not os.link")
2355    def test_hardlink_extraction1(self):
2356        self._test_link_extraction("ustar/lnktype")
2357
2358    @unittest.skipIf(hasattr(os.path, "islink"),
2359                     "Skip emulation - has os.path.islink but not os.link")
2360    def test_hardlink_extraction2(self):
2361        self._test_link_extraction("./ustar/linktest2/lnktype")
2362
2363    @unittest.skipIf(hasattr(os, "symlink"),
2364                     "Skip emulation if symlink exists")
2365    def test_symlink_extraction1(self):
2366        self._test_link_extraction("ustar/symtype")
2367
2368    @unittest.skipIf(hasattr(os, "symlink"),
2369                     "Skip emulation if symlink exists")
2370    def test_symlink_extraction2(self):
2371        self._test_link_extraction("./ustar/linktest2/symtype")
2372
2373
2374class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2375    # Issue5068: The _BZ2Proxy.read() method loops forever
2376    # on an empty or partial bzipped file.
2377
2378    def _test_partial_input(self, mode):
2379        class MyBytesIO(io.BytesIO):
2380            hit_eof = False
2381            def read(self, n):
2382                if self.hit_eof:
2383                    raise AssertionError("infinite loop detected in "
2384                                         "tarfile.open()")
2385                self.hit_eof = self.tell() == len(self.getvalue())
2386                return super(MyBytesIO, self).read(n)
2387            def seek(self, *args):
2388                self.hit_eof = False
2389                return super(MyBytesIO, self).seek(*args)
2390
2391        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2392        for x in range(len(data) + 1):
2393            try:
2394                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2395            except tarfile.ReadError:
2396                pass # we have no interest in ReadErrors
2397
2398    def test_partial_input(self):
2399        self._test_partial_input("r")
2400
2401    def test_partial_input_bz2(self):
2402        self._test_partial_input("r:bz2")
2403
2404
2405def root_is_uid_gid_0():
2406    try:
2407        import pwd, grp
2408    except ImportError:
2409        return False
2410    if pwd.getpwuid(0)[0] != 'root':
2411        return False
2412    if grp.getgrgid(0)[0] != 'root':
2413        return False
2414    return True
2415
2416
2417@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2418@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2419class NumericOwnerTest(unittest.TestCase):
2420    # mock the following:
2421    #  os.chown: so we can test what's being called
2422    #  os.chmod: so the modes are not actually changed. if they are, we can't
2423    #             delete the files/directories
2424    #  os.geteuid: so we can lie and say we're root (uid = 0)
2425
2426    @staticmethod
2427    def _make_test_archive(filename_1, dirname_1, filename_2):
2428        # the file contents to write
2429        fobj = io.BytesIO(b"content")
2430
2431        # create a tar file with a file, a directory, and a file within that
2432        #  directory. Assign various .uid/.gid values to them
2433        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2434                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2435                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2436                 ]
2437        with tarfile.open(tmpname, 'w') as tarfl:
2438            for name, uid, gid, typ, contents in items:
2439                t = tarfile.TarInfo(name)
2440                t.uid = uid
2441                t.gid = gid
2442                t.uname = 'root'
2443                t.gname = 'root'
2444                t.type = typ
2445                tarfl.addfile(t, contents)
2446
2447        # return the full pathname to the tar file
2448        return tmpname
2449
2450    @staticmethod
2451    @contextmanager
2452    def _setup_test(mock_geteuid):
2453        mock_geteuid.return_value = 0  # lie and say we're root
2454        fname = 'numeric-owner-testfile'
2455        dirname = 'dir'
2456
2457        # the names we want stored in the tarfile
2458        filename_1 = fname
2459        dirname_1 = dirname
2460        filename_2 = os.path.join(dirname, fname)
2461
2462        # create the tarfile with the contents we're after
2463        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2464                                                           dirname_1,
2465                                                           filename_2)
2466
2467        # open the tarfile for reading. yield it and the names of the items
2468        #  we stored into the file
2469        with tarfile.open(tar_filename) as tarfl:
2470            yield tarfl, filename_1, dirname_1, filename_2
2471
2472    @unittest.mock.patch('os.chown')
2473    @unittest.mock.patch('os.chmod')
2474    @unittest.mock.patch('os.geteuid')
2475    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2476                                        mock_chown):
2477        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2478                                                filename_2):
2479            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2480            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2481
2482        # convert to filesystem paths
2483        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2484        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2485
2486        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2487                                     unittest.mock.call(f_filename_2, 88, 87),
2488                                     ],
2489                                    any_order=True)
2490
2491    @unittest.mock.patch('os.chown')
2492    @unittest.mock.patch('os.chmod')
2493    @unittest.mock.patch('os.geteuid')
2494    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2495                                           mock_chown):
2496        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2497                                                filename_2):
2498            tarfl.extractall(TEMPDIR, numeric_owner=True)
2499
2500        # convert to filesystem paths
2501        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2502        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2503        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2504
2505        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2506                                     unittest.mock.call(f_dirname_1, 77, 76),
2507                                     unittest.mock.call(f_filename_2, 88, 87),
2508                                     ],
2509                                    any_order=True)
2510
2511    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2512    #  because the uname and gname in the test file are 'root', and extract()
2513    #  will look them up using pwd and grp to find their uid and gid, which we
2514    #  test here to be 0.
2515    @unittest.skipUnless(root_is_uid_gid_0(),
2516                         'uid=0,gid=0 must be named "root"')
2517    @unittest.mock.patch('os.chown')
2518    @unittest.mock.patch('os.chmod')
2519    @unittest.mock.patch('os.geteuid')
2520    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2521                                           mock_chown):
2522        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2523            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2524
2525        # convert to filesystem paths
2526        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2527
2528        mock_chown.assert_called_with(f_filename_1, 0, 0)
2529
2530    @unittest.mock.patch('os.geteuid')
2531    def test_keyword_only(self, mock_geteuid):
2532        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2533            self.assertRaises(TypeError,
2534                              tarfl.extract, filename_1, TEMPDIR, False, True)
2535
2536
2537def setUpModule():
2538    support.unlink(TEMPDIR)
2539    os.makedirs(TEMPDIR)
2540
2541    global testtarnames
2542    testtarnames = [tarname]
2543    with open(tarname, "rb") as fobj:
2544        data = fobj.read()
2545
2546    # Create compressed tarfiles.
2547    for c in GzipTest, Bz2Test, LzmaTest:
2548        if c.open:
2549            support.unlink(c.tarname)
2550            testtarnames.append(c.tarname)
2551            with c.open(c.tarname, "wb") as tar:
2552                tar.write(data)
2553
2554def tearDownModule():
2555    if os.path.exists(TEMPDIR):
2556        support.rmtree(TEMPDIR)
2557
2558if __name__ == "__main__":
2559    unittest.main()
2560