1import gc 2import os 3import weakref 4 5from cStringIO import StringIO 6 7try: 8 import signal 9except ImportError: 10 signal = None 11 12import unittest2 13 14 15class TestBreak(unittest2.TestCase): 16 17 def setUp(self): 18 self._default_handler = signal.getsignal(signal.SIGINT) 19 20 def tearDown(self): 21 signal.signal(signal.SIGINT, self._default_handler) 22 unittest2.signals._results = weakref.WeakKeyDictionary() 23 unittest2.signals._interrupt_handler = None 24 25 26 def testInstallHandler(self): 27 default_handler = signal.getsignal(signal.SIGINT) 28 unittest2.installHandler() 29 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 30 31 try: 32 pid = os.getpid() 33 os.kill(pid, signal.SIGINT) 34 except KeyboardInterrupt: 35 self.fail("KeyboardInterrupt not handled") 36 37 self.assertTrue(unittest2.signals._interrupt_handler.called) 38 39 def testRegisterResult(self): 40 result = unittest2.TestResult() 41 unittest2.registerResult(result) 42 43 for ref in unittest2.signals._results: 44 if ref is result: 45 break 46 elif ref is not result: 47 self.fail("odd object in result set") 48 else: 49 self.fail("result not found") 50 51 52 def testInterruptCaught(self): 53 default_handler = signal.getsignal(signal.SIGINT) 54 55 result = unittest2.TestResult() 56 unittest2.installHandler() 57 unittest2.registerResult(result) 58 59 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 60 61 def test(result): 62 pid = os.getpid() 63 os.kill(pid, signal.SIGINT) 64 result.breakCaught = True 65 self.assertTrue(result.shouldStop) 66 67 try: 68 test(result) 69 except KeyboardInterrupt: 70 self.fail("KeyboardInterrupt not handled") 71 self.assertTrue(result.breakCaught) 72 73 74 def testSecondInterrupt(self): 75 result = unittest2.TestResult() 76 unittest2.installHandler() 77 unittest2.registerResult(result) 78 79 def test(result): 80 pid = os.getpid() 81 os.kill(pid, signal.SIGINT) 82 result.breakCaught = True 83 self.assertTrue(result.shouldStop) 84 os.kill(pid, signal.SIGINT) 85 self.fail("Second KeyboardInterrupt not raised") 86 87 try: 88 test(result) 89 except KeyboardInterrupt: 90 pass 91 else: 92 self.fail("Second KeyboardInterrupt not raised") 93 self.assertTrue(result.breakCaught) 94 95 96 def testTwoResults(self): 97 unittest2.installHandler() 98 99 result = unittest2.TestResult() 100 unittest2.registerResult(result) 101 new_handler = signal.getsignal(signal.SIGINT) 102 103 result2 = unittest2.TestResult() 104 unittest2.registerResult(result2) 105 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) 106 107 result3 = unittest2.TestResult() 108 109 def test(result): 110 pid = os.getpid() 111 os.kill(pid, signal.SIGINT) 112 113 try: 114 test(result) 115 except KeyboardInterrupt: 116 self.fail("KeyboardInterrupt not handled") 117 118 self.assertTrue(result.shouldStop) 119 self.assertTrue(result2.shouldStop) 120 self.assertFalse(result3.shouldStop) 121 122 123 def testHandlerReplacedButCalled(self): 124 # If our handler has been replaced (is no longer installed) but is 125 # called by the *new* handler, then it isn't safe to delay the 126 # SIGINT and we should immediately delegate to the default handler 127 unittest2.installHandler() 128 129 handler = signal.getsignal(signal.SIGINT) 130 def new_handler(frame, signum): 131 handler(frame, signum) 132 signal.signal(signal.SIGINT, new_handler) 133 134 try: 135 pid = os.getpid() 136 os.kill(pid, signal.SIGINT) 137 except KeyboardInterrupt: 138 pass 139 else: 140 self.fail("replaced but delegated handler doesn't raise interrupt") 141 142 def testRunner(self): 143 # Creating a TextTestRunner with the appropriate argument should 144 # register the TextTestResult it creates 145 runner = unittest2.TextTestRunner(stream=StringIO()) 146 147 result = runner.run(unittest2.TestSuite()) 148 self.assertIn(result, unittest2.signals._results) 149 150 def testWeakReferences(self): 151 # Calling registerResult on a result should not keep it alive 152 result = unittest2.TestResult() 153 unittest2.registerResult(result) 154 155 ref = weakref.ref(result) 156 del result 157 158 # For non-reference counting implementations 159 gc.collect();gc.collect() 160 self.assertIsNone(ref()) 161 162 163 def testRemoveResult(self): 164 result = unittest2.TestResult() 165 unittest2.registerResult(result) 166 167 unittest2.installHandler() 168 self.assertTrue(unittest2.removeResult(result)) 169 170 # Should this raise an error instead? 171 self.assertFalse(unittest2.removeResult(unittest2.TestResult())) 172 173 try: 174 pid = os.getpid() 175 os.kill(pid, signal.SIGINT) 176 except KeyboardInterrupt: 177 pass 178 179 self.assertFalse(result.shouldStop) 180 181 def testMainInstallsHandler(self): 182 failfast = object() 183 test = object() 184 verbosity = object() 185 result = object() 186 default_handler = signal.getsignal(signal.SIGINT) 187 188 class FakeRunner(object): 189 initArgs = [] 190 runArgs = [] 191 def __init__(self, *args, **kwargs): 192 self.initArgs.append((args, kwargs)) 193 def run(self, test): 194 self.runArgs.append(test) 195 return result 196 197 class Program(unittest2.TestProgram): 198 def __init__(self, catchbreak): 199 self.exit = False 200 self.verbosity = verbosity 201 self.failfast = failfast 202 self.catchbreak = catchbreak 203 self.testRunner = FakeRunner 204 self.test = test 205 self.result = None 206 207 p = Program(False) 208 p.runTests() 209 210 self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity, 211 'failfast': failfast, 212 'buffer': None})]) 213 self.assertEqual(FakeRunner.runArgs, [test]) 214 self.assertEqual(p.result, result) 215 216 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 217 218 FakeRunner.initArgs = [] 219 FakeRunner.runArgs = [] 220 p = Program(True) 221 p.runTests() 222 223 self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity, 224 'failfast': failfast, 225 'buffer': None})]) 226 self.assertEqual(FakeRunner.runArgs, [test]) 227 self.assertEqual(p.result, result) 228 229 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 230 231 232 def testRemoveHandler(self): 233 default_handler = signal.getsignal(signal.SIGINT) 234 unittest2.installHandler() 235 unittest2.removeHandler() 236 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 237 238 # check that calling removeHandler multiple times has no ill-effect 239 unittest2.removeHandler() 240 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 241 242 def testRemoveHandlerAsDecorator(self): 243 default_handler = signal.getsignal(signal.SIGINT) 244 unittest2.installHandler() 245 246 @unittest2.removeHandler 247 def test(): 248 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 249 250 test() 251 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 252 253 254# Should also skip some tests on Jython 255skipper = unittest2.skipUnless(hasattr(os, 'kill') and signal is not None, 256 "test uses os.kill(...) and the signal module") 257TestBreak = skipper(TestBreak) 258 259if __name__ == '__main__': 260 unittest2.main() 261