1"""
2Test lldb Python event APIs.
3"""
4
5import os, time
6import re
7import unittest2
8import lldb, lldbutil
9from lldbtest import *
10
11class EventAPITestCase(TestBase):
12
13    mydir = os.path.join("python_api", "event")
14
15    @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
16    @python_api_test
17    @dsym_test
18    def test_listen_for_and_print_event_with_dsym(self):
19        """Exercise SBEvent API."""
20        self.buildDsym()
21        self.do_listen_for_and_print_event()
22
23    @python_api_test
24    @dwarf_test
25    def test_listen_for_and_print_event_with_dwarf(self):
26        """Exercise SBEvent API."""
27        self.buildDwarf()
28        self.do_listen_for_and_print_event()
29
30    @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
31    @python_api_test
32    @dsym_test
33    def test_wait_for_event_with_dsym(self):
34        """Exercise SBListener.WaitForEvent() API."""
35        self.buildDsym()
36        self.do_wait_for_event()
37
38    @python_api_test
39    @dwarf_test
40    def test_wait_for_event_with_dwarf(self):
41        """Exercise SBListener.WaitForEvent() API."""
42        self.buildDwarf()
43        self.do_wait_for_event()
44
45    @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
46    @python_api_test
47    @dsym_test
48    def test_add_listener_to_broadcaster_with_dsym(self):
49        """Exercise some SBBroadcaster APIs."""
50        self.buildDsym()
51        self.do_add_listener_to_broadcaster()
52
53    @python_api_test
54    @dwarf_test
55    def test_add_listener_to_broadcaster_with_dwarf(self):
56        """Exercise some SBBroadcaster APIs."""
57        self.buildDwarf()
58        self.do_add_listener_to_broadcaster()
59
60    def setUp(self):
61        # Call super's setUp().
62        TestBase.setUp(self)
63        # Find the line number to of function 'c'.
64        self.line = line_number('main.c', '// Find the line number of function "c" here.')
65
66    def do_listen_for_and_print_event(self):
67        """Create a listener and use SBEvent API to print the events received."""
68        exe = os.path.join(os.getcwd(), "a.out")
69
70        # Create a target by the debugger.
71        target = self.dbg.CreateTarget(exe)
72        self.assertTrue(target, VALID_TARGET)
73
74        # Now create a breakpoint on main.c by name 'c'.
75        breakpoint = target.BreakpointCreateByName('c', 'a.out')
76
77        # Now launch the process, and do not stop at the entry point.
78        process = target.LaunchSimple(None, None, os.getcwd())
79        self.assertTrue(process.GetState() == lldb.eStateStopped,
80                        PROCESS_STOPPED)
81
82        # Get a handle on the process's broadcaster.
83        broadcaster = process.GetBroadcaster()
84
85        # Create an empty event object.
86        event = lldb.SBEvent()
87
88        # Create a listener object and register with the broadcaster.
89        listener = lldb.SBListener("my listener")
90        rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
91        self.assertTrue(rc, "AddListener successfully retruns")
92
93        traceOn = self.TraceOn()
94        if traceOn:
95            lldbutil.print_stacktraces(process)
96
97        # Create MyListeningThread class to wait for any kind of event.
98        import threading
99        class MyListeningThread(threading.Thread):
100            def run(self):
101                count = 0
102                # Let's only try at most 4 times to retrieve any kind of event.
103                # After that, the thread exits.
104                while not count > 3:
105                    if traceOn:
106                        print "Try wait for event..."
107                    if listener.WaitForEventForBroadcasterWithType(5,
108                                                                   broadcaster,
109                                                                   lldb.SBProcess.eBroadcastBitStateChanged,
110                                                                   event):
111                        if traceOn:
112                            desc = lldbutil.get_description(event)
113                            print "Event description:", desc
114                            print "Event data flavor:", event.GetDataFlavor()
115                            print "Process state:", lldbutil.state_type_to_str(process.GetState())
116                            print
117                    else:
118                        if traceOn:
119                            print "timeout occurred waiting for event..."
120                    count = count + 1
121                return
122
123        # Let's start the listening thread to retrieve the events.
124        my_thread = MyListeningThread()
125        my_thread.start()
126
127        # Use Python API to continue the process.  The listening thread should be
128        # able to receive the state changed events.
129        process.Continue()
130
131        # Use Python API to kill the process.  The listening thread should be
132        # able to receive the state changed event, too.
133        process.Kill()
134
135        # Wait until the 'MyListeningThread' terminates.
136        my_thread.join()
137
138    def do_wait_for_event(self):
139        """Get the listener associated with the debugger and exercise WaitForEvent API."""
140        exe = os.path.join(os.getcwd(), "a.out")
141
142        # Create a target by the debugger.
143        target = self.dbg.CreateTarget(exe)
144        self.assertTrue(target, VALID_TARGET)
145
146        # Now create a breakpoint on main.c by name 'c'.
147        breakpoint = target.BreakpointCreateByName('c', 'a.out')
148        #print "breakpoint:", breakpoint
149        self.assertTrue(breakpoint and
150                        breakpoint.GetNumLocations() == 1,
151                        VALID_BREAKPOINT)
152
153        # Get the debugger listener.
154        listener = self.dbg.GetListener()
155
156        # Now launch the process, and do not stop at entry point.
157        error = lldb.SBError()
158        process = target.Launch (listener, None, None, None, None, None, None, 0, False, error)
159        self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
160
161        # Get a handle on the process's broadcaster.
162        broadcaster = process.GetBroadcaster()
163        self.assertTrue(broadcaster, "Process with valid broadcaster")
164
165        # Create an empty event object.
166        event = lldb.SBEvent()
167        self.assertFalse(event, "Event should not be valid initially")
168
169        # Create MyListeningThread to wait for any kind of event.
170        import threading
171        class MyListeningThread(threading.Thread):
172            def run(self):
173                count = 0
174                # Let's only try at most 3 times to retrieve any kind of event.
175                while not count > 3:
176                    if listener.WaitForEvent(5, event):
177                        #print "Got a valid event:", event
178                        #print "Event data flavor:", event.GetDataFlavor()
179                        #print "Event type:", lldbutil.state_type_to_str(event.GetType())
180                        return
181                    count = count + 1
182                    print "Timeout: listener.WaitForEvent"
183
184                return
185
186        # Use Python API to kill the process.  The listening thread should be
187        # able to receive a state changed event.
188        process.Kill()
189
190        # Let's start the listening thread to retrieve the event.
191        my_thread = MyListeningThread()
192        my_thread.start()
193
194        # Wait until the 'MyListeningThread' terminates.
195        my_thread.join()
196
197        self.assertTrue(event,
198                        "My listening thread successfully received an event")
199
200    def do_add_listener_to_broadcaster(self):
201        """Get the broadcaster associated with the process and wait for broadcaster events."""
202        exe = os.path.join(os.getcwd(), "a.out")
203
204        # Create a target by the debugger.
205        target = self.dbg.CreateTarget(exe)
206        self.assertTrue(target, VALID_TARGET)
207
208        # Now create a breakpoint on main.c by name 'c'.
209        breakpoint = target.BreakpointCreateByName('c', 'a.out')
210        #print "breakpoint:", breakpoint
211        self.assertTrue(breakpoint and
212                        breakpoint.GetNumLocations() == 1,
213                        VALID_BREAKPOINT)
214
215        # Now launch the process, and do not stop at the entry point.
216        process = target.LaunchSimple(None, None, os.getcwd())
217        self.assertTrue(process.GetState() == lldb.eStateStopped,
218                        PROCESS_STOPPED)
219
220        # Get a handle on the process's broadcaster.
221        broadcaster = process.GetBroadcaster()
222        self.assertTrue(broadcaster, "Process with valid broadcaster")
223
224        # Create an empty event object.
225        event = lldb.SBEvent()
226        self.assertFalse(event, "Event should not be valid initially")
227
228        # Create a listener object and register with the broadcaster.
229        listener = lldb.SBListener("TestEvents.listener")
230        rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
231        self.assertTrue(rc, "AddListener successfully retruns")
232
233        # The finite state machine for our custom listening thread, with an
234        # initail state of 0, which means a "running" event is expected.
235        # It changes to 1 after "running" is received.
236        # It cahnges to 2 after "stopped" is received.
237        # 2 will be our final state and the test is complete.
238        self.state = 0
239
240        # Create MyListeningThread to wait for state changed events.
241        # By design, a "running" event is expected following by a "stopped" event.
242        import threading
243        class MyListeningThread(threading.Thread):
244            def run(self):
245                #print "Running MyListeningThread:", self
246
247                # Regular expression pattern for the event description.
248                pattern = re.compile("data = {.*, state = (.*)}$")
249
250                # Let's only try at most 6 times to retrieve our events.
251                count = 0
252                while True:
253                    if listener.WaitForEventForBroadcasterWithType(5,
254                                                                   broadcaster,
255                                                                   lldb.SBProcess.eBroadcastBitStateChanged,
256                                                                   event):
257                        desc = lldbutil.get_description(event)
258                        #print "Event description:", desc
259                        match = pattern.search(desc)
260                        if not match:
261                            break;
262                        if self.context.state == 0 and match.group(1) == 'running':
263                            self.context.state = 1
264                            continue
265                        elif self.context.state == 1 and match.group(1) == 'stopped':
266                            # Whoopee, both events have been received!
267                            self.context.state = 2
268                            break
269                        else:
270                            break
271                    print "Timeout: listener.WaitForEvent"
272                    count = count + 1
273                    if count > 6:
274                        break
275
276                return
277
278        # Use Python API to continue the process.  The listening thread should be
279        # able to receive the state changed events.
280        process.Continue()
281
282        # Start the listening thread to receive the "running" followed by the
283        # "stopped" events.
284        my_thread = MyListeningThread()
285        # Supply the enclosing context so that our listening thread can access
286        # the 'state' variable.
287        my_thread.context = self
288        my_thread.start()
289
290        # Wait until the 'MyListeningThread' terminates.
291        my_thread.join()
292
293        # We are no longer interested in receiving state changed events.
294        # Remove our custom listener before the inferior is killed.
295        broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
296
297        # The final judgement. :-)
298        self.assertTrue(self.state == 2,
299                        "Both expected state changed events received")
300
301
302if __name__ == '__main__':
303    import atexit
304    lldb.SBDebugger.Initialize()
305    atexit.register(lambda: lldb.SBDebugger.Terminate())
306    unittest2.main()
307