1"""PyUnit testing that threads honor our signal semantics"""
2
3import unittest
4import signal
5import os
6import sys
7from test.support import run_unittest, import_module
8thread = import_module('_thread')
9import time
10
11if (sys.platform[:3] == 'win'):
12    raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
13
14process_pid = os.getpid()
15signalled_all=thread.allocate_lock()
16
17USING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
18                      and sys.thread_info.lock == 'mutex+cond')
19
20def registerSignals(for_usr1, for_usr2, for_alrm):
21    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
22    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
23    alrm = signal.signal(signal.SIGALRM, for_alrm)
24    return usr1, usr2, alrm
25
26
27# The signal handler. Just note that the signal occurred and
28# from who.
29def handle_signals(sig,frame):
30    signal_blackboard[sig]['tripped'] += 1
31    signal_blackboard[sig]['tripped_by'] = thread.get_ident()
32
33# a function that will be spawned as a separate thread.
34def send_signals():
35    os.kill(process_pid, signal.SIGUSR1)
36    os.kill(process_pid, signal.SIGUSR2)
37    signalled_all.release()
38
39class ThreadSignals(unittest.TestCase):
40
41    def test_signals(self):
42        # Test signal handling semantics of threads.
43        # We spawn a thread, have the thread send two signals, and
44        # wait for it to finish. Check that we got both signals
45        # and that they were run by the main thread.
46        signalled_all.acquire()
47        self.spawnSignallingThread()
48        signalled_all.acquire()
49        # the signals that we asked the kernel to send
50        # will come back, but we don't know when.
51        # (it might even be after the thread exits
52        # and might be out of order.)  If we haven't seen
53        # the signals yet, send yet another signal and
54        # wait for it return.
55        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
56           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
57            signal.alarm(1)
58            signal.pause()
59            signal.alarm(0)
60
61        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
62        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
63                           thread.get_ident())
64        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
65        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
66                           thread.get_ident())
67        signalled_all.release()
68
69    def spawnSignallingThread(self):
70        thread.start_new_thread(send_signals, ())
71
72    def alarm_interrupt(self, sig, frame):
73        raise KeyboardInterrupt
74
75    @unittest.skipIf(USING_PTHREAD_COND,
76                     'POSIX condition variables cannot be interrupted')
77    # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
78    @unittest.skipIf(sys.platform.startswith('openbsd'),
79                     'lock cannot be interrupted on OpenBSD')
80    def test_lock_acquire_interruption(self):
81        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
82        # in a deadlock.
83        # XXX this test can fail when the legacy (non-semaphore) implementation
84        # of locks is used in thread_pthread.h, see issue #11223.
85        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
86        try:
87            lock = thread.allocate_lock()
88            lock.acquire()
89            signal.alarm(1)
90            t1 = time.time()
91            self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
92            dt = time.time() - t1
93            # Checking that KeyboardInterrupt was raised is not sufficient.
94            # We want to assert that lock.acquire() was interrupted because
95            # of the signal, not that the signal handler was called immediately
96            # after timeout return of lock.acquire() (which can fool assertRaises).
97            self.assertLess(dt, 3.0)
98        finally:
99            signal.signal(signal.SIGALRM, oldalrm)
100
101    @unittest.skipIf(USING_PTHREAD_COND,
102                     'POSIX condition variables cannot be interrupted')
103    # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
104    @unittest.skipIf(sys.platform.startswith('openbsd'),
105                     'lock cannot be interrupted on OpenBSD')
106    def test_rlock_acquire_interruption(self):
107        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
108        # in a deadlock.
109        # XXX this test can fail when the legacy (non-semaphore) implementation
110        # of locks is used in thread_pthread.h, see issue #11223.
111        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
112        try:
113            rlock = thread.RLock()
114            # For reentrant locks, the initial acquisition must be in another
115            # thread.
116            def other_thread():
117                rlock.acquire()
118            thread.start_new_thread(other_thread, ())
119            # Wait until we can't acquire it without blocking...
120            while rlock.acquire(blocking=False):
121                rlock.release()
122                time.sleep(0.01)
123            signal.alarm(1)
124            t1 = time.time()
125            self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
126            dt = time.time() - t1
127            # See rationale above in test_lock_acquire_interruption
128            self.assertLess(dt, 3.0)
129        finally:
130            signal.signal(signal.SIGALRM, oldalrm)
131
132    def acquire_retries_on_intr(self, lock):
133        self.sig_recvd = False
134        def my_handler(signal, frame):
135            self.sig_recvd = True
136        old_handler = signal.signal(signal.SIGUSR1, my_handler)
137        try:
138            def other_thread():
139                # Acquire the lock in a non-main thread, so this test works for
140                # RLocks.
141                lock.acquire()
142                # Wait until the main thread is blocked in the lock acquire, and
143                # then wake it up with this.
144                time.sleep(0.5)
145                os.kill(process_pid, signal.SIGUSR1)
146                # Let the main thread take the interrupt, handle it, and retry
147                # the lock acquisition.  Then we'll let it run.
148                time.sleep(0.5)
149                lock.release()
150            thread.start_new_thread(other_thread, ())
151            # Wait until we can't acquire it without blocking...
152            while lock.acquire(blocking=False):
153                lock.release()
154                time.sleep(0.01)
155            result = lock.acquire()  # Block while we receive a signal.
156            self.assertTrue(self.sig_recvd)
157            self.assertTrue(result)
158        finally:
159            signal.signal(signal.SIGUSR1, old_handler)
160
161    def test_lock_acquire_retries_on_intr(self):
162        self.acquire_retries_on_intr(thread.allocate_lock())
163
164    def test_rlock_acquire_retries_on_intr(self):
165        self.acquire_retries_on_intr(thread.RLock())
166
167    def test_interrupted_timed_acquire(self):
168        # Test to make sure we recompute lock acquisition timeouts when we
169        # receive a signal.  Check this by repeatedly interrupting a lock
170        # acquire in the main thread, and make sure that the lock acquire times
171        # out after the right amount of time.
172        # NOTE: this test only behaves as expected if C signals get delivered
173        # to the main thread.  Otherwise lock.acquire() itself doesn't get
174        # interrupted and the test trivially succeeds.
175        self.start = None
176        self.end = None
177        self.sigs_recvd = 0
178        done = thread.allocate_lock()
179        done.acquire()
180        lock = thread.allocate_lock()
181        lock.acquire()
182        def my_handler(signum, frame):
183            self.sigs_recvd += 1
184        old_handler = signal.signal(signal.SIGUSR1, my_handler)
185        try:
186            def timed_acquire():
187                self.start = time.time()
188                lock.acquire(timeout=0.5)
189                self.end = time.time()
190            def send_signals():
191                for _ in range(40):
192                    time.sleep(0.02)
193                    os.kill(process_pid, signal.SIGUSR1)
194                done.release()
195
196            # Send the signals from the non-main thread, since the main thread
197            # is the only one that can process signals.
198            thread.start_new_thread(send_signals, ())
199            timed_acquire()
200            # Wait for thread to finish
201            done.acquire()
202            # This allows for some timing and scheduling imprecision
203            self.assertLess(self.end - self.start, 2.0)
204            self.assertGreater(self.end - self.start, 0.3)
205            # If the signal is received several times before PyErr_CheckSignals()
206            # is called, the handler will get called less than 40 times. Just
207            # check it's been called at least once.
208            self.assertGreater(self.sigs_recvd, 0)
209        finally:
210            signal.signal(signal.SIGUSR1, old_handler)
211
212
213def test_main():
214    global signal_blackboard
215
216    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
217                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
218                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
219
220    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
221    try:
222        run_unittest(ThreadSignals)
223    finally:
224        registerSignals(*oldsigs)
225
226if __name__ == '__main__':
227    test_main()
228