1"Test posix functions" 2 3from test import test_support 4 5# Skip these tests if there is no posix module. 6posix = test_support.import_module('posix') 7 8import errno 9import sys 10import time 11import os 12import platform 13import pwd 14import shutil 15import stat 16import sys 17import tempfile 18import unittest 19import warnings 20 21_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 22 test_support.TESTFN + '-dummy-symlink') 23 24warnings.filterwarnings('ignore', '.* potential security risk .*', 25 RuntimeWarning) 26 27class PosixTester(unittest.TestCase): 28 29 def setUp(self): 30 # create empty file 31 fp = open(test_support.TESTFN, 'w+') 32 fp.close() 33 self.teardown_files = [ test_support.TESTFN ] 34 35 def tearDown(self): 36 for teardown_file in self.teardown_files: 37 os.unlink(teardown_file) 38 39 def testNoArgFunctions(self): 40 # test posix functions which take no arguments and have 41 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 42 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdu", "uname", 43 "times", "getloadavg", "tmpnam", 44 "getegid", "geteuid", "getgid", "getgroups", 45 "getpid", "getpgrp", "getppid", "getuid", 46 ] 47 48 with warnings.catch_warnings(): 49 warnings.filterwarnings("ignore", "", DeprecationWarning) 50 for name in NO_ARG_FUNCTIONS: 51 posix_func = getattr(posix, name, None) 52 if posix_func is not None: 53 posix_func() 54 self.assertRaises(TypeError, posix_func, 1) 55 56 if hasattr(posix, 'getresuid'): 57 def test_getresuid(self): 58 user_ids = posix.getresuid() 59 self.assertEqual(len(user_ids), 3) 60 for val in user_ids: 61 self.assertGreaterEqual(val, 0) 62 63 if hasattr(posix, 'getresgid'): 64 def test_getresgid(self): 65 group_ids = posix.getresgid() 66 self.assertEqual(len(group_ids), 3) 67 for val in group_ids: 68 self.assertGreaterEqual(val, 0) 69 70 if hasattr(posix, 'setresuid'): 71 def test_setresuid(self): 72 current_user_ids = posix.getresuid() 73 self.assertIsNone(posix.setresuid(*current_user_ids)) 74 # -1 means don't change that value. 75 self.assertIsNone(posix.setresuid(-1, -1, -1)) 76 77 def test_setresuid_exception(self): 78 # Don't do this test if someone is silly enough to run us as root. 79 current_user_ids = posix.getresuid() 80 if 0 not in current_user_ids: 81 new_user_ids = (current_user_ids[0]+1, -1, -1) 82 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 83 84 if hasattr(posix, 'setresgid'): 85 def test_setresgid(self): 86 current_group_ids = posix.getresgid() 87 self.assertIsNone(posix.setresgid(*current_group_ids)) 88 # -1 means don't change that value. 89 self.assertIsNone(posix.setresgid(-1, -1, -1)) 90 91 def test_setresgid_exception(self): 92 # Don't do this test if someone is silly enough to run us as root. 93 current_group_ids = posix.getresgid() 94 if 0 not in current_group_ids: 95 new_group_ids = (current_group_ids[0]+1, -1, -1) 96 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 97 98 @unittest.skipUnless(hasattr(posix, 'initgroups'), 99 "test needs os.initgroups()") 100 def test_initgroups(self): 101 # It takes a string and an integer; check that it raises a TypeError 102 # for other argument lists. 103 self.assertRaises(TypeError, posix.initgroups) 104 self.assertRaises(TypeError, posix.initgroups, None) 105 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 106 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 107 108 # If a non-privileged user invokes it, it should fail with OSError 109 # EPERM. 110 if os.getuid() != 0: 111 try: 112 name = pwd.getpwuid(posix.getuid()).pw_name 113 except KeyError: 114 # the current UID may not have a pwd entry 115 raise unittest.SkipTest("need a pwd entry") 116 try: 117 posix.initgroups(name, 13) 118 except OSError as e: 119 self.assertEqual(e.errno, errno.EPERM) 120 else: 121 self.fail("Expected OSError to be raised by initgroups") 122 123 def test_statvfs(self): 124 if hasattr(posix, 'statvfs'): 125 self.assertTrue(posix.statvfs(os.curdir)) 126 127 def test_fstatvfs(self): 128 if hasattr(posix, 'fstatvfs'): 129 fp = open(test_support.TESTFN) 130 try: 131 self.assertTrue(posix.fstatvfs(fp.fileno())) 132 finally: 133 fp.close() 134 135 def test_ftruncate(self): 136 if hasattr(posix, 'ftruncate'): 137 fp = open(test_support.TESTFN, 'w+') 138 try: 139 # we need to have some data to truncate 140 fp.write('test') 141 fp.flush() 142 posix.ftruncate(fp.fileno(), 0) 143 finally: 144 fp.close() 145 146 def test_dup(self): 147 if hasattr(posix, 'dup'): 148 fp = open(test_support.TESTFN) 149 try: 150 fd = posix.dup(fp.fileno()) 151 self.assertIsInstance(fd, int) 152 os.close(fd) 153 finally: 154 fp.close() 155 156 def test_confstr(self): 157 if hasattr(posix, 'confstr'): 158 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 159 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 160 161 def test_dup2(self): 162 if hasattr(posix, 'dup2'): 163 fp1 = open(test_support.TESTFN) 164 fp2 = open(test_support.TESTFN) 165 try: 166 posix.dup2(fp1.fileno(), fp2.fileno()) 167 finally: 168 fp1.close() 169 fp2.close() 170 171 def fdopen_helper(self, *args): 172 fd = os.open(test_support.TESTFN, os.O_RDONLY) 173 fp2 = posix.fdopen(fd, *args) 174 fp2.close() 175 176 def test_fdopen(self): 177 if hasattr(posix, 'fdopen'): 178 self.fdopen_helper() 179 self.fdopen_helper('r') 180 self.fdopen_helper('r', 100) 181 182 def test_osexlock(self): 183 if hasattr(posix, "O_EXLOCK"): 184 fd = os.open(test_support.TESTFN, 185 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 186 self.assertRaises(OSError, os.open, test_support.TESTFN, 187 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 188 os.close(fd) 189 190 if hasattr(posix, "O_SHLOCK"): 191 fd = os.open(test_support.TESTFN, 192 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 193 self.assertRaises(OSError, os.open, test_support.TESTFN, 194 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 195 os.close(fd) 196 197 def test_osshlock(self): 198 if hasattr(posix, "O_SHLOCK"): 199 fd1 = os.open(test_support.TESTFN, 200 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 201 fd2 = os.open(test_support.TESTFN, 202 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 203 os.close(fd2) 204 os.close(fd1) 205 206 if hasattr(posix, "O_EXLOCK"): 207 fd = os.open(test_support.TESTFN, 208 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 209 self.assertRaises(OSError, os.open, test_support.TESTFN, 210 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 211 os.close(fd) 212 213 def test_fstat(self): 214 if hasattr(posix, 'fstat'): 215 fp = open(test_support.TESTFN) 216 try: 217 self.assertTrue(posix.fstat(fp.fileno())) 218 finally: 219 fp.close() 220 221 def test_stat(self): 222 if hasattr(posix, 'stat'): 223 self.assertTrue(posix.stat(test_support.TESTFN)) 224 225 def _test_all_chown_common(self, chown_func, first_param, stat_func): 226 """Common code for chown, fchown and lchown tests.""" 227 def check_stat(uid, gid): 228 if stat_func is not None: 229 stat = stat_func(first_param) 230 self.assertEqual(stat.st_uid, uid) 231 self.assertEqual(stat.st_gid, gid) 232 uid = os.getuid() 233 gid = os.getgid() 234 # test a successful chown call 235 chown_func(first_param, uid, gid) 236 check_stat(uid, gid) 237 chown_func(first_param, -1, gid) 238 check_stat(uid, gid) 239 chown_func(first_param, uid, -1) 240 check_stat(uid, gid) 241 242 if uid == 0: 243 # Try an amusingly large uid/gid to make sure we handle 244 # large unsigned values. (chown lets you use any 245 # uid/gid you like, even if they aren't defined.) 246 # 247 # This problem keeps coming up: 248 # http://bugs.python.org/issue1747858 249 # http://bugs.python.org/issue4591 250 # http://bugs.python.org/issue15301 251 # Hopefully the fix in 4591 fixes it for good! 252 # 253 # This part of the test only runs when run as root. 254 # Only scary people run their tests as root. 255 256 big_value = 2**31 257 chown_func(first_param, big_value, big_value) 258 check_stat(big_value, big_value) 259 chown_func(first_param, -1, -1) 260 check_stat(big_value, big_value) 261 chown_func(first_param, uid, gid) 262 check_stat(uid, gid) 263 elif platform.system() in ('HP-UX', 'SunOS'): 264 # HP-UX and Solaris can allow a non-root user to chown() to root 265 # (issue #5113) 266 raise unittest.SkipTest("Skipping because of non-standard chown() " 267 "behavior") 268 else: 269 # non-root cannot chown to root, raises OSError 270 self.assertRaises(OSError, chown_func, first_param, 0, 0) 271 check_stat(uid, gid) 272 self.assertRaises(OSError, chown_func, first_param, 0, -1) 273 check_stat(uid, gid) 274 if 0 not in os.getgroups(): 275 self.assertRaises(OSError, chown_func, first_param, -1, 0) 276 check_stat(uid, gid) 277 # test illegal types 278 for t in str, float: 279 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 280 check_stat(uid, gid) 281 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 282 check_stat(uid, gid) 283 284 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") 285 def test_chown(self): 286 # raise an OSError if the file does not exist 287 os.unlink(test_support.TESTFN) 288 self.assertRaises(OSError, posix.chown, test_support.TESTFN, -1, -1) 289 290 # re-create the file 291 open(test_support.TESTFN, 'w').close() 292 self._test_all_chown_common(posix.chown, test_support.TESTFN, 293 getattr(posix, 'stat', None)) 294 295 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 296 def test_fchown(self): 297 os.unlink(test_support.TESTFN) 298 299 # re-create the file 300 test_file = open(test_support.TESTFN, 'w') 301 try: 302 fd = test_file.fileno() 303 self._test_all_chown_common(posix.fchown, fd, 304 getattr(posix, 'fstat', None)) 305 finally: 306 test_file.close() 307 308 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 309 def test_lchown(self): 310 os.unlink(test_support.TESTFN) 311 # create a symlink 312 os.symlink(_DUMMY_SYMLINK, test_support.TESTFN) 313 self._test_all_chown_common(posix.lchown, test_support.TESTFN, 314 getattr(posix, 'lstat', None)) 315 316 def test_chdir(self): 317 if hasattr(posix, 'chdir'): 318 posix.chdir(os.curdir) 319 self.assertRaises(OSError, posix.chdir, test_support.TESTFN) 320 321 def test_lsdir(self): 322 if hasattr(posix, 'lsdir'): 323 self.assertIn(test_support.TESTFN, posix.lsdir(os.curdir)) 324 325 def test_access(self): 326 if hasattr(posix, 'access'): 327 self.assertTrue(posix.access(test_support.TESTFN, os.R_OK)) 328 329 def test_umask(self): 330 if hasattr(posix, 'umask'): 331 old_mask = posix.umask(0) 332 self.assertIsInstance(old_mask, int) 333 posix.umask(old_mask) 334 335 def test_strerror(self): 336 if hasattr(posix, 'strerror'): 337 self.assertTrue(posix.strerror(0)) 338 339 def test_pipe(self): 340 if hasattr(posix, 'pipe'): 341 reader, writer = posix.pipe() 342 os.close(reader) 343 os.close(writer) 344 345 def test_tempnam(self): 346 if hasattr(posix, 'tempnam'): 347 with warnings.catch_warnings(): 348 warnings.filterwarnings("ignore", "tempnam", DeprecationWarning) 349 self.assertTrue(posix.tempnam()) 350 self.assertTrue(posix.tempnam(os.curdir)) 351 self.assertTrue(posix.tempnam(os.curdir, 'blah')) 352 353 def test_tmpfile(self): 354 if hasattr(posix, 'tmpfile'): 355 with warnings.catch_warnings(): 356 warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning) 357 fp = posix.tmpfile() 358 fp.close() 359 360 def test_utime(self): 361 if hasattr(posix, 'utime'): 362 now = time.time() 363 posix.utime(test_support.TESTFN, None) 364 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None)) 365 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None)) 366 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now)) 367 posix.utime(test_support.TESTFN, (int(now), int(now))) 368 posix.utime(test_support.TESTFN, (now, now)) 369 370 def _test_chflags_regular_file(self, chflags_func, target_file): 371 st = os.stat(target_file) 372 self.assertTrue(hasattr(st, 'st_flags')) 373 374 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 375 try: 376 chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE) 377 except OSError as err: 378 if err.errno != errno.EOPNOTSUPP: 379 raise 380 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 381 self.skipTest(msg) 382 383 try: 384 new_st = os.stat(target_file) 385 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 386 try: 387 fd = open(target_file, 'w+') 388 except IOError as e: 389 self.assertEqual(e.errno, errno.EPERM) 390 finally: 391 posix.chflags(target_file, st.st_flags) 392 393 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 394 def test_chflags(self): 395 self._test_chflags_regular_file(posix.chflags, test_support.TESTFN) 396 397 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 398 def test_lchflags_regular_file(self): 399 self._test_chflags_regular_file(posix.lchflags, test_support.TESTFN) 400 401 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 402 def test_lchflags_symlink(self): 403 testfn_st = os.stat(test_support.TESTFN) 404 405 self.assertTrue(hasattr(testfn_st, 'st_flags')) 406 407 os.symlink(test_support.TESTFN, _DUMMY_SYMLINK) 408 self.teardown_files.append(_DUMMY_SYMLINK) 409 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 410 411 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 412 try: 413 posix.lchflags(_DUMMY_SYMLINK, 414 dummy_symlink_st.st_flags | stat.UF_IMMUTABLE) 415 except OSError as err: 416 if err.errno != errno.EOPNOTSUPP: 417 raise 418 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 419 self.skipTest(msg) 420 421 try: 422 new_testfn_st = os.stat(test_support.TESTFN) 423 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 424 425 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 426 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 427 new_dummy_symlink_st.st_flags) 428 finally: 429 posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 430 431 def test_getcwd_long_pathnames(self): 432 if hasattr(posix, 'getcwd'): 433 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 434 curdir = os.getcwd() 435 base_path = os.path.abspath(test_support.TESTFN) + '.getcwd' 436 437 try: 438 os.mkdir(base_path) 439 os.chdir(base_path) 440 except: 441# Just returning nothing instead of the SkipTest exception, 442# because the test results in Error in that case. 443# Is that ok? 444# raise unittest.SkipTest, "cannot create directory for testing" 445 return 446 447 try: 448 def _create_and_do_getcwd(dirname, current_path_length = 0): 449 try: 450 os.mkdir(dirname) 451 except: 452 raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test" 453 454 os.chdir(dirname) 455 try: 456 os.getcwd() 457 if current_path_length < 4099: 458 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 459 except OSError as e: 460 expected_errno = errno.ENAMETOOLONG 461 # The following platforms have quirky getcwd() 462 # behaviour -- see issue 9185 and 15765 for 463 # more information. 464 quirky_platform = ( 465 'sunos' in sys.platform or 466 'netbsd' in sys.platform or 467 'openbsd' in sys.platform 468 ) 469 if quirky_platform: 470 expected_errno = errno.ERANGE 471 self.assertEqual(e.errno, expected_errno) 472 finally: 473 os.chdir('..') 474 os.rmdir(dirname) 475 476 _create_and_do_getcwd(dirname) 477 478 finally: 479 os.chdir(curdir) 480 shutil.rmtree(base_path) 481 482 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 483 def test_getgroups(self): 484 with os.popen('id -G') as idg: 485 groups = idg.read().strip() 486 ret = idg.close() 487 488 if ret != None or not groups: 489 raise unittest.SkipTest("need working 'id -G'") 490 491 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 492 if sys.platform == 'darwin': 493 import sysconfig 494 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0' 495 if float(dt) < 10.6: 496 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 497 498 # 'id -G' and 'os.getgroups()' should return the same 499 # groups, ignoring order and duplicates. 500 # #10822 - it is implementation defined whether posix.getgroups() 501 # includes the effective gid so we include it anyway, since id -G does 502 self.assertEqual( 503 set([int(x) for x in groups.split()]), 504 set(posix.getgroups() + [posix.getegid()])) 505 506class PosixGroupsTester(unittest.TestCase): 507 508 def setUp(self): 509 if posix.getuid() != 0: 510 raise unittest.SkipTest("not enough privileges") 511 if not hasattr(posix, 'getgroups'): 512 raise unittest.SkipTest("need posix.getgroups") 513 if sys.platform == 'darwin': 514 raise unittest.SkipTest("getgroups(2) is broken on OSX") 515 self.saved_groups = posix.getgroups() 516 517 def tearDown(self): 518 if hasattr(posix, 'setgroups'): 519 posix.setgroups(self.saved_groups) 520 elif hasattr(posix, 'initgroups'): 521 name = pwd.getpwuid(posix.getuid()).pw_name 522 posix.initgroups(name, self.saved_groups[0]) 523 524 @unittest.skipUnless(hasattr(posix, 'initgroups'), 525 "test needs posix.initgroups()") 526 def test_initgroups(self): 527 # find missing group 528 529 g = max(self.saved_groups) + 1 530 name = pwd.getpwuid(posix.getuid()).pw_name 531 posix.initgroups(name, g) 532 self.assertIn(g, posix.getgroups()) 533 534 @unittest.skipUnless(hasattr(posix, 'setgroups'), 535 "test needs posix.setgroups()") 536 def test_setgroups(self): 537 for groups in [[0], range(16)]: 538 posix.setgroups(groups) 539 self.assertListEqual(groups, posix.getgroups()) 540 541 542def test_main(): 543 test_support.run_unittest(PosixTester, PosixGroupsTester) 544 545if __name__ == '__main__': 546 test_main() 547