test_tarfile.py revision 477c8d5e70240744d24631b18341ad892c8a8e1c
1import sys
2import os
3import shutil
4import tempfile
5import StringIO
6
7import unittest
8import tarfile
9
10from test import test_support
11
12# Check for our compression modules.
13try:
14    import gzip
15    gzip.GzipFile
16except (ImportError, AttributeError):
17    gzip = None
18try:
19    import bz2
20except ImportError:
21    bz2 = None
22
23def path(path):
24    return test_support.findfile(path)
25
26testtar = path("testtar.tar")
27tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir")
28tempname = test_support.TESTFN
29membercount = 12
30
31def tarname(comp=""):
32    if not comp:
33        return testtar
34    return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp))
35
36def dirname():
37    if not os.path.exists(tempdir):
38        os.mkdir(tempdir)
39    return tempdir
40
41def tmpname():
42    return tempname
43
44
45class BaseTest(unittest.TestCase):
46    comp = ''
47    mode = 'r'
48    sep = ':'
49
50    def setUp(self):
51        mode = self.mode + self.sep + self.comp
52        self.tar = tarfile.open(tarname(self.comp), mode)
53
54    def tearDown(self):
55        self.tar.close()
56
57class ReadTest(BaseTest):
58
59    def test(self):
60        """Test member extraction.
61        """
62        members = 0
63        for tarinfo in self.tar:
64            members += 1
65            if not tarinfo.isreg():
66                continue
67            f = self.tar.extractfile(tarinfo)
68            self.assert_(len(f.read()) == tarinfo.size,
69                         "size read does not match expected size")
70            f.close()
71
72        self.assert_(members == membercount,
73                     "could not find all members")
74
75    def test_sparse(self):
76        """Test sparse member extraction.
77        """
78        if self.sep != "|":
79            f1 = self.tar.extractfile("S-SPARSE")
80            f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
81            self.assert_(f1.read() == f2.read(),
82                         "_FileObject failed on sparse file member")
83
84    def test_readlines(self):
85        """Test readlines() method of _FileObject.
86        """
87        if self.sep != "|":
88            filename = "0-REGTYPE-TEXT"
89            self.tar.extract(filename, dirname())
90            f = open(os.path.join(dirname(), filename), "rU")
91            lines1 = f.readlines()
92            f.close()
93            lines2 = self.tar.extractfile(filename).readlines()
94            self.assert_(lines1 == lines2,
95                         "_FileObject.readline() does not work correctly")
96
97    def test_iter(self):
98        # Test iteration over ExFileObject.
99        if self.sep != "|":
100            filename = "0-REGTYPE-TEXT"
101            self.tar.extract(filename, dirname())
102            f = open(os.path.join(dirname(), filename), "rU")
103            lines1 = f.readlines()
104            f.close()
105            lines2 = [line for line in self.tar.extractfile(filename)]
106            self.assert_(lines1 == lines2,
107                         "ExFileObject iteration does not work correctly")
108
109    def test_seek(self):
110        """Test seek() method of _FileObject, incl. random reading.
111        """
112        if self.sep != "|":
113            filename = "0-REGTYPE"
114            self.tar.extract(filename, dirname())
115            f = open(os.path.join(dirname(), filename), "rb")
116            data = f.read()
117            f.close()
118
119            tarinfo = self.tar.getmember(filename)
120            fobj = self.tar.extractfile(tarinfo)
121
122            text = fobj.read()
123            fobj.seek(0)
124            self.assert_(0 == fobj.tell(),
125                         "seek() to file's start failed")
126            fobj.seek(2048, 0)
127            self.assert_(2048 == fobj.tell(),
128                         "seek() to absolute position failed")
129            fobj.seek(-1024, 1)
130            self.assert_(1024 == fobj.tell(),
131                         "seek() to negative relative position failed")
132            fobj.seek(1024, 1)
133            self.assert_(2048 == fobj.tell(),
134                         "seek() to positive relative position failed")
135            s = fobj.read(10)
136            self.assert_(s == data[2048:2058],
137                         "read() after seek failed")
138            fobj.seek(0, 2)
139            self.assert_(tarinfo.size == fobj.tell(),
140                         "seek() to file's end failed")
141            self.assert_(fobj.read() == "",
142                         "read() at file's end did not return empty string")
143            fobj.seek(-tarinfo.size, 2)
144            self.assert_(0 == fobj.tell(),
145                         "relative seek() to file's start failed")
146            fobj.seek(512)
147            s1 = fobj.readlines()
148            fobj.seek(512)
149            s2 = fobj.readlines()
150            self.assert_(s1 == s2,
151                         "readlines() after seek failed")
152            fobj.close()
153
154    def test_old_dirtype(self):
155        """Test old style dirtype member (bug #1336623).
156        """
157        # Old tars create directory members using a REGTYPE
158        # header with a "/" appended to the filename field.
159
160        # Create an old tar style directory entry.
161        filename = tmpname()
162        tarinfo = tarfile.TarInfo("directory/")
163        tarinfo.type = tarfile.REGTYPE
164
165        fobj = open(filename, "w")
166        fobj.write(tarinfo.tobuf())
167        fobj.close()
168
169        try:
170            # Test if it is still a directory entry when
171            # read back.
172            tar = tarfile.open(filename)
173            tarinfo = tar.getmembers()[0]
174            tar.close()
175
176            self.assert_(tarinfo.type == tarfile.DIRTYPE)
177            self.assert_(tarinfo.name.endswith("/"))
178        finally:
179            try:
180                os.unlink(filename)
181            except:
182                pass
183
184class ReadStreamTest(ReadTest):
185    sep = "|"
186
187    def test(self):
188        """Test member extraction, and for StreamError when
189           seeking backwards.
190        """
191        ReadTest.test(self)
192        tarinfo = self.tar.getmembers()[0]
193        f = self.tar.extractfile(tarinfo)
194        self.assertRaises(tarfile.StreamError, f.read)
195
196    def test_stream(self):
197        """Compare the normal tar and the stream tar.
198        """
199        stream = self.tar
200        tar = tarfile.open(tarname(), 'r')
201
202        while 1:
203            t1 = tar.next()
204            t2 = stream.next()
205            if t1 is None:
206                break
207            self.assert_(t2 is not None, "stream.next() failed.")
208
209            if t2.islnk() or t2.issym():
210                self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
211                continue
212            v1 = tar.extractfile(t1)
213            v2 = stream.extractfile(t2)
214            if v1 is None:
215                continue
216            self.assert_(v2 is not None, "stream.extractfile() failed")
217            self.assert_(v1.read() == v2.read(), "stream extraction failed")
218
219        tar.close()
220        stream.close()
221
222class ReadDetectTest(ReadTest):
223
224    def setUp(self):
225        self.tar = tarfile.open(tarname(self.comp), self.mode)
226
227class ReadDetectFileobjTest(ReadTest):
228
229    def setUp(self):
230        name = tarname(self.comp)
231        self.tar = tarfile.open(name, mode=self.mode,
232                                fileobj=open(name, "rb"))
233
234class ReadAsteriskTest(ReadTest):
235
236    def setUp(self):
237        mode = self.mode + self.sep + "*"
238        self.tar = tarfile.open(tarname(self.comp), mode)
239
240class ReadStreamAsteriskTest(ReadStreamTest):
241
242    def setUp(self):
243        mode = self.mode + self.sep + "*"
244        self.tar = tarfile.open(tarname(self.comp), mode)
245
246class WriteTest(BaseTest):
247    mode = 'w'
248
249    def setUp(self):
250        mode = self.mode + self.sep + self.comp
251        self.src = tarfile.open(tarname(self.comp), 'r')
252        self.dstname = tmpname()
253        self.dst = tarfile.open(self.dstname, mode)
254
255    def tearDown(self):
256        self.src.close()
257        self.dst.close()
258
259    def test_posix(self):
260        self.dst.posix = 1
261        self._test()
262
263    def test_nonposix(self):
264        self.dst.posix = 0
265        self._test()
266
267    def test_small(self):
268        self.dst.add(os.path.join(os.path.dirname(__file__),"cfgparser.1"))
269        self.dst.close()
270        self.assertNotEqual(os.stat(self.dstname).st_size, 0)
271
272    def _test(self):
273        for tarinfo in self.src:
274            if not tarinfo.isreg():
275                continue
276            f = self.src.extractfile(tarinfo)
277            if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME and "/" not in tarinfo.name:
278                self.assertRaises(ValueError, self.dst.addfile,
279                                 tarinfo, f)
280            else:
281                self.dst.addfile(tarinfo, f)
282
283class WriteSize0Test(BaseTest):
284    mode = 'w'
285
286    def setUp(self):
287        self.tmpdir = dirname()
288        self.dstname = tmpname()
289        self.dst = tarfile.open(self.dstname, "w")
290
291    def tearDown(self):
292        self.dst.close()
293
294    def test_file(self):
295        path = os.path.join(self.tmpdir, "file")
296        f = open(path, "w")
297        f.close()
298        tarinfo = self.dst.gettarinfo(path)
299        self.assertEqual(tarinfo.size, 0)
300        f = open(path, "w")
301        f.write("aaa")
302        f.close()
303        tarinfo = self.dst.gettarinfo(path)
304        self.assertEqual(tarinfo.size, 3)
305
306    def test_directory(self):
307        path = os.path.join(self.tmpdir, "directory")
308        if os.path.exists(path):
309            # This shouldn't be necessary, but is <wink> if a previous
310            # run was killed in mid-stream.
311            shutil.rmtree(path)
312        os.mkdir(path)
313        tarinfo = self.dst.gettarinfo(path)
314        self.assertEqual(tarinfo.size, 0)
315
316    def test_symlink(self):
317        if hasattr(os, "symlink"):
318            path = os.path.join(self.tmpdir, "symlink")
319            os.symlink("link_target", path)
320            tarinfo = self.dst.gettarinfo(path)
321            self.assertEqual(tarinfo.size, 0)
322
323
324class WriteStreamTest(WriteTest):
325    sep = '|'
326
327class WriteGNULongTest(unittest.TestCase):
328    """This testcase checks for correct creation of GNU Longname
329       and Longlink extensions.
330
331       It creates a tarfile and adds empty members with either
332       long names, long linknames or both and compares the size
333       of the tarfile with the expected size.
334
335       It checks for SF bug #812325 in TarFile._create_gnulong().
336
337       While I was writing this testcase, I noticed a second bug
338       in the same method:
339       Long{names,links} weren't null-terminated which lead to
340       bad tarfiles when their length was a multiple of 512. This
341       is tested as well.
342    """
343
344    def setUp(self):
345        self.tar = tarfile.open(tmpname(), "w")
346        self.tar.posix = False
347
348    def tearDown(self):
349        self.tar.close()
350
351    def _length(self, s):
352        blocks, remainder = divmod(len(s) + 1, 512)
353        if remainder:
354            blocks += 1
355        return blocks * 512
356
357    def _calc_size(self, name, link=None):
358        # initial tar header
359        count = 512
360
361        if len(name) > tarfile.LENGTH_NAME:
362            # gnu longname extended header + longname
363            count += 512
364            count += self._length(name)
365
366        if link is not None and len(link) > tarfile.LENGTH_LINK:
367            # gnu longlink extended header + longlink
368            count += 512
369            count += self._length(link)
370
371        return count
372
373    def _test(self, name, link=None):
374        tarinfo = tarfile.TarInfo(name)
375        if link:
376            tarinfo.linkname = link
377            tarinfo.type = tarfile.LNKTYPE
378
379        self.tar.addfile(tarinfo)
380
381        v1 = self._calc_size(name, link)
382        v2 = self.tar.offset
383        self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
384
385    def test_longname_1023(self):
386        self._test(("longnam/" * 127) + "longnam")
387
388    def test_longname_1024(self):
389        self._test(("longnam/" * 127) + "longname")
390
391    def test_longname_1025(self):
392        self._test(("longnam/" * 127) + "longname_")
393
394    def test_longlink_1023(self):
395        self._test("name", ("longlnk/" * 127) + "longlnk")
396
397    def test_longlink_1024(self):
398        self._test("name", ("longlnk/" * 127) + "longlink")
399
400    def test_longlink_1025(self):
401        self._test("name", ("longlnk/" * 127) + "longlink_")
402
403    def test_longnamelink_1023(self):
404        self._test(("longnam/" * 127) + "longnam",
405                   ("longlnk/" * 127) + "longlnk")
406
407    def test_longnamelink_1024(self):
408        self._test(("longnam/" * 127) + "longname",
409                   ("longlnk/" * 127) + "longlink")
410
411    def test_longnamelink_1025(self):
412        self._test(("longnam/" * 127) + "longname_",
413                   ("longlnk/" * 127) + "longlink_")
414
415class ReadGNULongTest(unittest.TestCase):
416
417    def setUp(self):
418        self.tar = tarfile.open(tarname())
419
420    def tearDown(self):
421        self.tar.close()
422
423    def test_1471427(self):
424        """Test reading of longname (bug #1471427).
425        """
426        name = "test/" * 20 + "0-REGTYPE"
427        try:
428            tarinfo = self.tar.getmember(name)
429        except KeyError:
430            tarinfo = None
431        self.assert_(tarinfo is not None, "longname not found")
432        self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
433
434    def test_read_name(self):
435        name = ("0-LONGNAME-" * 10)[:101]
436        try:
437            tarinfo = self.tar.getmember(name)
438        except KeyError:
439            tarinfo = None
440        self.assert_(tarinfo is not None, "longname not found")
441
442    def test_read_link(self):
443        link = ("1-LONGLINK-" * 10)[:101]
444        name = ("0-LONGNAME-" * 10)[:101]
445        try:
446            tarinfo = self.tar.getmember(link)
447        except KeyError:
448            tarinfo = None
449        self.assert_(tarinfo is not None, "longlink not found")
450        self.assert_(tarinfo.linkname == name, "linkname wrong")
451
452    def test_truncated_longname(self):
453        f = open(tarname())
454        fobj = StringIO.StringIO(f.read(1024))
455        f.close()
456        tar = tarfile.open(name="foo.tar", fileobj=fobj)
457        self.assert_(len(tar.getmembers()) == 0, "")
458        tar.close()
459
460
461class ExtractHardlinkTest(BaseTest):
462
463    def test_hardlink(self):
464        """Test hardlink extraction (bug #857297)
465        """
466        # Prevent errors from being caught
467        self.tar.errorlevel = 1
468
469        self.tar.extract("0-REGTYPE", dirname())
470        try:
471            # Extract 1-LNKTYPE which is a hardlink to 0-REGTYPE
472            self.tar.extract("1-LNKTYPE", dirname())
473        except EnvironmentError, e:
474            import errno
475            if e.errno == errno.ENOENT:
476                self.fail("hardlink not extracted properly")
477
478class CreateHardlinkTest(BaseTest):
479    """Test the creation of LNKTYPE (hardlink) members in an archive.
480       In this respect tarfile.py mimics the behaviour of GNU tar: If
481       a file has a st_nlink > 1, it will be added a REGTYPE member
482       only the first time.
483    """
484
485    def setUp(self):
486        self.tar = tarfile.open(tmpname(), "w")
487
488        self.foo = os.path.join(dirname(), "foo")
489        self.bar = os.path.join(dirname(), "bar")
490
491        if os.path.exists(self.foo):
492            os.remove(self.foo)
493        if os.path.exists(self.bar):
494            os.remove(self.bar)
495
496        f = open(self.foo, "w")
497        f.write("foo")
498        f.close()
499        self.tar.add(self.foo)
500
501    def test_add_twice(self):
502        # If st_nlink == 1 then the same file will be added as
503        # REGTYPE every time.
504        tarinfo = self.tar.gettarinfo(self.foo)
505        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
506                "add file as regular failed")
507
508    def test_add_hardlink(self):
509        # If st_nlink > 1 then the same file will be added as
510        # LNKTYPE.
511        os.link(self.foo, self.bar)
512        tarinfo = self.tar.gettarinfo(self.foo)
513        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
514                "add file as hardlink failed")
515
516        tarinfo = self.tar.gettarinfo(self.bar)
517        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
518                "add file as hardlink failed")
519
520    def test_dereference_hardlink(self):
521        self.tar.dereference = True
522        os.link(self.foo, self.bar)
523        tarinfo = self.tar.gettarinfo(self.bar)
524        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
525                "dereferencing hardlink failed")
526
527
528# Gzip TestCases
529class ReadTestGzip(ReadTest):
530    comp = "gz"
531class ReadStreamTestGzip(ReadStreamTest):
532    comp = "gz"
533class WriteTestGzip(WriteTest):
534    comp = "gz"
535class WriteStreamTestGzip(WriteStreamTest):
536    comp = "gz"
537class ReadDetectTestGzip(ReadDetectTest):
538    comp = "gz"
539class ReadDetectFileobjTestGzip(ReadDetectFileobjTest):
540    comp = "gz"
541class ReadAsteriskTestGzip(ReadAsteriskTest):
542    comp = "gz"
543class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest):
544    comp = "gz"
545
546# Filemode test cases
547
548class FileModeTest(unittest.TestCase):
549    def test_modes(self):
550        self.assertEqual(tarfile.filemode(0755), '-rwxr-xr-x')
551        self.assertEqual(tarfile.filemode(07111), '---s--s--t')
552
553
554if bz2:
555    # Bzip2 TestCases
556    class ReadTestBzip2(ReadTestGzip):
557        comp = "bz2"
558    class ReadStreamTestBzip2(ReadStreamTestGzip):
559        comp = "bz2"
560    class WriteTestBzip2(WriteTest):
561        comp = "bz2"
562    class WriteStreamTestBzip2(WriteStreamTestGzip):
563        comp = "bz2"
564    class ReadDetectTestBzip2(ReadDetectTest):
565        comp = "bz2"
566    class ReadDetectFileobjTestBzip2(ReadDetectFileobjTest):
567        comp = "bz2"
568    class ReadAsteriskTestBzip2(ReadAsteriskTest):
569        comp = "bz2"
570    class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest):
571        comp = "bz2"
572
573# If importing gzip failed, discard the Gzip TestCases.
574if not gzip:
575    del ReadTestGzip
576    del ReadStreamTestGzip
577    del WriteTestGzip
578    del WriteStreamTestGzip
579
580def test_main():
581    # Create archive.
582    f = open(tarname(), "rb")
583    fguts = f.read()
584    f.close()
585    if gzip:
586        # create testtar.tar.gz
587        tar = gzip.open(tarname("gz"), "wb")
588        tar.write(fguts)
589        tar.close()
590    if bz2:
591        # create testtar.tar.bz2
592        tar = bz2.BZ2File(tarname("bz2"), "wb")
593        tar.write(fguts)
594        tar.close()
595
596    tests = [
597        FileModeTest,
598        ReadTest,
599        ReadStreamTest,
600        ReadDetectTest,
601        ReadDetectFileobjTest,
602        ReadAsteriskTest,
603        ReadStreamAsteriskTest,
604        WriteTest,
605        WriteSize0Test,
606        WriteStreamTest,
607        WriteGNULongTest,
608        ReadGNULongTest,
609    ]
610
611    if hasattr(os, "link"):
612        tests.append(ExtractHardlinkTest)
613        tests.append(CreateHardlinkTest)
614
615    if gzip:
616        tests.extend([
617            ReadTestGzip, ReadStreamTestGzip,
618            WriteTestGzip, WriteStreamTestGzip,
619            ReadDetectTestGzip, ReadDetectFileobjTestGzip,
620            ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip
621        ])
622
623    if bz2:
624        tests.extend([
625            ReadTestBzip2, ReadStreamTestBzip2,
626            WriteTestBzip2, WriteStreamTestBzip2,
627            ReadDetectTestBzip2, ReadDetectFileobjTestBzip2,
628            ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2
629        ])
630    try:
631        test_support.run_unittest(*tests)
632    finally:
633        if gzip:
634            os.remove(tarname("gz"))
635        if bz2:
636            os.remove(tarname("bz2"))
637        if os.path.exists(dirname()):
638            shutil.rmtree(dirname())
639        if os.path.exists(tmpname()):
640            os.remove(tmpname())
641
642if __name__ == "__main__":
643    test_main()
644