1# -*- coding: utf-8 -*-
2"""Tests for distutils.archive_util."""
3import unittest
4import os
5import sys
6import tarfile
7from os.path import splitdrive
8import warnings
9
10from distutils import archive_util
11from distutils.archive_util import (check_archive_formats, make_tarball,
12                                    make_zipfile, make_archive,
13                                    ARCHIVE_FORMATS)
14from distutils.spawn import find_executable, spawn
15from distutils.tests import support
16from test.support import check_warnings, run_unittest, patch, change_cwd
17
18try:
19    import grp
20    import pwd
21    UID_GID_SUPPORT = True
22except ImportError:
23    UID_GID_SUPPORT = False
24
25try:
26    import zipfile
27    ZIP_SUPPORT = True
28except ImportError:
29    ZIP_SUPPORT = find_executable('zip')
30
31try:
32    import zlib
33    ZLIB_SUPPORT = True
34except ImportError:
35    ZLIB_SUPPORT = False
36
37try:
38    import bz2
39except ImportError:
40    bz2 = None
41
42try:
43    import lzma
44except ImportError:
45    lzma = None
46
47def can_fs_encode(filename):
48    """
49    Return True if the filename can be saved in the file system.
50    """
51    if os.path.supports_unicode_filenames:
52        return True
53    try:
54        filename.encode(sys.getfilesystemencoding())
55    except UnicodeEncodeError:
56        return False
57    return True
58
59
60class ArchiveUtilTestCase(support.TempdirManager,
61                          support.LoggingSilencer,
62                          unittest.TestCase):
63
64    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
65    def test_make_tarball(self, name='archive'):
66        # creating something to tar
67        tmpdir = self._create_files()
68        self._make_tarball(tmpdir, name, '.tar.gz')
69        # trying an uncompressed one
70        self._make_tarball(tmpdir, name, '.tar', compress=None)
71
72    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
73    def test_make_tarball_gzip(self):
74        tmpdir = self._create_files()
75        self._make_tarball(tmpdir, 'archive', '.tar.gz', compress='gzip')
76
77    @unittest.skipUnless(bz2, 'Need bz2 support to run')
78    def test_make_tarball_bzip2(self):
79        tmpdir = self._create_files()
80        self._make_tarball(tmpdir, 'archive', '.tar.bz2', compress='bzip2')
81
82    @unittest.skipUnless(lzma, 'Need lzma support to run')
83    def test_make_tarball_xz(self):
84        tmpdir = self._create_files()
85        self._make_tarball(tmpdir, 'archive', '.tar.xz', compress='xz')
86
87    @unittest.skipUnless(can_fs_encode('årchiv'),
88        'File system cannot handle this filename')
89    def test_make_tarball_latin1(self):
90        """
91        Mirror test_make_tarball, except filename contains latin characters.
92        """
93        self.test_make_tarball('årchiv') # note this isn't a real word
94
95    @unittest.skipUnless(can_fs_encode('のアーカイブ'),
96        'File system cannot handle this filename')
97    def test_make_tarball_extended(self):
98        """
99        Mirror test_make_tarball, except filename contains extended
100        characters outside the latin charset.
101        """
102        self.test_make_tarball('のアーカイブ') # japanese for archive
103
104    def _make_tarball(self, tmpdir, target_name, suffix, **kwargs):
105        tmpdir2 = self.mkdtemp()
106        unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
107                            "source and target should be on same drive")
108
109        base_name = os.path.join(tmpdir2, target_name)
110
111        # working with relative paths to avoid tar warnings
112        with change_cwd(tmpdir):
113            make_tarball(splitdrive(base_name)[1], 'dist', **kwargs)
114
115        # check if the compressed tarball was created
116        tarball = base_name + suffix
117        self.assertTrue(os.path.exists(tarball))
118        self.assertEqual(self._tarinfo(tarball), self._created_files)
119
120    def _tarinfo(self, path):
121        tar = tarfile.open(path)
122        try:
123            names = tar.getnames()
124            names.sort()
125            return tuple(names)
126        finally:
127            tar.close()
128
129    _created_files = ('dist', 'dist/file1', 'dist/file2',
130                      'dist/sub', 'dist/sub/file3', 'dist/sub2')
131
132    def _create_files(self):
133        # creating something to tar
134        tmpdir = self.mkdtemp()
135        dist = os.path.join(tmpdir, 'dist')
136        os.mkdir(dist)
137        self.write_file([dist, 'file1'], 'xxx')
138        self.write_file([dist, 'file2'], 'xxx')
139        os.mkdir(os.path.join(dist, 'sub'))
140        self.write_file([dist, 'sub', 'file3'], 'xxx')
141        os.mkdir(os.path.join(dist, 'sub2'))
142        return tmpdir
143
144    @unittest.skipUnless(find_executable('tar') and find_executable('gzip')
145                         and ZLIB_SUPPORT,
146                         'Need the tar, gzip and zlib command to run')
147    def test_tarfile_vs_tar(self):
148        tmpdir =  self._create_files()
149        tmpdir2 = self.mkdtemp()
150        base_name = os.path.join(tmpdir2, 'archive')
151        old_dir = os.getcwd()
152        os.chdir(tmpdir)
153        try:
154            make_tarball(base_name, 'dist')
155        finally:
156            os.chdir(old_dir)
157
158        # check if the compressed tarball was created
159        tarball = base_name + '.tar.gz'
160        self.assertTrue(os.path.exists(tarball))
161
162        # now create another tarball using `tar`
163        tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
164        tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
165        gzip_cmd = ['gzip', '-f9', 'archive2.tar']
166        old_dir = os.getcwd()
167        os.chdir(tmpdir)
168        try:
169            spawn(tar_cmd)
170            spawn(gzip_cmd)
171        finally:
172            os.chdir(old_dir)
173
174        self.assertTrue(os.path.exists(tarball2))
175        # let's compare both tarballs
176        self.assertEqual(self._tarinfo(tarball), self._created_files)
177        self.assertEqual(self._tarinfo(tarball2), self._created_files)
178
179        # trying an uncompressed one
180        base_name = os.path.join(tmpdir2, 'archive')
181        old_dir = os.getcwd()
182        os.chdir(tmpdir)
183        try:
184            make_tarball(base_name, 'dist', compress=None)
185        finally:
186            os.chdir(old_dir)
187        tarball = base_name + '.tar'
188        self.assertTrue(os.path.exists(tarball))
189
190        # now for a dry_run
191        base_name = os.path.join(tmpdir2, 'archive')
192        old_dir = os.getcwd()
193        os.chdir(tmpdir)
194        try:
195            make_tarball(base_name, 'dist', compress=None, dry_run=True)
196        finally:
197            os.chdir(old_dir)
198        tarball = base_name + '.tar'
199        self.assertTrue(os.path.exists(tarball))
200
201    @unittest.skipUnless(find_executable('compress'),
202                         'The compress program is required')
203    def test_compress_deprecated(self):
204        tmpdir =  self._create_files()
205        base_name = os.path.join(self.mkdtemp(), 'archive')
206
207        # using compress and testing the PendingDeprecationWarning
208        old_dir = os.getcwd()
209        os.chdir(tmpdir)
210        try:
211            with check_warnings() as w:
212                warnings.simplefilter("always")
213                make_tarball(base_name, 'dist', compress='compress')
214        finally:
215            os.chdir(old_dir)
216        tarball = base_name + '.tar.Z'
217        self.assertTrue(os.path.exists(tarball))
218        self.assertEqual(len(w.warnings), 1)
219
220        # same test with dry_run
221        os.remove(tarball)
222        old_dir = os.getcwd()
223        os.chdir(tmpdir)
224        try:
225            with check_warnings() as w:
226                warnings.simplefilter("always")
227                make_tarball(base_name, 'dist', compress='compress',
228                             dry_run=True)
229        finally:
230            os.chdir(old_dir)
231        self.assertFalse(os.path.exists(tarball))
232        self.assertEqual(len(w.warnings), 1)
233
234    @unittest.skipUnless(ZIP_SUPPORT and ZLIB_SUPPORT,
235                         'Need zip and zlib support to run')
236    def test_make_zipfile(self):
237        # creating something to tar
238        tmpdir = self._create_files()
239        base_name = os.path.join(self.mkdtemp(), 'archive')
240        with change_cwd(tmpdir):
241            make_zipfile(base_name, 'dist')
242
243        # check if the compressed tarball was created
244        tarball = base_name + '.zip'
245        self.assertTrue(os.path.exists(tarball))
246        with zipfile.ZipFile(tarball) as zf:
247            self.assertEqual(sorted(zf.namelist()),
248                             ['dist/file1', 'dist/file2', 'dist/sub/file3'])
249
250    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
251    def test_make_zipfile_no_zlib(self):
252        patch(self, archive_util.zipfile, 'zlib', None)  # force zlib ImportError
253
254        called = []
255        zipfile_class = zipfile.ZipFile
256        def fake_zipfile(*a, **kw):
257            if kw.get('compression', None) == zipfile.ZIP_STORED:
258                called.append((a, kw))
259            return zipfile_class(*a, **kw)
260
261        patch(self, archive_util.zipfile, 'ZipFile', fake_zipfile)
262
263        # create something to tar and compress
264        tmpdir = self._create_files()
265        base_name = os.path.join(self.mkdtemp(), 'archive')
266        with change_cwd(tmpdir):
267            make_zipfile(base_name, 'dist')
268
269        tarball = base_name + '.zip'
270        self.assertEqual(called,
271                         [((tarball, "w"), {'compression': zipfile.ZIP_STORED})])
272        self.assertTrue(os.path.exists(tarball))
273        with zipfile.ZipFile(tarball) as zf:
274            self.assertEqual(sorted(zf.namelist()),
275                             ['dist/file1', 'dist/file2', 'dist/sub/file3'])
276
277    def test_check_archive_formats(self):
278        self.assertEqual(check_archive_formats(['gztar', 'xxx', 'zip']),
279                         'xxx')
280        self.assertIsNone(check_archive_formats(['gztar', 'bztar', 'xztar',
281                                                 'ztar', 'tar', 'zip']))
282
283    def test_make_archive(self):
284        tmpdir = self.mkdtemp()
285        base_name = os.path.join(tmpdir, 'archive')
286        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
287
288    def test_make_archive_cwd(self):
289        current_dir = os.getcwd()
290        def _breaks(*args, **kw):
291            raise RuntimeError()
292        ARCHIVE_FORMATS['xxx'] = (_breaks, [], 'xxx file')
293        try:
294            try:
295                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
296            except:
297                pass
298            self.assertEqual(os.getcwd(), current_dir)
299        finally:
300            del ARCHIVE_FORMATS['xxx']
301
302    def test_make_archive_tar(self):
303        base_dir =  self._create_files()
304        base_name = os.path.join(self.mkdtemp() , 'archive')
305        res = make_archive(base_name, 'tar', base_dir, 'dist')
306        self.assertTrue(os.path.exists(res))
307        self.assertEqual(os.path.basename(res), 'archive.tar')
308        self.assertEqual(self._tarinfo(res), self._created_files)
309
310    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
311    def test_make_archive_gztar(self):
312        base_dir =  self._create_files()
313        base_name = os.path.join(self.mkdtemp() , 'archive')
314        res = make_archive(base_name, 'gztar', base_dir, 'dist')
315        self.assertTrue(os.path.exists(res))
316        self.assertEqual(os.path.basename(res), 'archive.tar.gz')
317        self.assertEqual(self._tarinfo(res), self._created_files)
318
319    @unittest.skipUnless(bz2, 'Need bz2 support to run')
320    def test_make_archive_bztar(self):
321        base_dir =  self._create_files()
322        base_name = os.path.join(self.mkdtemp() , 'archive')
323        res = make_archive(base_name, 'bztar', base_dir, 'dist')
324        self.assertTrue(os.path.exists(res))
325        self.assertEqual(os.path.basename(res), 'archive.tar.bz2')
326        self.assertEqual(self._tarinfo(res), self._created_files)
327
328    @unittest.skipUnless(lzma, 'Need xz support to run')
329    def test_make_archive_xztar(self):
330        base_dir =  self._create_files()
331        base_name = os.path.join(self.mkdtemp() , 'archive')
332        res = make_archive(base_name, 'xztar', base_dir, 'dist')
333        self.assertTrue(os.path.exists(res))
334        self.assertEqual(os.path.basename(res), 'archive.tar.xz')
335        self.assertEqual(self._tarinfo(res), self._created_files)
336
337    def test_make_archive_owner_group(self):
338        # testing make_archive with owner and group, with various combinations
339        # this works even if there's not gid/uid support
340        if UID_GID_SUPPORT:
341            group = grp.getgrgid(0)[0]
342            owner = pwd.getpwuid(0)[0]
343        else:
344            group = owner = 'root'
345
346        base_dir =  self._create_files()
347        root_dir = self.mkdtemp()
348        base_name = os.path.join(self.mkdtemp() , 'archive')
349        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
350                           group=group)
351        self.assertTrue(os.path.exists(res))
352
353        res = make_archive(base_name, 'zip', root_dir, base_dir)
354        self.assertTrue(os.path.exists(res))
355
356        res = make_archive(base_name, 'tar', root_dir, base_dir,
357                           owner=owner, group=group)
358        self.assertTrue(os.path.exists(res))
359
360        res = make_archive(base_name, 'tar', root_dir, base_dir,
361                           owner='kjhkjhkjg', group='oihohoh')
362        self.assertTrue(os.path.exists(res))
363
364    @unittest.skipUnless(ZLIB_SUPPORT, "Requires zlib")
365    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
366    def test_tarfile_root_owner(self):
367        tmpdir =  self._create_files()
368        base_name = os.path.join(self.mkdtemp(), 'archive')
369        old_dir = os.getcwd()
370        os.chdir(tmpdir)
371        group = grp.getgrgid(0)[0]
372        owner = pwd.getpwuid(0)[0]
373        try:
374            archive_name = make_tarball(base_name, 'dist', compress=None,
375                                        owner=owner, group=group)
376        finally:
377            os.chdir(old_dir)
378
379        # check if the compressed tarball was created
380        self.assertTrue(os.path.exists(archive_name))
381
382        # now checks the rights
383        archive = tarfile.open(archive_name)
384        try:
385            for member in archive.getmembers():
386                self.assertEqual(member.uid, 0)
387                self.assertEqual(member.gid, 0)
388        finally:
389            archive.close()
390
391def test_suite():
392    return unittest.makeSuite(ArchiveUtilTestCase)
393
394if __name__ == "__main__":
395    run_unittest(test_suite())
396