1# -*- coding: iso-8859-15 -*-
2
3import sys
4import os
5import shutil
6import StringIO
7from hashlib import md5
8import errno
9
10import unittest
11import tarfile
12
13from test import test_support
14
15# Check for our compression modules.
16try:
17    import gzip
18    gzip.GzipFile
19except (ImportError, AttributeError):
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25
26def md5sum(data):
27    return md5(data).hexdigest()
28
29TEMPDIR = os.path.abspath(test_support.TESTFN)
30tarname = test_support.findfile("testtar.tar")
31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
33tmpname = os.path.join(TEMPDIR, "tmp.tar")
34
35md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
36md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
37
38
39class ReadTest(unittest.TestCase):
40
41    tarname = tarname
42    mode = "r:"
43
44    def setUp(self):
45        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
46
47    def tearDown(self):
48        self.tar.close()
49
50
51class UstarReadTest(ReadTest):
52
53    def test_fileobj_regular_file(self):
54        tarinfo = self.tar.getmember("ustar/regtype")
55        fobj = self.tar.extractfile(tarinfo)
56        data = fobj.read()
57        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
58                "regular file extraction failed")
59
60    def test_fileobj_readlines(self):
61        self.tar.extract("ustar/regtype", TEMPDIR)
62        tarinfo = self.tar.getmember("ustar/regtype")
63        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
64        fobj2 = self.tar.extractfile(tarinfo)
65
66        lines1 = fobj1.readlines()
67        lines2 = fobj2.readlines()
68        self.assertTrue(lines1 == lines2,
69                "fileobj.readlines() failed")
70        self.assertTrue(len(lines2) == 114,
71                "fileobj.readlines() failed")
72        self.assertTrue(lines2[83] ==
73                "I will gladly admit that Python is not the fastest running scripting language.\n",
74                "fileobj.readlines() failed")
75
76    def test_fileobj_iter(self):
77        self.tar.extract("ustar/regtype", TEMPDIR)
78        tarinfo = self.tar.getmember("ustar/regtype")
79        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
80        fobj2 = self.tar.extractfile(tarinfo)
81        lines1 = fobj1.readlines()
82        lines2 = [line for line in fobj2]
83        self.assertTrue(lines1 == lines2,
84                     "fileobj.__iter__() failed")
85
86    def test_fileobj_seek(self):
87        self.tar.extract("ustar/regtype", TEMPDIR)
88        fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
89        data = fobj.read()
90        fobj.close()
91
92        tarinfo = self.tar.getmember("ustar/regtype")
93        fobj = self.tar.extractfile(tarinfo)
94
95        text = fobj.read()
96        fobj.seek(0)
97        self.assertTrue(0 == fobj.tell(),
98                     "seek() to file's start failed")
99        fobj.seek(2048, 0)
100        self.assertTrue(2048 == fobj.tell(),
101                     "seek() to absolute position failed")
102        fobj.seek(-1024, 1)
103        self.assertTrue(1024 == fobj.tell(),
104                     "seek() to negative relative position failed")
105        fobj.seek(1024, 1)
106        self.assertTrue(2048 == fobj.tell(),
107                     "seek() to positive relative position failed")
108        s = fobj.read(10)
109        self.assertTrue(s == data[2048:2058],
110                     "read() after seek failed")
111        fobj.seek(0, 2)
112        self.assertTrue(tarinfo.size == fobj.tell(),
113                     "seek() to file's end failed")
114        self.assertTrue(fobj.read() == "",
115                     "read() at file's end did not return empty string")
116        fobj.seek(-tarinfo.size, 2)
117        self.assertTrue(0 == fobj.tell(),
118                     "relative seek() to file's start failed")
119        fobj.seek(512)
120        s1 = fobj.readlines()
121        fobj.seek(512)
122        s2 = fobj.readlines()
123        self.assertTrue(s1 == s2,
124                     "readlines() after seek failed")
125        fobj.seek(0)
126        self.assertTrue(len(fobj.readline()) == fobj.tell(),
127                     "tell() after readline() failed")
128        fobj.seek(512)
129        self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
130                     "tell() after seek() and readline() failed")
131        fobj.seek(0)
132        line = fobj.readline()
133        self.assertTrue(fobj.read() == data[len(line):],
134                     "read() after readline() failed")
135        fobj.close()
136
137    # Test if symbolic and hard links are resolved by extractfile().  The
138    # test link members each point to a regular member whose data is
139    # supposed to be exported.
140    def _test_fileobj_link(self, lnktype, regtype):
141        a = self.tar.extractfile(lnktype)
142        b = self.tar.extractfile(regtype)
143        self.assertEqual(a.name, b.name)
144
145    def test_fileobj_link1(self):
146        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
147
148    def test_fileobj_link2(self):
149        self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
150
151    def test_fileobj_symlink1(self):
152        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
153
154    def test_fileobj_symlink2(self):
155        self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
156
157    def test_issue14160(self):
158        self._test_fileobj_link("symtype2", "ustar/regtype")
159
160
161class CommonReadTest(ReadTest):
162
163    def test_empty_tarfile(self):
164        # Test for issue6123: Allow opening empty archives.
165        # This test checks if tarfile.open() is able to open an empty tar
166        # archive successfully. Note that an empty tar archive is not the
167        # same as an empty file!
168        tarfile.open(tmpname, self.mode.replace("r", "w")).close()
169        try:
170            tar = tarfile.open(tmpname, self.mode)
171            tar.getnames()
172        except tarfile.ReadError:
173            self.fail("tarfile.open() failed on empty archive")
174        self.assertListEqual(tar.getmembers(), [])
175
176    def test_null_tarfile(self):
177        # Test for issue6123: Allow opening empty archives.
178        # This test guarantees that tarfile.open() does not treat an empty
179        # file as an empty tar archive.
180        open(tmpname, "wb").close()
181        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
182        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
183
184    def test_ignore_zeros(self):
185        # Test TarFile's ignore_zeros option.
186        if self.mode.endswith(":gz"):
187            _open = gzip.GzipFile
188        elif self.mode.endswith(":bz2"):
189            _open = bz2.BZ2File
190        else:
191            _open = open
192
193        for char in ('\0', 'a'):
194            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
195            # are ignored correctly.
196            fobj = _open(tmpname, "wb")
197            fobj.write(char * 1024)
198            fobj.write(tarfile.TarInfo("foo").tobuf())
199            fobj.close()
200
201            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
202            self.assertListEqual(tar.getnames(), ["foo"],
203                    "ignore_zeros=True should have skipped the %r-blocks" % char)
204            tar.close()
205
206
207class MiscReadTest(CommonReadTest):
208
209    def test_no_name_argument(self):
210        fobj = open(self.tarname, "rb")
211        tar = tarfile.open(fileobj=fobj, mode=self.mode)
212        self.assertEqual(tar.name, os.path.abspath(fobj.name))
213
214    def test_no_name_attribute(self):
215        data = open(self.tarname, "rb").read()
216        fobj = StringIO.StringIO(data)
217        self.assertRaises(AttributeError, getattr, fobj, "name")
218        tar = tarfile.open(fileobj=fobj, mode=self.mode)
219        self.assertEqual(tar.name, None)
220
221    def test_empty_name_attribute(self):
222        data = open(self.tarname, "rb").read()
223        fobj = StringIO.StringIO(data)
224        fobj.name = ""
225        tar = tarfile.open(fileobj=fobj, mode=self.mode)
226        self.assertEqual(tar.name, None)
227
228    def test_fileobj_with_offset(self):
229        # Skip the first member and store values from the second member
230        # of the testtar.
231        tar = tarfile.open(self.tarname, mode=self.mode)
232        tar.next()
233        t = tar.next()
234        name = t.name
235        offset = t.offset
236        data = tar.extractfile(t).read()
237        tar.close()
238
239        # Open the testtar and seek to the offset of the second member.
240        if self.mode.endswith(":gz"):
241            _open = gzip.GzipFile
242        elif self.mode.endswith(":bz2"):
243            _open = bz2.BZ2File
244        else:
245            _open = open
246        fobj = _open(self.tarname, "rb")
247        fobj.seek(offset)
248
249        # Test if the tarfile starts with the second member.
250        tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
251        t = tar.next()
252        self.assertEqual(t.name, name)
253        # Read to the end of fileobj and test if seeking back to the
254        # beginning works.
255        tar.getmembers()
256        self.assertEqual(tar.extractfile(t).read(), data,
257                "seek back did not work")
258        tar.close()
259
260    def test_fail_comp(self):
261        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
262        if self.mode == "r:":
263            return
264        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
265        fobj = open(tarname, "rb")
266        self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
267
268    def test_v7_dirtype(self):
269        # Test old style dirtype member (bug #1336623):
270        # Old V7 tars create directory members using an AREGTYPE
271        # header with a "/" appended to the filename field.
272        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
273        self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
274                "v7 dirtype failed")
275
276    def test_xstar_type(self):
277        # The xstar format stores extra atime and ctime fields inside the
278        # space reserved for the prefix field. The prefix field must be
279        # ignored in this case, otherwise it will mess up the name.
280        try:
281            self.tar.getmember("misc/regtype-xstar")
282        except KeyError:
283            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
284
285    def test_check_members(self):
286        for tarinfo in self.tar:
287            self.assertTrue(int(tarinfo.mtime) == 07606136617,
288                    "wrong mtime for %s" % tarinfo.name)
289            if not tarinfo.name.startswith("ustar/"):
290                continue
291            self.assertTrue(tarinfo.uname == "tarfile",
292                    "wrong uname for %s" % tarinfo.name)
293
294    def test_find_members(self):
295        self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
296                "could not find all members")
297
298    def test_extract_hardlink(self):
299        # Test hardlink extraction (e.g. bug #857297).
300        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
301            tar.extract("ustar/regtype", TEMPDIR)
302            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
303
304            tar.extract("ustar/lnktype", TEMPDIR)
305            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
306            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
307                data = f.read()
308            self.assertEqual(md5sum(data), md5_regtype)
309
310            tar.extract("ustar/symtype", TEMPDIR)
311            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
312            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
313                data = f.read()
314            self.assertEqual(md5sum(data), md5_regtype)
315
316    def test_extractall(self):
317        # Test if extractall() correctly restores directory permissions
318        # and times (see issue1735).
319        tar = tarfile.open(tarname, encoding="iso8859-1")
320        directories = [t for t in tar if t.isdir()]
321        tar.extractall(TEMPDIR, directories)
322        for tarinfo in directories:
323            path = os.path.join(TEMPDIR, tarinfo.name)
324            if sys.platform != "win32":
325                # Win32 has no support for fine grained permissions.
326                self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
327            self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
328        tar.close()
329
330    def test_init_close_fobj(self):
331        # Issue #7341: Close the internal file object in the TarFile
332        # constructor in case of an error. For the test we rely on
333        # the fact that opening an empty file raises a ReadError.
334        empty = os.path.join(TEMPDIR, "empty")
335        open(empty, "wb").write("")
336
337        try:
338            tar = object.__new__(tarfile.TarFile)
339            try:
340                tar.__init__(empty)
341            except tarfile.ReadError:
342                self.assertTrue(tar.fileobj.closed)
343            else:
344                self.fail("ReadError not raised")
345        finally:
346            os.remove(empty)
347
348    def test_parallel_iteration(self):
349        # Issue #16601: Restarting iteration over tarfile continued
350        # from where it left off.
351        with tarfile.open(self.tarname) as tar:
352            for m1, m2 in zip(tar, tar):
353                self.assertEqual(m1.offset, m2.offset)
354                self.assertEqual(m1.name, m2.name)
355
356
357class StreamReadTest(CommonReadTest):
358
359    mode="r|"
360
361    def test_fileobj_regular_file(self):
362        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
363        fobj = self.tar.extractfile(tarinfo)
364        data = fobj.read()
365        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
366                "regular file extraction failed")
367
368    def test_provoke_stream_error(self):
369        tarinfos = self.tar.getmembers()
370        f = self.tar.extractfile(tarinfos[0]) # read the first member
371        self.assertRaises(tarfile.StreamError, f.read)
372
373    def test_compare_members(self):
374        tar1 = tarfile.open(tarname, encoding="iso8859-1")
375        tar2 = self.tar
376
377        while True:
378            t1 = tar1.next()
379            t2 = tar2.next()
380            if t1 is None:
381                break
382            self.assertTrue(t2 is not None, "stream.next() failed.")
383
384            if t2.islnk() or t2.issym():
385                self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
386                continue
387
388            v1 = tar1.extractfile(t1)
389            v2 = tar2.extractfile(t2)
390            if v1 is None:
391                continue
392            self.assertTrue(v2 is not None, "stream.extractfile() failed")
393            self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
394
395        tar1.close()
396
397
398class DetectReadTest(unittest.TestCase):
399
400    def _testfunc_file(self, name, mode):
401        try:
402            tarfile.open(name, mode)
403        except tarfile.ReadError:
404            self.fail()
405
406    def _testfunc_fileobj(self, name, mode):
407        try:
408            tarfile.open(name, mode, fileobj=open(name, "rb"))
409        except tarfile.ReadError:
410            self.fail()
411
412    def _test_modes(self, testfunc):
413        testfunc(tarname, "r")
414        testfunc(tarname, "r:")
415        testfunc(tarname, "r:*")
416        testfunc(tarname, "r|")
417        testfunc(tarname, "r|*")
418
419        if gzip:
420            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
421            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
422            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
423            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
424
425            testfunc(gzipname, "r")
426            testfunc(gzipname, "r:*")
427            testfunc(gzipname, "r:gz")
428            testfunc(gzipname, "r|*")
429            testfunc(gzipname, "r|gz")
430
431        if bz2:
432            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
433            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
434            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
435            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
436
437            testfunc(bz2name, "r")
438            testfunc(bz2name, "r:*")
439            testfunc(bz2name, "r:bz2")
440            testfunc(bz2name, "r|*")
441            testfunc(bz2name, "r|bz2")
442
443    def test_detect_file(self):
444        self._test_modes(self._testfunc_file)
445
446    def test_detect_fileobj(self):
447        self._test_modes(self._testfunc_fileobj)
448
449    def test_detect_stream_bz2(self):
450        # Originally, tarfile's stream detection looked for the string
451        # "BZh91" at the start of the file. This is incorrect because
452        # the '9' represents the blocksize (900kB). If the file was
453        # compressed using another blocksize autodetection fails.
454        if not bz2:
455            return
456
457        with open(tarname, "rb") as fobj:
458            data = fobj.read()
459
460        # Compress with blocksize 100kB, the file starts with "BZh11".
461        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
462            fobj.write(data)
463
464        self._testfunc_file(tmpname, "r|*")
465
466
467class MemberReadTest(ReadTest):
468
469    def _test_member(self, tarinfo, chksum=None, **kwargs):
470        if chksum is not None:
471            self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
472                    "wrong md5sum for %s" % tarinfo.name)
473
474        kwargs["mtime"] = 07606136617
475        kwargs["uid"] = 1000
476        kwargs["gid"] = 100
477        if "old-v7" not in tarinfo.name:
478            # V7 tar can't handle alphabetic owners.
479            kwargs["uname"] = "tarfile"
480            kwargs["gname"] = "tarfile"
481        for k, v in kwargs.iteritems():
482            self.assertTrue(getattr(tarinfo, k) == v,
483                    "wrong value in %s field of %s" % (k, tarinfo.name))
484
485    def test_find_regtype(self):
486        tarinfo = self.tar.getmember("ustar/regtype")
487        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
488
489    def test_find_conttype(self):
490        tarinfo = self.tar.getmember("ustar/conttype")
491        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
492
493    def test_find_dirtype(self):
494        tarinfo = self.tar.getmember("ustar/dirtype")
495        self._test_member(tarinfo, size=0)
496
497    def test_find_dirtype_with_size(self):
498        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
499        self._test_member(tarinfo, size=255)
500
501    def test_find_lnktype(self):
502        tarinfo = self.tar.getmember("ustar/lnktype")
503        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
504
505    def test_find_symtype(self):
506        tarinfo = self.tar.getmember("ustar/symtype")
507        self._test_member(tarinfo, size=0, linkname="regtype")
508
509    def test_find_blktype(self):
510        tarinfo = self.tar.getmember("ustar/blktype")
511        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
512
513    def test_find_chrtype(self):
514        tarinfo = self.tar.getmember("ustar/chrtype")
515        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
516
517    def test_find_fifotype(self):
518        tarinfo = self.tar.getmember("ustar/fifotype")
519        self._test_member(tarinfo, size=0)
520
521    def test_find_sparse(self):
522        tarinfo = self.tar.getmember("ustar/sparse")
523        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
524
525    def test_find_umlauts(self):
526        tarinfo = self.tar.getmember("ustar/umlauts-�������")
527        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
528
529    def test_find_ustar_longname(self):
530        name = "ustar/" + "12345/" * 39 + "1234567/longname"
531        self.assertIn(name, self.tar.getnames())
532
533    def test_find_regtype_oldv7(self):
534        tarinfo = self.tar.getmember("misc/regtype-old-v7")
535        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
536
537    def test_find_pax_umlauts(self):
538        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
539        tarinfo = self.tar.getmember("pax/umlauts-�������")
540        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
541
542
543class LongnameTest(ReadTest):
544
545    def test_read_longname(self):
546        # Test reading of longname (bug #1471427).
547        longname = self.subdir + "/" + "123/" * 125 + "longname"
548        try:
549            tarinfo = self.tar.getmember(longname)
550        except KeyError:
551            self.fail("longname not found")
552        self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
553
554    def test_read_longlink(self):
555        longname = self.subdir + "/" + "123/" * 125 + "longname"
556        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
557        try:
558            tarinfo = self.tar.getmember(longlink)
559        except KeyError:
560            self.fail("longlink not found")
561        self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
562
563    def test_truncated_longname(self):
564        longname = self.subdir + "/" + "123/" * 125 + "longname"
565        tarinfo = self.tar.getmember(longname)
566        offset = tarinfo.offset
567        self.tar.fileobj.seek(offset)
568        fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
569        self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
570
571    def test_header_offset(self):
572        # Test if the start offset of the TarInfo object includes
573        # the preceding extended header.
574        longname = self.subdir + "/" + "123/" * 125 + "longname"
575        offset = self.tar.getmember(longname).offset
576        fobj = open(tarname)
577        fobj.seek(offset)
578        tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
579        self.assertEqual(tarinfo.type, self.longnametype)
580
581
582class GNUReadTest(LongnameTest):
583
584    subdir = "gnu"
585    longnametype = tarfile.GNUTYPE_LONGNAME
586
587    def test_sparse_file(self):
588        tarinfo1 = self.tar.getmember("ustar/sparse")
589        fobj1 = self.tar.extractfile(tarinfo1)
590        tarinfo2 = self.tar.getmember("gnu/sparse")
591        fobj2 = self.tar.extractfile(tarinfo2)
592        self.assertTrue(fobj1.read() == fobj2.read(),
593                "sparse file extraction failed")
594
595
596class PaxReadTest(LongnameTest):
597
598    subdir = "pax"
599    longnametype = tarfile.XHDTYPE
600
601    def test_pax_global_headers(self):
602        tar = tarfile.open(tarname, encoding="iso8859-1")
603
604        tarinfo = tar.getmember("pax/regtype1")
605        self.assertEqual(tarinfo.uname, "foo")
606        self.assertEqual(tarinfo.gname, "bar")
607        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������")
608
609        tarinfo = tar.getmember("pax/regtype2")
610        self.assertEqual(tarinfo.uname, "")
611        self.assertEqual(tarinfo.gname, "bar")
612        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������")
613
614        tarinfo = tar.getmember("pax/regtype3")
615        self.assertEqual(tarinfo.uname, "tarfile")
616        self.assertEqual(tarinfo.gname, "tarfile")
617        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������")
618
619    def test_pax_number_fields(self):
620        # All following number fields are read from the pax header.
621        tar = tarfile.open(tarname, encoding="iso8859-1")
622        tarinfo = tar.getmember("pax/regtype4")
623        self.assertEqual(tarinfo.size, 7011)
624        self.assertEqual(tarinfo.uid, 123)
625        self.assertEqual(tarinfo.gid, 123)
626        self.assertEqual(tarinfo.mtime, 1041808783.0)
627        self.assertEqual(type(tarinfo.mtime), float)
628        self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
629        self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
630
631
632class WriteTestBase(unittest.TestCase):
633    # Put all write tests in here that are supposed to be tested
634    # in all possible mode combinations.
635
636    def test_fileobj_no_close(self):
637        fobj = StringIO.StringIO()
638        tar = tarfile.open(fileobj=fobj, mode=self.mode)
639        tar.addfile(tarfile.TarInfo("foo"))
640        tar.close()
641        self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
642
643
644class WriteTest(WriteTestBase):
645
646    mode = "w:"
647
648    def test_100_char_name(self):
649        # The name field in a tar header stores strings of at most 100 chars.
650        # If a string is shorter than 100 chars it has to be padded with '\0',
651        # which implies that a string of exactly 100 chars is stored without
652        # a trailing '\0'.
653        name = "0123456789" * 10
654        tar = tarfile.open(tmpname, self.mode)
655        t = tarfile.TarInfo(name)
656        tar.addfile(t)
657        tar.close()
658
659        tar = tarfile.open(tmpname)
660        self.assertTrue(tar.getnames()[0] == name,
661                "failed to store 100 char filename")
662        tar.close()
663
664    def test_tar_size(self):
665        # Test for bug #1013882.
666        tar = tarfile.open(tmpname, self.mode)
667        path = os.path.join(TEMPDIR, "file")
668        fobj = open(path, "wb")
669        fobj.write("aaa")
670        fobj.close()
671        tar.add(path)
672        tar.close()
673        self.assertTrue(os.path.getsize(tmpname) > 0,
674                "tarfile is empty")
675
676    # The test_*_size tests test for bug #1167128.
677    def test_file_size(self):
678        tar = tarfile.open(tmpname, self.mode)
679
680        path = os.path.join(TEMPDIR, "file")
681        fobj = open(path, "wb")
682        fobj.close()
683        tarinfo = tar.gettarinfo(path)
684        self.assertEqual(tarinfo.size, 0)
685
686        fobj = open(path, "wb")
687        fobj.write("aaa")
688        fobj.close()
689        tarinfo = tar.gettarinfo(path)
690        self.assertEqual(tarinfo.size, 3)
691
692        tar.close()
693
694    def test_directory_size(self):
695        path = os.path.join(TEMPDIR, "directory")
696        os.mkdir(path)
697        try:
698            tar = tarfile.open(tmpname, self.mode)
699            tarinfo = tar.gettarinfo(path)
700            self.assertEqual(tarinfo.size, 0)
701        finally:
702            os.rmdir(path)
703
704    def test_link_size(self):
705        if hasattr(os, "link"):
706            link = os.path.join(TEMPDIR, "link")
707            target = os.path.join(TEMPDIR, "link_target")
708            fobj = open(target, "wb")
709            fobj.write("aaa")
710            fobj.close()
711            os.link(target, link)
712            try:
713                tar = tarfile.open(tmpname, self.mode)
714                # Record the link target in the inodes list.
715                tar.gettarinfo(target)
716                tarinfo = tar.gettarinfo(link)
717                self.assertEqual(tarinfo.size, 0)
718            finally:
719                os.remove(target)
720                os.remove(link)
721
722    def test_symlink_size(self):
723        if hasattr(os, "symlink"):
724            path = os.path.join(TEMPDIR, "symlink")
725            os.symlink("link_target", path)
726            try:
727                tar = tarfile.open(tmpname, self.mode)
728                tarinfo = tar.gettarinfo(path)
729                self.assertEqual(tarinfo.size, 0)
730            finally:
731                os.remove(path)
732
733    def test_add_self(self):
734        # Test for #1257255.
735        dstname = os.path.abspath(tmpname)
736
737        tar = tarfile.open(tmpname, self.mode)
738        self.assertTrue(tar.name == dstname, "archive name must be absolute")
739
740        tar.add(dstname)
741        self.assertTrue(tar.getnames() == [], "added the archive to itself")
742
743        cwd = os.getcwd()
744        os.chdir(TEMPDIR)
745        tar.add(dstname)
746        os.chdir(cwd)
747        self.assertTrue(tar.getnames() == [], "added the archive to itself")
748
749    def test_exclude(self):
750        tempdir = os.path.join(TEMPDIR, "exclude")
751        os.mkdir(tempdir)
752        try:
753            for name in ("foo", "bar", "baz"):
754                name = os.path.join(tempdir, name)
755                open(name, "wb").close()
756
757            exclude = os.path.isfile
758
759            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
760            with test_support.check_warnings(("use the filter argument",
761                                              DeprecationWarning)):
762                tar.add(tempdir, arcname="empty_dir", exclude=exclude)
763            tar.close()
764
765            tar = tarfile.open(tmpname, "r")
766            self.assertEqual(len(tar.getmembers()), 1)
767            self.assertEqual(tar.getnames()[0], "empty_dir")
768        finally:
769            shutil.rmtree(tempdir)
770
771    def test_filter(self):
772        tempdir = os.path.join(TEMPDIR, "filter")
773        os.mkdir(tempdir)
774        try:
775            for name in ("foo", "bar", "baz"):
776                name = os.path.join(tempdir, name)
777                open(name, "wb").close()
778
779            def filter(tarinfo):
780                if os.path.basename(tarinfo.name) == "bar":
781                    return
782                tarinfo.uid = 123
783                tarinfo.uname = "foo"
784                return tarinfo
785
786            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
787            tar.add(tempdir, arcname="empty_dir", filter=filter)
788            tar.close()
789
790            tar = tarfile.open(tmpname, "r")
791            for tarinfo in tar:
792                self.assertEqual(tarinfo.uid, 123)
793                self.assertEqual(tarinfo.uname, "foo")
794            self.assertEqual(len(tar.getmembers()), 3)
795            tar.close()
796        finally:
797            shutil.rmtree(tempdir)
798
799    # Guarantee that stored pathnames are not modified. Don't
800    # remove ./ or ../ or double slashes. Still make absolute
801    # pathnames relative.
802    # For details see bug #6054.
803    def _test_pathname(self, path, cmp_path=None, dir=False):
804        # Create a tarfile with an empty member named path
805        # and compare the stored name with the original.
806        foo = os.path.join(TEMPDIR, "foo")
807        if not dir:
808            open(foo, "w").close()
809        else:
810            os.mkdir(foo)
811
812        tar = tarfile.open(tmpname, self.mode)
813        tar.add(foo, arcname=path)
814        tar.close()
815
816        tar = tarfile.open(tmpname, "r")
817        t = tar.next()
818        tar.close()
819
820        if not dir:
821            os.remove(foo)
822        else:
823            os.rmdir(foo)
824
825        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
826
827    def test_pathnames(self):
828        self._test_pathname("foo")
829        self._test_pathname(os.path.join("foo", ".", "bar"))
830        self._test_pathname(os.path.join("foo", "..", "bar"))
831        self._test_pathname(os.path.join(".", "foo"))
832        self._test_pathname(os.path.join(".", "foo", "."))
833        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
834        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
835        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
836        self._test_pathname(os.path.join("..", "foo"))
837        self._test_pathname(os.path.join("..", "foo", ".."))
838        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
839        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
840
841        self._test_pathname("foo" + os.sep + os.sep + "bar")
842        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
843
844    def test_abs_pathnames(self):
845        if sys.platform == "win32":
846            self._test_pathname("C:\\foo", "foo")
847        else:
848            self._test_pathname("/foo", "foo")
849            self._test_pathname("///foo", "foo")
850
851    def test_cwd(self):
852        # Test adding the current working directory.
853        cwd = os.getcwd()
854        os.chdir(TEMPDIR)
855        try:
856            open("foo", "w").close()
857
858            tar = tarfile.open(tmpname, self.mode)
859            tar.add(".")
860            tar.close()
861
862            tar = tarfile.open(tmpname, "r")
863            for t in tar:
864                self.assertTrue(t.name == "." or t.name.startswith("./"))
865            tar.close()
866        finally:
867            os.chdir(cwd)
868
869    @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
870    def test_extractall_symlinks(self):
871        # Test if extractall works properly when tarfile contains symlinks
872        tempdir = os.path.join(TEMPDIR, "testsymlinks")
873        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
874        os.mkdir(tempdir)
875        try:
876            source_file = os.path.join(tempdir,'source')
877            target_file = os.path.join(tempdir,'symlink')
878            with open(source_file,'w') as f:
879                f.write('something\n')
880            os.symlink(source_file, target_file)
881            tar = tarfile.open(temparchive,'w')
882            tar.add(source_file, arcname=os.path.basename(source_file))
883            tar.add(target_file, arcname=os.path.basename(target_file))
884            tar.close()
885            # Let's extract it to the location which contains the symlink
886            tar = tarfile.open(temparchive,'r')
887            # this should not raise OSError: [Errno 17] File exists
888            try:
889                tar.extractall(path=tempdir)
890            except OSError:
891                self.fail("extractall failed with symlinked files")
892            finally:
893                tar.close()
894        finally:
895            os.unlink(temparchive)
896            shutil.rmtree(tempdir)
897
898    @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
899    def test_extractall_broken_symlinks(self):
900        # Test if extractall works properly when tarfile contains broken
901        # symlinks
902        tempdir = os.path.join(TEMPDIR, "testsymlinks")
903        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
904        os.mkdir(tempdir)
905        try:
906            source_file = os.path.join(tempdir,'source')
907            target_file = os.path.join(tempdir,'symlink')
908            with open(source_file,'w') as f:
909                f.write('something\n')
910            os.symlink(source_file, target_file)
911            tar = tarfile.open(temparchive,'w')
912            tar.add(target_file, arcname=os.path.basename(target_file))
913            tar.close()
914            # remove the real file
915            os.unlink(source_file)
916            # Let's extract it to the location which contains the symlink
917            tar = tarfile.open(temparchive,'r')
918            # this should not raise OSError: [Errno 17] File exists
919            try:
920                tar.extractall(path=tempdir)
921            except OSError:
922                self.fail("extractall failed with broken symlinked files")
923            finally:
924                tar.close()
925        finally:
926            os.unlink(temparchive)
927            shutil.rmtree(tempdir)
928
929    @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
930    def test_extractall_hardlinks(self):
931        # Test if extractall works properly when tarfile contains symlinks
932        tempdir = os.path.join(TEMPDIR, "testsymlinks")
933        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
934        os.mkdir(tempdir)
935        try:
936            source_file = os.path.join(tempdir,'source')
937            target_file = os.path.join(tempdir,'symlink')
938            with open(source_file,'w') as f:
939                f.write('something\n')
940            os.link(source_file, target_file)
941            tar = tarfile.open(temparchive,'w')
942            tar.add(source_file, arcname=os.path.basename(source_file))
943            tar.add(target_file, arcname=os.path.basename(target_file))
944            tar.close()
945            # Let's extract it to the location which contains the symlink
946            tar = tarfile.open(temparchive,'r')
947            # this should not raise OSError: [Errno 17] File exists
948            try:
949                tar.extractall(path=tempdir)
950            except OSError:
951                self.fail("extractall failed with linked files")
952            finally:
953                tar.close()
954        finally:
955            os.unlink(temparchive)
956            shutil.rmtree(tempdir)
957
958class StreamWriteTest(WriteTestBase):
959
960    mode = "w|"
961
962    def test_stream_padding(self):
963        # Test for bug #1543303.
964        tar = tarfile.open(tmpname, self.mode)
965        tar.close()
966
967        if self.mode.endswith("gz"):
968            fobj = gzip.GzipFile(tmpname)
969            data = fobj.read()
970            fobj.close()
971        elif self.mode.endswith("bz2"):
972            dec = bz2.BZ2Decompressor()
973            data = open(tmpname, "rb").read()
974            data = dec.decompress(data)
975            self.assertTrue(len(dec.unused_data) == 0,
976                    "found trailing data")
977        else:
978            fobj = open(tmpname, "rb")
979            data = fobj.read()
980            fobj.close()
981
982        self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
983                         "incorrect zero padding")
984
985    def test_file_mode(self):
986        # Test for issue #8464: Create files with correct
987        # permissions.
988        if sys.platform == "win32" or not hasattr(os, "umask"):
989            return
990
991        if os.path.exists(tmpname):
992            os.remove(tmpname)
993
994        original_umask = os.umask(0022)
995        try:
996            tar = tarfile.open(tmpname, self.mode)
997            tar.close()
998            mode = os.stat(tmpname).st_mode & 0777
999            self.assertEqual(mode, 0644, "wrong file permissions")
1000        finally:
1001            os.umask(original_umask)
1002
1003    def test_issue13639(self):
1004        try:
1005            with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode):
1006                pass
1007        except UnicodeDecodeError:
1008            self.fail("_Stream failed to write unicode filename")
1009
1010
1011class GNUWriteTest(unittest.TestCase):
1012    # This testcase checks for correct creation of GNU Longname
1013    # and Longlink extended headers (cp. bug #812325).
1014
1015    def _length(self, s):
1016        blocks, remainder = divmod(len(s) + 1, 512)
1017        if remainder:
1018            blocks += 1
1019        return blocks * 512
1020
1021    def _calc_size(self, name, link=None):
1022        # Initial tar header
1023        count = 512
1024
1025        if len(name) > tarfile.LENGTH_NAME:
1026            # GNU longname extended header + longname
1027            count += 512
1028            count += self._length(name)
1029        if link is not None and len(link) > tarfile.LENGTH_LINK:
1030            # GNU longlink extended header + longlink
1031            count += 512
1032            count += self._length(link)
1033        return count
1034
1035    def _test(self, name, link=None):
1036        tarinfo = tarfile.TarInfo(name)
1037        if link:
1038            tarinfo.linkname = link
1039            tarinfo.type = tarfile.LNKTYPE
1040
1041        tar = tarfile.open(tmpname, "w")
1042        tar.format = tarfile.GNU_FORMAT
1043        tar.addfile(tarinfo)
1044
1045        v1 = self._calc_size(name, link)
1046        v2 = tar.offset
1047        self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
1048
1049        tar.close()
1050
1051        tar = tarfile.open(tmpname)
1052        member = tar.next()
1053        self.assertIsNotNone(member,
1054                "unable to read longname member")
1055        self.assertEqual(tarinfo.name, member.name,
1056                "unable to read longname member")
1057        self.assertEqual(tarinfo.linkname, member.linkname,
1058                "unable to read longname member")
1059
1060    def test_longname_1023(self):
1061        self._test(("longnam/" * 127) + "longnam")
1062
1063    def test_longname_1024(self):
1064        self._test(("longnam/" * 127) + "longname")
1065
1066    def test_longname_1025(self):
1067        self._test(("longnam/" * 127) + "longname_")
1068
1069    def test_longlink_1023(self):
1070        self._test("name", ("longlnk/" * 127) + "longlnk")
1071
1072    def test_longlink_1024(self):
1073        self._test("name", ("longlnk/" * 127) + "longlink")
1074
1075    def test_longlink_1025(self):
1076        self._test("name", ("longlnk/" * 127) + "longlink_")
1077
1078    def test_longnamelink_1023(self):
1079        self._test(("longnam/" * 127) + "longnam",
1080                   ("longlnk/" * 127) + "longlnk")
1081
1082    def test_longnamelink_1024(self):
1083        self._test(("longnam/" * 127) + "longname",
1084                   ("longlnk/" * 127) + "longlink")
1085
1086    def test_longnamelink_1025(self):
1087        self._test(("longnam/" * 127) + "longname_",
1088                   ("longlnk/" * 127) + "longlink_")
1089
1090
1091class HardlinkTest(unittest.TestCase):
1092    # Test the creation of LNKTYPE (hardlink) members in an archive.
1093
1094    def setUp(self):
1095        self.foo = os.path.join(TEMPDIR, "foo")
1096        self.bar = os.path.join(TEMPDIR, "bar")
1097
1098        fobj = open(self.foo, "wb")
1099        fobj.write("foo")
1100        fobj.close()
1101
1102        os.link(self.foo, self.bar)
1103
1104        self.tar = tarfile.open(tmpname, "w")
1105        self.tar.add(self.foo)
1106
1107    def tearDown(self):
1108        self.tar.close()
1109        os.remove(self.foo)
1110        os.remove(self.bar)
1111
1112    def test_add_twice(self):
1113        # The same name will be added as a REGTYPE every
1114        # time regardless of st_nlink.
1115        tarinfo = self.tar.gettarinfo(self.foo)
1116        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1117                "add file as regular failed")
1118
1119    def test_add_hardlink(self):
1120        tarinfo = self.tar.gettarinfo(self.bar)
1121        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
1122                "add file as hardlink failed")
1123
1124    def test_dereference_hardlink(self):
1125        self.tar.dereference = True
1126        tarinfo = self.tar.gettarinfo(self.bar)
1127        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1128                "dereferencing hardlink failed")
1129
1130
1131class PaxWriteTest(GNUWriteTest):
1132
1133    def _test(self, name, link=None):
1134        # See GNUWriteTest.
1135        tarinfo = tarfile.TarInfo(name)
1136        if link:
1137            tarinfo.linkname = link
1138            tarinfo.type = tarfile.LNKTYPE
1139
1140        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1141        tar.addfile(tarinfo)
1142        tar.close()
1143
1144        tar = tarfile.open(tmpname)
1145        if link:
1146            l = tar.getmembers()[0].linkname
1147            self.assertTrue(link == l, "PAX longlink creation failed")
1148        else:
1149            n = tar.getmembers()[0].name
1150            self.assertTrue(name == n, "PAX longname creation failed")
1151
1152    def test_pax_global_header(self):
1153        pax_headers = {
1154                u"foo": u"bar",
1155                u"uid": u"0",
1156                u"mtime": u"1.23",
1157                u"test": u"���",
1158                u"���": u"test"}
1159
1160        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1161                pax_headers=pax_headers)
1162        tar.addfile(tarfile.TarInfo("test"))
1163        tar.close()
1164
1165        # Test if the global header was written correctly.
1166        tar = tarfile.open(tmpname, encoding="iso8859-1")
1167        self.assertEqual(tar.pax_headers, pax_headers)
1168        self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1169
1170        # Test if all the fields are unicode.
1171        for key, val in tar.pax_headers.iteritems():
1172            self.assertTrue(type(key) is unicode)
1173            self.assertTrue(type(val) is unicode)
1174            if key in tarfile.PAX_NUMBER_FIELDS:
1175                try:
1176                    tarfile.PAX_NUMBER_FIELDS[key](val)
1177                except (TypeError, ValueError):
1178                    self.fail("unable to convert pax header field")
1179
1180    def test_pax_extended_header(self):
1181        # The fields from the pax header have priority over the
1182        # TarInfo.
1183        pax_headers = {u"path": u"foo", u"uid": u"123"}
1184
1185        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
1186        t = tarfile.TarInfo()
1187        t.name = u"���"     # non-ASCII
1188        t.uid = 8**8        # too large
1189        t.pax_headers = pax_headers
1190        tar.addfile(t)
1191        tar.close()
1192
1193        tar = tarfile.open(tmpname, encoding="iso8859-1")
1194        t = tar.getmembers()[0]
1195        self.assertEqual(t.pax_headers, pax_headers)
1196        self.assertEqual(t.name, "foo")
1197        self.assertEqual(t.uid, 123)
1198
1199
1200class UstarUnicodeTest(unittest.TestCase):
1201    # All *UnicodeTests FIXME
1202
1203    format = tarfile.USTAR_FORMAT
1204
1205    def test_iso8859_1_filename(self):
1206        self._test_unicode_filename("iso8859-1")
1207
1208    def test_utf7_filename(self):
1209        self._test_unicode_filename("utf7")
1210
1211    def test_utf8_filename(self):
1212        self._test_unicode_filename("utf8")
1213
1214    def _test_unicode_filename(self, encoding):
1215        tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
1216        name = u"���"
1217        tar.addfile(tarfile.TarInfo(name))
1218        tar.close()
1219
1220        tar = tarfile.open(tmpname, encoding=encoding)
1221        self.assertTrue(type(tar.getnames()[0]) is not unicode)
1222        self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
1223        tar.close()
1224
1225    def test_unicode_filename_error(self):
1226        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
1227        tarinfo = tarfile.TarInfo()
1228
1229        tarinfo.name = "���"
1230        if self.format == tarfile.PAX_FORMAT:
1231            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1232        else:
1233            tar.addfile(tarinfo)
1234
1235        tarinfo.name = u"���"
1236        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1237
1238        tarinfo.name = "foo"
1239        tarinfo.uname = u"���"
1240        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1241
1242    def test_unicode_argument(self):
1243        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
1244        for t in tar:
1245            self.assertTrue(type(t.name) is str)
1246            self.assertTrue(type(t.linkname) is str)
1247            self.assertTrue(type(t.uname) is str)
1248            self.assertTrue(type(t.gname) is str)
1249        tar.close()
1250
1251    def test_uname_unicode(self):
1252        for name in (u"���", "���"):
1253            t = tarfile.TarInfo("foo")
1254            t.uname = name
1255            t.gname = name
1256
1257            fobj = StringIO.StringIO()
1258            tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
1259            tar.addfile(t)
1260            tar.close()
1261            fobj.seek(0)
1262
1263            tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
1264            t = tar.getmember("foo")
1265            self.assertEqual(t.uname, "���")
1266            self.assertEqual(t.gname, "���")
1267
1268
1269class GNUUnicodeTest(UstarUnicodeTest):
1270
1271    format = tarfile.GNU_FORMAT
1272
1273
1274class PaxUnicodeTest(UstarUnicodeTest):
1275
1276    format = tarfile.PAX_FORMAT
1277
1278    def _create_unicode_name(self, name):
1279        tar = tarfile.open(tmpname, "w", format=self.format)
1280        t = tarfile.TarInfo()
1281        t.pax_headers["path"] = name
1282        tar.addfile(t)
1283        tar.close()
1284
1285    def test_error_handlers(self):
1286        # Test if the unicode error handlers work correctly for characters
1287        # that cannot be expressed in a given encoding.
1288        self._create_unicode_name(u"���")
1289
1290        for handler, name in (("utf-8", u"���".encode("utf8")),
1291                    ("replace", "???"), ("ignore", "")):
1292            tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
1293                    errors=handler)
1294            self.assertEqual(tar.getnames()[0], name)
1295
1296        self.assertRaises(UnicodeError, tarfile.open, tmpname,
1297                encoding="ascii", errors="strict")
1298
1299    def test_error_handler_utf8(self):
1300        # Create a pathname that has one component representable using
1301        # iso8859-1 and the other only in iso8859-15.
1302        self._create_unicode_name(u"���/�")
1303
1304        tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
1305                errors="utf-8")
1306        self.assertEqual(tar.getnames()[0], "���/" + u"�".encode("utf8"))
1307
1308
1309class AppendTest(unittest.TestCase):
1310    # Test append mode (cp. patch #1652681).
1311
1312    def setUp(self):
1313        self.tarname = tmpname
1314        if os.path.exists(self.tarname):
1315            os.remove(self.tarname)
1316
1317    def _add_testfile(self, fileobj=None):
1318        tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
1319        tar.addfile(tarfile.TarInfo("bar"))
1320        tar.close()
1321
1322    def _create_testtar(self, mode="w:"):
1323        src = tarfile.open(tarname, encoding="iso8859-1")
1324        t = src.getmember("ustar/regtype")
1325        t.name = "foo"
1326        f = src.extractfile(t)
1327        tar = tarfile.open(self.tarname, mode)
1328        tar.addfile(t, f)
1329        tar.close()
1330
1331    def _test(self, names=["bar"], fileobj=None):
1332        tar = tarfile.open(self.tarname, fileobj=fileobj)
1333        self.assertEqual(tar.getnames(), names)
1334
1335    def test_non_existing(self):
1336        self._add_testfile()
1337        self._test()
1338
1339    def test_empty(self):
1340        tarfile.open(self.tarname, "w:").close()
1341        self._add_testfile()
1342        self._test()
1343
1344    def test_empty_fileobj(self):
1345        fobj = StringIO.StringIO("\0" * 1024)
1346        self._add_testfile(fobj)
1347        fobj.seek(0)
1348        self._test(fileobj=fobj)
1349
1350    def test_fileobj(self):
1351        self._create_testtar()
1352        data = open(self.tarname).read()
1353        fobj = StringIO.StringIO(data)
1354        self._add_testfile(fobj)
1355        fobj.seek(0)
1356        self._test(names=["foo", "bar"], fileobj=fobj)
1357
1358    def test_existing(self):
1359        self._create_testtar()
1360        self._add_testfile()
1361        self._test(names=["foo", "bar"])
1362
1363    def test_append_gz(self):
1364        if gzip is None:
1365            return
1366        self._create_testtar("w:gz")
1367        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1368
1369    def test_append_bz2(self):
1370        if bz2 is None:
1371            return
1372        self._create_testtar("w:bz2")
1373        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1374
1375    # Append mode is supposed to fail if the tarfile to append to
1376    # does not end with a zero block.
1377    def _test_error(self, data):
1378        open(self.tarname, "wb").write(data)
1379        self.assertRaises(tarfile.ReadError, self._add_testfile)
1380
1381    def test_null(self):
1382        self._test_error("")
1383
1384    def test_incomplete(self):
1385        self._test_error("\0" * 13)
1386
1387    def test_premature_eof(self):
1388        data = tarfile.TarInfo("foo").tobuf()
1389        self._test_error(data)
1390
1391    def test_trailing_garbage(self):
1392        data = tarfile.TarInfo("foo").tobuf()
1393        self._test_error(data + "\0" * 13)
1394
1395    def test_invalid(self):
1396        self._test_error("a" * 512)
1397
1398
1399class LimitsTest(unittest.TestCase):
1400
1401    def test_ustar_limits(self):
1402        # 100 char name
1403        tarinfo = tarfile.TarInfo("0123456789" * 10)
1404        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1405
1406        # 101 char name that cannot be stored
1407        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1408        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1409
1410        # 256 char name with a slash at pos 156
1411        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1412        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1413
1414        # 256 char name that cannot be stored
1415        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1416        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1417
1418        # 512 char name
1419        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1420        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1421
1422        # 512 char linkname
1423        tarinfo = tarfile.TarInfo("longlink")
1424        tarinfo.linkname = "123/" * 126 + "longname"
1425        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1426
1427        # uid > 8 digits
1428        tarinfo = tarfile.TarInfo("name")
1429        tarinfo.uid = 010000000
1430        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1431
1432    def test_gnu_limits(self):
1433        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1434        tarinfo.tobuf(tarfile.GNU_FORMAT)
1435
1436        tarinfo = tarfile.TarInfo("longlink")
1437        tarinfo.linkname = "123/" * 126 + "longname"
1438        tarinfo.tobuf(tarfile.GNU_FORMAT)
1439
1440        # uid >= 256 ** 7
1441        tarinfo = tarfile.TarInfo("name")
1442        tarinfo.uid = 04000000000000000000L
1443        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1444
1445    def test_pax_limits(self):
1446        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1447        tarinfo.tobuf(tarfile.PAX_FORMAT)
1448
1449        tarinfo = tarfile.TarInfo("longlink")
1450        tarinfo.linkname = "123/" * 126 + "longname"
1451        tarinfo.tobuf(tarfile.PAX_FORMAT)
1452
1453        tarinfo = tarfile.TarInfo("name")
1454        tarinfo.uid = 04000000000000000000L
1455        tarinfo.tobuf(tarfile.PAX_FORMAT)
1456
1457
1458class ContextManagerTest(unittest.TestCase):
1459
1460    def test_basic(self):
1461        with tarfile.open(tarname) as tar:
1462            self.assertFalse(tar.closed, "closed inside runtime context")
1463        self.assertTrue(tar.closed, "context manager failed")
1464
1465    def test_closed(self):
1466        # The __enter__() method is supposed to raise IOError
1467        # if the TarFile object is already closed.
1468        tar = tarfile.open(tarname)
1469        tar.close()
1470        with self.assertRaises(IOError):
1471            with tar:
1472                pass
1473
1474    def test_exception(self):
1475        # Test if the IOError exception is passed through properly.
1476        with self.assertRaises(Exception) as exc:
1477            with tarfile.open(tarname) as tar:
1478                raise IOError
1479        self.assertIsInstance(exc.exception, IOError,
1480                              "wrong exception raised in context manager")
1481        self.assertTrue(tar.closed, "context manager failed")
1482
1483    def test_no_eof(self):
1484        # __exit__() must not write end-of-archive blocks if an
1485        # exception was raised.
1486        try:
1487            with tarfile.open(tmpname, "w") as tar:
1488                raise Exception
1489        except:
1490            pass
1491        self.assertEqual(os.path.getsize(tmpname), 0,
1492                "context manager wrote an end-of-archive block")
1493        self.assertTrue(tar.closed, "context manager failed")
1494
1495    def test_eof(self):
1496        # __exit__() must write end-of-archive blocks, i.e. call
1497        # TarFile.close() if there was no error.
1498        with tarfile.open(tmpname, "w"):
1499            pass
1500        self.assertNotEqual(os.path.getsize(tmpname), 0,
1501                "context manager wrote no end-of-archive block")
1502
1503    def test_fileobj(self):
1504        # Test that __exit__() did not close the external file
1505        # object.
1506        fobj = open(tmpname, "wb")
1507        try:
1508            with tarfile.open(fileobj=fobj, mode="w") as tar:
1509                raise Exception
1510        except:
1511            pass
1512        self.assertFalse(fobj.closed, "external file object was closed")
1513        self.assertTrue(tar.closed, "context manager failed")
1514        fobj.close()
1515
1516
1517class LinkEmulationTest(ReadTest):
1518
1519    # Test for issue #8741 regression. On platforms that do not support
1520    # symbolic or hard links tarfile tries to extract these types of members as
1521    # the regular files they point to.
1522    def _test_link_extraction(self, name):
1523        self.tar.extract(name, TEMPDIR)
1524        data = open(os.path.join(TEMPDIR, name), "rb").read()
1525        self.assertEqual(md5sum(data), md5_regtype)
1526
1527    def test_hardlink_extraction1(self):
1528        self._test_link_extraction("ustar/lnktype")
1529
1530    def test_hardlink_extraction2(self):
1531        self._test_link_extraction("./ustar/linktest2/lnktype")
1532
1533    def test_symlink_extraction1(self):
1534        self._test_link_extraction("ustar/symtype")
1535
1536    def test_symlink_extraction2(self):
1537        self._test_link_extraction("./ustar/linktest2/symtype")
1538
1539
1540class GzipMiscReadTest(MiscReadTest):
1541    tarname = gzipname
1542    mode = "r:gz"
1543class GzipUstarReadTest(UstarReadTest):
1544    tarname = gzipname
1545    mode = "r:gz"
1546class GzipStreamReadTest(StreamReadTest):
1547    tarname = gzipname
1548    mode = "r|gz"
1549class GzipWriteTest(WriteTest):
1550    mode = "w:gz"
1551class GzipStreamWriteTest(StreamWriteTest):
1552    mode = "w|gz"
1553
1554
1555class Bz2MiscReadTest(MiscReadTest):
1556    tarname = bz2name
1557    mode = "r:bz2"
1558class Bz2UstarReadTest(UstarReadTest):
1559    tarname = bz2name
1560    mode = "r:bz2"
1561class Bz2StreamReadTest(StreamReadTest):
1562    tarname = bz2name
1563    mode = "r|bz2"
1564class Bz2WriteTest(WriteTest):
1565    mode = "w:bz2"
1566class Bz2StreamWriteTest(StreamWriteTest):
1567    mode = "w|bz2"
1568
1569class Bz2PartialReadTest(unittest.TestCase):
1570    # Issue5068: The _BZ2Proxy.read() method loops forever
1571    # on an empty or partial bzipped file.
1572
1573    def _test_partial_input(self, mode):
1574        class MyStringIO(StringIO.StringIO):
1575            hit_eof = False
1576            def read(self, n):
1577                if self.hit_eof:
1578                    raise AssertionError("infinite loop detected in tarfile.open()")
1579                self.hit_eof = self.pos == self.len
1580                return StringIO.StringIO.read(self, n)
1581            def seek(self, *args):
1582                self.hit_eof = False
1583                return StringIO.StringIO.seek(self, *args)
1584
1585        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1586        for x in range(len(data) + 1):
1587            try:
1588                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
1589            except tarfile.ReadError:
1590                pass # we have no interest in ReadErrors
1591
1592    def test_partial_input(self):
1593        self._test_partial_input("r")
1594
1595    def test_partial_input_bz2(self):
1596        self._test_partial_input("r:bz2")
1597
1598
1599def test_main():
1600    os.makedirs(TEMPDIR)
1601
1602    tests = [
1603        UstarReadTest,
1604        MiscReadTest,
1605        StreamReadTest,
1606        DetectReadTest,
1607        MemberReadTest,
1608        GNUReadTest,
1609        PaxReadTest,
1610        WriteTest,
1611        StreamWriteTest,
1612        GNUWriteTest,
1613        PaxWriteTest,
1614        UstarUnicodeTest,
1615        GNUUnicodeTest,
1616        PaxUnicodeTest,
1617        AppendTest,
1618        LimitsTest,
1619        ContextManagerTest,
1620    ]
1621
1622    if hasattr(os, "link"):
1623        tests.append(HardlinkTest)
1624    else:
1625        tests.append(LinkEmulationTest)
1626
1627    fobj = open(tarname, "rb")
1628    data = fobj.read()
1629    fobj.close()
1630
1631    if gzip:
1632        # Create testtar.tar.gz and add gzip-specific tests.
1633        tar = gzip.open(gzipname, "wb")
1634        tar.write(data)
1635        tar.close()
1636
1637        tests += [
1638            GzipMiscReadTest,
1639            GzipUstarReadTest,
1640            GzipStreamReadTest,
1641            GzipWriteTest,
1642            GzipStreamWriteTest,
1643        ]
1644
1645    if bz2:
1646        # Create testtar.tar.bz2 and add bz2-specific tests.
1647        tar = bz2.BZ2File(bz2name, "wb")
1648        tar.write(data)
1649        tar.close()
1650
1651        tests += [
1652            Bz2MiscReadTest,
1653            Bz2UstarReadTest,
1654            Bz2StreamReadTest,
1655            Bz2WriteTest,
1656            Bz2StreamWriteTest,
1657            Bz2PartialReadTest,
1658        ]
1659
1660    try:
1661        test_support.run_unittest(*tests)
1662    finally:
1663        if os.path.exists(TEMPDIR):
1664            shutil.rmtree(TEMPDIR)
1665
1666if __name__ == "__main__":
1667    test_main()
1668