1import sys
2import os
3import shutil
4import StringIO
5from binascii import unhexlify
6from hashlib import md5
7from random import Random
8import errno
9
10import unittest
11import tarfile
12
13from test import test_support
14from test import test_support as support
15
16# Check for our compression modules.
17try:
18    import gzip
19    gzip.GzipFile
20except (ImportError, AttributeError):
21    gzip = None
22try:
23    import bz2
24except ImportError:
25    bz2 = None
26
27def md5sum(data):
28    return md5(data).hexdigest()
29
30TEMPDIR = os.path.abspath(test_support.TESTFN)
31tarname = test_support.findfile("testtar.tar")
32gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
33bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
34tmpname = os.path.join(TEMPDIR, "tmp.tar")
35
36md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
37md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
38
39
40class ReadTest(unittest.TestCase):
41
42    tarname = tarname
43    mode = "r:"
44
45    def setUp(self):
46        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
47
48    def tearDown(self):
49        self.tar.close()
50
51
52class UstarReadTest(ReadTest):
53
54    def test_fileobj_regular_file(self):
55        tarinfo = self.tar.getmember("ustar/regtype")
56        fobj = self.tar.extractfile(tarinfo)
57        data = fobj.read()
58        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
59                "regular file extraction failed")
60
61    def test_fileobj_readlines(self):
62        self.tar.extract("ustar/regtype", TEMPDIR)
63        tarinfo = self.tar.getmember("ustar/regtype")
64        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
65        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
66            lines1 = fobj1.readlines()
67        fobj2 = self.tar.extractfile(tarinfo)
68
69        lines2 = fobj2.readlines()
70        self.assertTrue(lines1 == lines2,
71                "fileobj.readlines() failed")
72        self.assertTrue(len(lines2) == 114,
73                "fileobj.readlines() failed")
74        self.assertTrue(lines2[83] ==
75                "I will gladly admit that Python is not the fastest running scripting language.\n",
76                "fileobj.readlines() failed")
77
78    def test_fileobj_iter(self):
79        self.tar.extract("ustar/regtype", TEMPDIR)
80        tarinfo = self.tar.getmember("ustar/regtype")
81        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
82            lines1 = fobj1.readlines()
83        fobj2 = self.tar.extractfile(tarinfo)
84        lines2 = [line for line in fobj2]
85        self.assertTrue(lines1 == lines2,
86                     "fileobj.__iter__() failed")
87
88    def test_fileobj_seek(self):
89        self.tar.extract("ustar/regtype", TEMPDIR)
90        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
91            data = fobj.read()
92
93        tarinfo = self.tar.getmember("ustar/regtype")
94        fobj = self.tar.extractfile(tarinfo)
95
96        text = fobj.read()
97        fobj.seek(0)
98        self.assertTrue(0 == fobj.tell(),
99                     "seek() to file's start failed")
100        fobj.seek(2048, 0)
101        self.assertTrue(2048 == fobj.tell(),
102                     "seek() to absolute position failed")
103        fobj.seek(-1024, 1)
104        self.assertTrue(1024 == fobj.tell(),
105                     "seek() to negative relative position failed")
106        fobj.seek(1024, 1)
107        self.assertTrue(2048 == fobj.tell(),
108                     "seek() to positive relative position failed")
109        s = fobj.read(10)
110        self.assertTrue(s == data[2048:2058],
111                     "read() after seek failed")
112        fobj.seek(0, 2)
113        self.assertTrue(tarinfo.size == fobj.tell(),
114                     "seek() to file's end failed")
115        self.assertTrue(fobj.read() == "",
116                     "read() at file's end did not return empty string")
117        fobj.seek(-tarinfo.size, 2)
118        self.assertTrue(0 == fobj.tell(),
119                     "relative seek() to file's start failed")
120        fobj.seek(512)
121        s1 = fobj.readlines()
122        fobj.seek(512)
123        s2 = fobj.readlines()
124        self.assertTrue(s1 == s2,
125                     "readlines() after seek failed")
126        fobj.seek(0)
127        self.assertTrue(len(fobj.readline()) == fobj.tell(),
128                     "tell() after readline() failed")
129        fobj.seek(512)
130        self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
131                     "tell() after seek() and readline() failed")
132        fobj.seek(0)
133        line = fobj.readline()
134        self.assertTrue(fobj.read() == data[len(line):],
135                     "read() after readline() failed")
136        fobj.close()
137
138    # Test if symbolic and hard links are resolved by extractfile().  The
139    # test link members each point to a regular member whose data is
140    # supposed to be exported.
141    def _test_fileobj_link(self, lnktype, regtype):
142        a = self.tar.extractfile(lnktype)
143        b = self.tar.extractfile(regtype)
144        self.assertEqual(a.name, b.name)
145
146    def test_fileobj_link1(self):
147        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
148
149    def test_fileobj_link2(self):
150        self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
151
152    def test_fileobj_symlink1(self):
153        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
154
155    def test_fileobj_symlink2(self):
156        self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
157
158    def test_issue14160(self):
159        self._test_fileobj_link("symtype2", "ustar/regtype")
160
161
162class ListTest(ReadTest, unittest.TestCase):
163
164    # Override setUp to use default encoding (UTF-8)
165    def setUp(self):
166        self.tar = tarfile.open(self.tarname, mode=self.mode)
167
168    def test_list(self):
169        with test_support.captured_stdout() as t:
170            self.tar.list(verbose=False)
171        out = t.getvalue()
172        self.assertIn('ustar/conttype', out)
173        self.assertIn('ustar/regtype', out)
174        self.assertIn('ustar/lnktype', out)
175        self.assertIn('ustar' + ('/12345' * 40) + '67/longname', out)
176        self.assertIn('./ustar/linktest2/symtype', out)
177        self.assertIn('./ustar/linktest2/lnktype', out)
178        # Make sure it puts trailing slash for directory
179        self.assertIn('ustar/dirtype/', out)
180        self.assertIn('ustar/dirtype-with-size/', out)
181        # Make sure it is able to print non-ASCII characters
182        self.assertIn('ustar/umlauts-'
183                      '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
184        self.assertIn('misc/regtype-hpux-signed-chksum-'
185                      '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
186        self.assertIn('misc/regtype-old-v7-signed-chksum-'
187                      '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
188        # Make sure it prints files separated by one newline without any
189        # 'ls -l'-like accessories if verbose flag is not being used
190        # ...
191        # ustar/conttype
192        # ustar/regtype
193        # ...
194        self.assertRegexpMatches(out, r'ustar/conttype ?\r?\n'
195                                      r'ustar/regtype ?\r?\n')
196        # Make sure it does not print the source of link without verbose flag
197        self.assertNotIn('link to', out)
198        self.assertNotIn('->', out)
199
200    def test_list_verbose(self):
201        with test_support.captured_stdout() as t:
202            self.tar.list(verbose=True)
203        out = t.getvalue()
204        # Make sure it prints files separated by one newline with 'ls -l'-like
205        # accessories if verbose flag is being used
206        # ...
207        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
208        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
209        # ...
210        self.assertRegexpMatches(out, (r'-rw-r--r-- tarfile/tarfile\s+7011 '
211                                       r'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
212                                       r'ustar/\w+type ?\r?\n') * 2)
213        # Make sure it prints the source of link with verbose flag
214        self.assertIn('ustar/symtype -> regtype', out)
215        self.assertIn('./ustar/linktest2/symtype -> ../linktest1/regtype', out)
216        self.assertIn('./ustar/linktest2/lnktype link to '
217                      './ustar/linktest1/regtype', out)
218        self.assertIn('gnu' + ('/123' * 125) + '/longlink link to gnu' +
219                      ('/123' * 125) + '/longname', out)
220        self.assertIn('pax' + ('/123' * 125) + '/longlink link to pax' +
221                      ('/123' * 125) + '/longname', out)
222
223
224class GzipListTest(ListTest):
225    tarname = gzipname
226    mode = "r:gz"
227    taropen = tarfile.TarFile.gzopen
228
229
230class Bz2ListTest(ListTest):
231    tarname = bz2name
232    mode = "r:bz2"
233    taropen = tarfile.TarFile.bz2open
234
235
236class CommonReadTest(ReadTest):
237
238    def test_empty_tarfile(self):
239        # Test for issue6123: Allow opening empty archives.
240        # This test checks if tarfile.open() is able to open an empty tar
241        # archive successfully. Note that an empty tar archive is not the
242        # same as an empty file!
243        with tarfile.open(tmpname, self.mode.replace("r", "w")):
244            pass
245        try:
246            tar = tarfile.open(tmpname, self.mode)
247            tar.getnames()
248        except tarfile.ReadError:
249            self.fail("tarfile.open() failed on empty archive")
250        else:
251            self.assertListEqual(tar.getmembers(), [])
252        finally:
253            tar.close()
254
255    def test_null_tarfile(self):
256        # Test for issue6123: Allow opening empty archives.
257        # This test guarantees that tarfile.open() does not treat an empty
258        # file as an empty tar archive.
259        with open(tmpname, "wb"):
260            pass
261        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
262        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
263
264    def test_non_existent_tarfile(self):
265        # Test for issue11513: prevent non-existent gzipped tarfiles raising
266        # multiple exceptions.
267        exctype = OSError if '|' in self.mode else IOError
268        with self.assertRaisesRegexp(exctype, "xxx") as ex:
269            tarfile.open("xxx", self.mode)
270        self.assertEqual(ex.exception.errno, errno.ENOENT)
271
272    def test_ignore_zeros(self):
273        # Test TarFile's ignore_zeros option.
274        if self.mode.endswith(":gz"):
275            _open = gzip.GzipFile
276        elif self.mode.endswith(":bz2"):
277            _open = bz2.BZ2File
278        else:
279            _open = open
280
281        # generate 512 pseudorandom bytes
282        data = unhexlify('%1024x' % Random(0).getrandbits(512*8))
283        for char in ('\0', 'a'):
284            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
285            # are ignored correctly.
286            with _open(tmpname, "wb") as fobj:
287                fobj.write(char * 1024)
288                tarinfo = tarfile.TarInfo("foo")
289                tarinfo.size = len(data)
290                fobj.write(tarinfo.tobuf())
291                fobj.write(data)
292
293            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
294            try:
295                self.assertListEqual(tar.getnames(), ["foo"],
296                    "ignore_zeros=True should have skipped the %r-blocks" % char)
297            finally:
298                tar.close()
299
300    def test_premature_end_of_archive(self):
301        for size in (512, 600, 1024, 1200):
302            with tarfile.open(tmpname, "w:") as tar:
303                t = tarfile.TarInfo("foo")
304                t.size = 1024
305                tar.addfile(t, StringIO.StringIO("a" * 1024))
306
307            with open(tmpname, "r+b") as fobj:
308                fobj.truncate(size)
309
310            with tarfile.open(tmpname) as tar:
311                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
312                    for t in tar:
313                        pass
314
315            with tarfile.open(tmpname) as tar:
316                t = tar.next()
317
318                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
319                    tar.extract(t, TEMPDIR)
320
321                with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
322                    tar.extractfile(t).read()
323
324
325class MiscReadTest(CommonReadTest):
326    taropen = tarfile.TarFile.taropen
327
328    def test_no_name_argument(self):
329        with open(self.tarname, "rb") as fobj:
330            tar = tarfile.open(fileobj=fobj, mode=self.mode)
331            self.assertEqual(tar.name, os.path.abspath(fobj.name))
332
333    def test_no_name_attribute(self):
334        with open(self.tarname, "rb") as fobj:
335            data = fobj.read()
336        fobj = StringIO.StringIO(data)
337        self.assertRaises(AttributeError, getattr, fobj, "name")
338        tar = tarfile.open(fileobj=fobj, mode=self.mode)
339        self.assertEqual(tar.name, None)
340
341    def test_empty_name_attribute(self):
342        with open(self.tarname, "rb") as fobj:
343            data = fobj.read()
344        fobj = StringIO.StringIO(data)
345        fobj.name = ""
346        tar = tarfile.open(fileobj=fobj, mode=self.mode)
347        self.assertEqual(tar.name, None)
348
349    def test_illegal_mode_arg(self):
350        with open(tmpname, 'wb'):
351            pass
352        self.addCleanup(os.unlink, tmpname)
353        with self.assertRaisesRegexp(ValueError, 'mode must be '):
354            tar = self.taropen(tmpname, 'q')
355        with self.assertRaisesRegexp(ValueError, 'mode must be '):
356            tar = self.taropen(tmpname, 'rw')
357        with self.assertRaisesRegexp(ValueError, 'mode must be '):
358            tar = self.taropen(tmpname, '')
359
360    def test_fileobj_with_offset(self):
361        # Skip the first member and store values from the second member
362        # of the testtar.
363        tar = tarfile.open(self.tarname, mode=self.mode)
364        try:
365            tar.next()
366            t = tar.next()
367            name = t.name
368            offset = t.offset
369            data = tar.extractfile(t).read()
370        finally:
371            tar.close()
372
373        # Open the testtar and seek to the offset of the second member.
374        if self.mode.endswith(":gz"):
375            _open = gzip.GzipFile
376        elif self.mode.endswith(":bz2"):
377            _open = bz2.BZ2File
378        else:
379            _open = open
380        fobj = _open(self.tarname, "rb")
381        try:
382            fobj.seek(offset)
383
384            # Test if the tarfile starts with the second member.
385            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
386            t = tar.next()
387            self.assertEqual(t.name, name)
388            # Read to the end of fileobj and test if seeking back to the
389            # beginning works.
390            tar.getmembers()
391            self.assertEqual(tar.extractfile(t).read(), data,
392                    "seek back did not work")
393            tar.close()
394        finally:
395            fobj.close()
396
397    def test_fail_comp(self):
398        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
399        if self.mode == "r:":
400            self.skipTest('needs a gz or bz2 mode')
401        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
402        with open(tarname, "rb") as fobj:
403            self.assertRaises(tarfile.ReadError, tarfile.open,
404                              fileobj=fobj, mode=self.mode)
405
406    def test_v7_dirtype(self):
407        # Test old style dirtype member (bug #1336623):
408        # Old V7 tars create directory members using an AREGTYPE
409        # header with a "/" appended to the filename field.
410        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
411        self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
412                "v7 dirtype failed")
413
414    def test_xstar_type(self):
415        # The xstar format stores extra atime and ctime fields inside the
416        # space reserved for the prefix field. The prefix field must be
417        # ignored in this case, otherwise it will mess up the name.
418        try:
419            self.tar.getmember("misc/regtype-xstar")
420        except KeyError:
421            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
422
423    def test_check_members(self):
424        for tarinfo in self.tar:
425            self.assertTrue(int(tarinfo.mtime) == 07606136617,
426                    "wrong mtime for %s" % tarinfo.name)
427            if not tarinfo.name.startswith("ustar/"):
428                continue
429            self.assertTrue(tarinfo.uname == "tarfile",
430                    "wrong uname for %s" % tarinfo.name)
431
432    def test_find_members(self):
433        self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
434                "could not find all members")
435
436    def test_extract_hardlink(self):
437        # Test hardlink extraction (e.g. bug #857297).
438        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
439            tar.extract("ustar/regtype", TEMPDIR)
440            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
441
442            tar.extract("ustar/lnktype", TEMPDIR)
443            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
444            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
445                data = f.read()
446            self.assertEqual(md5sum(data), md5_regtype)
447
448            tar.extract("ustar/symtype", TEMPDIR)
449            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
450            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
451                data = f.read()
452            self.assertEqual(md5sum(data), md5_regtype)
453
454    def test_extractall(self):
455        # Test if extractall() correctly restores directory permissions
456        # and times (see issue1735).
457        tar = tarfile.open(tarname, encoding="iso8859-1")
458        try:
459            directories = [t for t in tar if t.isdir()]
460            tar.extractall(TEMPDIR, directories)
461            for tarinfo in directories:
462                path = os.path.join(TEMPDIR, tarinfo.name)
463                if sys.platform != "win32":
464                    # Win32 has no support for fine grained permissions.
465                    self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
466                self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
467        finally:
468            tar.close()
469
470    def test_init_close_fobj(self):
471        # Issue #7341: Close the internal file object in the TarFile
472        # constructor in case of an error. For the test we rely on
473        # the fact that opening an empty file raises a ReadError.
474        empty = os.path.join(TEMPDIR, "empty")
475        with open(empty, "wb") as fobj:
476            fobj.write("")
477
478        try:
479            tar = object.__new__(tarfile.TarFile)
480            try:
481                tar.__init__(empty)
482            except tarfile.ReadError:
483                self.assertTrue(tar.fileobj.closed)
484            else:
485                self.fail("ReadError not raised")
486        finally:
487            support.unlink(empty)
488
489    def test_parallel_iteration(self):
490        # Issue #16601: Restarting iteration over tarfile continued
491        # from where it left off.
492        with tarfile.open(self.tarname) as tar:
493            for m1, m2 in zip(tar, tar):
494                self.assertEqual(m1.offset, m2.offset)
495                self.assertEqual(m1.name, m2.name)
496
497
498class StreamReadTest(CommonReadTest):
499
500    mode="r|"
501
502    def test_fileobj_regular_file(self):
503        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
504        fobj = self.tar.extractfile(tarinfo)
505        data = fobj.read()
506        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
507                "regular file extraction failed")
508
509    def test_provoke_stream_error(self):
510        tarinfos = self.tar.getmembers()
511        f = self.tar.extractfile(tarinfos[0]) # read the first member
512        self.assertRaises(tarfile.StreamError, f.read)
513
514    def test_compare_members(self):
515        tar1 = tarfile.open(tarname, encoding="iso8859-1")
516        try:
517            tar2 = self.tar
518
519            while True:
520                t1 = tar1.next()
521                t2 = tar2.next()
522                if t1 is None:
523                    break
524                self.assertTrue(t2 is not None, "stream.next() failed.")
525
526                if t2.islnk() or t2.issym():
527                    self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
528                    continue
529
530                v1 = tar1.extractfile(t1)
531                v2 = tar2.extractfile(t2)
532                if v1 is None:
533                    continue
534                self.assertTrue(v2 is not None, "stream.extractfile() failed")
535                self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
536        finally:
537            tar1.close()
538
539
540class DetectReadTest(unittest.TestCase):
541
542    def _testfunc_file(self, name, mode):
543        try:
544            tar = tarfile.open(name, mode)
545        except tarfile.ReadError:
546            self.fail()
547        else:
548            tar.close()
549
550    def _testfunc_fileobj(self, name, mode):
551        try:
552            tar = tarfile.open(name, mode, fileobj=open(name, "rb"))
553        except tarfile.ReadError:
554            self.fail()
555        else:
556            tar.close()
557
558    def _test_modes(self, testfunc):
559        testfunc(tarname, "r")
560        testfunc(tarname, "r:")
561        testfunc(tarname, "r:*")
562        testfunc(tarname, "r|")
563        testfunc(tarname, "r|*")
564
565        if gzip:
566            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
567            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
568            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
569            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
570
571            testfunc(gzipname, "r")
572            testfunc(gzipname, "r:*")
573            testfunc(gzipname, "r:gz")
574            testfunc(gzipname, "r|*")
575            testfunc(gzipname, "r|gz")
576
577        if bz2:
578            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
579            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
580            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
581            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
582
583            testfunc(bz2name, "r")
584            testfunc(bz2name, "r:*")
585            testfunc(bz2name, "r:bz2")
586            testfunc(bz2name, "r|*")
587            testfunc(bz2name, "r|bz2")
588
589    def test_detect_file(self):
590        self._test_modes(self._testfunc_file)
591
592    def test_detect_fileobj(self):
593        self._test_modes(self._testfunc_fileobj)
594
595    @unittest.skipUnless(bz2, 'requires bz2')
596    def test_detect_stream_bz2(self):
597        # Originally, tarfile's stream detection looked for the string
598        # "BZh91" at the start of the file. This is incorrect because
599        # the '9' represents the blocksize (900kB). If the file was
600        # compressed using another blocksize autodetection fails.
601        with open(tarname, "rb") as fobj:
602            data = fobj.read()
603
604        # Compress with blocksize 100kB, the file starts with "BZh11".
605        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
606            fobj.write(data)
607
608        self._testfunc_file(tmpname, "r|*")
609
610
611class MemberReadTest(ReadTest):
612
613    def _test_member(self, tarinfo, chksum=None, **kwargs):
614        if chksum is not None:
615            self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
616                    "wrong md5sum for %s" % tarinfo.name)
617
618        kwargs["mtime"] = 07606136617
619        kwargs["uid"] = 1000
620        kwargs["gid"] = 100
621        if "old-v7" not in tarinfo.name:
622            # V7 tar can't handle alphabetic owners.
623            kwargs["uname"] = "tarfile"
624            kwargs["gname"] = "tarfile"
625        for k, v in kwargs.iteritems():
626            self.assertTrue(getattr(tarinfo, k) == v,
627                    "wrong value in %s field of %s" % (k, tarinfo.name))
628
629    def test_find_regtype(self):
630        tarinfo = self.tar.getmember("ustar/regtype")
631        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
632
633    def test_find_conttype(self):
634        tarinfo = self.tar.getmember("ustar/conttype")
635        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
636
637    def test_find_dirtype(self):
638        tarinfo = self.tar.getmember("ustar/dirtype")
639        self._test_member(tarinfo, size=0)
640
641    def test_find_dirtype_with_size(self):
642        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
643        self._test_member(tarinfo, size=255)
644
645    def test_find_lnktype(self):
646        tarinfo = self.tar.getmember("ustar/lnktype")
647        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
648
649    def test_find_symtype(self):
650        tarinfo = self.tar.getmember("ustar/symtype")
651        self._test_member(tarinfo, size=0, linkname="regtype")
652
653    def test_find_blktype(self):
654        tarinfo = self.tar.getmember("ustar/blktype")
655        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
656
657    def test_find_chrtype(self):
658        tarinfo = self.tar.getmember("ustar/chrtype")
659        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
660
661    def test_find_fifotype(self):
662        tarinfo = self.tar.getmember("ustar/fifotype")
663        self._test_member(tarinfo, size=0)
664
665    def test_find_sparse(self):
666        tarinfo = self.tar.getmember("ustar/sparse")
667        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
668
669    def test_find_umlauts(self):
670        tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
671        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
672
673    def test_find_ustar_longname(self):
674        name = "ustar/" + "12345/" * 39 + "1234567/longname"
675        self.assertIn(name, self.tar.getnames())
676
677    def test_find_regtype_oldv7(self):
678        tarinfo = self.tar.getmember("misc/regtype-old-v7")
679        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
680
681    def test_find_pax_umlauts(self):
682        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
683        tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
684        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
685
686
687class LongnameTest(ReadTest):
688
689    def test_read_longname(self):
690        # Test reading of longname (bug #1471427).
691        longname = self.subdir + "/" + "123/" * 125 + "longname"
692        try:
693            tarinfo = self.tar.getmember(longname)
694        except KeyError:
695            self.fail("longname not found")
696        self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
697
698    def test_read_longlink(self):
699        longname = self.subdir + "/" + "123/" * 125 + "longname"
700        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
701        try:
702            tarinfo = self.tar.getmember(longlink)
703        except KeyError:
704            self.fail("longlink not found")
705        self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
706
707    def test_truncated_longname(self):
708        longname = self.subdir + "/" + "123/" * 125 + "longname"
709        tarinfo = self.tar.getmember(longname)
710        offset = tarinfo.offset
711        self.tar.fileobj.seek(offset)
712        fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
713        self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
714
715    def test_header_offset(self):
716        # Test if the start offset of the TarInfo object includes
717        # the preceding extended header.
718        longname = self.subdir + "/" + "123/" * 125 + "longname"
719        offset = self.tar.getmember(longname).offset
720        fobj = open(tarname)
721        fobj.seek(offset)
722        tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
723        self.assertEqual(tarinfo.type, self.longnametype)
724
725
726class GNUReadTest(LongnameTest):
727
728    subdir = "gnu"
729    longnametype = tarfile.GNUTYPE_LONGNAME
730
731    def test_sparse_file(self):
732        tarinfo1 = self.tar.getmember("ustar/sparse")
733        fobj1 = self.tar.extractfile(tarinfo1)
734        tarinfo2 = self.tar.getmember("gnu/sparse")
735        fobj2 = self.tar.extractfile(tarinfo2)
736        self.assertTrue(fobj1.read() == fobj2.read(),
737                "sparse file extraction failed")
738
739
740class PaxReadTest(LongnameTest):
741
742    subdir = "pax"
743    longnametype = tarfile.XHDTYPE
744
745    def test_pax_global_headers(self):
746        tar = tarfile.open(tarname, encoding="iso8859-1")
747        try:
748
749            tarinfo = tar.getmember("pax/regtype1")
750            self.assertEqual(tarinfo.uname, "foo")
751            self.assertEqual(tarinfo.gname, "bar")
752            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
753
754            tarinfo = tar.getmember("pax/regtype2")
755            self.assertEqual(tarinfo.uname, "")
756            self.assertEqual(tarinfo.gname, "bar")
757            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
758
759            tarinfo = tar.getmember("pax/regtype3")
760            self.assertEqual(tarinfo.uname, "tarfile")
761            self.assertEqual(tarinfo.gname, "tarfile")
762            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
763        finally:
764            tar.close()
765
766    def test_pax_number_fields(self):
767        # All following number fields are read from the pax header.
768        tar = tarfile.open(tarname, encoding="iso8859-1")
769        try:
770            tarinfo = tar.getmember("pax/regtype4")
771            self.assertEqual(tarinfo.size, 7011)
772            self.assertEqual(tarinfo.uid, 123)
773            self.assertEqual(tarinfo.gid, 123)
774            self.assertEqual(tarinfo.mtime, 1041808783.0)
775            self.assertEqual(type(tarinfo.mtime), float)
776            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
777            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
778        finally:
779            tar.close()
780
781
782class WriteTestBase(unittest.TestCase):
783    # Put all write tests in here that are supposed to be tested
784    # in all possible mode combinations.
785
786    def test_fileobj_no_close(self):
787        fobj = StringIO.StringIO()
788        tar = tarfile.open(fileobj=fobj, mode=self.mode)
789        tar.addfile(tarfile.TarInfo("foo"))
790        tar.close()
791        self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
792        # Issue #20238: Incomplete gzip output with mode="w:gz"
793        data = fobj.getvalue()
794        del tar
795        test_support.gc_collect()
796        self.assertFalse(fobj.closed)
797        self.assertEqual(data, fobj.getvalue())
798
799
800class WriteTest(WriteTestBase):
801
802    mode = "w:"
803
804    def test_100_char_name(self):
805        # The name field in a tar header stores strings of at most 100 chars.
806        # If a string is shorter than 100 chars it has to be padded with '\0',
807        # which implies that a string of exactly 100 chars is stored without
808        # a trailing '\0'.
809        name = "0123456789" * 10
810        tar = tarfile.open(tmpname, self.mode)
811        try:
812            t = tarfile.TarInfo(name)
813            tar.addfile(t)
814        finally:
815            tar.close()
816
817        tar = tarfile.open(tmpname)
818        try:
819            self.assertTrue(tar.getnames()[0] == name,
820                    "failed to store 100 char filename")
821        finally:
822            tar.close()
823
824    def test_tar_size(self):
825        # Test for bug #1013882.
826        tar = tarfile.open(tmpname, self.mode)
827        try:
828            path = os.path.join(TEMPDIR, "file")
829            with open(path, "wb") as fobj:
830                fobj.write("aaa")
831            tar.add(path)
832        finally:
833            tar.close()
834        self.assertTrue(os.path.getsize(tmpname) > 0,
835                "tarfile is empty")
836
837    # The test_*_size tests test for bug #1167128.
838    def test_file_size(self):
839        tar = tarfile.open(tmpname, self.mode)
840        try:
841
842            path = os.path.join(TEMPDIR, "file")
843            with open(path, "wb"):
844                pass
845            tarinfo = tar.gettarinfo(path)
846            self.assertEqual(tarinfo.size, 0)
847
848            with open(path, "wb") as fobj:
849                fobj.write("aaa")
850            tarinfo = tar.gettarinfo(path)
851            self.assertEqual(tarinfo.size, 3)
852        finally:
853            tar.close()
854
855    def test_directory_size(self):
856        path = os.path.join(TEMPDIR, "directory")
857        os.mkdir(path)
858        try:
859            tar = tarfile.open(tmpname, self.mode)
860            try:
861                tarinfo = tar.gettarinfo(path)
862                self.assertEqual(tarinfo.size, 0)
863            finally:
864                tar.close()
865        finally:
866            os.rmdir(path)
867
868    def test_link_size(self):
869        if hasattr(os, "link"):
870            link = os.path.join(TEMPDIR, "link")
871            target = os.path.join(TEMPDIR, "link_target")
872            with open(target, "wb") as fobj:
873                fobj.write("aaa")
874            os.link(target, link)
875            try:
876                tar = tarfile.open(tmpname, self.mode)
877                try:
878                    # Record the link target in the inodes list.
879                    tar.gettarinfo(target)
880                    tarinfo = tar.gettarinfo(link)
881                    self.assertEqual(tarinfo.size, 0)
882                finally:
883                    tar.close()
884            finally:
885                os.remove(target)
886                os.remove(link)
887
888    def test_symlink_size(self):
889        if hasattr(os, "symlink"):
890            path = os.path.join(TEMPDIR, "symlink")
891            os.symlink("link_target", path)
892            try:
893                tar = tarfile.open(tmpname, self.mode)
894                try:
895                    tarinfo = tar.gettarinfo(path)
896                    self.assertEqual(tarinfo.size, 0)
897                finally:
898                    tar.close()
899            finally:
900                os.remove(path)
901
902    def test_add_self(self):
903        # Test for #1257255.
904        dstname = os.path.abspath(tmpname)
905        tar = tarfile.open(tmpname, self.mode)
906        try:
907            self.assertTrue(tar.name == dstname, "archive name must be absolute")
908            tar.add(dstname)
909            self.assertTrue(tar.getnames() == [], "added the archive to itself")
910
911            cwd = os.getcwd()
912            os.chdir(TEMPDIR)
913            tar.add(dstname)
914            os.chdir(cwd)
915            self.assertTrue(tar.getnames() == [], "added the archive to itself")
916        finally:
917            tar.close()
918
919    def test_exclude(self):
920        tempdir = os.path.join(TEMPDIR, "exclude")
921        os.mkdir(tempdir)
922        try:
923            for name in ("foo", "bar", "baz"):
924                name = os.path.join(tempdir, name)
925                open(name, "wb").close()
926
927            exclude = os.path.isfile
928
929            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
930            try:
931                with test_support.check_warnings(("use the filter argument",
932                                                DeprecationWarning)):
933                    tar.add(tempdir, arcname="empty_dir", exclude=exclude)
934            finally:
935                tar.close()
936
937            tar = tarfile.open(tmpname, "r")
938            try:
939                self.assertEqual(len(tar.getmembers()), 1)
940                self.assertEqual(tar.getnames()[0], "empty_dir")
941            finally:
942                tar.close()
943        finally:
944            shutil.rmtree(tempdir)
945
946    def test_filter(self):
947        tempdir = os.path.join(TEMPDIR, "filter")
948        os.mkdir(tempdir)
949        try:
950            for name in ("foo", "bar", "baz"):
951                name = os.path.join(tempdir, name)
952                open(name, "wb").close()
953
954            def filter(tarinfo):
955                if os.path.basename(tarinfo.name) == "bar":
956                    return
957                tarinfo.uid = 123
958                tarinfo.uname = "foo"
959                return tarinfo
960
961            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
962            try:
963                tar.add(tempdir, arcname="empty_dir", filter=filter)
964            finally:
965                tar.close()
966
967            tar = tarfile.open(tmpname, "r")
968            try:
969                for tarinfo in tar:
970                    self.assertEqual(tarinfo.uid, 123)
971                    self.assertEqual(tarinfo.uname, "foo")
972                self.assertEqual(len(tar.getmembers()), 3)
973            finally:
974                tar.close()
975        finally:
976            shutil.rmtree(tempdir)
977
978    # Guarantee that stored pathnames are not modified. Don't
979    # remove ./ or ../ or double slashes. Still make absolute
980    # pathnames relative.
981    # For details see bug #6054.
982    def _test_pathname(self, path, cmp_path=None, dir=False):
983        # Create a tarfile with an empty member named path
984        # and compare the stored name with the original.
985        foo = os.path.join(TEMPDIR, "foo")
986        if not dir:
987            open(foo, "w").close()
988        else:
989            os.mkdir(foo)
990
991        tar = tarfile.open(tmpname, self.mode)
992        try:
993            tar.add(foo, arcname=path)
994        finally:
995            tar.close()
996
997        tar = tarfile.open(tmpname, "r")
998        try:
999            t = tar.next()
1000        finally:
1001            tar.close()
1002
1003        if not dir:
1004            os.remove(foo)
1005        else:
1006            os.rmdir(foo)
1007
1008        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1009
1010    def test_pathnames(self):
1011        self._test_pathname("foo")
1012        self._test_pathname(os.path.join("foo", ".", "bar"))
1013        self._test_pathname(os.path.join("foo", "..", "bar"))
1014        self._test_pathname(os.path.join(".", "foo"))
1015        self._test_pathname(os.path.join(".", "foo", "."))
1016        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1017        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1018        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1019        self._test_pathname(os.path.join("..", "foo"))
1020        self._test_pathname(os.path.join("..", "foo", ".."))
1021        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1022        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1023
1024        self._test_pathname("foo" + os.sep + os.sep + "bar")
1025        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1026
1027    def test_abs_pathnames(self):
1028        if sys.platform == "win32":
1029            self._test_pathname("C:\\foo", "foo")
1030        else:
1031            self._test_pathname("/foo", "foo")
1032            self._test_pathname("///foo", "foo")
1033
1034    def test_cwd(self):
1035        # Test adding the current working directory.
1036        with support.change_cwd(TEMPDIR):
1037            tar = tarfile.open(tmpname, self.mode)
1038            try:
1039                tar.add(".")
1040            finally:
1041                tar.close()
1042
1043            tar = tarfile.open(tmpname, "r")
1044            try:
1045                for t in tar:
1046                    self.assertTrue(t.name == "." or t.name.startswith("./"))
1047            finally:
1048                tar.close()
1049
1050    @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
1051    def test_extractall_symlinks(self):
1052        # Test if extractall works properly when tarfile contains symlinks
1053        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1054        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1055        os.mkdir(tempdir)
1056        try:
1057            source_file = os.path.join(tempdir,'source')
1058            target_file = os.path.join(tempdir,'symlink')
1059            with open(source_file,'w') as f:
1060                f.write('something\n')
1061            os.symlink(source_file, target_file)
1062            tar = tarfile.open(temparchive,'w')
1063            tar.add(source_file, arcname=os.path.basename(source_file))
1064            tar.add(target_file, arcname=os.path.basename(target_file))
1065            tar.close()
1066            # Let's extract it to the location which contains the symlink
1067            tar = tarfile.open(temparchive,'r')
1068            # this should not raise OSError: [Errno 17] File exists
1069            try:
1070                tar.extractall(path=tempdir)
1071            except OSError:
1072                self.fail("extractall failed with symlinked files")
1073            finally:
1074                tar.close()
1075        finally:
1076            os.unlink(temparchive)
1077            shutil.rmtree(tempdir)
1078
1079    @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
1080    def test_extractall_broken_symlinks(self):
1081        # Test if extractall works properly when tarfile contains broken
1082        # symlinks
1083        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1084        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1085        os.mkdir(tempdir)
1086        try:
1087            source_file = os.path.join(tempdir,'source')
1088            target_file = os.path.join(tempdir,'symlink')
1089            with open(source_file,'w') as f:
1090                f.write('something\n')
1091            os.symlink(source_file, target_file)
1092            tar = tarfile.open(temparchive,'w')
1093            tar.add(target_file, arcname=os.path.basename(target_file))
1094            tar.close()
1095            # remove the real file
1096            os.unlink(source_file)
1097            # Let's extract it to the location which contains the symlink
1098            tar = tarfile.open(temparchive,'r')
1099            # this should not raise OSError: [Errno 17] File exists
1100            try:
1101                tar.extractall(path=tempdir)
1102            except OSError:
1103                self.fail("extractall failed with broken symlinked files")
1104            finally:
1105                tar.close()
1106        finally:
1107            os.unlink(temparchive)
1108            shutil.rmtree(tempdir)
1109
1110    @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
1111    def test_extractall_hardlinks(self):
1112        # Test if extractall works properly when tarfile contains symlinks
1113        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1114        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1115        os.mkdir(tempdir)
1116        try:
1117            source_file = os.path.join(tempdir,'source')
1118            target_file = os.path.join(tempdir,'symlink')
1119            with open(source_file,'w') as f:
1120                f.write('something\n')
1121            os.link(source_file, target_file)
1122            tar = tarfile.open(temparchive,'w')
1123            tar.add(source_file, arcname=os.path.basename(source_file))
1124            tar.add(target_file, arcname=os.path.basename(target_file))
1125            tar.close()
1126            # Let's extract it to the location which contains the symlink
1127            tar = tarfile.open(temparchive,'r')
1128            # this should not raise OSError: [Errno 17] File exists
1129            try:
1130                tar.extractall(path=tempdir)
1131            except OSError:
1132                self.fail("extractall failed with linked files")
1133            finally:
1134                tar.close()
1135        finally:
1136            os.unlink(temparchive)
1137            shutil.rmtree(tempdir)
1138
1139    def test_open_nonwritable_fileobj(self):
1140        for exctype in IOError, EOFError, RuntimeError:
1141            class BadFile(StringIO.StringIO):
1142                first = True
1143                def write(self, data):
1144                    if self.first:
1145                        self.first = False
1146                        raise exctype
1147
1148            f = BadFile()
1149            with self.assertRaises(exctype):
1150                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1151                                   format=tarfile.PAX_FORMAT,
1152                                   pax_headers={'non': 'empty'})
1153            self.assertFalse(f.closed)
1154
1155class StreamWriteTest(WriteTestBase):
1156
1157    mode = "w|"
1158
1159    def test_stream_padding(self):
1160        # Test for bug #1543303.
1161        tar = tarfile.open(tmpname, self.mode)
1162        tar.close()
1163
1164        if self.mode.endswith("gz"):
1165            with gzip.GzipFile(tmpname) as fobj:
1166                data = fobj.read()
1167        elif self.mode.endswith("bz2"):
1168            dec = bz2.BZ2Decompressor()
1169            with open(tmpname, "rb") as fobj:
1170                data = fobj.read()
1171            data = dec.decompress(data)
1172            self.assertTrue(len(dec.unused_data) == 0,
1173                    "found trailing data")
1174        else:
1175            with open(tmpname, "rb") as fobj:
1176                data = fobj.read()
1177
1178        self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
1179                         "incorrect zero padding")
1180
1181    @unittest.skipIf(sys.platform == 'win32', 'not appropriate for Windows')
1182    @unittest.skipUnless(hasattr(os, 'umask'), 'requires os.umask')
1183    def test_file_mode(self):
1184        # Test for issue #8464: Create files with correct
1185        # permissions.
1186        if os.path.exists(tmpname):
1187            os.remove(tmpname)
1188
1189        original_umask = os.umask(0022)
1190        try:
1191            tar = tarfile.open(tmpname, self.mode)
1192            tar.close()
1193            mode = os.stat(tmpname).st_mode & 0777
1194            self.assertEqual(mode, 0644, "wrong file permissions")
1195        finally:
1196            os.umask(original_umask)
1197
1198    def test_issue13639(self):
1199        try:
1200            with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode):
1201                pass
1202        except UnicodeDecodeError:
1203            self.fail("_Stream failed to write unicode filename")
1204
1205
1206class GNUWriteTest(unittest.TestCase):
1207    # This testcase checks for correct creation of GNU Longname
1208    # and Longlink extended headers (cp. bug #812325).
1209
1210    def _length(self, s):
1211        blocks, remainder = divmod(len(s) + 1, 512)
1212        if remainder:
1213            blocks += 1
1214        return blocks * 512
1215
1216    def _calc_size(self, name, link=None):
1217        # Initial tar header
1218        count = 512
1219
1220        if len(name) > tarfile.LENGTH_NAME:
1221            # GNU longname extended header + longname
1222            count += 512
1223            count += self._length(name)
1224        if link is not None and len(link) > tarfile.LENGTH_LINK:
1225            # GNU longlink extended header + longlink
1226            count += 512
1227            count += self._length(link)
1228        return count
1229
1230    def _test(self, name, link=None):
1231        tarinfo = tarfile.TarInfo(name)
1232        if link:
1233            tarinfo.linkname = link
1234            tarinfo.type = tarfile.LNKTYPE
1235
1236        tar = tarfile.open(tmpname, "w")
1237        try:
1238            tar.format = tarfile.GNU_FORMAT
1239            tar.addfile(tarinfo)
1240
1241            v1 = self._calc_size(name, link)
1242            v2 = tar.offset
1243            self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
1244        finally:
1245            tar.close()
1246
1247        tar = tarfile.open(tmpname)
1248        try:
1249            member = tar.next()
1250            self.assertIsNotNone(member,
1251                    "unable to read longname member")
1252            self.assertEqual(tarinfo.name, member.name,
1253                    "unable to read longname member")
1254            self.assertEqual(tarinfo.linkname, member.linkname,
1255                    "unable to read longname member")
1256        finally:
1257            tar.close()
1258
1259    def test_longname_1023(self):
1260        self._test(("longnam/" * 127) + "longnam")
1261
1262    def test_longname_1024(self):
1263        self._test(("longnam/" * 127) + "longname")
1264
1265    def test_longname_1025(self):
1266        self._test(("longnam/" * 127) + "longname_")
1267
1268    def test_longlink_1023(self):
1269        self._test("name", ("longlnk/" * 127) + "longlnk")
1270
1271    def test_longlink_1024(self):
1272        self._test("name", ("longlnk/" * 127) + "longlink")
1273
1274    def test_longlink_1025(self):
1275        self._test("name", ("longlnk/" * 127) + "longlink_")
1276
1277    def test_longnamelink_1023(self):
1278        self._test(("longnam/" * 127) + "longnam",
1279                   ("longlnk/" * 127) + "longlnk")
1280
1281    def test_longnamelink_1024(self):
1282        self._test(("longnam/" * 127) + "longname",
1283                   ("longlnk/" * 127) + "longlink")
1284
1285    def test_longnamelink_1025(self):
1286        self._test(("longnam/" * 127) + "longname_",
1287                   ("longlnk/" * 127) + "longlink_")
1288
1289
1290class HardlinkTest(unittest.TestCase):
1291    # Test the creation of LNKTYPE (hardlink) members in an archive.
1292
1293    def setUp(self):
1294        self.foo = os.path.join(TEMPDIR, "foo")
1295        self.bar = os.path.join(TEMPDIR, "bar")
1296
1297        with open(self.foo, "wb") as fobj:
1298            fobj.write("foo")
1299
1300        os.link(self.foo, self.bar)
1301
1302        self.tar = tarfile.open(tmpname, "w")
1303        self.tar.add(self.foo)
1304
1305    def tearDown(self):
1306        self.tar.close()
1307        support.unlink(self.foo)
1308        support.unlink(self.bar)
1309
1310    def test_add_twice(self):
1311        # The same name will be added as a REGTYPE every
1312        # time regardless of st_nlink.
1313        tarinfo = self.tar.gettarinfo(self.foo)
1314        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1315                "add file as regular failed")
1316
1317    def test_add_hardlink(self):
1318        tarinfo = self.tar.gettarinfo(self.bar)
1319        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
1320                "add file as hardlink failed")
1321
1322    def test_dereference_hardlink(self):
1323        self.tar.dereference = True
1324        tarinfo = self.tar.gettarinfo(self.bar)
1325        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1326                "dereferencing hardlink failed")
1327
1328
1329class PaxWriteTest(GNUWriteTest):
1330
1331    def _test(self, name, link=None):
1332        # See GNUWriteTest.
1333        tarinfo = tarfile.TarInfo(name)
1334        if link:
1335            tarinfo.linkname = link
1336            tarinfo.type = tarfile.LNKTYPE
1337
1338        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1339        try:
1340            tar.addfile(tarinfo)
1341        finally:
1342            tar.close()
1343
1344        tar = tarfile.open(tmpname)
1345        try:
1346            if link:
1347                l = tar.getmembers()[0].linkname
1348                self.assertTrue(link == l, "PAX longlink creation failed")
1349            else:
1350                n = tar.getmembers()[0].name
1351                self.assertTrue(name == n, "PAX longname creation failed")
1352        finally:
1353            tar.close()
1354
1355    def test_pax_global_header(self):
1356        pax_headers = {
1357                u"foo": u"bar",
1358                u"uid": u"0",
1359                u"mtime": u"1.23",
1360                u"test": u"\xe4\xf6\xfc",
1361                u"\xe4\xf6\xfc": u"test"}
1362
1363        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1364                pax_headers=pax_headers)
1365        try:
1366            tar.addfile(tarfile.TarInfo("test"))
1367        finally:
1368            tar.close()
1369
1370        # Test if the global header was written correctly.
1371        tar = tarfile.open(tmpname, encoding="iso8859-1")
1372        try:
1373            self.assertEqual(tar.pax_headers, pax_headers)
1374            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1375
1376            # Test if all the fields are unicode.
1377            for key, val in tar.pax_headers.iteritems():
1378                self.assertTrue(type(key) is unicode)
1379                self.assertTrue(type(val) is unicode)
1380                if key in tarfile.PAX_NUMBER_FIELDS:
1381                    try:
1382                        tarfile.PAX_NUMBER_FIELDS[key](val)
1383                    except (TypeError, ValueError):
1384                        self.fail("unable to convert pax header field")
1385        finally:
1386            tar.close()
1387
1388    def test_pax_extended_header(self):
1389        # The fields from the pax header have priority over the
1390        # TarInfo.
1391        pax_headers = {u"path": u"foo", u"uid": u"123"}
1392
1393        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
1394        try:
1395            t = tarfile.TarInfo()
1396            t.name = u"\xe4\xf6\xfc"     # non-ASCII
1397            t.uid = 8**8        # too large
1398            t.pax_headers = pax_headers
1399            tar.addfile(t)
1400        finally:
1401            tar.close()
1402
1403        tar = tarfile.open(tmpname, encoding="iso8859-1")
1404        try:
1405            t = tar.getmembers()[0]
1406            self.assertEqual(t.pax_headers, pax_headers)
1407            self.assertEqual(t.name, "foo")
1408            self.assertEqual(t.uid, 123)
1409        finally:
1410            tar.close()
1411
1412
1413class UstarUnicodeTest(unittest.TestCase):
1414    # All *UnicodeTests FIXME
1415
1416    format = tarfile.USTAR_FORMAT
1417
1418    def test_iso8859_1_filename(self):
1419        self._test_unicode_filename("iso8859-1")
1420
1421    def test_utf7_filename(self):
1422        self._test_unicode_filename("utf7")
1423
1424    def test_utf8_filename(self):
1425        self._test_unicode_filename("utf8")
1426
1427    def _test_unicode_filename(self, encoding):
1428        tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
1429        try:
1430            name = u"\xe4\xf6\xfc"
1431            tar.addfile(tarfile.TarInfo(name))
1432        finally:
1433            tar.close()
1434
1435        tar = tarfile.open(tmpname, encoding=encoding)
1436        try:
1437            self.assertTrue(type(tar.getnames()[0]) is not unicode)
1438            self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
1439        finally:
1440            tar.close()
1441
1442    def test_unicode_filename_error(self):
1443        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
1444        try:
1445            tarinfo = tarfile.TarInfo()
1446
1447            tarinfo.name = "\xe4\xf6\xfc"
1448            if self.format == tarfile.PAX_FORMAT:
1449                self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1450            else:
1451                tar.addfile(tarinfo)
1452
1453            tarinfo.name = u"\xe4\xf6\xfc"
1454            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1455
1456            tarinfo.name = "foo"
1457            tarinfo.uname = u"\xe4\xf6\xfc"
1458            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1459        finally:
1460            tar.close()
1461
1462    def test_unicode_argument(self):
1463        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
1464        try:
1465            for t in tar:
1466                self.assertTrue(type(t.name) is str)
1467                self.assertTrue(type(t.linkname) is str)
1468                self.assertTrue(type(t.uname) is str)
1469                self.assertTrue(type(t.gname) is str)
1470        finally:
1471            tar.close()
1472
1473    def test_uname_unicode(self):
1474        for name in (u"\xe4\xf6\xfc", "\xe4\xf6\xfc"):
1475            t = tarfile.TarInfo("foo")
1476            t.uname = name
1477            t.gname = name
1478
1479            fobj = StringIO.StringIO()
1480            tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
1481            try:
1482                tar.addfile(t)
1483            finally:
1484                tar.close()
1485            fobj.seek(0)
1486
1487            tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
1488            t = tar.getmember("foo")
1489            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1490            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1491
1492
1493class GNUUnicodeTest(UstarUnicodeTest):
1494
1495    format = tarfile.GNU_FORMAT
1496
1497
1498class PaxUnicodeTest(UstarUnicodeTest):
1499
1500    format = tarfile.PAX_FORMAT
1501
1502    def _create_unicode_name(self, name):
1503        tar = tarfile.open(tmpname, "w", format=self.format)
1504        t = tarfile.TarInfo()
1505        t.pax_headers["path"] = name
1506        tar.addfile(t)
1507        tar.close()
1508
1509    def test_error_handlers(self):
1510        # Test if the unicode error handlers work correctly for characters
1511        # that cannot be expressed in a given encoding.
1512        self._create_unicode_name(u"\xe4\xf6\xfc")
1513
1514        for handler, name in (("utf-8", u"\xe4\xf6\xfc".encode("utf8")),
1515                    ("replace", "???"), ("ignore", "")):
1516            tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
1517                    errors=handler)
1518            self.assertEqual(tar.getnames()[0], name)
1519
1520        self.assertRaises(UnicodeError, tarfile.open, tmpname,
1521                encoding="ascii", errors="strict")
1522
1523    def test_error_handler_utf8(self):
1524        # Create a pathname that has one component representable using
1525        # iso8859-1 and the other only in iso8859-15.
1526        self._create_unicode_name(u"\xe4\xf6\xfc/\u20ac")
1527
1528        tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
1529                errors="utf-8")
1530        self.assertEqual(tar.getnames()[0], "\xe4\xf6\xfc/" + u"\u20ac".encode("utf8"))
1531
1532
1533class AppendTest(unittest.TestCase):
1534    # Test append mode (cp. patch #1652681).
1535
1536    def setUp(self):
1537        self.tarname = tmpname
1538        if os.path.exists(self.tarname):
1539            os.remove(self.tarname)
1540
1541    def _add_testfile(self, fileobj=None):
1542        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1543            tar.addfile(tarfile.TarInfo("bar"))
1544
1545    def _create_testtar(self, mode="w:"):
1546        with tarfile.open(tarname, encoding="iso8859-1") as src:
1547            t = src.getmember("ustar/regtype")
1548            t.name = "foo"
1549            f = src.extractfile(t)
1550            with tarfile.open(self.tarname, mode) as tar:
1551                tar.addfile(t, f)
1552
1553    def _test(self, names=["bar"], fileobj=None):
1554        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1555            self.assertEqual(tar.getnames(), names)
1556
1557    def test_non_existing(self):
1558        self._add_testfile()
1559        self._test()
1560
1561    def test_empty(self):
1562        tarfile.open(self.tarname, "w:").close()
1563        self._add_testfile()
1564        self._test()
1565
1566    def test_empty_fileobj(self):
1567        fobj = StringIO.StringIO("\0" * 1024)
1568        self._add_testfile(fobj)
1569        fobj.seek(0)
1570        self._test(fileobj=fobj)
1571
1572    def test_fileobj(self):
1573        self._create_testtar()
1574        with open(self.tarname) as fobj:
1575            data = fobj.read()
1576        fobj = StringIO.StringIO(data)
1577        self._add_testfile(fobj)
1578        fobj.seek(0)
1579        self._test(names=["foo", "bar"], fileobj=fobj)
1580
1581    def test_existing(self):
1582        self._create_testtar()
1583        self._add_testfile()
1584        self._test(names=["foo", "bar"])
1585
1586    @unittest.skipUnless(gzip, 'requires gzip')
1587    def test_append_gz(self):
1588        self._create_testtar("w:gz")
1589        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1590
1591    @unittest.skipUnless(bz2, 'requires bz2')
1592    def test_append_bz2(self):
1593        self._create_testtar("w:bz2")
1594        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1595
1596    # Append mode is supposed to fail if the tarfile to append to
1597    # does not end with a zero block.
1598    def _test_error(self, data):
1599        with open(self.tarname, "wb") as fobj:
1600            fobj.write(data)
1601        self.assertRaises(tarfile.ReadError, self._add_testfile)
1602
1603    def test_null(self):
1604        self._test_error("")
1605
1606    def test_incomplete(self):
1607        self._test_error("\0" * 13)
1608
1609    def test_premature_eof(self):
1610        data = tarfile.TarInfo("foo").tobuf()
1611        self._test_error(data)
1612
1613    def test_trailing_garbage(self):
1614        data = tarfile.TarInfo("foo").tobuf()
1615        self._test_error(data + "\0" * 13)
1616
1617    def test_invalid(self):
1618        self._test_error("a" * 512)
1619
1620
1621class LimitsTest(unittest.TestCase):
1622
1623    def test_ustar_limits(self):
1624        # 100 char name
1625        tarinfo = tarfile.TarInfo("0123456789" * 10)
1626        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1627
1628        # 101 char name that cannot be stored
1629        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1630        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1631
1632        # 256 char name with a slash at pos 156
1633        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1634        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1635
1636        # 256 char name that cannot be stored
1637        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1638        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1639
1640        # 512 char name
1641        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1642        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1643
1644        # 512 char linkname
1645        tarinfo = tarfile.TarInfo("longlink")
1646        tarinfo.linkname = "123/" * 126 + "longname"
1647        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1648
1649        # uid > 8 digits
1650        tarinfo = tarfile.TarInfo("name")
1651        tarinfo.uid = 010000000
1652        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1653
1654    def test_gnu_limits(self):
1655        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1656        tarinfo.tobuf(tarfile.GNU_FORMAT)
1657
1658        tarinfo = tarfile.TarInfo("longlink")
1659        tarinfo.linkname = "123/" * 126 + "longname"
1660        tarinfo.tobuf(tarfile.GNU_FORMAT)
1661
1662        # uid >= 256 ** 7
1663        tarinfo = tarfile.TarInfo("name")
1664        tarinfo.uid = 04000000000000000000L
1665        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1666
1667    def test_pax_limits(self):
1668        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1669        tarinfo.tobuf(tarfile.PAX_FORMAT)
1670
1671        tarinfo = tarfile.TarInfo("longlink")
1672        tarinfo.linkname = "123/" * 126 + "longname"
1673        tarinfo.tobuf(tarfile.PAX_FORMAT)
1674
1675        tarinfo = tarfile.TarInfo("name")
1676        tarinfo.uid = 04000000000000000000L
1677        tarinfo.tobuf(tarfile.PAX_FORMAT)
1678
1679
1680class MiscTest(unittest.TestCase):
1681
1682    def test_read_number_fields(self):
1683        # Issue 24514: Test if empty number fields are converted to zero.
1684        self.assertEqual(tarfile.nti("\0"), 0)
1685        self.assertEqual(tarfile.nti("       \0"), 0)
1686
1687
1688class ContextManagerTest(unittest.TestCase):
1689
1690    def test_basic(self):
1691        with tarfile.open(tarname) as tar:
1692            self.assertFalse(tar.closed, "closed inside runtime context")
1693        self.assertTrue(tar.closed, "context manager failed")
1694
1695    def test_closed(self):
1696        # The __enter__() method is supposed to raise IOError
1697        # if the TarFile object is already closed.
1698        tar = tarfile.open(tarname)
1699        tar.close()
1700        with self.assertRaises(IOError):
1701            with tar:
1702                pass
1703
1704    def test_exception(self):
1705        # Test if the IOError exception is passed through properly.
1706        with self.assertRaises(Exception) as exc:
1707            with tarfile.open(tarname) as tar:
1708                raise IOError
1709        self.assertIsInstance(exc.exception, IOError,
1710                              "wrong exception raised in context manager")
1711        self.assertTrue(tar.closed, "context manager failed")
1712
1713    def test_no_eof(self):
1714        # __exit__() must not write end-of-archive blocks if an
1715        # exception was raised.
1716        try:
1717            with tarfile.open(tmpname, "w") as tar:
1718                raise Exception
1719        except:
1720            pass
1721        self.assertEqual(os.path.getsize(tmpname), 0,
1722                "context manager wrote an end-of-archive block")
1723        self.assertTrue(tar.closed, "context manager failed")
1724
1725    def test_eof(self):
1726        # __exit__() must write end-of-archive blocks, i.e. call
1727        # TarFile.close() if there was no error.
1728        with tarfile.open(tmpname, "w"):
1729            pass
1730        self.assertNotEqual(os.path.getsize(tmpname), 0,
1731                "context manager wrote no end-of-archive block")
1732
1733    def test_fileobj(self):
1734        # Test that __exit__() did not close the external file
1735        # object.
1736        with open(tmpname, "wb") as fobj:
1737            try:
1738                with tarfile.open(fileobj=fobj, mode="w") as tar:
1739                    raise Exception
1740            except:
1741                pass
1742            self.assertFalse(fobj.closed, "external file object was closed")
1743            self.assertTrue(tar.closed, "context manager failed")
1744
1745
1746class LinkEmulationTest(ReadTest):
1747
1748    # Test for issue #8741 regression. On platforms that do not support
1749    # symbolic or hard links tarfile tries to extract these types of members as
1750    # the regular files they point to.
1751    def _test_link_extraction(self, name):
1752        self.tar.extract(name, TEMPDIR)
1753        data = open(os.path.join(TEMPDIR, name), "rb").read()
1754        self.assertEqual(md5sum(data), md5_regtype)
1755
1756    def test_hardlink_extraction1(self):
1757        self._test_link_extraction("ustar/lnktype")
1758
1759    def test_hardlink_extraction2(self):
1760        self._test_link_extraction("./ustar/linktest2/lnktype")
1761
1762    def test_symlink_extraction1(self):
1763        self._test_link_extraction("ustar/symtype")
1764
1765    def test_symlink_extraction2(self):
1766        self._test_link_extraction("./ustar/linktest2/symtype")
1767
1768
1769class GzipMiscReadTest(MiscReadTest):
1770    tarname = gzipname
1771    mode = "r:gz"
1772    taropen = tarfile.TarFile.gzopen
1773class GzipUstarReadTest(UstarReadTest):
1774    tarname = gzipname
1775    mode = "r:gz"
1776class GzipStreamReadTest(StreamReadTest):
1777    tarname = gzipname
1778    mode = "r|gz"
1779class GzipWriteTest(WriteTest):
1780    mode = "w:gz"
1781class GzipStreamWriteTest(StreamWriteTest):
1782    mode = "w|gz"
1783
1784
1785class Bz2MiscReadTest(MiscReadTest):
1786    tarname = bz2name
1787    mode = "r:bz2"
1788    taropen = tarfile.TarFile.bz2open
1789class Bz2UstarReadTest(UstarReadTest):
1790    tarname = bz2name
1791    mode = "r:bz2"
1792class Bz2StreamReadTest(StreamReadTest):
1793    tarname = bz2name
1794    mode = "r|bz2"
1795class Bz2WriteTest(WriteTest):
1796    mode = "w:bz2"
1797class Bz2StreamWriteTest(StreamWriteTest):
1798    mode = "w|bz2"
1799
1800class Bz2PartialReadTest(unittest.TestCase):
1801    # Issue5068: The _BZ2Proxy.read() method loops forever
1802    # on an empty or partial bzipped file.
1803
1804    def _test_partial_input(self, mode):
1805        class MyStringIO(StringIO.StringIO):
1806            hit_eof = False
1807            def read(self, n):
1808                if self.hit_eof:
1809                    raise AssertionError("infinite loop detected in tarfile.open()")
1810                self.hit_eof = self.pos == self.len
1811                return StringIO.StringIO.read(self, n)
1812            def seek(self, *args):
1813                self.hit_eof = False
1814                return StringIO.StringIO.seek(self, *args)
1815
1816        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1817        for x in range(len(data) + 1):
1818            try:
1819                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
1820            except tarfile.ReadError:
1821                pass # we have no interest in ReadErrors
1822
1823    def test_partial_input(self):
1824        self._test_partial_input("r")
1825
1826    def test_partial_input_bz2(self):
1827        self._test_partial_input("r:bz2")
1828
1829
1830def test_main():
1831    support.unlink(TEMPDIR)
1832    os.makedirs(TEMPDIR)
1833
1834    tests = [
1835        UstarReadTest,
1836        MiscReadTest,
1837        StreamReadTest,
1838        DetectReadTest,
1839        MemberReadTest,
1840        GNUReadTest,
1841        PaxReadTest,
1842        ListTest,
1843        WriteTest,
1844        StreamWriteTest,
1845        GNUWriteTest,
1846        PaxWriteTest,
1847        UstarUnicodeTest,
1848        GNUUnicodeTest,
1849        PaxUnicodeTest,
1850        AppendTest,
1851        LimitsTest,
1852        MiscTest,
1853        ContextManagerTest,
1854    ]
1855
1856    if hasattr(os, "link"):
1857        tests.append(HardlinkTest)
1858    else:
1859        tests.append(LinkEmulationTest)
1860
1861    with open(tarname, "rb") as fobj:
1862        data = fobj.read()
1863
1864    if gzip:
1865        # Create testtar.tar.gz and add gzip-specific tests.
1866        support.unlink(gzipname)
1867        with gzip.open(gzipname, "wb") as tar:
1868            tar.write(data)
1869
1870        tests += [
1871            GzipMiscReadTest,
1872            GzipUstarReadTest,
1873            GzipStreamReadTest,
1874            GzipListTest,
1875            GzipWriteTest,
1876            GzipStreamWriteTest,
1877        ]
1878
1879    if bz2:
1880        # Create testtar.tar.bz2 and add bz2-specific tests.
1881        support.unlink(bz2name)
1882        tar = bz2.BZ2File(bz2name, "wb")
1883        try:
1884            tar.write(data)
1885        finally:
1886            tar.close()
1887
1888        tests += [
1889            Bz2MiscReadTest,
1890            Bz2UstarReadTest,
1891            Bz2StreamReadTest,
1892            Bz2ListTest,
1893            Bz2WriteTest,
1894            Bz2StreamWriteTest,
1895            Bz2PartialReadTest,
1896        ]
1897
1898    try:
1899        test_support.run_unittest(*tests)
1900    finally:
1901        if os.path.exists(TEMPDIR):
1902            shutil.rmtree(TEMPDIR)
1903
1904if __name__ == "__main__":
1905    test_main()
1906