1from test import support
2import unittest
3import dummy_threading as _threading
4import time
5
6class DummyThreadingTestCase(unittest.TestCase):
7
8    class TestThread(_threading.Thread):
9
10        def run(self):
11            global running
12            global sema
13            global mutex
14            # Uncomment if testing another module, such as the real 'threading'
15            # module.
16            #delay = random.random() * 2
17            delay = 0
18            if support.verbose:
19                print('task', self.name, 'will run for', delay, 'sec')
20            sema.acquire()
21            mutex.acquire()
22            running += 1
23            if support.verbose:
24                print(running, 'tasks are running')
25            mutex.release()
26            time.sleep(delay)
27            if support.verbose:
28                print('task', self.name, 'done')
29            mutex.acquire()
30            running -= 1
31            if support.verbose:
32                print(self.name, 'is finished.', running, 'tasks are running')
33            mutex.release()
34            sema.release()
35
36    def setUp(self):
37        self.numtasks = 10
38        global sema
39        sema = _threading.BoundedSemaphore(value=3)
40        global mutex
41        mutex = _threading.RLock()
42        global running
43        running = 0
44        self.threads = []
45
46    def test_tasks(self):
47        for i in range(self.numtasks):
48            t = self.TestThread(name="<thread %d>"%i)
49            self.threads.append(t)
50            t.start()
51
52        if support.verbose:
53            print('waiting for all tasks to complete')
54        for t in self.threads:
55            t.join()
56        if support.verbose:
57            print('all tasks done')
58
59if __name__ == '__main__':
60    unittest.main()
61