1import gc
2import os
3import weakref
4
5from cStringIO import StringIO
6
7try:
8    import signal
9except ImportError:
10    signal = None
11
12import unittest2
13
14
15class TestBreak(unittest2.TestCase):
16
17    def setUp(self):
18        self._default_handler = signal.getsignal(signal.SIGINT)
19
20    def tearDown(self):
21        signal.signal(signal.SIGINT, self._default_handler)
22        unittest2.signals._results = weakref.WeakKeyDictionary()
23        unittest2.signals._interrupt_handler = None
24
25
26    def testInstallHandler(self):
27        default_handler = signal.getsignal(signal.SIGINT)
28        unittest2.installHandler()
29        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
30
31        try:
32            pid = os.getpid()
33            os.kill(pid, signal.SIGINT)
34        except KeyboardInterrupt:
35            self.fail("KeyboardInterrupt not handled")
36
37        self.assertTrue(unittest2.signals._interrupt_handler.called)
38
39    def testRegisterResult(self):
40        result = unittest2.TestResult()
41        unittest2.registerResult(result)
42
43        for ref in unittest2.signals._results:
44            if ref is result:
45                break
46            elif ref is not result:
47                self.fail("odd object in result set")
48        else:
49            self.fail("result not found")
50
51
52    def testInterruptCaught(self):
53        default_handler = signal.getsignal(signal.SIGINT)
54
55        result = unittest2.TestResult()
56        unittest2.installHandler()
57        unittest2.registerResult(result)
58
59        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
60
61        def test(result):
62            pid = os.getpid()
63            os.kill(pid, signal.SIGINT)
64            result.breakCaught = True
65            self.assertTrue(result.shouldStop)
66
67        try:
68            test(result)
69        except KeyboardInterrupt:
70            self.fail("KeyboardInterrupt not handled")
71        self.assertTrue(result.breakCaught)
72
73
74    def testSecondInterrupt(self):
75        result = unittest2.TestResult()
76        unittest2.installHandler()
77        unittest2.registerResult(result)
78
79        def test(result):
80            pid = os.getpid()
81            os.kill(pid, signal.SIGINT)
82            result.breakCaught = True
83            self.assertTrue(result.shouldStop)
84            os.kill(pid, signal.SIGINT)
85            self.fail("Second KeyboardInterrupt not raised")
86
87        try:
88            test(result)
89        except KeyboardInterrupt:
90            pass
91        else:
92            self.fail("Second KeyboardInterrupt not raised")
93        self.assertTrue(result.breakCaught)
94
95
96    def testTwoResults(self):
97        unittest2.installHandler()
98
99        result = unittest2.TestResult()
100        unittest2.registerResult(result)
101        new_handler = signal.getsignal(signal.SIGINT)
102
103        result2 = unittest2.TestResult()
104        unittest2.registerResult(result2)
105        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
106
107        result3 = unittest2.TestResult()
108
109        def test(result):
110            pid = os.getpid()
111            os.kill(pid, signal.SIGINT)
112
113        try:
114            test(result)
115        except KeyboardInterrupt:
116            self.fail("KeyboardInterrupt not handled")
117
118        self.assertTrue(result.shouldStop)
119        self.assertTrue(result2.shouldStop)
120        self.assertFalse(result3.shouldStop)
121
122
123    def testHandlerReplacedButCalled(self):
124        # If our handler has been replaced (is no longer installed) but is
125        # called by the *new* handler, then it isn't safe to delay the
126        # SIGINT and we should immediately delegate to the default handler
127        unittest2.installHandler()
128
129        handler = signal.getsignal(signal.SIGINT)
130        def new_handler(frame, signum):
131            handler(frame, signum)
132        signal.signal(signal.SIGINT, new_handler)
133
134        try:
135            pid = os.getpid()
136            os.kill(pid, signal.SIGINT)
137        except KeyboardInterrupt:
138            pass
139        else:
140            self.fail("replaced but delegated handler doesn't raise interrupt")
141
142    def testRunner(self):
143        # Creating a TextTestRunner with the appropriate argument should
144        # register the TextTestResult it creates
145        runner = unittest2.TextTestRunner(stream=StringIO())
146
147        result = runner.run(unittest2.TestSuite())
148        self.assertIn(result, unittest2.signals._results)
149
150    def testWeakReferences(self):
151        # Calling registerResult on a result should not keep it alive
152        result = unittest2.TestResult()
153        unittest2.registerResult(result)
154
155        ref = weakref.ref(result)
156        del result
157
158        # For non-reference counting implementations
159        gc.collect();gc.collect()
160        self.assertIsNone(ref())
161
162
163    def testRemoveResult(self):
164        result = unittest2.TestResult()
165        unittest2.registerResult(result)
166
167        unittest2.installHandler()
168        self.assertTrue(unittest2.removeResult(result))
169
170        # Should this raise an error instead?
171        self.assertFalse(unittest2.removeResult(unittest2.TestResult()))
172
173        try:
174            pid = os.getpid()
175            os.kill(pid, signal.SIGINT)
176        except KeyboardInterrupt:
177            pass
178
179        self.assertFalse(result.shouldStop)
180
181    def testMainInstallsHandler(self):
182        failfast = object()
183        test = object()
184        verbosity = object()
185        result = object()
186        default_handler = signal.getsignal(signal.SIGINT)
187
188        class FakeRunner(object):
189            initArgs = []
190            runArgs = []
191            def __init__(self, *args, **kwargs):
192                self.initArgs.append((args, kwargs))
193            def run(self, test):
194                self.runArgs.append(test)
195                return result
196
197        class Program(unittest2.TestProgram):
198            def __init__(self, catchbreak):
199                self.exit = False
200                self.verbosity = verbosity
201                self.failfast = failfast
202                self.catchbreak = catchbreak
203                self.testRunner = FakeRunner
204                self.test = test
205                self.result = None
206
207        p = Program(False)
208        p.runTests()
209
210        self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity,
211                                                'failfast': failfast,
212                                                'buffer': None})])
213        self.assertEqual(FakeRunner.runArgs, [test])
214        self.assertEqual(p.result, result)
215
216        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
217
218        FakeRunner.initArgs = []
219        FakeRunner.runArgs = []
220        p = Program(True)
221        p.runTests()
222
223        self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity,
224                                                'failfast': failfast,
225                                                'buffer': None})])
226        self.assertEqual(FakeRunner.runArgs, [test])
227        self.assertEqual(p.result, result)
228
229        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
230
231
232    def testRemoveHandler(self):
233        default_handler = signal.getsignal(signal.SIGINT)
234        unittest2.installHandler()
235        unittest2.removeHandler()
236        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
237
238        # check that calling removeHandler multiple times has no ill-effect
239        unittest2.removeHandler()
240        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
241
242    def testRemoveHandlerAsDecorator(self):
243        default_handler = signal.getsignal(signal.SIGINT)
244        unittest2.installHandler()
245
246        @unittest2.removeHandler
247        def test():
248            self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
249
250        test()
251        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
252
253
254# Should also skip some tests on Jython
255skipper = unittest2.skipUnless(hasattr(os, 'kill') and signal is not None,
256                               "test uses os.kill(...) and the signal module")
257TestBreak = skipper(TestBreak)
258
259if __name__ == '__main__':
260    unittest2.main()
261