1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import signal 7import sys 8import re 9import warnings 10import contextlib 11import weakref 12from unittest import mock 13 14import unittest 15from test import support 16from test.support import script_helper 17 18 19if hasattr(os, 'stat'): 20 import stat 21 has_stat = 1 22else: 23 has_stat = 0 24 25has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 26has_spawnl = hasattr(os, 'spawnl') 27 28# TEST_FILES may need to be tweaked for systems depending on the maximum 29# number of files that can be opened at one time (see ulimit -n) 30if sys.platform.startswith('openbsd'): 31 TEST_FILES = 48 32else: 33 TEST_FILES = 100 34 35# This is organized as one test for each chunk of code in tempfile.py, 36# in order of their appearance in the file. Testing which requires 37# threads is not done here. 38 39class TestLowLevelInternals(unittest.TestCase): 40 def test_infer_return_type_singles(self): 41 self.assertIs(str, tempfile._infer_return_type('')) 42 self.assertIs(bytes, tempfile._infer_return_type(b'')) 43 self.assertIs(str, tempfile._infer_return_type(None)) 44 45 def test_infer_return_type_multiples(self): 46 self.assertIs(str, tempfile._infer_return_type('', '')) 47 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 48 with self.assertRaises(TypeError): 49 tempfile._infer_return_type('', b'') 50 with self.assertRaises(TypeError): 51 tempfile._infer_return_type(b'', '') 52 53 def test_infer_return_type_multiples_and_none(self): 54 self.assertIs(str, tempfile._infer_return_type(None, '')) 55 self.assertIs(str, tempfile._infer_return_type('', None)) 56 self.assertIs(str, tempfile._infer_return_type(None, None)) 57 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 58 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 59 with self.assertRaises(TypeError): 60 tempfile._infer_return_type('', None, b'') 61 with self.assertRaises(TypeError): 62 tempfile._infer_return_type(b'', None, '') 63 64 65# Common functionality. 66 67class BaseTestCase(unittest.TestCase): 68 69 str_check = re.compile(r"^[a-z0-9_-]{8}$") 70 b_check = re.compile(br"^[a-z0-9_-]{8}$") 71 72 def setUp(self): 73 self._warnings_manager = support.check_warnings() 74 self._warnings_manager.__enter__() 75 warnings.filterwarnings("ignore", category=RuntimeWarning, 76 message="mktemp", module=__name__) 77 78 def tearDown(self): 79 self._warnings_manager.__exit__(None, None, None) 80 81 82 def nameCheck(self, name, dir, pre, suf): 83 (ndir, nbase) = os.path.split(name) 84 npre = nbase[:len(pre)] 85 nsuf = nbase[len(nbase)-len(suf):] 86 87 if dir is not None: 88 self.assertIs(type(name), str if type(dir) is str else bytes, 89 "unexpected return type") 90 if pre is not None: 91 self.assertIs(type(name), str if type(pre) is str else bytes, 92 "unexpected return type") 93 if suf is not None: 94 self.assertIs(type(name), str if type(suf) is str else bytes, 95 "unexpected return type") 96 if (dir, pre, suf) == (None, None, None): 97 self.assertIs(type(name), str, "default return type must be str") 98 99 # check for equality of the absolute paths! 100 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 101 "file %r not in directory %r" % (name, dir)) 102 self.assertEqual(npre, pre, 103 "file %r does not begin with %r" % (nbase, pre)) 104 self.assertEqual(nsuf, suf, 105 "file %r does not end with %r" % (nbase, suf)) 106 107 nbase = nbase[len(pre):len(nbase)-len(suf)] 108 check = self.str_check if isinstance(nbase, str) else self.b_check 109 self.assertTrue(check.match(nbase), 110 "random characters %r do not match %r" 111 % (nbase, check.pattern)) 112 113 114class TestExports(BaseTestCase): 115 def test_exports(self): 116 # There are no surprising symbols in the tempfile module 117 dict = tempfile.__dict__ 118 119 expected = { 120 "NamedTemporaryFile" : 1, 121 "TemporaryFile" : 1, 122 "mkstemp" : 1, 123 "mkdtemp" : 1, 124 "mktemp" : 1, 125 "TMP_MAX" : 1, 126 "gettempprefix" : 1, 127 "gettempprefixb" : 1, 128 "gettempdir" : 1, 129 "gettempdirb" : 1, 130 "tempdir" : 1, 131 "template" : 1, 132 "SpooledTemporaryFile" : 1, 133 "TemporaryDirectory" : 1, 134 } 135 136 unexp = [] 137 for key in dict: 138 if key[0] != '_' and key not in expected: 139 unexp.append(key) 140 self.assertTrue(len(unexp) == 0, 141 "unexpected keys: %s" % unexp) 142 143 144class TestRandomNameSequence(BaseTestCase): 145 """Test the internal iterator object _RandomNameSequence.""" 146 147 def setUp(self): 148 self.r = tempfile._RandomNameSequence() 149 super().setUp() 150 151 def test_get_six_char_str(self): 152 # _RandomNameSequence returns a six-character string 153 s = next(self.r) 154 self.nameCheck(s, '', '', '') 155 156 def test_many(self): 157 # _RandomNameSequence returns no duplicate strings (stochastic) 158 159 dict = {} 160 r = self.r 161 for i in range(TEST_FILES): 162 s = next(r) 163 self.nameCheck(s, '', '', '') 164 self.assertNotIn(s, dict) 165 dict[s] = 1 166 167 def supports_iter(self): 168 # _RandomNameSequence supports the iterator protocol 169 170 i = 0 171 r = self.r 172 for s in r: 173 i += 1 174 if i == 20: 175 break 176 177 @unittest.skipUnless(hasattr(os, 'fork'), 178 "os.fork is required for this test") 179 def test_process_awareness(self): 180 # ensure that the random source differs between 181 # child and parent. 182 read_fd, write_fd = os.pipe() 183 pid = None 184 try: 185 pid = os.fork() 186 if not pid: 187 os.close(read_fd) 188 os.write(write_fd, next(self.r).encode("ascii")) 189 os.close(write_fd) 190 # bypass the normal exit handlers- leave those to 191 # the parent. 192 os._exit(0) 193 parent_value = next(self.r) 194 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 195 finally: 196 if pid: 197 # best effort to ensure the process can't bleed out 198 # via any bugs above 199 try: 200 os.kill(pid, signal.SIGKILL) 201 except OSError: 202 pass 203 os.close(read_fd) 204 os.close(write_fd) 205 self.assertNotEqual(child_value, parent_value) 206 207 208 209class TestCandidateTempdirList(BaseTestCase): 210 """Test the internal function _candidate_tempdir_list.""" 211 212 def test_nonempty_list(self): 213 # _candidate_tempdir_list returns a nonempty list of strings 214 215 cand = tempfile._candidate_tempdir_list() 216 217 self.assertFalse(len(cand) == 0) 218 for c in cand: 219 self.assertIsInstance(c, str) 220 221 def test_wanted_dirs(self): 222 # _candidate_tempdir_list contains the expected directories 223 224 # Make sure the interesting environment variables are all set. 225 with support.EnvironmentVarGuard() as env: 226 for envname in 'TMPDIR', 'TEMP', 'TMP': 227 dirname = os.getenv(envname) 228 if not dirname: 229 env[envname] = os.path.abspath(envname) 230 231 cand = tempfile._candidate_tempdir_list() 232 233 for envname in 'TMPDIR', 'TEMP', 'TMP': 234 dirname = os.getenv(envname) 235 if not dirname: raise ValueError 236 self.assertIn(dirname, cand) 237 238 try: 239 dirname = os.getcwd() 240 except (AttributeError, OSError): 241 dirname = os.curdir 242 243 self.assertIn(dirname, cand) 244 245 # Not practical to try to verify the presence of OS-specific 246 # paths in this list. 247 248 249# We test _get_default_tempdir some more by testing gettempdir. 250 251class TestGetDefaultTempdir(BaseTestCase): 252 """Test _get_default_tempdir().""" 253 254 def test_no_files_left_behind(self): 255 # use a private empty directory 256 with tempfile.TemporaryDirectory() as our_temp_directory: 257 # force _get_default_tempdir() to consider our empty directory 258 def our_candidate_list(): 259 return [our_temp_directory] 260 261 with support.swap_attr(tempfile, "_candidate_tempdir_list", 262 our_candidate_list): 263 # verify our directory is empty after _get_default_tempdir() 264 tempfile._get_default_tempdir() 265 self.assertEqual(os.listdir(our_temp_directory), []) 266 267 def raise_OSError(*args, **kwargs): 268 raise OSError() 269 270 with support.swap_attr(io, "open", raise_OSError): 271 # test again with failing io.open() 272 with self.assertRaises(FileNotFoundError): 273 tempfile._get_default_tempdir() 274 self.assertEqual(os.listdir(our_temp_directory), []) 275 276 open = io.open 277 def bad_writer(*args, **kwargs): 278 fp = open(*args, **kwargs) 279 fp.write = raise_OSError 280 return fp 281 282 with support.swap_attr(io, "open", bad_writer): 283 # test again with failing write() 284 with self.assertRaises(FileNotFoundError): 285 tempfile._get_default_tempdir() 286 self.assertEqual(os.listdir(our_temp_directory), []) 287 288 289class TestGetCandidateNames(BaseTestCase): 290 """Test the internal function _get_candidate_names.""" 291 292 def test_retval(self): 293 # _get_candidate_names returns a _RandomNameSequence object 294 obj = tempfile._get_candidate_names() 295 self.assertIsInstance(obj, tempfile._RandomNameSequence) 296 297 def test_same_thing(self): 298 # _get_candidate_names always returns the same object 299 a = tempfile._get_candidate_names() 300 b = tempfile._get_candidate_names() 301 302 self.assertTrue(a is b) 303 304 305@contextlib.contextmanager 306def _inside_empty_temp_dir(): 307 dir = tempfile.mkdtemp() 308 try: 309 with support.swap_attr(tempfile, 'tempdir', dir): 310 yield 311 finally: 312 support.rmtree(dir) 313 314 315def _mock_candidate_names(*names): 316 return support.swap_attr(tempfile, 317 '_get_candidate_names', 318 lambda: iter(names)) 319 320 321class TestBadTempdir: 322 323 def test_read_only_directory(self): 324 with _inside_empty_temp_dir(): 325 oldmode = mode = os.stat(tempfile.tempdir).st_mode 326 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 327 os.chmod(tempfile.tempdir, mode) 328 try: 329 if os.access(tempfile.tempdir, os.W_OK): 330 self.skipTest("can't set the directory read-only") 331 with self.assertRaises(PermissionError): 332 self.make_temp() 333 self.assertEqual(os.listdir(tempfile.tempdir), []) 334 finally: 335 os.chmod(tempfile.tempdir, oldmode) 336 337 def test_nonexisting_directory(self): 338 with _inside_empty_temp_dir(): 339 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 340 with support.swap_attr(tempfile, 'tempdir', tempdir): 341 with self.assertRaises(FileNotFoundError): 342 self.make_temp() 343 344 def test_non_directory(self): 345 with _inside_empty_temp_dir(): 346 tempdir = os.path.join(tempfile.tempdir, 'file') 347 open(tempdir, 'wb').close() 348 with support.swap_attr(tempfile, 'tempdir', tempdir): 349 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 350 self.make_temp() 351 352 353class TestMkstempInner(TestBadTempdir, BaseTestCase): 354 """Test the internal function _mkstemp_inner.""" 355 356 class mkstemped: 357 _bflags = tempfile._bin_openflags 358 _tflags = tempfile._text_openflags 359 _close = os.close 360 _unlink = os.unlink 361 362 def __init__(self, dir, pre, suf, bin): 363 if bin: flags = self._bflags 364 else: flags = self._tflags 365 366 output_type = tempfile._infer_return_type(dir, pre, suf) 367 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 368 369 def write(self, str): 370 os.write(self.fd, str) 371 372 def __del__(self): 373 self._close(self.fd) 374 self._unlink(self.name) 375 376 def do_create(self, dir=None, pre=None, suf=None, bin=1): 377 output_type = tempfile._infer_return_type(dir, pre, suf) 378 if dir is None: 379 if output_type is str: 380 dir = tempfile.gettempdir() 381 else: 382 dir = tempfile.gettempdirb() 383 if pre is None: 384 pre = output_type() 385 if suf is None: 386 suf = output_type() 387 file = self.mkstemped(dir, pre, suf, bin) 388 389 self.nameCheck(file.name, dir, pre, suf) 390 return file 391 392 def test_basic(self): 393 # _mkstemp_inner can create files 394 self.do_create().write(b"blat") 395 self.do_create(pre="a").write(b"blat") 396 self.do_create(suf="b").write(b"blat") 397 self.do_create(pre="a", suf="b").write(b"blat") 398 self.do_create(pre="aa", suf=".txt").write(b"blat") 399 400 def test_basic_with_bytes_names(self): 401 # _mkstemp_inner can create files when given name parts all 402 # specified as bytes. 403 dir_b = tempfile.gettempdirb() 404 self.do_create(dir=dir_b, suf=b"").write(b"blat") 405 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 406 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 407 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 408 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 409 # Can't mix str & binary types in the args. 410 with self.assertRaises(TypeError): 411 self.do_create(dir="", suf=b"").write(b"blat") 412 with self.assertRaises(TypeError): 413 self.do_create(dir=dir_b, pre="").write(b"blat") 414 with self.assertRaises(TypeError): 415 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 416 417 def test_basic_many(self): 418 # _mkstemp_inner can create many files (stochastic) 419 extant = list(range(TEST_FILES)) 420 for i in extant: 421 extant[i] = self.do_create(pre="aa") 422 423 def test_choose_directory(self): 424 # _mkstemp_inner can create files in a user-selected directory 425 dir = tempfile.mkdtemp() 426 try: 427 self.do_create(dir=dir).write(b"blat") 428 finally: 429 os.rmdir(dir) 430 431 @unittest.skipUnless(has_stat, 'os.stat not available') 432 def test_file_mode(self): 433 # _mkstemp_inner creates files with the proper mode 434 435 file = self.do_create() 436 mode = stat.S_IMODE(os.stat(file.name).st_mode) 437 expected = 0o600 438 if sys.platform == 'win32': 439 # There's no distinction among 'user', 'group' and 'world'; 440 # replicate the 'user' bits. 441 user = expected >> 6 442 expected = user * (1 + 8 + 64) 443 self.assertEqual(mode, expected) 444 445 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 446 def test_noinherit(self): 447 # _mkstemp_inner file handles are not inherited by child processes 448 449 if support.verbose: 450 v="v" 451 else: 452 v="q" 453 454 file = self.do_create() 455 self.assertEqual(os.get_inheritable(file.fd), False) 456 fd = "%d" % file.fd 457 458 try: 459 me = __file__ 460 except NameError: 461 me = sys.argv[0] 462 463 # We have to exec something, so that FD_CLOEXEC will take 464 # effect. The core of this test is therefore in 465 # tf_inherit_check.py, which see. 466 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 467 "tf_inherit_check.py") 468 469 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 470 # but an arg with embedded spaces should be decorated with double 471 # quotes on each end 472 if sys.platform == 'win32': 473 decorated = '"%s"' % sys.executable 474 tester = '"%s"' % tester 475 else: 476 decorated = sys.executable 477 478 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 479 self.assertFalse(retval < 0, 480 "child process caught fatal signal %d" % -retval) 481 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 482 483 @unittest.skipUnless(has_textmode, "text mode not available") 484 def test_textmode(self): 485 # _mkstemp_inner can create files in text mode 486 487 # A text file is truncated at the first Ctrl+Z byte 488 f = self.do_create(bin=0) 489 f.write(b"blat\x1a") 490 f.write(b"extra\n") 491 os.lseek(f.fd, 0, os.SEEK_SET) 492 self.assertEqual(os.read(f.fd, 20), b"blat") 493 494 def make_temp(self): 495 return tempfile._mkstemp_inner(tempfile.gettempdir(), 496 tempfile.gettempprefix(), 497 '', 498 tempfile._bin_openflags, 499 str) 500 501 def test_collision_with_existing_file(self): 502 # _mkstemp_inner tries another name when a file with 503 # the chosen name already exists 504 with _inside_empty_temp_dir(), \ 505 _mock_candidate_names('aaa', 'aaa', 'bbb'): 506 (fd1, name1) = self.make_temp() 507 os.close(fd1) 508 self.assertTrue(name1.endswith('aaa')) 509 510 (fd2, name2) = self.make_temp() 511 os.close(fd2) 512 self.assertTrue(name2.endswith('bbb')) 513 514 def test_collision_with_existing_directory(self): 515 # _mkstemp_inner tries another name when a directory with 516 # the chosen name already exists 517 with _inside_empty_temp_dir(), \ 518 _mock_candidate_names('aaa', 'aaa', 'bbb'): 519 dir = tempfile.mkdtemp() 520 self.assertTrue(dir.endswith('aaa')) 521 522 (fd, name) = self.make_temp() 523 os.close(fd) 524 self.assertTrue(name.endswith('bbb')) 525 526 527class TestGetTempPrefix(BaseTestCase): 528 """Test gettempprefix().""" 529 530 def test_sane_template(self): 531 # gettempprefix returns a nonempty prefix string 532 p = tempfile.gettempprefix() 533 534 self.assertIsInstance(p, str) 535 self.assertGreater(len(p), 0) 536 537 pb = tempfile.gettempprefixb() 538 539 self.assertIsInstance(pb, bytes) 540 self.assertGreater(len(pb), 0) 541 542 def test_usable_template(self): 543 # gettempprefix returns a usable prefix string 544 545 # Create a temp directory, avoiding use of the prefix. 546 # Then attempt to create a file whose name is 547 # prefix + 'xxxxxx.xxx' in that directory. 548 p = tempfile.gettempprefix() + "xxxxxx.xxx" 549 d = tempfile.mkdtemp(prefix="") 550 try: 551 p = os.path.join(d, p) 552 fd = os.open(p, os.O_RDWR | os.O_CREAT) 553 os.close(fd) 554 os.unlink(p) 555 finally: 556 os.rmdir(d) 557 558 559class TestGetTempDir(BaseTestCase): 560 """Test gettempdir().""" 561 562 def test_directory_exists(self): 563 # gettempdir returns a directory which exists 564 565 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 566 self.assertTrue(os.path.isabs(d) or d == os.curdir, 567 "%r is not an absolute path" % d) 568 self.assertTrue(os.path.isdir(d), 569 "%r is not a directory" % d) 570 571 def test_directory_writable(self): 572 # gettempdir returns a directory writable by the user 573 574 # sneaky: just instantiate a NamedTemporaryFile, which 575 # defaults to writing into the directory returned by 576 # gettempdir. 577 file = tempfile.NamedTemporaryFile() 578 file.write(b"blat") 579 file.close() 580 581 def test_same_thing(self): 582 # gettempdir always returns the same object 583 a = tempfile.gettempdir() 584 b = tempfile.gettempdir() 585 c = tempfile.gettempdirb() 586 587 self.assertTrue(a is b) 588 self.assertNotEqual(type(a), type(c)) 589 self.assertEqual(a, os.fsdecode(c)) 590 591 def test_case_sensitive(self): 592 # gettempdir should not flatten its case 593 # even on a case-insensitive file system 594 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 595 _tempdir, tempfile.tempdir = tempfile.tempdir, None 596 try: 597 with support.EnvironmentVarGuard() as env: 598 # Fake the first env var which is checked as a candidate 599 env["TMPDIR"] = case_sensitive_tempdir 600 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 601 finally: 602 tempfile.tempdir = _tempdir 603 support.rmdir(case_sensitive_tempdir) 604 605 606class TestMkstemp(BaseTestCase): 607 """Test mkstemp().""" 608 609 def do_create(self, dir=None, pre=None, suf=None): 610 output_type = tempfile._infer_return_type(dir, pre, suf) 611 if dir is None: 612 if output_type is str: 613 dir = tempfile.gettempdir() 614 else: 615 dir = tempfile.gettempdirb() 616 if pre is None: 617 pre = output_type() 618 if suf is None: 619 suf = output_type() 620 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 621 (ndir, nbase) = os.path.split(name) 622 adir = os.path.abspath(dir) 623 self.assertEqual(adir, ndir, 624 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 625 626 try: 627 self.nameCheck(name, dir, pre, suf) 628 finally: 629 os.close(fd) 630 os.unlink(name) 631 632 def test_basic(self): 633 # mkstemp can create files 634 self.do_create() 635 self.do_create(pre="a") 636 self.do_create(suf="b") 637 self.do_create(pre="a", suf="b") 638 self.do_create(pre="aa", suf=".txt") 639 self.do_create(dir=".") 640 641 def test_basic_with_bytes_names(self): 642 # mkstemp can create files when given name parts all 643 # specified as bytes. 644 d = tempfile.gettempdirb() 645 self.do_create(dir=d, suf=b"") 646 self.do_create(dir=d, pre=b"a") 647 self.do_create(dir=d, suf=b"b") 648 self.do_create(dir=d, pre=b"a", suf=b"b") 649 self.do_create(dir=d, pre=b"aa", suf=b".txt") 650 self.do_create(dir=b".") 651 with self.assertRaises(TypeError): 652 self.do_create(dir=".", pre=b"aa", suf=b".txt") 653 with self.assertRaises(TypeError): 654 self.do_create(dir=b".", pre="aa", suf=b".txt") 655 with self.assertRaises(TypeError): 656 self.do_create(dir=b".", pre=b"aa", suf=".txt") 657 658 659 def test_choose_directory(self): 660 # mkstemp can create directories in a user-selected directory 661 dir = tempfile.mkdtemp() 662 try: 663 self.do_create(dir=dir) 664 finally: 665 os.rmdir(dir) 666 667 668class TestMkdtemp(TestBadTempdir, BaseTestCase): 669 """Test mkdtemp().""" 670 671 def make_temp(self): 672 return tempfile.mkdtemp() 673 674 def do_create(self, dir=None, pre=None, suf=None): 675 output_type = tempfile._infer_return_type(dir, pre, suf) 676 if dir is None: 677 if output_type is str: 678 dir = tempfile.gettempdir() 679 else: 680 dir = tempfile.gettempdirb() 681 if pre is None: 682 pre = output_type() 683 if suf is None: 684 suf = output_type() 685 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 686 687 try: 688 self.nameCheck(name, dir, pre, suf) 689 return name 690 except: 691 os.rmdir(name) 692 raise 693 694 def test_basic(self): 695 # mkdtemp can create directories 696 os.rmdir(self.do_create()) 697 os.rmdir(self.do_create(pre="a")) 698 os.rmdir(self.do_create(suf="b")) 699 os.rmdir(self.do_create(pre="a", suf="b")) 700 os.rmdir(self.do_create(pre="aa", suf=".txt")) 701 702 def test_basic_with_bytes_names(self): 703 # mkdtemp can create directories when given all binary parts 704 d = tempfile.gettempdirb() 705 os.rmdir(self.do_create(dir=d)) 706 os.rmdir(self.do_create(dir=d, pre=b"a")) 707 os.rmdir(self.do_create(dir=d, suf=b"b")) 708 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 709 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 710 with self.assertRaises(TypeError): 711 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 712 with self.assertRaises(TypeError): 713 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 714 with self.assertRaises(TypeError): 715 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 716 717 def test_basic_many(self): 718 # mkdtemp can create many directories (stochastic) 719 extant = list(range(TEST_FILES)) 720 try: 721 for i in extant: 722 extant[i] = self.do_create(pre="aa") 723 finally: 724 for i in extant: 725 if(isinstance(i, str)): 726 os.rmdir(i) 727 728 def test_choose_directory(self): 729 # mkdtemp can create directories in a user-selected directory 730 dir = tempfile.mkdtemp() 731 try: 732 os.rmdir(self.do_create(dir=dir)) 733 finally: 734 os.rmdir(dir) 735 736 @unittest.skipUnless(has_stat, 'os.stat not available') 737 def test_mode(self): 738 # mkdtemp creates directories with the proper mode 739 740 dir = self.do_create() 741 try: 742 mode = stat.S_IMODE(os.stat(dir).st_mode) 743 mode &= 0o777 # Mask off sticky bits inherited from /tmp 744 expected = 0o700 745 if sys.platform == 'win32': 746 # There's no distinction among 'user', 'group' and 'world'; 747 # replicate the 'user' bits. 748 user = expected >> 6 749 expected = user * (1 + 8 + 64) 750 self.assertEqual(mode, expected) 751 finally: 752 os.rmdir(dir) 753 754 def test_collision_with_existing_file(self): 755 # mkdtemp tries another name when a file with 756 # the chosen name already exists 757 with _inside_empty_temp_dir(), \ 758 _mock_candidate_names('aaa', 'aaa', 'bbb'): 759 file = tempfile.NamedTemporaryFile(delete=False) 760 file.close() 761 self.assertTrue(file.name.endswith('aaa')) 762 dir = tempfile.mkdtemp() 763 self.assertTrue(dir.endswith('bbb')) 764 765 def test_collision_with_existing_directory(self): 766 # mkdtemp tries another name when a directory with 767 # the chosen name already exists 768 with _inside_empty_temp_dir(), \ 769 _mock_candidate_names('aaa', 'aaa', 'bbb'): 770 dir1 = tempfile.mkdtemp() 771 self.assertTrue(dir1.endswith('aaa')) 772 dir2 = tempfile.mkdtemp() 773 self.assertTrue(dir2.endswith('bbb')) 774 775 776class TestMktemp(BaseTestCase): 777 """Test mktemp().""" 778 779 # For safety, all use of mktemp must occur in a private directory. 780 # We must also suppress the RuntimeWarning it generates. 781 def setUp(self): 782 self.dir = tempfile.mkdtemp() 783 super().setUp() 784 785 def tearDown(self): 786 if self.dir: 787 os.rmdir(self.dir) 788 self.dir = None 789 super().tearDown() 790 791 class mktemped: 792 _unlink = os.unlink 793 _bflags = tempfile._bin_openflags 794 795 def __init__(self, dir, pre, suf): 796 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 797 # Create the file. This will raise an exception if it's 798 # mysteriously appeared in the meanwhile. 799 os.close(os.open(self.name, self._bflags, 0o600)) 800 801 def __del__(self): 802 self._unlink(self.name) 803 804 def do_create(self, pre="", suf=""): 805 file = self.mktemped(self.dir, pre, suf) 806 807 self.nameCheck(file.name, self.dir, pre, suf) 808 return file 809 810 def test_basic(self): 811 # mktemp can choose usable file names 812 self.do_create() 813 self.do_create(pre="a") 814 self.do_create(suf="b") 815 self.do_create(pre="a", suf="b") 816 self.do_create(pre="aa", suf=".txt") 817 818 def test_many(self): 819 # mktemp can choose many usable file names (stochastic) 820 extant = list(range(TEST_FILES)) 821 for i in extant: 822 extant[i] = self.do_create(pre="aa") 823 824## def test_warning(self): 825## # mktemp issues a warning when used 826## warnings.filterwarnings("error", 827## category=RuntimeWarning, 828## message="mktemp") 829## self.assertRaises(RuntimeWarning, 830## tempfile.mktemp, dir=self.dir) 831 832 833# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 834 835 836class TestNamedTemporaryFile(BaseTestCase): 837 """Test NamedTemporaryFile().""" 838 839 def do_create(self, dir=None, pre="", suf="", delete=True): 840 if dir is None: 841 dir = tempfile.gettempdir() 842 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 843 delete=delete) 844 845 self.nameCheck(file.name, dir, pre, suf) 846 return file 847 848 849 def test_basic(self): 850 # NamedTemporaryFile can create files 851 self.do_create() 852 self.do_create(pre="a") 853 self.do_create(suf="b") 854 self.do_create(pre="a", suf="b") 855 self.do_create(pre="aa", suf=".txt") 856 857 def test_method_lookup(self): 858 # Issue #18879: Looking up a temporary file method should keep it 859 # alive long enough. 860 f = self.do_create() 861 wr = weakref.ref(f) 862 write = f.write 863 write2 = f.write 864 del f 865 write(b'foo') 866 del write 867 write2(b'bar') 868 del write2 869 if support.check_impl_detail(cpython=True): 870 # No reference cycle was created. 871 self.assertIsNone(wr()) 872 873 def test_iter(self): 874 # Issue #23700: getting iterator from a temporary file should keep 875 # it alive as long as it's being iterated over 876 lines = [b'spam\n', b'eggs\n', b'beans\n'] 877 def make_file(): 878 f = tempfile.NamedTemporaryFile(mode='w+b') 879 f.write(b''.join(lines)) 880 f.seek(0) 881 return f 882 for i, l in enumerate(make_file()): 883 self.assertEqual(l, lines[i]) 884 self.assertEqual(i, len(lines) - 1) 885 886 def test_creates_named(self): 887 # NamedTemporaryFile creates files with names 888 f = tempfile.NamedTemporaryFile() 889 self.assertTrue(os.path.exists(f.name), 890 "NamedTemporaryFile %s does not exist" % f.name) 891 892 def test_del_on_close(self): 893 # A NamedTemporaryFile is deleted when closed 894 dir = tempfile.mkdtemp() 895 try: 896 f = tempfile.NamedTemporaryFile(dir=dir) 897 f.write(b'blat') 898 f.close() 899 self.assertFalse(os.path.exists(f.name), 900 "NamedTemporaryFile %s exists after close" % f.name) 901 finally: 902 os.rmdir(dir) 903 904 def test_dis_del_on_close(self): 905 # Tests that delete-on-close can be disabled 906 dir = tempfile.mkdtemp() 907 tmp = None 908 try: 909 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 910 tmp = f.name 911 f.write(b'blat') 912 f.close() 913 self.assertTrue(os.path.exists(f.name), 914 "NamedTemporaryFile %s missing after close" % f.name) 915 finally: 916 if tmp is not None: 917 os.unlink(tmp) 918 os.rmdir(dir) 919 920 def test_multiple_close(self): 921 # A NamedTemporaryFile can be closed many times without error 922 f = tempfile.NamedTemporaryFile() 923 f.write(b'abc\n') 924 f.close() 925 f.close() 926 f.close() 927 928 def test_context_manager(self): 929 # A NamedTemporaryFile can be used as a context manager 930 with tempfile.NamedTemporaryFile() as f: 931 self.assertTrue(os.path.exists(f.name)) 932 self.assertFalse(os.path.exists(f.name)) 933 def use_closed(): 934 with f: 935 pass 936 self.assertRaises(ValueError, use_closed) 937 938 def test_no_leak_fd(self): 939 # Issue #21058: don't leak file descriptor when io.open() fails 940 closed = [] 941 os_close = os.close 942 def close(fd): 943 closed.append(fd) 944 os_close(fd) 945 946 with mock.patch('os.close', side_effect=close): 947 with mock.patch('io.open', side_effect=ValueError): 948 self.assertRaises(ValueError, tempfile.NamedTemporaryFile) 949 self.assertEqual(len(closed), 1) 950 951 def test_bad_mode(self): 952 dir = tempfile.mkdtemp() 953 self.addCleanup(support.rmtree, dir) 954 with self.assertRaises(ValueError): 955 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 956 with self.assertRaises(TypeError): 957 tempfile.NamedTemporaryFile(mode=2, dir=dir) 958 self.assertEqual(os.listdir(dir), []) 959 960 # How to test the mode and bufsize parameters? 961 962class TestSpooledTemporaryFile(BaseTestCase): 963 """Test SpooledTemporaryFile().""" 964 965 def do_create(self, max_size=0, dir=None, pre="", suf=""): 966 if dir is None: 967 dir = tempfile.gettempdir() 968 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 969 970 return file 971 972 973 def test_basic(self): 974 # SpooledTemporaryFile can create files 975 f = self.do_create() 976 self.assertFalse(f._rolled) 977 f = self.do_create(max_size=100, pre="a", suf=".txt") 978 self.assertFalse(f._rolled) 979 980 def test_del_on_close(self): 981 # A SpooledTemporaryFile is deleted when closed 982 dir = tempfile.mkdtemp() 983 try: 984 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 985 self.assertFalse(f._rolled) 986 f.write(b'blat ' * 5) 987 self.assertTrue(f._rolled) 988 filename = f.name 989 f.close() 990 self.assertFalse(isinstance(filename, str) and os.path.exists(filename), 991 "SpooledTemporaryFile %s exists after close" % filename) 992 finally: 993 os.rmdir(dir) 994 995 def test_rewrite_small(self): 996 # A SpooledTemporaryFile can be written to multiple within the max_size 997 f = self.do_create(max_size=30) 998 self.assertFalse(f._rolled) 999 for i in range(5): 1000 f.seek(0, 0) 1001 f.write(b'x' * 20) 1002 self.assertFalse(f._rolled) 1003 1004 def test_write_sequential(self): 1005 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1006 # over afterward 1007 f = self.do_create(max_size=30) 1008 self.assertFalse(f._rolled) 1009 f.write(b'x' * 20) 1010 self.assertFalse(f._rolled) 1011 f.write(b'x' * 10) 1012 self.assertFalse(f._rolled) 1013 f.write(b'x') 1014 self.assertTrue(f._rolled) 1015 1016 def test_writelines(self): 1017 # Verify writelines with a SpooledTemporaryFile 1018 f = self.do_create() 1019 f.writelines((b'x', b'y', b'z')) 1020 f.seek(0) 1021 buf = f.read() 1022 self.assertEqual(buf, b'xyz') 1023 1024 def test_writelines_sequential(self): 1025 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1026 # over afterward 1027 f = self.do_create(max_size=35) 1028 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1029 self.assertFalse(f._rolled) 1030 f.write(b'x') 1031 self.assertTrue(f._rolled) 1032 1033 def test_sparse(self): 1034 # A SpooledTemporaryFile that is written late in the file will extend 1035 # when that occurs 1036 f = self.do_create(max_size=30) 1037 self.assertFalse(f._rolled) 1038 f.seek(100, 0) 1039 self.assertFalse(f._rolled) 1040 f.write(b'x') 1041 self.assertTrue(f._rolled) 1042 1043 def test_fileno(self): 1044 # A SpooledTemporaryFile should roll over to a real file on fileno() 1045 f = self.do_create(max_size=30) 1046 self.assertFalse(f._rolled) 1047 self.assertTrue(f.fileno() > 0) 1048 self.assertTrue(f._rolled) 1049 1050 def test_multiple_close_before_rollover(self): 1051 # A SpooledTemporaryFile can be closed many times without error 1052 f = tempfile.SpooledTemporaryFile() 1053 f.write(b'abc\n') 1054 self.assertFalse(f._rolled) 1055 f.close() 1056 f.close() 1057 f.close() 1058 1059 def test_multiple_close_after_rollover(self): 1060 # A SpooledTemporaryFile can be closed many times without error 1061 f = tempfile.SpooledTemporaryFile(max_size=1) 1062 f.write(b'abc\n') 1063 self.assertTrue(f._rolled) 1064 f.close() 1065 f.close() 1066 f.close() 1067 1068 def test_bound_methods(self): 1069 # It should be OK to steal a bound method from a SpooledTemporaryFile 1070 # and use it independently; when the file rolls over, those bound 1071 # methods should continue to function 1072 f = self.do_create(max_size=30) 1073 read = f.read 1074 write = f.write 1075 seek = f.seek 1076 1077 write(b"a" * 35) 1078 write(b"b" * 35) 1079 seek(0, 0) 1080 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1081 1082 def test_properties(self): 1083 f = tempfile.SpooledTemporaryFile(max_size=10) 1084 f.write(b'x' * 10) 1085 self.assertFalse(f._rolled) 1086 self.assertEqual(f.mode, 'w+b') 1087 self.assertIsNone(f.name) 1088 with self.assertRaises(AttributeError): 1089 f.newlines 1090 with self.assertRaises(AttributeError): 1091 f.encoding 1092 1093 f.write(b'x') 1094 self.assertTrue(f._rolled) 1095 self.assertEqual(f.mode, 'rb+') 1096 self.assertIsNotNone(f.name) 1097 with self.assertRaises(AttributeError): 1098 f.newlines 1099 with self.assertRaises(AttributeError): 1100 f.encoding 1101 1102 def test_text_mode(self): 1103 # Creating a SpooledTemporaryFile with a text mode should produce 1104 # a file object reading and writing (Unicode) text strings. 1105 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10) 1106 f.write("abc\n") 1107 f.seek(0) 1108 self.assertEqual(f.read(), "abc\n") 1109 f.write("def\n") 1110 f.seek(0) 1111 self.assertEqual(f.read(), "abc\ndef\n") 1112 self.assertFalse(f._rolled) 1113 self.assertEqual(f.mode, 'w+') 1114 self.assertIsNone(f.name) 1115 self.assertIsNone(f.newlines) 1116 self.assertIsNone(f.encoding) 1117 1118 f.write("xyzzy\n") 1119 f.seek(0) 1120 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1121 # Check that Ctrl+Z doesn't truncate the file 1122 f.write("foo\x1abar\n") 1123 f.seek(0) 1124 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1125 self.assertTrue(f._rolled) 1126 self.assertEqual(f.mode, 'w+') 1127 self.assertIsNotNone(f.name) 1128 self.assertEqual(f.newlines, os.linesep) 1129 self.assertIsNotNone(f.encoding) 1130 1131 def test_text_newline_and_encoding(self): 1132 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1133 newline='', encoding='utf-8') 1134 f.write("\u039B\r\n") 1135 f.seek(0) 1136 self.assertEqual(f.read(), "\u039B\r\n") 1137 self.assertFalse(f._rolled) 1138 self.assertEqual(f.mode, 'w+') 1139 self.assertIsNone(f.name) 1140 self.assertIsNone(f.newlines) 1141 self.assertIsNone(f.encoding) 1142 1143 f.write("\u039B" * 20 + "\r\n") 1144 f.seek(0) 1145 self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n") 1146 self.assertTrue(f._rolled) 1147 self.assertEqual(f.mode, 'w+') 1148 self.assertIsNotNone(f.name) 1149 self.assertIsNotNone(f.newlines) 1150 self.assertEqual(f.encoding, 'utf-8') 1151 1152 def test_context_manager_before_rollover(self): 1153 # A SpooledTemporaryFile can be used as a context manager 1154 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1155 self.assertFalse(f._rolled) 1156 self.assertFalse(f.closed) 1157 self.assertTrue(f.closed) 1158 def use_closed(): 1159 with f: 1160 pass 1161 self.assertRaises(ValueError, use_closed) 1162 1163 def test_context_manager_during_rollover(self): 1164 # A SpooledTemporaryFile can be used as a context manager 1165 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1166 self.assertFalse(f._rolled) 1167 f.write(b'abc\n') 1168 f.flush() 1169 self.assertTrue(f._rolled) 1170 self.assertFalse(f.closed) 1171 self.assertTrue(f.closed) 1172 def use_closed(): 1173 with f: 1174 pass 1175 self.assertRaises(ValueError, use_closed) 1176 1177 def test_context_manager_after_rollover(self): 1178 # A SpooledTemporaryFile can be used as a context manager 1179 f = tempfile.SpooledTemporaryFile(max_size=1) 1180 f.write(b'abc\n') 1181 f.flush() 1182 self.assertTrue(f._rolled) 1183 with f: 1184 self.assertFalse(f.closed) 1185 self.assertTrue(f.closed) 1186 def use_closed(): 1187 with f: 1188 pass 1189 self.assertRaises(ValueError, use_closed) 1190 1191 def test_truncate_with_size_parameter(self): 1192 # A SpooledTemporaryFile can be truncated to zero size 1193 f = tempfile.SpooledTemporaryFile(max_size=10) 1194 f.write(b'abcdefg\n') 1195 f.seek(0) 1196 f.truncate() 1197 self.assertFalse(f._rolled) 1198 self.assertEqual(f._file.getvalue(), b'') 1199 # A SpooledTemporaryFile can be truncated to a specific size 1200 f = tempfile.SpooledTemporaryFile(max_size=10) 1201 f.write(b'abcdefg\n') 1202 f.truncate(4) 1203 self.assertFalse(f._rolled) 1204 self.assertEqual(f._file.getvalue(), b'abcd') 1205 # A SpooledTemporaryFile rolls over if truncated to large size 1206 f = tempfile.SpooledTemporaryFile(max_size=10) 1207 f.write(b'abcdefg\n') 1208 f.truncate(20) 1209 self.assertTrue(f._rolled) 1210 if has_stat: 1211 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1212 1213 1214if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1215 1216 class TestTemporaryFile(BaseTestCase): 1217 """Test TemporaryFile().""" 1218 1219 def test_basic(self): 1220 # TemporaryFile can create files 1221 # No point in testing the name params - the file has no name. 1222 tempfile.TemporaryFile() 1223 1224 def test_has_no_name(self): 1225 # TemporaryFile creates files with no names (on this system) 1226 dir = tempfile.mkdtemp() 1227 f = tempfile.TemporaryFile(dir=dir) 1228 f.write(b'blat') 1229 1230 # Sneaky: because this file has no name, it should not prevent 1231 # us from removing the directory it was created in. 1232 try: 1233 os.rmdir(dir) 1234 except: 1235 # cleanup 1236 f.close() 1237 os.rmdir(dir) 1238 raise 1239 1240 def test_multiple_close(self): 1241 # A TemporaryFile can be closed many times without error 1242 f = tempfile.TemporaryFile() 1243 f.write(b'abc\n') 1244 f.close() 1245 f.close() 1246 f.close() 1247 1248 # How to test the mode and bufsize parameters? 1249 def test_mode_and_encoding(self): 1250 1251 def roundtrip(input, *args, **kwargs): 1252 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1253 fileobj.write(input) 1254 fileobj.seek(0) 1255 self.assertEqual(input, fileobj.read()) 1256 1257 roundtrip(b"1234", "w+b") 1258 roundtrip("abdc\n", "w+") 1259 roundtrip("\u039B", "w+", encoding="utf-16") 1260 roundtrip("foo\r\n", "w+", newline="") 1261 1262 def test_no_leak_fd(self): 1263 # Issue #21058: don't leak file descriptor when io.open() fails 1264 closed = [] 1265 os_close = os.close 1266 def close(fd): 1267 closed.append(fd) 1268 os_close(fd) 1269 1270 with mock.patch('os.close', side_effect=close): 1271 with mock.patch('io.open', side_effect=ValueError): 1272 self.assertRaises(ValueError, tempfile.TemporaryFile) 1273 self.assertEqual(len(closed), 1) 1274 1275 1276 1277# Helper for test_del_on_shutdown 1278class NulledModules: 1279 def __init__(self, *modules): 1280 self.refs = [mod.__dict__ for mod in modules] 1281 self.contents = [ref.copy() for ref in self.refs] 1282 1283 def __enter__(self): 1284 for d in self.refs: 1285 for key in d: 1286 d[key] = None 1287 1288 def __exit__(self, *exc_info): 1289 for d, c in zip(self.refs, self.contents): 1290 d.clear() 1291 d.update(c) 1292 1293class TestTemporaryDirectory(BaseTestCase): 1294 """Test TemporaryDirectory().""" 1295 1296 def do_create(self, dir=None, pre="", suf="", recurse=1): 1297 if dir is None: 1298 dir = tempfile.gettempdir() 1299 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) 1300 self.nameCheck(tmp.name, dir, pre, suf) 1301 # Create a subdirectory and some files 1302 if recurse: 1303 d1 = self.do_create(tmp.name, pre, suf, recurse-1) 1304 d1.name = None 1305 with open(os.path.join(tmp.name, "test.txt"), "wb") as f: 1306 f.write(b"Hello world!") 1307 return tmp 1308 1309 def test_mkdtemp_failure(self): 1310 # Check no additional exception if mkdtemp fails 1311 # Previously would raise AttributeError instead 1312 # (noted as part of Issue #10188) 1313 with tempfile.TemporaryDirectory() as nonexistent: 1314 pass 1315 with self.assertRaises(FileNotFoundError) as cm: 1316 tempfile.TemporaryDirectory(dir=nonexistent) 1317 self.assertEqual(cm.exception.errno, errno.ENOENT) 1318 1319 def test_explicit_cleanup(self): 1320 # A TemporaryDirectory is deleted when cleaned up 1321 dir = tempfile.mkdtemp() 1322 try: 1323 d = self.do_create(dir=dir) 1324 self.assertTrue(os.path.exists(d.name), 1325 "TemporaryDirectory %s does not exist" % d.name) 1326 d.cleanup() 1327 self.assertFalse(os.path.exists(d.name), 1328 "TemporaryDirectory %s exists after cleanup" % d.name) 1329 finally: 1330 os.rmdir(dir) 1331 1332 @support.skip_unless_symlink 1333 def test_cleanup_with_symlink_to_a_directory(self): 1334 # cleanup() should not follow symlinks to directories (issue #12464) 1335 d1 = self.do_create() 1336 d2 = self.do_create(recurse=0) 1337 1338 # Symlink d1/foo -> d2 1339 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1340 1341 # This call to cleanup() should not follow the "foo" symlink 1342 d1.cleanup() 1343 1344 self.assertFalse(os.path.exists(d1.name), 1345 "TemporaryDirectory %s exists after cleanup" % d1.name) 1346 self.assertTrue(os.path.exists(d2.name), 1347 "Directory pointed to by a symlink was deleted") 1348 self.assertEqual(os.listdir(d2.name), ['test.txt'], 1349 "Contents of the directory pointed to by a symlink " 1350 "were deleted") 1351 d2.cleanup() 1352 1353 @support.cpython_only 1354 def test_del_on_collection(self): 1355 # A TemporaryDirectory is deleted when garbage collected 1356 dir = tempfile.mkdtemp() 1357 try: 1358 d = self.do_create(dir=dir) 1359 name = d.name 1360 del d # Rely on refcounting to invoke __del__ 1361 self.assertFalse(os.path.exists(name), 1362 "TemporaryDirectory %s exists after __del__" % name) 1363 finally: 1364 os.rmdir(dir) 1365 1366 def test_del_on_shutdown(self): 1367 # A TemporaryDirectory may be cleaned up during shutdown 1368 with self.do_create() as dir: 1369 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1370 code = """if True: 1371 import builtins 1372 import os 1373 import shutil 1374 import sys 1375 import tempfile 1376 import warnings 1377 1378 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1379 sys.stdout.buffer.write(tmp.name.encode()) 1380 1381 tmp2 = os.path.join(tmp.name, 'test_dir') 1382 os.mkdir(tmp2) 1383 with open(os.path.join(tmp2, "test.txt"), "w") as f: 1384 f.write("Hello world!") 1385 1386 {mod}.tmp = tmp 1387 1388 warnings.filterwarnings("always", category=ResourceWarning) 1389 """.format(dir=dir, mod=mod) 1390 rc, out, err = script_helper.assert_python_ok("-c", code) 1391 tmp_name = out.decode().strip() 1392 self.assertFalse(os.path.exists(tmp_name), 1393 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1394 err = err.decode('utf-8', 'backslashreplace') 1395 self.assertNotIn("Exception ", err) 1396 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1397 1398 def test_exit_on_shutdown(self): 1399 # Issue #22427 1400 with self.do_create() as dir: 1401 code = """if True: 1402 import sys 1403 import tempfile 1404 import warnings 1405 1406 def generator(): 1407 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1408 yield tmp 1409 g = generator() 1410 sys.stdout.buffer.write(next(g).encode()) 1411 1412 warnings.filterwarnings("always", category=ResourceWarning) 1413 """.format(dir=dir) 1414 rc, out, err = script_helper.assert_python_ok("-c", code) 1415 tmp_name = out.decode().strip() 1416 self.assertFalse(os.path.exists(tmp_name), 1417 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1418 err = err.decode('utf-8', 'backslashreplace') 1419 self.assertNotIn("Exception ", err) 1420 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1421 1422 def test_warnings_on_cleanup(self): 1423 # ResourceWarning will be triggered by __del__ 1424 with self.do_create() as dir: 1425 d = self.do_create(dir=dir, recurse=3) 1426 name = d.name 1427 1428 # Check for the resource warning 1429 with support.check_warnings(('Implicitly', ResourceWarning), quiet=False): 1430 warnings.filterwarnings("always", category=ResourceWarning) 1431 del d 1432 support.gc_collect() 1433 self.assertFalse(os.path.exists(name), 1434 "TemporaryDirectory %s exists after __del__" % name) 1435 1436 def test_multiple_close(self): 1437 # Can be cleaned-up many times without error 1438 d = self.do_create() 1439 d.cleanup() 1440 d.cleanup() 1441 d.cleanup() 1442 1443 def test_context_manager(self): 1444 # Can be used as a context manager 1445 d = self.do_create() 1446 with d as name: 1447 self.assertTrue(os.path.exists(name)) 1448 self.assertEqual(name, d.name) 1449 self.assertFalse(os.path.exists(name)) 1450 1451 1452if __name__ == "__main__": 1453 unittest.main() 1454