1# test asynchat
2
3import asyncore, asynchat, socket, time
4import unittest
5import sys
6from test import test_support
7try:
8    import threading
9except ImportError:
10    threading = None
11
12HOST = test_support.HOST
13SERVER_QUIT = 'QUIT\n'
14
15if threading:
16    class echo_server(threading.Thread):
17        # parameter to determine the number of bytes passed back to the
18        # client each send
19        chunk_size = 1
20
21        def __init__(self, event):
22            threading.Thread.__init__(self)
23            self.event = event
24            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
25            self.port = test_support.bind_port(self.sock)
26            # This will be set if the client wants us to wait before echoing data
27            # back.
28            self.start_resend_event = None
29
30        def run(self):
31            self.sock.listen(1)
32            self.event.set()
33            conn, client = self.sock.accept()
34            self.buffer = ""
35            # collect data until quit message is seen
36            while SERVER_QUIT not in self.buffer:
37                data = conn.recv(1)
38                if not data:
39                    break
40                self.buffer = self.buffer + data
41
42            # remove the SERVER_QUIT message
43            self.buffer = self.buffer.replace(SERVER_QUIT, '')
44
45            if self.start_resend_event:
46                self.start_resend_event.wait()
47
48            # re-send entire set of collected data
49            try:
50                # this may fail on some tests, such as test_close_when_done, since
51                # the client closes the channel when it's done sending
52                while self.buffer:
53                    n = conn.send(self.buffer[:self.chunk_size])
54                    time.sleep(0.001)
55                    self.buffer = self.buffer[n:]
56            except:
57                pass
58
59            conn.close()
60            self.sock.close()
61
62    class echo_client(asynchat.async_chat):
63
64        def __init__(self, terminator, server_port):
65            asynchat.async_chat.__init__(self)
66            self.contents = []
67            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
68            self.connect((HOST, server_port))
69            self.set_terminator(terminator)
70            self.buffer = ''
71
72        def handle_connect(self):
73            pass
74
75        if sys.platform == 'darwin':
76            # select.poll returns a select.POLLHUP at the end of the tests
77            # on darwin, so just ignore it
78            def handle_expt(self):
79                pass
80
81        def collect_incoming_data(self, data):
82            self.buffer += data
83
84        def found_terminator(self):
85            self.contents.append(self.buffer)
86            self.buffer = ""
87
88
89    def start_echo_server():
90        event = threading.Event()
91        s = echo_server(event)
92        s.start()
93        event.wait()
94        event.clear()
95        time.sleep(0.01) # Give server time to start accepting.
96        return s, event
97
98
99@unittest.skipUnless(threading, 'Threading required for this test.')
100class TestAsynchat(unittest.TestCase):
101    usepoll = False
102
103    def setUp (self):
104        self._threads = test_support.threading_setup()
105
106    def tearDown (self):
107        test_support.threading_cleanup(*self._threads)
108
109    def line_terminator_check(self, term, server_chunk):
110        event = threading.Event()
111        s = echo_server(event)
112        s.chunk_size = server_chunk
113        s.start()
114        event.wait()
115        event.clear()
116        time.sleep(0.01) # Give server time to start accepting.
117        c = echo_client(term, s.port)
118        c.push("hello ")
119        c.push("world%s" % term)
120        c.push("I'm not dead yet!%s" % term)
121        c.push(SERVER_QUIT)
122        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
123        s.join()
124
125        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
126
127    # the line terminator tests below check receiving variously-sized
128    # chunks back from the server in order to exercise all branches of
129    # async_chat.handle_read
130
131    def test_line_terminator1(self):
132        # test one-character terminator
133        for l in (1,2,3):
134            self.line_terminator_check('\n', l)
135
136    def test_line_terminator2(self):
137        # test two-character terminator
138        for l in (1,2,3):
139            self.line_terminator_check('\r\n', l)
140
141    def test_line_terminator3(self):
142        # test three-character terminator
143        for l in (1,2,3):
144            self.line_terminator_check('qqq', l)
145
146    def numeric_terminator_check(self, termlen):
147        # Try reading a fixed number of bytes
148        s, event = start_echo_server()
149        c = echo_client(termlen, s.port)
150        data = "hello world, I'm not dead yet!\n"
151        c.push(data)
152        c.push(SERVER_QUIT)
153        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
154        s.join()
155
156        self.assertEqual(c.contents, [data[:termlen]])
157
158    def test_numeric_terminator1(self):
159        # check that ints & longs both work (since type is
160        # explicitly checked in async_chat.handle_read)
161        self.numeric_terminator_check(1)
162        self.numeric_terminator_check(1L)
163
164    def test_numeric_terminator2(self):
165        self.numeric_terminator_check(6L)
166
167    def test_none_terminator(self):
168        # Try reading a fixed number of bytes
169        s, event = start_echo_server()
170        c = echo_client(None, s.port)
171        data = "hello world, I'm not dead yet!\n"
172        c.push(data)
173        c.push(SERVER_QUIT)
174        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
175        s.join()
176
177        self.assertEqual(c.contents, [])
178        self.assertEqual(c.buffer, data)
179
180    def test_simple_producer(self):
181        s, event = start_echo_server()
182        c = echo_client('\n', s.port)
183        data = "hello world\nI'm not dead yet!\n"
184        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
185        c.push_with_producer(p)
186        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
187        s.join()
188
189        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
190
191    def test_string_producer(self):
192        s, event = start_echo_server()
193        c = echo_client('\n', s.port)
194        data = "hello world\nI'm not dead yet!\n"
195        c.push_with_producer(data+SERVER_QUIT)
196        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
197        s.join()
198
199        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
200
201    def test_empty_line(self):
202        # checks that empty lines are handled correctly
203        s, event = start_echo_server()
204        c = echo_client('\n', s.port)
205        c.push("hello world\n\nI'm not dead yet!\n")
206        c.push(SERVER_QUIT)
207        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
208        s.join()
209
210        self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
211
212    def test_close_when_done(self):
213        s, event = start_echo_server()
214        s.start_resend_event = threading.Event()
215        c = echo_client('\n', s.port)
216        c.push("hello world\nI'm not dead yet!\n")
217        c.push(SERVER_QUIT)
218        c.close_when_done()
219        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
220
221        # Only allow the server to start echoing data back to the client after
222        # the client has closed its connection.  This prevents a race condition
223        # where the server echoes all of its data before we can check that it
224        # got any down below.
225        s.start_resend_event.set()
226        s.join()
227
228        self.assertEqual(c.contents, [])
229        # the server might have been able to send a byte or two back, but this
230        # at least checks that it received something and didn't just fail
231        # (which could still result in the client not having received anything)
232        self.assertTrue(len(s.buffer) > 0)
233
234
235class TestAsynchat_WithPoll(TestAsynchat):
236    usepoll = True
237
238class TestHelperFunctions(unittest.TestCase):
239    def test_find_prefix_at_end(self):
240        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
241        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
242
243class TestFifo(unittest.TestCase):
244    def test_basic(self):
245        f = asynchat.fifo()
246        f.push(7)
247        f.push('a')
248        self.assertEqual(len(f), 2)
249        self.assertEqual(f.first(), 7)
250        self.assertEqual(f.pop(), (1, 7))
251        self.assertEqual(len(f), 1)
252        self.assertEqual(f.first(), 'a')
253        self.assertEqual(f.is_empty(), False)
254        self.assertEqual(f.pop(), (1, 'a'))
255        self.assertEqual(len(f), 0)
256        self.assertEqual(f.is_empty(), True)
257        self.assertEqual(f.pop(), (0, None))
258
259    def test_given_list(self):
260        f = asynchat.fifo(['x', 17, 3])
261        self.assertEqual(len(f), 3)
262        self.assertEqual(f.pop(), (1, 'x'))
263        self.assertEqual(f.pop(), (1, 17))
264        self.assertEqual(f.pop(), (1, 3))
265        self.assertEqual(f.pop(), (0, None))
266
267
268def test_main(verbose=None):
269    test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
270                              TestHelperFunctions, TestFifo)
271
272if __name__ == "__main__":
273    test_main(verbose=True)
274