1"""
2Create and delete FILES_PER_THREAD temp files (via tempfile.TemporaryFile)
3in each of NUM_THREADS threads, recording the number of successes and
4failures.  A failure is a bug in tempfile, and may be due to:
5
6+ Trying to create more than one tempfile with the same name.
7+ Trying to delete a tempfile that doesn't still exist.
8+ Something we've never seen before.
9
10By default, NUM_THREADS == 20 and FILES_PER_THREAD == 50.  This is enough to
11create about 150 failures per run under Win98SE in 2.0, and runs pretty
12quickly. Guido reports needing to boost FILES_PER_THREAD to 500 before
13provoking a 2.0 failure under Linux.
14"""
15
16NUM_THREADS = 20
17FILES_PER_THREAD = 50
18
19import tempfile
20
21from test.test_support import threading_setup, threading_cleanup, run_unittest, import_module
22threading = import_module('threading')
23import unittest
24import StringIO
25from traceback import print_exc
26
27startEvent = threading.Event()
28
29class TempFileGreedy(threading.Thread):
30    error_count = 0
31    ok_count = 0
32
33    def run(self):
34        self.errors = StringIO.StringIO()
35        startEvent.wait()
36        for i in range(FILES_PER_THREAD):
37            try:
38                f = tempfile.TemporaryFile("w+b")
39                f.close()
40            except:
41                self.error_count += 1
42                print_exc(file=self.errors)
43            else:
44                self.ok_count += 1
45
46
47class ThreadedTempFileTest(unittest.TestCase):
48    def test_main(self):
49        threads = []
50        thread_info = threading_setup()
51
52        for i in range(NUM_THREADS):
53            t = TempFileGreedy()
54            threads.append(t)
55            t.start()
56
57        startEvent.set()
58
59        ok = 0
60        errors = []
61        for t in threads:
62            t.join()
63            ok += t.ok_count
64            if t.error_count:
65                errors.append(str(t.getName()) + str(t.errors.getvalue()))
66
67        threading_cleanup(*thread_info)
68
69        msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
70            '\n'.join(errors))
71        self.assertEqual(errors, [], msg)
72        self.assertEqual(ok, NUM_THREADS * FILES_PER_THREAD)
73
74def test_main():
75    run_unittest(ThreadedTempFileTest)
76
77if __name__ == "__main__":
78    test_main()
79