1# As a test suite for the os module, this is woefully inadequate, but this 2# does add tests for a few functions which have been determined to be more 3# portable than they had been thought to be. 4 5import os 6import errno 7import unittest 8import warnings 9import sys 10import signal 11import subprocess 12import time 13 14from test import test_support 15import mmap 16import uuid 17 18warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__) 19warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__) 20 21# Tests creating TESTFN 22class FileTests(unittest.TestCase): 23 def setUp(self): 24 if os.path.exists(test_support.TESTFN): 25 os.unlink(test_support.TESTFN) 26 tearDown = setUp 27 28 def test_access(self): 29 f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) 30 os.close(f) 31 self.assertTrue(os.access(test_support.TESTFN, os.W_OK)) 32 33 def test_closerange(self): 34 first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) 35 # We must allocate two consecutive file descriptors, otherwise 36 # it will mess up other file descriptors (perhaps even the three 37 # standard ones). 38 second = os.dup(first) 39 try: 40 retries = 0 41 while second != first + 1: 42 os.close(first) 43 retries += 1 44 if retries > 10: 45 # XXX test skipped 46 self.skipTest("couldn't allocate two consecutive fds") 47 first, second = second, os.dup(second) 48 finally: 49 os.close(second) 50 # close a fd that is open, and one that isn't 51 os.closerange(first, first + 2) 52 self.assertRaises(OSError, os.write, first, "a") 53 54 @test_support.cpython_only 55 def test_rename(self): 56 path = unicode(test_support.TESTFN) 57 old = sys.getrefcount(path) 58 self.assertRaises(TypeError, os.rename, path, 0) 59 new = sys.getrefcount(path) 60 self.assertEqual(old, new) 61 62 63class TemporaryFileTests(unittest.TestCase): 64 def setUp(self): 65 self.files = [] 66 os.mkdir(test_support.TESTFN) 67 68 def tearDown(self): 69 for name in self.files: 70 os.unlink(name) 71 os.rmdir(test_support.TESTFN) 72 73 def check_tempfile(self, name): 74 # make sure it doesn't already exist: 75 self.assertFalse(os.path.exists(name), 76 "file already exists for temporary file") 77 # make sure we can create the file 78 open(name, "w") 79 self.files.append(name) 80 81 def test_tempnam(self): 82 if not hasattr(os, "tempnam"): 83 return 84 with warnings.catch_warnings(): 85 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, 86 r"test_os$") 87 warnings.filterwarnings("ignore", "tempnam", DeprecationWarning) 88 self.check_tempfile(os.tempnam()) 89 90 name = os.tempnam(test_support.TESTFN) 91 self.check_tempfile(name) 92 93 name = os.tempnam(test_support.TESTFN, "pfx") 94 self.assertTrue(os.path.basename(name)[:3] == "pfx") 95 self.check_tempfile(name) 96 97 def test_tmpfile(self): 98 if not hasattr(os, "tmpfile"): 99 return 100 # As with test_tmpnam() below, the Windows implementation of tmpfile() 101 # attempts to create a file in the root directory of the current drive. 102 # On Vista and Server 2008, this test will always fail for normal users 103 # as writing to the root directory requires elevated privileges. With 104 # XP and below, the semantics of tmpfile() are the same, but the user 105 # running the test is more likely to have administrative privileges on 106 # their account already. If that's the case, then os.tmpfile() should 107 # work. In order to make this test as useful as possible, rather than 108 # trying to detect Windows versions or whether or not the user has the 109 # right permissions, just try and create a file in the root directory 110 # and see if it raises a 'Permission denied' OSError. If it does, then 111 # test that a subsequent call to os.tmpfile() raises the same error. If 112 # it doesn't, assume we're on XP or below and the user running the test 113 # has administrative privileges, and proceed with the test as normal. 114 with warnings.catch_warnings(): 115 warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning) 116 117 if sys.platform == 'win32': 118 name = '\\python_test_os_test_tmpfile.txt' 119 if os.path.exists(name): 120 os.remove(name) 121 try: 122 fp = open(name, 'w') 123 except IOError, first: 124 # open() failed, assert tmpfile() fails in the same way. 125 # Although open() raises an IOError and os.tmpfile() raises an 126 # OSError(), 'args' will be (13, 'Permission denied') in both 127 # cases. 128 try: 129 fp = os.tmpfile() 130 except OSError, second: 131 self.assertEqual(first.args, second.args) 132 else: 133 self.fail("expected os.tmpfile() to raise OSError") 134 return 135 else: 136 # open() worked, therefore, tmpfile() should work. Close our 137 # dummy file and proceed with the test as normal. 138 fp.close() 139 os.remove(name) 140 141 fp = os.tmpfile() 142 fp.write("foobar") 143 fp.seek(0,0) 144 s = fp.read() 145 fp.close() 146 self.assertTrue(s == "foobar") 147 148 def test_tmpnam(self): 149 if not hasattr(os, "tmpnam"): 150 return 151 with warnings.catch_warnings(): 152 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, 153 r"test_os$") 154 warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning) 155 156 name = os.tmpnam() 157 if sys.platform in ("win32",): 158 # The Windows tmpnam() seems useless. From the MS docs: 159 # 160 # The character string that tmpnam creates consists of 161 # the path prefix, defined by the entry P_tmpdir in the 162 # file STDIO.H, followed by a sequence consisting of the 163 # digit characters '0' through '9'; the numerical value 164 # of this string is in the range 1 - 65,535. Changing the 165 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not 166 # change the operation of tmpnam. 167 # 168 # The really bizarre part is that, at least under MSVC6, 169 # P_tmpdir is "\\". That is, the path returned refers to 170 # the root of the current drive. That's a terrible place to 171 # put temp files, and, depending on privileges, the user 172 # may not even be able to open a file in the root directory. 173 self.assertFalse(os.path.exists(name), 174 "file already exists for temporary file") 175 else: 176 self.check_tempfile(name) 177 178# Test attributes on return values from os.*stat* family. 179class StatAttributeTests(unittest.TestCase): 180 def setUp(self): 181 os.mkdir(test_support.TESTFN) 182 self.fname = os.path.join(test_support.TESTFN, "f1") 183 f = open(self.fname, 'wb') 184 f.write("ABC") 185 f.close() 186 187 def tearDown(self): 188 os.unlink(self.fname) 189 os.rmdir(test_support.TESTFN) 190 191 def test_stat_attributes(self): 192 if not hasattr(os, "stat"): 193 return 194 195 import stat 196 result = os.stat(self.fname) 197 198 # Make sure direct access works 199 self.assertEqual(result[stat.ST_SIZE], 3) 200 self.assertEqual(result.st_size, 3) 201 202 # Make sure all the attributes are there 203 members = dir(result) 204 for name in dir(stat): 205 if name[:3] == 'ST_': 206 attr = name.lower() 207 if name.endswith("TIME"): 208 def trunc(x): return int(x) 209 else: 210 def trunc(x): return x 211 self.assertEqual(trunc(getattr(result, attr)), 212 result[getattr(stat, name)]) 213 self.assertIn(attr, members) 214 215 try: 216 result[200] 217 self.fail("No exception raised") 218 except IndexError: 219 pass 220 221 # Make sure that assignment fails 222 try: 223 result.st_mode = 1 224 self.fail("No exception raised") 225 except (AttributeError, TypeError): 226 pass 227 228 try: 229 result.st_rdev = 1 230 self.fail("No exception raised") 231 except (AttributeError, TypeError): 232 pass 233 234 try: 235 result.parrot = 1 236 self.fail("No exception raised") 237 except AttributeError: 238 pass 239 240 # Use the stat_result constructor with a too-short tuple. 241 try: 242 result2 = os.stat_result((10,)) 243 self.fail("No exception raised") 244 except TypeError: 245 pass 246 247 # Use the constructor with a too-long tuple. 248 try: 249 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 250 except TypeError: 251 pass 252 253 254 def test_statvfs_attributes(self): 255 if not hasattr(os, "statvfs"): 256 return 257 258 try: 259 result = os.statvfs(self.fname) 260 except OSError, e: 261 # On AtheOS, glibc always returns ENOSYS 262 if e.errno == errno.ENOSYS: 263 return 264 265 # Make sure direct access works 266 self.assertEqual(result.f_bfree, result[3]) 267 268 # Make sure all the attributes are there. 269 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 270 'ffree', 'favail', 'flag', 'namemax') 271 for value, member in enumerate(members): 272 self.assertEqual(getattr(result, 'f_' + member), result[value]) 273 274 # Make sure that assignment really fails 275 try: 276 result.f_bfree = 1 277 self.fail("No exception raised") 278 except TypeError: 279 pass 280 281 try: 282 result.parrot = 1 283 self.fail("No exception raised") 284 except AttributeError: 285 pass 286 287 # Use the constructor with a too-short tuple. 288 try: 289 result2 = os.statvfs_result((10,)) 290 self.fail("No exception raised") 291 except TypeError: 292 pass 293 294 # Use the constructor with a too-long tuple. 295 try: 296 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 297 except TypeError: 298 pass 299 300 def test_utime_dir(self): 301 delta = 1000000 302 st = os.stat(test_support.TESTFN) 303 # round to int, because some systems may support sub-second 304 # time stamps in stat, but not in utime. 305 os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) 306 st2 = os.stat(test_support.TESTFN) 307 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta)) 308 309 # Restrict test to Win32, since there is no guarantee other 310 # systems support centiseconds 311 if sys.platform == 'win32': 312 def get_file_system(path): 313 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 314 import ctypes 315 kernel32 = ctypes.windll.kernel32 316 buf = ctypes.create_string_buffer("", 100) 317 if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)): 318 return buf.value 319 320 if get_file_system(test_support.TESTFN) == "NTFS": 321 def test_1565150(self): 322 t1 = 1159195039.25 323 os.utime(self.fname, (t1, t1)) 324 self.assertEqual(os.stat(self.fname).st_mtime, t1) 325 326 def test_large_time(self): 327 t1 = 5000000000 # some day in 2128 328 try: 329 #Note fail if time_t is 32 bit 330 os.utime(self.fname, (t1, t1)) 331 except OverflowError: 332 self.skipTest("requires at least 64-bit time_t") 333 self.assertEqual(os.stat(self.fname).st_mtime, t1) 334 335 def test_1686475(self): 336 # Verify that an open file can be stat'ed 337 try: 338 os.stat(r"c:\pagefile.sys") 339 except WindowsError, e: 340 if e.errno == 2: # file does not exist; cannot run test 341 return 342 self.fail("Could not stat pagefile.sys") 343 344from test import mapping_tests 345 346class EnvironTests(mapping_tests.BasicTestMappingProtocol): 347 """check that os.environ object conform to mapping protocol""" 348 type2test = None 349 def _reference(self): 350 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 351 def _empty_mapping(self): 352 os.environ.clear() 353 return os.environ 354 def setUp(self): 355 self.__save = dict(os.environ) 356 os.environ.clear() 357 def tearDown(self): 358 os.environ.clear() 359 os.environ.update(self.__save) 360 361 # Bug 1110478 362 def test_update2(self): 363 if os.path.exists("/bin/sh"): 364 os.environ.update(HELLO="World") 365 with os.popen("/bin/sh -c 'echo $HELLO'") as popen: 366 value = popen.read().strip() 367 self.assertEqual(value, "World") 368 369 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue 370 # #13415). 371 @unittest.skipIf(sys.platform.startswith(('freebsd', 'darwin')), 372 "due to known OS bug: see issue #13415") 373 def test_unset_error(self): 374 if sys.platform == "win32": 375 # an environment variable is limited to 32,767 characters 376 key = 'x' * 50000 377 self.assertRaises(ValueError, os.environ.__delitem__, key) 378 else: 379 # "=" is not allowed in a variable name 380 key = 'key=' 381 self.assertRaises(OSError, os.environ.__delitem__, key) 382 383class WalkTests(unittest.TestCase): 384 """Tests for os.walk().""" 385 386 def test_traversal(self): 387 import os 388 from os.path import join 389 390 # Build: 391 # TESTFN/ 392 # TEST1/ a file kid and two directory kids 393 # tmp1 394 # SUB1/ a file kid and a directory kid 395 # tmp2 396 # SUB11/ no kids 397 # SUB2/ a file kid and a dirsymlink kid 398 # tmp3 399 # link/ a symlink to TESTFN.2 400 # TEST2/ 401 # tmp4 a lone file 402 walk_path = join(test_support.TESTFN, "TEST1") 403 sub1_path = join(walk_path, "SUB1") 404 sub11_path = join(sub1_path, "SUB11") 405 sub2_path = join(walk_path, "SUB2") 406 tmp1_path = join(walk_path, "tmp1") 407 tmp2_path = join(sub1_path, "tmp2") 408 tmp3_path = join(sub2_path, "tmp3") 409 link_path = join(sub2_path, "link") 410 t2_path = join(test_support.TESTFN, "TEST2") 411 tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4") 412 413 # Create stuff. 414 os.makedirs(sub11_path) 415 os.makedirs(sub2_path) 416 os.makedirs(t2_path) 417 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: 418 f = file(path, "w") 419 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 420 f.close() 421 if hasattr(os, "symlink"): 422 os.symlink(os.path.abspath(t2_path), link_path) 423 sub2_tree = (sub2_path, ["link"], ["tmp3"]) 424 else: 425 sub2_tree = (sub2_path, [], ["tmp3"]) 426 427 # Walk top-down. 428 all = list(os.walk(walk_path)) 429 self.assertEqual(len(all), 4) 430 # We can't know which order SUB1 and SUB2 will appear in. 431 # Not flipped: TESTFN, SUB1, SUB11, SUB2 432 # flipped: TESTFN, SUB2, SUB1, SUB11 433 flipped = all[0][1][0] != "SUB1" 434 all[0][1].sort() 435 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) 436 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"])) 437 self.assertEqual(all[2 + flipped], (sub11_path, [], [])) 438 self.assertEqual(all[3 - 2 * flipped], sub2_tree) 439 440 # Prune the search. 441 all = [] 442 for root, dirs, files in os.walk(walk_path): 443 all.append((root, dirs, files)) 444 # Don't descend into SUB1. 445 if 'SUB1' in dirs: 446 # Note that this also mutates the dirs we appended to all! 447 dirs.remove('SUB1') 448 self.assertEqual(len(all), 2) 449 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"])) 450 self.assertEqual(all[1], sub2_tree) 451 452 # Walk bottom-up. 453 all = list(os.walk(walk_path, topdown=False)) 454 self.assertEqual(len(all), 4) 455 # We can't know which order SUB1 and SUB2 will appear in. 456 # Not flipped: SUB11, SUB1, SUB2, TESTFN 457 # flipped: SUB2, SUB11, SUB1, TESTFN 458 flipped = all[3][1][0] != "SUB1" 459 all[3][1].sort() 460 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) 461 self.assertEqual(all[flipped], (sub11_path, [], [])) 462 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"])) 463 self.assertEqual(all[2 - 2 * flipped], sub2_tree) 464 465 if hasattr(os, "symlink"): 466 # Walk, following symlinks. 467 for root, dirs, files in os.walk(walk_path, followlinks=True): 468 if root == link_path: 469 self.assertEqual(dirs, []) 470 self.assertEqual(files, ["tmp4"]) 471 break 472 else: 473 self.fail("Didn't follow symlink with followlinks=True") 474 475 def tearDown(self): 476 # Tear everything down. This is a decent use for bottom-up on 477 # Windows, which doesn't have a recursive delete command. The 478 # (not so) subtlety is that rmdir will fail unless the dir's 479 # kids are removed first, so bottom up is essential. 480 for root, dirs, files in os.walk(test_support.TESTFN, topdown=False): 481 for name in files: 482 os.remove(os.path.join(root, name)) 483 for name in dirs: 484 dirname = os.path.join(root, name) 485 if not os.path.islink(dirname): 486 os.rmdir(dirname) 487 else: 488 os.remove(dirname) 489 os.rmdir(test_support.TESTFN) 490 491class MakedirTests (unittest.TestCase): 492 def setUp(self): 493 os.mkdir(test_support.TESTFN) 494 495 def test_makedir(self): 496 base = test_support.TESTFN 497 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 498 os.makedirs(path) # Should work 499 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 500 os.makedirs(path) 501 502 # Try paths with a '.' in them 503 self.assertRaises(OSError, os.makedirs, os.curdir) 504 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 505 os.makedirs(path) 506 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 507 'dir5', 'dir6') 508 os.makedirs(path) 509 510 511 512 513 def tearDown(self): 514 path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3', 515 'dir4', 'dir5', 'dir6') 516 # If the tests failed, the bottom-most directory ('../dir6') 517 # may not have been created, so we look for the outermost directory 518 # that exists. 519 while not os.path.exists(path) and path != test_support.TESTFN: 520 path = os.path.dirname(path) 521 522 os.removedirs(path) 523 524class DevNullTests (unittest.TestCase): 525 def test_devnull(self): 526 f = file(os.devnull, 'w') 527 f.write('hello') 528 f.close() 529 f = file(os.devnull, 'r') 530 self.assertEqual(f.read(), '') 531 f.close() 532 533class URandomTests (unittest.TestCase): 534 535 def test_urandom_length(self): 536 self.assertEqual(len(os.urandom(0)), 0) 537 self.assertEqual(len(os.urandom(1)), 1) 538 self.assertEqual(len(os.urandom(10)), 10) 539 self.assertEqual(len(os.urandom(100)), 100) 540 self.assertEqual(len(os.urandom(1000)), 1000) 541 542 def test_urandom_value(self): 543 data1 = os.urandom(16) 544 data2 = os.urandom(16) 545 self.assertNotEqual(data1, data2) 546 547 def get_urandom_subprocess(self, count): 548 # We need to use repr() and eval() to avoid line ending conversions 549 # under Windows. 550 code = '\n'.join(( 551 'import os, sys', 552 'data = os.urandom(%s)' % count, 553 'sys.stdout.write(repr(data))', 554 'sys.stdout.flush()', 555 'print >> sys.stderr, (len(data), data)')) 556 cmd_line = [sys.executable, '-c', code] 557 p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, 558 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 559 out, err = p.communicate() 560 self.assertEqual(p.wait(), 0, (p.wait(), err)) 561 out = eval(out) 562 self.assertEqual(len(out), count, err) 563 return out 564 565 def test_urandom_subprocess(self): 566 data1 = self.get_urandom_subprocess(16) 567 data2 = self.get_urandom_subprocess(16) 568 self.assertNotEqual(data1, data2) 569 570 def test_execvpe_with_bad_arglist(self): 571 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 572 573class Win32ErrorTests(unittest.TestCase): 574 def test_rename(self): 575 self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak") 576 577 def test_remove(self): 578 self.assertRaises(WindowsError, os.remove, test_support.TESTFN) 579 580 def test_chdir(self): 581 self.assertRaises(WindowsError, os.chdir, test_support.TESTFN) 582 583 def test_mkdir(self): 584 f = open(test_support.TESTFN, "w") 585 try: 586 self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN) 587 finally: 588 f.close() 589 os.unlink(test_support.TESTFN) 590 591 def test_utime(self): 592 self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None) 593 594 def test_chmod(self): 595 self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0) 596 597class TestInvalidFD(unittest.TestCase): 598 singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat", 599 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 600 #singles.append("close") 601 #We omit close because it doesn'r raise an exception on some platforms 602 def get_single(f): 603 def helper(self): 604 if hasattr(os, f): 605 self.check(getattr(os, f)) 606 return helper 607 for f in singles: 608 locals()["test_"+f] = get_single(f) 609 610 def check(self, f, *args): 611 try: 612 f(test_support.make_bad_fd(), *args) 613 except OSError as e: 614 self.assertEqual(e.errno, errno.EBADF) 615 else: 616 self.fail("%r didn't raise a OSError with a bad file descriptor" 617 % f) 618 619 def test_isatty(self): 620 if hasattr(os, "isatty"): 621 self.assertEqual(os.isatty(test_support.make_bad_fd()), False) 622 623 def test_closerange(self): 624 if hasattr(os, "closerange"): 625 fd = test_support.make_bad_fd() 626 # Make sure none of the descriptors we are about to close are 627 # currently valid (issue 6542). 628 for i in range(10): 629 try: os.fstat(fd+i) 630 except OSError: 631 pass 632 else: 633 break 634 if i < 2: 635 raise unittest.SkipTest( 636 "Unable to acquire a range of invalid file descriptors") 637 self.assertEqual(os.closerange(fd, fd + i-1), None) 638 639 def test_dup2(self): 640 if hasattr(os, "dup2"): 641 self.check(os.dup2, 20) 642 643 def test_fchmod(self): 644 if hasattr(os, "fchmod"): 645 self.check(os.fchmod, 0) 646 647 def test_fchown(self): 648 if hasattr(os, "fchown"): 649 self.check(os.fchown, -1, -1) 650 651 def test_fpathconf(self): 652 if hasattr(os, "fpathconf"): 653 self.check(os.fpathconf, "PC_NAME_MAX") 654 655 def test_ftruncate(self): 656 if hasattr(os, "ftruncate"): 657 self.check(os.ftruncate, 0) 658 659 def test_lseek(self): 660 if hasattr(os, "lseek"): 661 self.check(os.lseek, 0, 0) 662 663 def test_read(self): 664 if hasattr(os, "read"): 665 self.check(os.read, 1) 666 667 def test_tcsetpgrpt(self): 668 if hasattr(os, "tcsetpgrp"): 669 self.check(os.tcsetpgrp, 0) 670 671 def test_write(self): 672 if hasattr(os, "write"): 673 self.check(os.write, " ") 674 675if sys.platform != 'win32': 676 class Win32ErrorTests(unittest.TestCase): 677 pass 678 679 class PosixUidGidTests(unittest.TestCase): 680 if hasattr(os, 'setuid'): 681 def test_setuid(self): 682 if os.getuid() != 0: 683 self.assertRaises(os.error, os.setuid, 0) 684 self.assertRaises(OverflowError, os.setuid, 1<<32) 685 686 if hasattr(os, 'setgid'): 687 def test_setgid(self): 688 if os.getuid() != 0: 689 self.assertRaises(os.error, os.setgid, 0) 690 self.assertRaises(OverflowError, os.setgid, 1<<32) 691 692 if hasattr(os, 'seteuid'): 693 def test_seteuid(self): 694 if os.getuid() != 0: 695 self.assertRaises(os.error, os.seteuid, 0) 696 self.assertRaises(OverflowError, os.seteuid, 1<<32) 697 698 if hasattr(os, 'setegid'): 699 def test_setegid(self): 700 if os.getuid() != 0: 701 self.assertRaises(os.error, os.setegid, 0) 702 self.assertRaises(OverflowError, os.setegid, 1<<32) 703 704 if hasattr(os, 'setreuid'): 705 def test_setreuid(self): 706 if os.getuid() != 0: 707 self.assertRaises(os.error, os.setreuid, 0, 0) 708 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) 709 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) 710 711 def test_setreuid_neg1(self): 712 # Needs to accept -1. We run this in a subprocess to avoid 713 # altering the test runner's process state (issue8045). 714 subprocess.check_call([ 715 sys.executable, '-c', 716 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 717 718 if hasattr(os, 'setregid'): 719 def test_setregid(self): 720 if os.getuid() != 0: 721 self.assertRaises(os.error, os.setregid, 0, 0) 722 self.assertRaises(OverflowError, os.setregid, 1<<32, 0) 723 self.assertRaises(OverflowError, os.setregid, 0, 1<<32) 724 725 def test_setregid_neg1(self): 726 # Needs to accept -1. We run this in a subprocess to avoid 727 # altering the test runner's process state (issue8045). 728 subprocess.check_call([ 729 sys.executable, '-c', 730 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 731else: 732 class PosixUidGidTests(unittest.TestCase): 733 pass 734 735@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 736class Win32KillTests(unittest.TestCase): 737 def _kill(self, sig): 738 # Start sys.executable as a subprocess and communicate from the 739 # subprocess to the parent that the interpreter is ready. When it 740 # becomes ready, send *sig* via os.kill to the subprocess and check 741 # that the return code is equal to *sig*. 742 import ctypes 743 from ctypes import wintypes 744 import msvcrt 745 746 # Since we can't access the contents of the process' stdout until the 747 # process has exited, use PeekNamedPipe to see what's inside stdout 748 # without waiting. This is done so we can tell that the interpreter 749 # is started and running at a point where it could handle a signal. 750 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 751 PeekNamedPipe.restype = wintypes.BOOL 752 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 753 ctypes.POINTER(ctypes.c_char), # stdout buf 754 wintypes.DWORD, # Buffer size 755 ctypes.POINTER(wintypes.DWORD), # bytes read 756 ctypes.POINTER(wintypes.DWORD), # bytes avail 757 ctypes.POINTER(wintypes.DWORD)) # bytes left 758 msg = "running" 759 proc = subprocess.Popen([sys.executable, "-c", 760 "import sys;" 761 "sys.stdout.write('{}');" 762 "sys.stdout.flush();" 763 "input()".format(msg)], 764 stdout=subprocess.PIPE, 765 stderr=subprocess.PIPE, 766 stdin=subprocess.PIPE) 767 self.addCleanup(proc.stdout.close) 768 self.addCleanup(proc.stderr.close) 769 self.addCleanup(proc.stdin.close) 770 771 count, max = 0, 100 772 while count < max and proc.poll() is None: 773 # Create a string buffer to store the result of stdout from the pipe 774 buf = ctypes.create_string_buffer(len(msg)) 775 # Obtain the text currently in proc.stdout 776 # Bytes read/avail/left are left as NULL and unused 777 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 778 buf, ctypes.sizeof(buf), None, None, None) 779 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 780 if buf.value: 781 self.assertEqual(msg, buf.value) 782 break 783 time.sleep(0.1) 784 count += 1 785 else: 786 self.fail("Did not receive communication from the subprocess") 787 788 os.kill(proc.pid, sig) 789 self.assertEqual(proc.wait(), sig) 790 791 def test_kill_sigterm(self): 792 # SIGTERM doesn't mean anything special, but make sure it works 793 self._kill(signal.SIGTERM) 794 795 def test_kill_int(self): 796 # os.kill on Windows can take an int which gets set as the exit code 797 self._kill(100) 798 799 def _kill_with_event(self, event, name): 800 tagname = "test_os_%s" % uuid.uuid1() 801 m = mmap.mmap(-1, 1, tagname) 802 m[0] = '0' 803 # Run a script which has console control handling enabled. 804 proc = subprocess.Popen([sys.executable, 805 os.path.join(os.path.dirname(__file__), 806 "win_console_handler.py"), tagname], 807 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 808 # Let the interpreter startup before we send signals. See #3137. 809 count, max = 0, 20 810 while count < max and proc.poll() is None: 811 if m[0] == '1': 812 break 813 time.sleep(0.5) 814 count += 1 815 else: 816 self.fail("Subprocess didn't finish initialization") 817 os.kill(proc.pid, event) 818 # proc.send_signal(event) could also be done here. 819 # Allow time for the signal to be passed and the process to exit. 820 time.sleep(0.5) 821 if not proc.poll(): 822 # Forcefully kill the process if we weren't able to signal it. 823 os.kill(proc.pid, signal.SIGINT) 824 self.fail("subprocess did not stop on {}".format(name)) 825 826 @unittest.skip("subprocesses aren't inheriting CTRL+C property") 827 def test_CTRL_C_EVENT(self): 828 from ctypes import wintypes 829 import ctypes 830 831 # Make a NULL value by creating a pointer with no argument. 832 NULL = ctypes.POINTER(ctypes.c_int)() 833 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 834 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 835 wintypes.BOOL) 836 SetConsoleCtrlHandler.restype = wintypes.BOOL 837 838 # Calling this with NULL and FALSE causes the calling process to 839 # handle CTRL+C, rather than ignore it. This property is inherited 840 # by subprocesses. 841 SetConsoleCtrlHandler(NULL, 0) 842 843 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 844 845 def test_CTRL_BREAK_EVENT(self): 846 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 847 848 849def test_main(): 850 test_support.run_unittest( 851 FileTests, 852 TemporaryFileTests, 853 StatAttributeTests, 854 EnvironTests, 855 WalkTests, 856 MakedirTests, 857 DevNullTests, 858 URandomTests, 859 Win32ErrorTests, 860 TestInvalidFD, 861 PosixUidGidTests, 862 Win32KillTests 863 ) 864 865if __name__ == "__main__": 866 test_main() 867