test_asynchat.py revision fe9ebe4f4ff848cda1d4a5e398354b3be3fb984e
1# test asynchat
2
3import errno
4import asyncore
5import asynchat
6import socket
7import time
8import unittest
9import sys
10from test import test_support
11try:
12    import threading
13except ImportError:
14    threading = None
15
16HOST = test_support.HOST
17SERVER_QUIT = 'QUIT\n'
18
19if threading:
20    class echo_server(threading.Thread):
21        # parameter to determine the number of bytes passed back to the
22        # client each send
23        chunk_size = 1
24
25        def __init__(self, event):
26            threading.Thread.__init__(self)
27            self.event = event
28            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
29            self.port = test_support.bind_port(self.sock)
30            # This will be set if the client wants us to wait before echoing data
31            # back.
32            self.start_resend_event = None
33
34        def run(self):
35            self.sock.listen(1)
36            self.event.set()
37            conn, client = self.sock.accept()
38            self.buffer = ""
39            # collect data until quit message is seen
40            while SERVER_QUIT not in self.buffer:
41                data = conn.recv(1)
42                if not data:
43                    break
44                self.buffer = self.buffer + data
45
46            # remove the SERVER_QUIT message
47            self.buffer = self.buffer.replace(SERVER_QUIT, '')
48
49            if self.start_resend_event:
50                self.start_resend_event.wait()
51
52            # re-send entire set of collected data
53            try:
54                # this may fail on some tests, such as test_close_when_done, since
55                # the client closes the channel when it's done sending
56                while self.buffer:
57                    n = conn.send(self.buffer[:self.chunk_size])
58                    time.sleep(0.001)
59                    self.buffer = self.buffer[n:]
60            except:
61                pass
62
63            conn.close()
64            self.sock.close()
65
66    class echo_client(asynchat.async_chat):
67
68        def __init__(self, terminator, server_port):
69            asynchat.async_chat.__init__(self)
70            self.contents = []
71            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
72            self.connect((HOST, server_port))
73            self.set_terminator(terminator)
74            self.buffer = ''
75
76        def handle_connect(self):
77            pass
78
79        if sys.platform == 'darwin':
80            # select.poll returns a select.POLLHUP at the end of the tests
81            # on darwin, so just ignore it
82            def handle_expt(self):
83                pass
84
85        def collect_incoming_data(self, data):
86            self.buffer += data
87
88        def found_terminator(self):
89            self.contents.append(self.buffer)
90            self.buffer = ""
91
92
93    def start_echo_server():
94        event = threading.Event()
95        s = echo_server(event)
96        s.start()
97        event.wait()
98        event.clear()
99        time.sleep(0.01) # Give server time to start accepting.
100        return s, event
101
102
103@unittest.skipUnless(threading, 'Threading required for this test.')
104class TestAsynchat(unittest.TestCase):
105    usepoll = False
106
107    def setUp (self):
108        self._threads = test_support.threading_setup()
109
110    def tearDown (self):
111        test_support.threading_cleanup(*self._threads)
112
113    def line_terminator_check(self, term, server_chunk):
114        event = threading.Event()
115        s = echo_server(event)
116        s.chunk_size = server_chunk
117        s.start()
118        event.wait()
119        event.clear()
120        time.sleep(0.01) # Give server time to start accepting.
121        c = echo_client(term, s.port)
122        c.push("hello ")
123        c.push("world%s" % term)
124        c.push("I'm not dead yet!%s" % term)
125        c.push(SERVER_QUIT)
126        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
127        s.join()
128
129        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
130
131    # the line terminator tests below check receiving variously-sized
132    # chunks back from the server in order to exercise all branches of
133    # async_chat.handle_read
134
135    def test_line_terminator1(self):
136        # test one-character terminator
137        for l in (1,2,3):
138            self.line_terminator_check('\n', l)
139
140    def test_line_terminator2(self):
141        # test two-character terminator
142        for l in (1,2,3):
143            self.line_terminator_check('\r\n', l)
144
145    def test_line_terminator3(self):
146        # test three-character terminator
147        for l in (1,2,3):
148            self.line_terminator_check('qqq', l)
149
150    def numeric_terminator_check(self, termlen):
151        # Try reading a fixed number of bytes
152        s, event = start_echo_server()
153        c = echo_client(termlen, s.port)
154        data = "hello world, I'm not dead yet!\n"
155        c.push(data)
156        c.push(SERVER_QUIT)
157        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
158        s.join()
159
160        self.assertEqual(c.contents, [data[:termlen]])
161
162    def test_numeric_terminator1(self):
163        # check that ints & longs both work (since type is
164        # explicitly checked in async_chat.handle_read)
165        self.numeric_terminator_check(1)
166        self.numeric_terminator_check(1L)
167
168    def test_numeric_terminator2(self):
169        self.numeric_terminator_check(6L)
170
171    def test_none_terminator(self):
172        # Try reading a fixed number of bytes
173        s, event = start_echo_server()
174        c = echo_client(None, s.port)
175        data = "hello world, I'm not dead yet!\n"
176        c.push(data)
177        c.push(SERVER_QUIT)
178        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
179        s.join()
180
181        self.assertEqual(c.contents, [])
182        self.assertEqual(c.buffer, data)
183
184    def test_simple_producer(self):
185        s, event = start_echo_server()
186        c = echo_client('\n', s.port)
187        data = "hello world\nI'm not dead yet!\n"
188        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
189        c.push_with_producer(p)
190        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
191        s.join()
192
193        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
194
195    def test_string_producer(self):
196        s, event = start_echo_server()
197        c = echo_client('\n', s.port)
198        data = "hello world\nI'm not dead yet!\n"
199        c.push_with_producer(data+SERVER_QUIT)
200        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
201        s.join()
202
203        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
204
205    def test_empty_line(self):
206        # checks that empty lines are handled correctly
207        s, event = start_echo_server()
208        c = echo_client('\n', s.port)
209        c.push("hello world\n\nI'm not dead yet!\n")
210        c.push(SERVER_QUIT)
211        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
212        s.join()
213
214        self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
215
216    def test_close_when_done(self):
217        s, event = start_echo_server()
218        s.start_resend_event = threading.Event()
219        c = echo_client('\n', s.port)
220        c.push("hello world\nI'm not dead yet!\n")
221        c.push(SERVER_QUIT)
222        c.close_when_done()
223        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
224
225        # Only allow the server to start echoing data back to the client after
226        # the client has closed its connection.  This prevents a race condition
227        # where the server echoes all of its data before we can check that it
228        # got any down below.
229        s.start_resend_event.set()
230        s.join()
231
232        self.assertEqual(c.contents, [])
233        # the server might have been able to send a byte or two back, but this
234        # at least checks that it received something and didn't just fail
235        # (which could still result in the client not having received anything)
236        self.assertTrue(len(s.buffer) > 0)
237
238
239class TestAsynchat_WithPoll(TestAsynchat):
240    usepoll = True
241
242
243class TestAsynchatMocked(unittest.TestCase):
244    def test_blockingioerror(self):
245        # Issue #16133: handle_read() must ignore blocking I/O errors like
246        # EAGAIN
247        class fake_socket:
248            def fileno(self):
249                return 0
250
251            def recv(self, size):
252                raise socket.error(errno.EAGAIN, "EAGAIN")
253
254        class MyChat(asynchat.async_chat):
255            def handle_error(self):
256                raise Exception("error")
257
258        sock = fake_socket()
259        dispatcher = MyChat()
260        dispatcher.set_socket(sock)
261        self.addCleanup(dispatcher.del_channel)
262
263        # must not call handle_error()
264        dispatcher.handle_read()
265
266
267class TestHelperFunctions(unittest.TestCase):
268    def test_find_prefix_at_end(self):
269        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
270        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
271
272class TestFifo(unittest.TestCase):
273    def test_basic(self):
274        f = asynchat.fifo()
275        f.push(7)
276        f.push('a')
277        self.assertEqual(len(f), 2)
278        self.assertEqual(f.first(), 7)
279        self.assertEqual(f.pop(), (1, 7))
280        self.assertEqual(len(f), 1)
281        self.assertEqual(f.first(), 'a')
282        self.assertEqual(f.is_empty(), False)
283        self.assertEqual(f.pop(), (1, 'a'))
284        self.assertEqual(len(f), 0)
285        self.assertEqual(f.is_empty(), True)
286        self.assertEqual(f.pop(), (0, None))
287
288    def test_given_list(self):
289        f = asynchat.fifo(['x', 17, 3])
290        self.assertEqual(len(f), 3)
291        self.assertEqual(f.pop(), (1, 'x'))
292        self.assertEqual(f.pop(), (1, 17))
293        self.assertEqual(f.pop(), (1, 3))
294        self.assertEqual(f.pop(), (0, None))
295
296
297def test_main(verbose=None):
298    test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
299                              TestAsynchatMocked,
300                              TestHelperFunctions, TestFifo)
301
302if __name__ == "__main__":
303    test_main(verbose=True)
304