test_asynchat.py revision 6e070814b232d663e222dec7252d8d187d7195ab
1# test asynchat -- requires threading
2
3import thread # If this fails, we can't test this module
4import asyncore, asynchat, socket, threading, time
5import unittest
6import sys
7from test import test_support
8
9HOST = "127.0.0.1"
10PORT = 54322
11SERVER_QUIT = 'QUIT\n'
12
13class echo_server(threading.Thread):
14    # parameter to determine the number of bytes passed back to the
15    # client each send
16    chunk_size = 1
17
18    def __init__(self, event):
19        threading.Thread.__init__(self)
20        self.event = event
21
22    def run(self):
23        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
25        global PORT
26        PORT = test_support.bind_port(sock, HOST, PORT)
27        sock.listen(1)
28        self.event.set()
29        conn, client = sock.accept()
30        self.buffer = ""
31        # collect data until quit message is seen
32        while SERVER_QUIT not in self.buffer:
33            data = conn.recv(1)
34            if not data:
35                break
36            self.buffer = self.buffer + data
37
38        # remove the SERVER_QUIT message
39        self.buffer = self.buffer.replace(SERVER_QUIT, '')
40
41        # re-send entire set of collected data
42        try:
43            # this may fail on some tests, such as test_close_when_done, since
44            # the client closes the channel when it's done sending
45            while self.buffer:
46                n = conn.send(self.buffer[:self.chunk_size])
47                time.sleep(0.001)
48                self.buffer = self.buffer[n:]
49        except:
50            pass
51
52        conn.close()
53        sock.close()
54
55class echo_client(asynchat.async_chat):
56
57    def __init__(self, terminator):
58        asynchat.async_chat.__init__(self)
59        self.contents = []
60        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
61        self.connect((HOST, PORT))
62        self.set_terminator(terminator)
63        self.buffer = ''
64
65    def handle_connect(self):
66        pass
67
68    if sys.platform == 'darwin':
69        # select.poll returns a select.POLLHUP at the end of the tests
70        # on darwin, so just ignore it
71        def handle_expt(self):
72            pass
73
74    def collect_incoming_data(self, data):
75        self.buffer += data
76
77    def found_terminator(self):
78        self.contents.append(self.buffer)
79        self.buffer = ""
80
81
82def start_echo_server():
83    event = threading.Event()
84    s = echo_server(event)
85    s.start()
86    event.wait()
87    event.clear()
88    time.sleep(0.01) # Give server time to start accepting.
89    return s, event
90
91
92class TestAsynchat(unittest.TestCase):
93    usepoll = False
94
95    def setUp (self):
96        pass
97
98    def tearDown (self):
99        pass
100
101    def line_terminator_check(self, term, server_chunk):
102        event = threading.Event()
103        s = echo_server(event)
104        s.chunk_size = server_chunk
105        s.start()
106        event.wait()
107        event.clear()
108        time.sleep(0.01) # Give server time to start accepting.
109        c = echo_client(term)
110        c.push("hello ")
111        c.push("world%s" % term)
112        c.push("I'm not dead yet!%s" % term)
113        c.push(SERVER_QUIT)
114        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
115        s.join()
116
117        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
118
119    # the line terminator tests below check receiving variously-sized
120    # chunks back from the server in order to exercise all branches of
121    # async_chat.handle_read
122
123    def test_line_terminator1(self):
124        # test one-character terminator
125        for l in (1,2,3):
126            self.line_terminator_check('\n', l)
127
128    def test_line_terminator2(self):
129        # test two-character terminator
130        for l in (1,2,3):
131            self.line_terminator_check('\r\n', l)
132
133    def test_line_terminator3(self):
134        # test three-character terminator
135        for l in (1,2,3):
136            self.line_terminator_check('qqq', l)
137
138    def numeric_terminator_check(self, termlen):
139        # Try reading a fixed number of bytes
140        s, event = start_echo_server()
141        c = echo_client(termlen)
142        data = "hello world, I'm not dead yet!\n"
143        c.push(data)
144        c.push(SERVER_QUIT)
145        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
146        s.join()
147
148        self.assertEqual(c.contents, [data[:termlen]])
149
150    def test_numeric_terminator1(self):
151        # check that ints & longs both work (since type is
152        # explicitly checked in async_chat.handle_read)
153        self.numeric_terminator_check(1)
154        self.numeric_terminator_check(1L)
155
156    def test_numeric_terminator2(self):
157        self.numeric_terminator_check(6L)
158
159    def test_none_terminator(self):
160        # Try reading a fixed number of bytes
161        s, event = start_echo_server()
162        c = echo_client(None)
163        data = "hello world, I'm not dead yet!\n"
164        c.push(data)
165        c.push(SERVER_QUIT)
166        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
167        s.join()
168
169        self.assertEqual(c.contents, [])
170        self.assertEqual(c.buffer, data)
171
172    def test_simple_producer(self):
173        s, event = start_echo_server()
174        c = echo_client('\n')
175        data = "hello world\nI'm not dead yet!\n"
176        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
177        c.push_with_producer(p)
178        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
179        s.join()
180
181        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
182
183    def test_string_producer(self):
184        s, event = start_echo_server()
185        c = echo_client('\n')
186        data = "hello world\nI'm not dead yet!\n"
187        c.push_with_producer(data+SERVER_QUIT)
188        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
189        s.join()
190
191        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
192
193    def test_empty_line(self):
194        # checks that empty lines are handled correctly
195        s, event = start_echo_server()
196        c = echo_client('\n')
197        c.push("hello world\n\nI'm not dead yet!\n")
198        c.push(SERVER_QUIT)
199        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
200        s.join()
201
202        self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
203
204    def test_close_when_done(self):
205        s, event = start_echo_server()
206        c = echo_client('\n')
207        c.push("hello world\nI'm not dead yet!\n")
208        c.push(SERVER_QUIT)
209        c.close_when_done()
210        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
211        s.join()
212
213        self.assertEqual(c.contents, [])
214        # the server might have been able to send a byte or two back, but this
215        # at least checks that it received something and didn't just fail
216        # (which could still result in the client not having received anything)
217        self.assertTrue(len(s.buffer) > 0)
218
219
220class TestAsynchat_WithPoll(TestAsynchat):
221    usepoll = True
222
223class TestHelperFunctions(unittest.TestCase):
224    def test_find_prefix_at_end(self):
225        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
226        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
227
228class TestFifo(unittest.TestCase):
229    def test_basic(self):
230        f = asynchat.fifo()
231        f.push(7)
232        f.push('a')
233        self.assertEqual(len(f), 2)
234        self.assertEqual(f.first(), 7)
235        self.assertEqual(f.pop(), (1, 7))
236        self.assertEqual(len(f), 1)
237        self.assertEqual(f.first(), 'a')
238        self.assertEqual(f.is_empty(), False)
239        self.assertEqual(f.pop(), (1, 'a'))
240        self.assertEqual(len(f), 0)
241        self.assertEqual(f.is_empty(), True)
242        self.assertEqual(f.pop(), (0, None))
243
244    def test_given_list(self):
245        f = asynchat.fifo(['x', 17, 3])
246        self.assertEqual(len(f), 3)
247        self.assertEqual(f.pop(), (1, 'x'))
248        self.assertEqual(f.pop(), (1, 17))
249        self.assertEqual(f.pop(), (1, 3))
250        self.assertEqual(f.pop(), (0, None))
251
252
253def test_main(verbose=None):
254    test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
255                              TestHelperFunctions, TestFifo)
256
257if __name__ == "__main__":
258    test_main(verbose=True)
259