1"""PyUnit testing that threads honor our signal semantics""" 2 3import unittest 4import signal 5import os 6import sys 7from test.support import run_unittest, import_module 8thread = import_module('_thread') 9import time 10 11if (sys.platform[:3] == 'win'): 12 raise unittest.SkipTest("Can't test signal on %s" % sys.platform) 13 14process_pid = os.getpid() 15signalled_all=thread.allocate_lock() 16 17USING_PTHREAD_COND = (sys.thread_info.name == 'pthread' 18 and sys.thread_info.lock == 'mutex+cond') 19 20def registerSignals(for_usr1, for_usr2, for_alrm): 21 usr1 = signal.signal(signal.SIGUSR1, for_usr1) 22 usr2 = signal.signal(signal.SIGUSR2, for_usr2) 23 alrm = signal.signal(signal.SIGALRM, for_alrm) 24 return usr1, usr2, alrm 25 26 27# The signal handler. Just note that the signal occurred and 28# from who. 29def handle_signals(sig,frame): 30 signal_blackboard[sig]['tripped'] += 1 31 signal_blackboard[sig]['tripped_by'] = thread.get_ident() 32 33# a function that will be spawned as a separate thread. 34def send_signals(): 35 os.kill(process_pid, signal.SIGUSR1) 36 os.kill(process_pid, signal.SIGUSR2) 37 signalled_all.release() 38 39class ThreadSignals(unittest.TestCase): 40 41 def test_signals(self): 42 # Test signal handling semantics of threads. 43 # We spawn a thread, have the thread send two signals, and 44 # wait for it to finish. Check that we got both signals 45 # and that they were run by the main thread. 46 signalled_all.acquire() 47 self.spawnSignallingThread() 48 signalled_all.acquire() 49 # the signals that we asked the kernel to send 50 # will come back, but we don't know when. 51 # (it might even be after the thread exits 52 # and might be out of order.) If we haven't seen 53 # the signals yet, send yet another signal and 54 # wait for it return. 55 if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \ 56 or signal_blackboard[signal.SIGUSR2]['tripped'] == 0: 57 signal.alarm(1) 58 signal.pause() 59 signal.alarm(0) 60 61 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1) 62 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'], 63 thread.get_ident()) 64 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1) 65 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'], 66 thread.get_ident()) 67 signalled_all.release() 68 69 def spawnSignallingThread(self): 70 thread.start_new_thread(send_signals, ()) 71 72 def alarm_interrupt(self, sig, frame): 73 raise KeyboardInterrupt 74 75 @unittest.skipIf(USING_PTHREAD_COND, 76 'POSIX condition variables cannot be interrupted') 77 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD 78 @unittest.skipIf(sys.platform.startswith('openbsd'), 79 'lock cannot be interrupted on OpenBSD') 80 def test_lock_acquire_interruption(self): 81 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck 82 # in a deadlock. 83 # XXX this test can fail when the legacy (non-semaphore) implementation 84 # of locks is used in thread_pthread.h, see issue #11223. 85 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 86 try: 87 lock = thread.allocate_lock() 88 lock.acquire() 89 signal.alarm(1) 90 t1 = time.time() 91 self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5) 92 dt = time.time() - t1 93 # Checking that KeyboardInterrupt was raised is not sufficient. 94 # We want to assert that lock.acquire() was interrupted because 95 # of the signal, not that the signal handler was called immediately 96 # after timeout return of lock.acquire() (which can fool assertRaises). 97 self.assertLess(dt, 3.0) 98 finally: 99 signal.signal(signal.SIGALRM, oldalrm) 100 101 @unittest.skipIf(USING_PTHREAD_COND, 102 'POSIX condition variables cannot be interrupted') 103 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD 104 @unittest.skipIf(sys.platform.startswith('openbsd'), 105 'lock cannot be interrupted on OpenBSD') 106 def test_rlock_acquire_interruption(self): 107 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck 108 # in a deadlock. 109 # XXX this test can fail when the legacy (non-semaphore) implementation 110 # of locks is used in thread_pthread.h, see issue #11223. 111 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 112 try: 113 rlock = thread.RLock() 114 # For reentrant locks, the initial acquisition must be in another 115 # thread. 116 def other_thread(): 117 rlock.acquire() 118 thread.start_new_thread(other_thread, ()) 119 # Wait until we can't acquire it without blocking... 120 while rlock.acquire(blocking=False): 121 rlock.release() 122 time.sleep(0.01) 123 signal.alarm(1) 124 t1 = time.time() 125 self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5) 126 dt = time.time() - t1 127 # See rationale above in test_lock_acquire_interruption 128 self.assertLess(dt, 3.0) 129 finally: 130 signal.signal(signal.SIGALRM, oldalrm) 131 132 def acquire_retries_on_intr(self, lock): 133 self.sig_recvd = False 134 def my_handler(signal, frame): 135 self.sig_recvd = True 136 old_handler = signal.signal(signal.SIGUSR1, my_handler) 137 try: 138 def other_thread(): 139 # Acquire the lock in a non-main thread, so this test works for 140 # RLocks. 141 lock.acquire() 142 # Wait until the main thread is blocked in the lock acquire, and 143 # then wake it up with this. 144 time.sleep(0.5) 145 os.kill(process_pid, signal.SIGUSR1) 146 # Let the main thread take the interrupt, handle it, and retry 147 # the lock acquisition. Then we'll let it run. 148 time.sleep(0.5) 149 lock.release() 150 thread.start_new_thread(other_thread, ()) 151 # Wait until we can't acquire it without blocking... 152 while lock.acquire(blocking=False): 153 lock.release() 154 time.sleep(0.01) 155 result = lock.acquire() # Block while we receive a signal. 156 self.assertTrue(self.sig_recvd) 157 self.assertTrue(result) 158 finally: 159 signal.signal(signal.SIGUSR1, old_handler) 160 161 def test_lock_acquire_retries_on_intr(self): 162 self.acquire_retries_on_intr(thread.allocate_lock()) 163 164 def test_rlock_acquire_retries_on_intr(self): 165 self.acquire_retries_on_intr(thread.RLock()) 166 167 def test_interrupted_timed_acquire(self): 168 # Test to make sure we recompute lock acquisition timeouts when we 169 # receive a signal. Check this by repeatedly interrupting a lock 170 # acquire in the main thread, and make sure that the lock acquire times 171 # out after the right amount of time. 172 # NOTE: this test only behaves as expected if C signals get delivered 173 # to the main thread. Otherwise lock.acquire() itself doesn't get 174 # interrupted and the test trivially succeeds. 175 self.start = None 176 self.end = None 177 self.sigs_recvd = 0 178 done = thread.allocate_lock() 179 done.acquire() 180 lock = thread.allocate_lock() 181 lock.acquire() 182 def my_handler(signum, frame): 183 self.sigs_recvd += 1 184 old_handler = signal.signal(signal.SIGUSR1, my_handler) 185 try: 186 def timed_acquire(): 187 self.start = time.time() 188 lock.acquire(timeout=0.5) 189 self.end = time.time() 190 def send_signals(): 191 for _ in range(40): 192 time.sleep(0.02) 193 os.kill(process_pid, signal.SIGUSR1) 194 done.release() 195 196 # Send the signals from the non-main thread, since the main thread 197 # is the only one that can process signals. 198 thread.start_new_thread(send_signals, ()) 199 timed_acquire() 200 # Wait for thread to finish 201 done.acquire() 202 # This allows for some timing and scheduling imprecision 203 self.assertLess(self.end - self.start, 2.0) 204 self.assertGreater(self.end - self.start, 0.3) 205 # If the signal is received several times before PyErr_CheckSignals() 206 # is called, the handler will get called less than 40 times. Just 207 # check it's been called at least once. 208 self.assertGreater(self.sigs_recvd, 0) 209 finally: 210 signal.signal(signal.SIGUSR1, old_handler) 211 212 213def test_main(): 214 global signal_blackboard 215 216 signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 }, 217 signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 }, 218 signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } } 219 220 oldsigs = registerSignals(handle_signals, handle_signals, handle_signals) 221 try: 222 run_unittest(ThreadSignals) 223 finally: 224 registerSignals(*oldsigs) 225 226if __name__ == '__main__': 227 test_main() 228