1"Test posix functions"
2
3from test import test_support
4
5# Skip these tests if there is no posix module.
6posix = test_support.import_module('posix')
7
8import errno
9import sys
10import time
11import os
12import platform
13import pwd
14import shutil
15import stat
16import sys
17import tempfile
18import unittest
19import warnings
20
21_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
22                              test_support.TESTFN + '-dummy-symlink')
23
24warnings.filterwarnings('ignore', '.* potential security risk .*',
25                        RuntimeWarning)
26
27class PosixTester(unittest.TestCase):
28
29    def setUp(self):
30        # create empty file
31        fp = open(test_support.TESTFN, 'w+')
32        fp.close()
33        self.teardown_files = [ test_support.TESTFN ]
34
35    def tearDown(self):
36        for teardown_file in self.teardown_files:
37            os.unlink(teardown_file)
38
39    def testNoArgFunctions(self):
40        # test posix functions which take no arguments and have
41        # no side-effects which we need to cleanup (e.g., fork, wait, abort)
42        NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdu", "uname",
43                             "times", "getloadavg", "tmpnam",
44                             "getegid", "geteuid", "getgid", "getgroups",
45                             "getpid", "getpgrp", "getppid", "getuid",
46                           ]
47
48        with warnings.catch_warnings():
49            warnings.filterwarnings("ignore", "", DeprecationWarning)
50            for name in NO_ARG_FUNCTIONS:
51                posix_func = getattr(posix, name, None)
52                if posix_func is not None:
53                    posix_func()
54                    self.assertRaises(TypeError, posix_func, 1)
55
56    if hasattr(posix, 'getresuid'):
57        def test_getresuid(self):
58            user_ids = posix.getresuid()
59            self.assertEqual(len(user_ids), 3)
60            for val in user_ids:
61                self.assertGreaterEqual(val, 0)
62
63    if hasattr(posix, 'getresgid'):
64        def test_getresgid(self):
65            group_ids = posix.getresgid()
66            self.assertEqual(len(group_ids), 3)
67            for val in group_ids:
68                self.assertGreaterEqual(val, 0)
69
70    if hasattr(posix, 'setresuid'):
71        def test_setresuid(self):
72            current_user_ids = posix.getresuid()
73            self.assertIsNone(posix.setresuid(*current_user_ids))
74            # -1 means don't change that value.
75            self.assertIsNone(posix.setresuid(-1, -1, -1))
76
77        def test_setresuid_exception(self):
78            # Don't do this test if someone is silly enough to run us as root.
79            current_user_ids = posix.getresuid()
80            if 0 not in current_user_ids:
81                new_user_ids = (current_user_ids[0]+1, -1, -1)
82                self.assertRaises(OSError, posix.setresuid, *new_user_ids)
83
84    if hasattr(posix, 'setresgid'):
85        def test_setresgid(self):
86            current_group_ids = posix.getresgid()
87            self.assertIsNone(posix.setresgid(*current_group_ids))
88            # -1 means don't change that value.
89            self.assertIsNone(posix.setresgid(-1, -1, -1))
90
91        def test_setresgid_exception(self):
92            # Don't do this test if someone is silly enough to run us as root.
93            current_group_ids = posix.getresgid()
94            if 0 not in current_group_ids:
95                new_group_ids = (current_group_ids[0]+1, -1, -1)
96                self.assertRaises(OSError, posix.setresgid, *new_group_ids)
97
98    @unittest.skipUnless(hasattr(posix, 'initgroups'),
99                         "test needs os.initgroups()")
100    def test_initgroups(self):
101        # It takes a string and an integer; check that it raises a TypeError
102        # for other argument lists.
103        self.assertRaises(TypeError, posix.initgroups)
104        self.assertRaises(TypeError, posix.initgroups, None)
105        self.assertRaises(TypeError, posix.initgroups, 3, "foo")
106        self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
107
108        # If a non-privileged user invokes it, it should fail with OSError
109        # EPERM.
110        if os.getuid() != 0:
111            try:
112                name = pwd.getpwuid(posix.getuid()).pw_name
113            except KeyError:
114                # the current UID may not have a pwd entry
115                raise unittest.SkipTest("need a pwd entry")
116            try:
117                posix.initgroups(name, 13)
118            except OSError as e:
119                self.assertEqual(e.errno, errno.EPERM)
120            else:
121                self.fail("Expected OSError to be raised by initgroups")
122
123    def test_statvfs(self):
124        if hasattr(posix, 'statvfs'):
125            self.assertTrue(posix.statvfs(os.curdir))
126
127    def test_fstatvfs(self):
128        if hasattr(posix, 'fstatvfs'):
129            fp = open(test_support.TESTFN)
130            try:
131                self.assertTrue(posix.fstatvfs(fp.fileno()))
132            finally:
133                fp.close()
134
135    def test_ftruncate(self):
136        if hasattr(posix, 'ftruncate'):
137            fp = open(test_support.TESTFN, 'w+')
138            try:
139                # we need to have some data to truncate
140                fp.write('test')
141                fp.flush()
142                posix.ftruncate(fp.fileno(), 0)
143            finally:
144                fp.close()
145
146    def test_dup(self):
147        if hasattr(posix, 'dup'):
148            fp = open(test_support.TESTFN)
149            try:
150                fd = posix.dup(fp.fileno())
151                self.assertIsInstance(fd, int)
152                os.close(fd)
153            finally:
154                fp.close()
155
156    def test_confstr(self):
157        if hasattr(posix, 'confstr'):
158            self.assertRaises(ValueError, posix.confstr, "CS_garbage")
159            self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
160
161    def test_dup2(self):
162        if hasattr(posix, 'dup2'):
163            fp1 = open(test_support.TESTFN)
164            fp2 = open(test_support.TESTFN)
165            try:
166                posix.dup2(fp1.fileno(), fp2.fileno())
167            finally:
168                fp1.close()
169                fp2.close()
170
171    def fdopen_helper(self, *args):
172        fd = os.open(test_support.TESTFN, os.O_RDONLY)
173        fp2 = posix.fdopen(fd, *args)
174        fp2.close()
175
176    def test_fdopen(self):
177        if hasattr(posix, 'fdopen'):
178            self.fdopen_helper()
179            self.fdopen_helper('r')
180            self.fdopen_helper('r', 100)
181
182    def test_osexlock(self):
183        if hasattr(posix, "O_EXLOCK"):
184            fd = os.open(test_support.TESTFN,
185                         os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
186            self.assertRaises(OSError, os.open, test_support.TESTFN,
187                              os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
188            os.close(fd)
189
190            if hasattr(posix, "O_SHLOCK"):
191                fd = os.open(test_support.TESTFN,
192                             os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
193                self.assertRaises(OSError, os.open, test_support.TESTFN,
194                                  os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
195                os.close(fd)
196
197    def test_osshlock(self):
198        if hasattr(posix, "O_SHLOCK"):
199            fd1 = os.open(test_support.TESTFN,
200                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
201            fd2 = os.open(test_support.TESTFN,
202                          os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
203            os.close(fd2)
204            os.close(fd1)
205
206            if hasattr(posix, "O_EXLOCK"):
207                fd = os.open(test_support.TESTFN,
208                             os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
209                self.assertRaises(OSError, os.open, test_support.TESTFN,
210                                  os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
211                os.close(fd)
212
213    def test_fstat(self):
214        if hasattr(posix, 'fstat'):
215            fp = open(test_support.TESTFN)
216            try:
217                self.assertTrue(posix.fstat(fp.fileno()))
218            finally:
219                fp.close()
220
221    def test_stat(self):
222        if hasattr(posix, 'stat'):
223            self.assertTrue(posix.stat(test_support.TESTFN))
224
225    def _test_all_chown_common(self, chown_func, first_param, stat_func):
226        """Common code for chown, fchown and lchown tests."""
227        def check_stat(uid, gid):
228            if stat_func is not None:
229                stat = stat_func(first_param)
230                self.assertEqual(stat.st_uid, uid)
231                self.assertEqual(stat.st_gid, gid)
232        uid = os.getuid()
233        gid = os.getgid()
234        # test a successful chown call
235        chown_func(first_param, uid, gid)
236        check_stat(uid, gid)
237        chown_func(first_param, -1, gid)
238        check_stat(uid, gid)
239        chown_func(first_param, uid, -1)
240        check_stat(uid, gid)
241
242        if uid == 0:
243            # Try an amusingly large uid/gid to make sure we handle
244            # large unsigned values.  (chown lets you use any
245            # uid/gid you like, even if they aren't defined.)
246            #
247            # This problem keeps coming up:
248            #   http://bugs.python.org/issue1747858
249            #   http://bugs.python.org/issue4591
250            #   http://bugs.python.org/issue15301
251            # Hopefully the fix in 4591 fixes it for good!
252            #
253            # This part of the test only runs when run as root.
254            # Only scary people run their tests as root.
255
256            big_value = 2**31
257            chown_func(first_param, big_value, big_value)
258            check_stat(big_value, big_value)
259            chown_func(first_param, -1, -1)
260            check_stat(big_value, big_value)
261            chown_func(first_param, uid, gid)
262            check_stat(uid, gid)
263        elif platform.system() in ('HP-UX', 'SunOS'):
264            # HP-UX and Solaris can allow a non-root user to chown() to root
265            # (issue #5113)
266            raise unittest.SkipTest("Skipping because of non-standard chown() "
267                                    "behavior")
268        else:
269            # non-root cannot chown to root, raises OSError
270            self.assertRaises(OSError, chown_func, first_param, 0, 0)
271            check_stat(uid, gid)
272            self.assertRaises(OSError, chown_func, first_param, 0, -1)
273            check_stat(uid, gid)
274            if 0 not in os.getgroups():
275                self.assertRaises(OSError, chown_func, first_param, -1, 0)
276                check_stat(uid, gid)
277        # test illegal types
278        for t in str, float:
279            self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
280            check_stat(uid, gid)
281            self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
282            check_stat(uid, gid)
283
284    @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
285    def test_chown(self):
286        # raise an OSError if the file does not exist
287        os.unlink(test_support.TESTFN)
288        self.assertRaises(OSError, posix.chown, test_support.TESTFN, -1, -1)
289
290        # re-create the file
291        open(test_support.TESTFN, 'w').close()
292        self._test_all_chown_common(posix.chown, test_support.TESTFN,
293                                    getattr(posix, 'stat', None))
294
295    @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
296    def test_fchown(self):
297        os.unlink(test_support.TESTFN)
298
299        # re-create the file
300        test_file = open(test_support.TESTFN, 'w')
301        try:
302            fd = test_file.fileno()
303            self._test_all_chown_common(posix.fchown, fd,
304                                        getattr(posix, 'fstat', None))
305        finally:
306            test_file.close()
307
308    @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
309    def test_lchown(self):
310        os.unlink(test_support.TESTFN)
311        # create a symlink
312        os.symlink(_DUMMY_SYMLINK, test_support.TESTFN)
313        self._test_all_chown_common(posix.lchown, test_support.TESTFN,
314                                    getattr(posix, 'lstat', None))
315
316    def test_chdir(self):
317        if hasattr(posix, 'chdir'):
318            posix.chdir(os.curdir)
319            self.assertRaises(OSError, posix.chdir, test_support.TESTFN)
320
321    def test_lsdir(self):
322        if hasattr(posix, 'lsdir'):
323            self.assertIn(test_support.TESTFN, posix.lsdir(os.curdir))
324
325    def test_access(self):
326        if hasattr(posix, 'access'):
327            self.assertTrue(posix.access(test_support.TESTFN, os.R_OK))
328
329    def test_umask(self):
330        if hasattr(posix, 'umask'):
331            old_mask = posix.umask(0)
332            self.assertIsInstance(old_mask, int)
333            posix.umask(old_mask)
334
335    def test_strerror(self):
336        if hasattr(posix, 'strerror'):
337            self.assertTrue(posix.strerror(0))
338
339    def test_pipe(self):
340        if hasattr(posix, 'pipe'):
341            reader, writer = posix.pipe()
342            os.close(reader)
343            os.close(writer)
344
345    def test_tempnam(self):
346        if hasattr(posix, 'tempnam'):
347            with warnings.catch_warnings():
348                warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
349                self.assertTrue(posix.tempnam())
350                self.assertTrue(posix.tempnam(os.curdir))
351                self.assertTrue(posix.tempnam(os.curdir, 'blah'))
352
353    def test_tmpfile(self):
354        if hasattr(posix, 'tmpfile'):
355            with warnings.catch_warnings():
356                warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
357                fp = posix.tmpfile()
358                fp.close()
359
360    def test_utime(self):
361        if hasattr(posix, 'utime'):
362            now = time.time()
363            posix.utime(test_support.TESTFN, None)
364            self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None))
365            self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None))
366            self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now))
367            posix.utime(test_support.TESTFN, (int(now), int(now)))
368            posix.utime(test_support.TESTFN, (now, now))
369
370    def _test_chflags_regular_file(self, chflags_func, target_file):
371        st = os.stat(target_file)
372        self.assertTrue(hasattr(st, 'st_flags'))
373
374        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
375        try:
376            chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
377        except OSError as err:
378            if err.errno != errno.EOPNOTSUPP:
379                raise
380            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
381            self.skipTest(msg)
382
383        try:
384            new_st = os.stat(target_file)
385            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
386            try:
387                fd = open(target_file, 'w+')
388            except IOError as e:
389                self.assertEqual(e.errno, errno.EPERM)
390        finally:
391            posix.chflags(target_file, st.st_flags)
392
393    @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
394    def test_chflags(self):
395        self._test_chflags_regular_file(posix.chflags, test_support.TESTFN)
396
397    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
398    def test_lchflags_regular_file(self):
399        self._test_chflags_regular_file(posix.lchflags, test_support.TESTFN)
400
401    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
402    def test_lchflags_symlink(self):
403        testfn_st = os.stat(test_support.TESTFN)
404
405        self.assertTrue(hasattr(testfn_st, 'st_flags'))
406
407        os.symlink(test_support.TESTFN, _DUMMY_SYMLINK)
408        self.teardown_files.append(_DUMMY_SYMLINK)
409        dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
410
411        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
412        try:
413            posix.lchflags(_DUMMY_SYMLINK,
414                           dummy_symlink_st.st_flags | stat.UF_IMMUTABLE)
415        except OSError as err:
416            if err.errno != errno.EOPNOTSUPP:
417                raise
418            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
419            self.skipTest(msg)
420
421        try:
422            new_testfn_st = os.stat(test_support.TESTFN)
423            new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
424
425            self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
426            self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
427                             new_dummy_symlink_st.st_flags)
428        finally:
429            posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
430
431    def test_getcwd_long_pathnames(self):
432        if hasattr(posix, 'getcwd'):
433            dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
434            curdir = os.getcwd()
435            base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
436
437            try:
438                os.mkdir(base_path)
439                os.chdir(base_path)
440            except:
441#               Just returning nothing instead of the SkipTest exception,
442#               because the test results in Error in that case.
443#               Is that ok?
444#                raise unittest.SkipTest, "cannot create directory for testing"
445                return
446
447            try:
448                def _create_and_do_getcwd(dirname, current_path_length = 0):
449                    try:
450                        os.mkdir(dirname)
451                    except:
452                        raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test"
453
454                    os.chdir(dirname)
455                    try:
456                        os.getcwd()
457                        if current_path_length < 4099:
458                            _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
459                    except OSError as e:
460                        expected_errno = errno.ENAMETOOLONG
461                        # The following platforms have quirky getcwd()
462                        # behaviour -- see issue 9185 and 15765 for
463                        # more information.
464                        quirky_platform = (
465                            'sunos' in sys.platform or
466                            'netbsd' in sys.platform or
467                            'openbsd' in sys.platform
468                        )
469                        if quirky_platform:
470                            expected_errno = errno.ERANGE
471                        self.assertEqual(e.errno, expected_errno)
472                    finally:
473                        os.chdir('..')
474                        os.rmdir(dirname)
475
476                _create_and_do_getcwd(dirname)
477
478            finally:
479                os.chdir(curdir)
480                shutil.rmtree(base_path)
481
482    @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
483    def test_getgroups(self):
484        with os.popen('id -G') as idg:
485            groups = idg.read().strip()
486            ret = idg.close()
487
488        if ret != None or not groups:
489            raise unittest.SkipTest("need working 'id -G'")
490
491        # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
492        if sys.platform == 'darwin':
493            import sysconfig
494            dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0'
495            if float(dt) < 10.6:
496                raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
497
498        # 'id -G' and 'os.getgroups()' should return the same
499        # groups, ignoring order and duplicates.
500        # #10822 - it is implementation defined whether posix.getgroups()
501        # includes the effective gid so we include it anyway, since id -G does
502        self.assertEqual(
503                set([int(x) for x in groups.split()]),
504                set(posix.getgroups() + [posix.getegid()]))
505
506class PosixGroupsTester(unittest.TestCase):
507
508    def setUp(self):
509        if posix.getuid() != 0:
510            raise unittest.SkipTest("not enough privileges")
511        if not hasattr(posix, 'getgroups'):
512            raise unittest.SkipTest("need posix.getgroups")
513        if sys.platform == 'darwin':
514            raise unittest.SkipTest("getgroups(2) is broken on OSX")
515        self.saved_groups = posix.getgroups()
516
517    def tearDown(self):
518        if hasattr(posix, 'setgroups'):
519            posix.setgroups(self.saved_groups)
520        elif hasattr(posix, 'initgroups'):
521            name = pwd.getpwuid(posix.getuid()).pw_name
522            posix.initgroups(name, self.saved_groups[0])
523
524    @unittest.skipUnless(hasattr(posix, 'initgroups'),
525                         "test needs posix.initgroups()")
526    def test_initgroups(self):
527        # find missing group
528
529        g = max(self.saved_groups) + 1
530        name = pwd.getpwuid(posix.getuid()).pw_name
531        posix.initgroups(name, g)
532        self.assertIn(g, posix.getgroups())
533
534    @unittest.skipUnless(hasattr(posix, 'setgroups'),
535                         "test needs posix.setgroups()")
536    def test_setgroups(self):
537        for groups in [[0], range(16)]:
538            posix.setgroups(groups)
539            self.assertListEqual(groups, posix.getgroups())
540
541
542def test_main():
543    test_support.run_unittest(PosixTester, PosixGroupsTester)
544
545if __name__ == '__main__':
546    test_main()
547