1"""Tests for distutils.archive_util."""
2__revision__ = "$Id$"
3
4import unittest
5import os
6import tarfile
7from os.path import splitdrive
8import warnings
9
10from distutils.archive_util import (check_archive_formats, make_tarball,
11                                    make_zipfile, make_archive,
12                                    ARCHIVE_FORMATS)
13from distutils.spawn import find_executable, spawn
14from distutils.tests import support
15from test.test_support import check_warnings, run_unittest
16
17try:
18    import grp
19    import pwd
20    UID_GID_SUPPORT = True
21except ImportError:
22    UID_GID_SUPPORT = False
23
24try:
25    import zipfile
26    ZIP_SUPPORT = True
27except ImportError:
28    ZIP_SUPPORT = find_executable('zip')
29
30# some tests will fail if zlib is not available
31try:
32    import zlib
33except ImportError:
34    zlib = None
35
36
37class ArchiveUtilTestCase(support.TempdirManager,
38                          support.LoggingSilencer,
39                          unittest.TestCase):
40
41    @unittest.skipUnless(zlib, "requires zlib")
42    def test_make_tarball(self):
43        # creating something to tar
44        tmpdir = self.mkdtemp()
45        self.write_file([tmpdir, 'file1'], 'xxx')
46        self.write_file([tmpdir, 'file2'], 'xxx')
47        os.mkdir(os.path.join(tmpdir, 'sub'))
48        self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
49
50        tmpdir2 = self.mkdtemp()
51        unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
52                            "source and target should be on same drive")
53
54        base_name = os.path.join(tmpdir2, 'archive')
55
56        # working with relative paths to avoid tar warnings
57        old_dir = os.getcwd()
58        os.chdir(tmpdir)
59        try:
60            make_tarball(splitdrive(base_name)[1], '.')
61        finally:
62            os.chdir(old_dir)
63
64        # check if the compressed tarball was created
65        tarball = base_name + '.tar.gz'
66        self.assertTrue(os.path.exists(tarball))
67
68        # trying an uncompressed one
69        base_name = os.path.join(tmpdir2, 'archive')
70        old_dir = os.getcwd()
71        os.chdir(tmpdir)
72        try:
73            make_tarball(splitdrive(base_name)[1], '.', compress=None)
74        finally:
75            os.chdir(old_dir)
76        tarball = base_name + '.tar'
77        self.assertTrue(os.path.exists(tarball))
78
79    def _tarinfo(self, path):
80        tar = tarfile.open(path)
81        try:
82            names = tar.getnames()
83            names.sort()
84            return tuple(names)
85        finally:
86            tar.close()
87
88    def _create_files(self):
89        # creating something to tar
90        tmpdir = self.mkdtemp()
91        dist = os.path.join(tmpdir, 'dist')
92        os.mkdir(dist)
93        self.write_file([dist, 'file1'], 'xxx')
94        self.write_file([dist, 'file2'], 'xxx')
95        os.mkdir(os.path.join(dist, 'sub'))
96        self.write_file([dist, 'sub', 'file3'], 'xxx')
97        os.mkdir(os.path.join(dist, 'sub2'))
98        tmpdir2 = self.mkdtemp()
99        base_name = os.path.join(tmpdir2, 'archive')
100        return tmpdir, tmpdir2, base_name
101
102    @unittest.skipUnless(zlib, "Requires zlib")
103    @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
104                         'Need the tar command to run')
105    def test_tarfile_vs_tar(self):
106        tmpdir, tmpdir2, base_name =  self._create_files()
107        old_dir = os.getcwd()
108        os.chdir(tmpdir)
109        try:
110            make_tarball(base_name, 'dist')
111        finally:
112            os.chdir(old_dir)
113
114        # check if the compressed tarball was created
115        tarball = base_name + '.tar.gz'
116        self.assertTrue(os.path.exists(tarball))
117
118        # now create another tarball using `tar`
119        tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
120        tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
121        gzip_cmd = ['gzip', '-f9', 'archive2.tar']
122        old_dir = os.getcwd()
123        os.chdir(tmpdir)
124        try:
125            spawn(tar_cmd)
126            spawn(gzip_cmd)
127        finally:
128            os.chdir(old_dir)
129
130        self.assertTrue(os.path.exists(tarball2))
131        # let's compare both tarballs
132        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
133
134        # trying an uncompressed one
135        base_name = os.path.join(tmpdir2, 'archive')
136        old_dir = os.getcwd()
137        os.chdir(tmpdir)
138        try:
139            make_tarball(base_name, 'dist', compress=None)
140        finally:
141            os.chdir(old_dir)
142        tarball = base_name + '.tar'
143        self.assertTrue(os.path.exists(tarball))
144
145        # now for a dry_run
146        base_name = os.path.join(tmpdir2, 'archive')
147        old_dir = os.getcwd()
148        os.chdir(tmpdir)
149        try:
150            make_tarball(base_name, 'dist', compress=None, dry_run=True)
151        finally:
152            os.chdir(old_dir)
153        tarball = base_name + '.tar'
154        self.assertTrue(os.path.exists(tarball))
155
156    @unittest.skipUnless(find_executable('compress'),
157                         'The compress program is required')
158    def test_compress_deprecated(self):
159        tmpdir, tmpdir2, base_name =  self._create_files()
160
161        # using compress and testing the PendingDeprecationWarning
162        old_dir = os.getcwd()
163        os.chdir(tmpdir)
164        try:
165            with check_warnings() as w:
166                warnings.simplefilter("always")
167                make_tarball(base_name, 'dist', compress='compress')
168        finally:
169            os.chdir(old_dir)
170        tarball = base_name + '.tar.Z'
171        self.assertTrue(os.path.exists(tarball))
172        self.assertEqual(len(w.warnings), 1)
173
174        # same test with dry_run
175        os.remove(tarball)
176        old_dir = os.getcwd()
177        os.chdir(tmpdir)
178        try:
179            with check_warnings() as w:
180                warnings.simplefilter("always")
181                make_tarball(base_name, 'dist', compress='compress',
182                             dry_run=True)
183        finally:
184            os.chdir(old_dir)
185        self.assertTrue(not os.path.exists(tarball))
186        self.assertEqual(len(w.warnings), 1)
187
188    @unittest.skipUnless(zlib, "Requires zlib")
189    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
190    def test_make_zipfile(self):
191        # creating something to tar
192        tmpdir = self.mkdtemp()
193        self.write_file([tmpdir, 'file1'], 'xxx')
194        self.write_file([tmpdir, 'file2'], 'xxx')
195
196        tmpdir2 = self.mkdtemp()
197        base_name = os.path.join(tmpdir2, 'archive')
198        make_zipfile(base_name, tmpdir)
199
200        # check if the compressed tarball was created
201        tarball = base_name + '.zip'
202
203    def test_check_archive_formats(self):
204        self.assertEqual(check_archive_formats(['gztar', 'xxx', 'zip']),
205                         'xxx')
206        self.assertEqual(check_archive_formats(['gztar', 'zip']), None)
207
208    def test_make_archive(self):
209        tmpdir = self.mkdtemp()
210        base_name = os.path.join(tmpdir, 'archive')
211        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
212
213    @unittest.skipUnless(zlib, "Requires zlib")
214    def test_make_archive_owner_group(self):
215        # testing make_archive with owner and group, with various combinations
216        # this works even if there's not gid/uid support
217        if UID_GID_SUPPORT:
218            group = grp.getgrgid(0)[0]
219            owner = pwd.getpwuid(0)[0]
220        else:
221            group = owner = 'root'
222
223        base_dir, root_dir, base_name =  self._create_files()
224        base_name = os.path.join(self.mkdtemp() , 'archive')
225        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
226                           group=group)
227        self.assertTrue(os.path.exists(res))
228
229        res = make_archive(base_name, 'zip', root_dir, base_dir)
230        self.assertTrue(os.path.exists(res))
231
232        res = make_archive(base_name, 'tar', root_dir, base_dir,
233                           owner=owner, group=group)
234        self.assertTrue(os.path.exists(res))
235
236        res = make_archive(base_name, 'tar', root_dir, base_dir,
237                           owner='kjhkjhkjg', group='oihohoh')
238        self.assertTrue(os.path.exists(res))
239
240    @unittest.skipUnless(zlib, "Requires zlib")
241    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
242    def test_tarfile_root_owner(self):
243        tmpdir, tmpdir2, base_name =  self._create_files()
244        old_dir = os.getcwd()
245        os.chdir(tmpdir)
246        group = grp.getgrgid(0)[0]
247        owner = pwd.getpwuid(0)[0]
248        try:
249            archive_name = make_tarball(base_name, 'dist', compress=None,
250                                        owner=owner, group=group)
251        finally:
252            os.chdir(old_dir)
253
254        # check if the compressed tarball was created
255        self.assertTrue(os.path.exists(archive_name))
256
257        # now checks the rights
258        archive = tarfile.open(archive_name)
259        try:
260            for member in archive.getmembers():
261                self.assertEqual(member.uid, 0)
262                self.assertEqual(member.gid, 0)
263        finally:
264            archive.close()
265
266    def test_make_archive_cwd(self):
267        current_dir = os.getcwd()
268        def _breaks(*args, **kw):
269            raise RuntimeError()
270        ARCHIVE_FORMATS['xxx'] = (_breaks, [], 'xxx file')
271        try:
272            try:
273                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
274            except:
275                pass
276            self.assertEqual(os.getcwd(), current_dir)
277        finally:
278            del ARCHIVE_FORMATS['xxx']
279
280def test_suite():
281    return unittest.makeSuite(ArchiveUtilTestCase)
282
283if __name__ == "__main__":
284    run_unittest(test_suite())
285