1import os
2import unittest
3import random
4from test import support
5thread = support.import_module('_thread')
6import time
7import sys
8import weakref
9
10from test import lock_tests
11
12NUMTASKS = 10
13NUMTRIPS = 3
14
15_print_mutex = thread.allocate_lock()
16
17def verbose_print(arg):
18    """Helper function for printing out debugging output."""
19    if support.verbose:
20        with _print_mutex:
21            print(arg)
22
23class BasicThreadTest(unittest.TestCase):
24
25    def setUp(self):
26        self.done_mutex = thread.allocate_lock()
27        self.done_mutex.acquire()
28        self.running_mutex = thread.allocate_lock()
29        self.random_mutex = thread.allocate_lock()
30        self.created = 0
31        self.running = 0
32        self.next_ident = 0
33
34
35class ThreadRunningTests(BasicThreadTest):
36
37    def newtask(self):
38        with self.running_mutex:
39            self.next_ident += 1
40            verbose_print("creating task %s" % self.next_ident)
41            thread.start_new_thread(self.task, (self.next_ident,))
42            self.created += 1
43            self.running += 1
44
45    def task(self, ident):
46        with self.random_mutex:
47            delay = random.random() / 10000.0
48        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
49        time.sleep(delay)
50        verbose_print("task %s done" % ident)
51        with self.running_mutex:
52            self.running -= 1
53            if self.created == NUMTASKS and self.running == 0:
54                self.done_mutex.release()
55
56    def test_starting_threads(self):
57        # Basic test for thread creation.
58        for i in range(NUMTASKS):
59            self.newtask()
60        verbose_print("waiting for tasks to complete...")
61        self.done_mutex.acquire()
62        verbose_print("all tasks done")
63
64    def test_stack_size(self):
65        # Various stack size tests.
66        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
67
68        thread.stack_size(0)
69        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
70
71    @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
72    def test_nt_and_posix_stack_size(self):
73        try:
74            thread.stack_size(4096)
75        except ValueError:
76            verbose_print("caught expected ValueError setting "
77                            "stack_size(4096)")
78        except thread.error:
79            self.skipTest("platform does not support changing thread stack "
80                          "size")
81
82        fail_msg = "stack_size(%d) failed - should succeed"
83        for tss in (262144, 0x100000, 0):
84            thread.stack_size(tss)
85            self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
86            verbose_print("successfully set stack_size(%d)" % tss)
87
88        for tss in (262144, 0x100000):
89            verbose_print("trying stack_size = (%d)" % tss)
90            self.next_ident = 0
91            self.created = 0
92            for i in range(NUMTASKS):
93                self.newtask()
94
95            verbose_print("waiting for all tasks to complete")
96            self.done_mutex.acquire()
97            verbose_print("all tasks done")
98
99        thread.stack_size(0)
100
101    def test__count(self):
102        # Test the _count() function.
103        orig = thread._count()
104        mut = thread.allocate_lock()
105        mut.acquire()
106        started = []
107        def task():
108            started.append(None)
109            mut.acquire()
110            mut.release()
111        thread.start_new_thread(task, ())
112        while not started:
113            time.sleep(0.01)
114        self.assertEqual(thread._count(), orig + 1)
115        # Allow the task to finish.
116        mut.release()
117        # The only reliable way to be sure that the thread ended from the
118        # interpreter's point of view is to wait for the function object to be
119        # destroyed.
120        done = []
121        wr = weakref.ref(task, lambda _: done.append(None))
122        del task
123        while not done:
124            time.sleep(0.01)
125        self.assertEqual(thread._count(), orig)
126
127    def test_save_exception_state_on_error(self):
128        # See issue #14474
129        def task():
130            started.release()
131            raise SyntaxError
132        def mywrite(self, *args):
133            try:
134                raise ValueError
135            except ValueError:
136                pass
137            real_write(self, *args)
138        c = thread._count()
139        started = thread.allocate_lock()
140        with support.captured_output("stderr") as stderr:
141            real_write = stderr.write
142            stderr.write = mywrite
143            started.acquire()
144            thread.start_new_thread(task, ())
145            started.acquire()
146            while thread._count() > c:
147                time.sleep(0.01)
148        self.assertIn("Traceback", stderr.getvalue())
149
150
151class Barrier:
152    def __init__(self, num_threads):
153        self.num_threads = num_threads
154        self.waiting = 0
155        self.checkin_mutex  = thread.allocate_lock()
156        self.checkout_mutex = thread.allocate_lock()
157        self.checkout_mutex.acquire()
158
159    def enter(self):
160        self.checkin_mutex.acquire()
161        self.waiting = self.waiting + 1
162        if self.waiting == self.num_threads:
163            self.waiting = self.num_threads - 1
164            self.checkout_mutex.release()
165            return
166        self.checkin_mutex.release()
167
168        self.checkout_mutex.acquire()
169        self.waiting = self.waiting - 1
170        if self.waiting == 0:
171            self.checkin_mutex.release()
172            return
173        self.checkout_mutex.release()
174
175
176class BarrierTest(BasicThreadTest):
177
178    def test_barrier(self):
179        self.bar = Barrier(NUMTASKS)
180        self.running = NUMTASKS
181        for i in range(NUMTASKS):
182            thread.start_new_thread(self.task2, (i,))
183        verbose_print("waiting for tasks to end")
184        self.done_mutex.acquire()
185        verbose_print("tasks done")
186
187    def task2(self, ident):
188        for i in range(NUMTRIPS):
189            if ident == 0:
190                # give it a good chance to enter the next
191                # barrier before the others are all out
192                # of the current one
193                delay = 0
194            else:
195                with self.random_mutex:
196                    delay = random.random() / 10000.0
197            verbose_print("task %s will run for %sus" %
198                          (ident, round(delay * 1e6)))
199            time.sleep(delay)
200            verbose_print("task %s entering %s" % (ident, i))
201            self.bar.enter()
202            verbose_print("task %s leaving barrier" % ident)
203        with self.running_mutex:
204            self.running -= 1
205            # Must release mutex before releasing done, else the main thread can
206            # exit and set mutex to None as part of global teardown; then
207            # mutex.release() raises AttributeError.
208            finished = self.running == 0
209        if finished:
210            self.done_mutex.release()
211
212class LockTests(lock_tests.LockTests):
213    locktype = thread.allocate_lock
214
215
216class TestForkInThread(unittest.TestCase):
217    def setUp(self):
218        self.read_fd, self.write_fd = os.pipe()
219
220    @unittest.skipIf(sys.platform.startswith('win'),
221                     "This test is only appropriate for POSIX-like systems.")
222    @support.reap_threads
223    def test_forkinthread(self):
224        def thread1():
225            try:
226                pid = os.fork() # fork in a thread
227            except RuntimeError:
228                os._exit(1) # exit the child
229
230            if pid == 0: # child
231                try:
232                    os.close(self.read_fd)
233                    os.write(self.write_fd, b"OK")
234                finally:
235                    os._exit(0)
236            else: # parent
237                os.close(self.write_fd)
238
239        thread.start_new_thread(thread1, ())
240        self.assertEqual(os.read(self.read_fd, 2), b"OK",
241                         "Unable to fork() in thread")
242
243    def tearDown(self):
244        try:
245            os.close(self.read_fd)
246        except OSError:
247            pass
248
249        try:
250            os.close(self.write_fd)
251        except OSError:
252            pass
253
254
255if __name__ == "__main__":
256    unittest.main()
257