1import socket
2import telnetlib
3import time
4import Queue
5
6import unittest
7from unittest import TestCase
8from test import test_support
9threading = test_support.import_module('threading')
10
11HOST = test_support.HOST
12EOF_sigil = object()
13
14def server(evt, serv, dataq=None):
15    """ Open a tcp server in three steps
16        1) set evt to true to let the parent know we are ready
17        2) [optional] if is not False, write the list of data from dataq.get()
18           to the socket.
19    """
20    serv.listen(5)
21    evt.set()
22    try:
23        conn, addr = serv.accept()
24        if dataq:
25            data = ''
26            new_data = dataq.get(True, 0.5)
27            dataq.task_done()
28            for item in new_data:
29                if item == EOF_sigil:
30                    break
31                if type(item) in [int, float]:
32                    time.sleep(item)
33                else:
34                    data += item
35                written = conn.send(data)
36                data = data[written:]
37        conn.close()
38    except socket.timeout:
39        pass
40    finally:
41        serv.close()
42
43class GeneralTests(TestCase):
44
45    def setUp(self):
46        self.evt = threading.Event()
47        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48        self.sock.settimeout(60)  # Safety net. Look issue 11812
49        self.port = test_support.bind_port(self.sock)
50        self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
51        self.thread.setDaemon(True)
52        self.thread.start()
53        self.evt.wait()
54
55    def tearDown(self):
56        self.thread.join()
57
58    def testBasic(self):
59        # connects
60        telnet = telnetlib.Telnet(HOST, self.port)
61        telnet.sock.close()
62
63    def testTimeoutDefault(self):
64        self.assertTrue(socket.getdefaulttimeout() is None)
65        socket.setdefaulttimeout(30)
66        try:
67            telnet = telnetlib.Telnet(HOST, self.port)
68        finally:
69            socket.setdefaulttimeout(None)
70        self.assertEqual(telnet.sock.gettimeout(), 30)
71        telnet.sock.close()
72
73    def testTimeoutNone(self):
74        # None, having other default
75        self.assertTrue(socket.getdefaulttimeout() is None)
76        socket.setdefaulttimeout(30)
77        try:
78            telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
79        finally:
80            socket.setdefaulttimeout(None)
81        self.assertTrue(telnet.sock.gettimeout() is None)
82        telnet.sock.close()
83
84    def testTimeoutValue(self):
85        telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
86        self.assertEqual(telnet.sock.gettimeout(), 30)
87        telnet.sock.close()
88
89    def testTimeoutOpen(self):
90        telnet = telnetlib.Telnet()
91        telnet.open(HOST, self.port, timeout=30)
92        self.assertEqual(telnet.sock.gettimeout(), 30)
93        telnet.sock.close()
94
95def _read_setUp(self):
96    self.evt = threading.Event()
97    self.dataq = Queue.Queue()
98    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
99    self.sock.settimeout(10)
100    self.port = test_support.bind_port(self.sock)
101    self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
102    self.thread.start()
103    self.evt.wait()
104
105def _read_tearDown(self):
106    self.thread.join()
107
108class ReadTests(TestCase):
109    setUp = _read_setUp
110    tearDown = _read_tearDown
111
112    # use a similar approach to testing timeouts as test_timeout.py
113    # these will never pass 100% but make the fuzz big enough that it is rare
114    block_long = 0.6
115    block_short = 0.3
116    def test_read_until_A(self):
117        """
118        read_until(expected, [timeout])
119          Read until the expected string has been seen, or a timeout is
120          hit (default is no timeout); may block.
121        """
122        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
123        self.dataq.put(want)
124        telnet = telnetlib.Telnet(HOST, self.port)
125        self.dataq.join()
126        data = telnet.read_until('match')
127        self.assertEqual(data, ''.join(want[:-2]))
128
129    def test_read_until_B(self):
130        # test the timeout - it does NOT raise socket.timeout
131        want = ['hello', self.block_long, 'not seen', EOF_sigil]
132        self.dataq.put(want)
133        telnet = telnetlib.Telnet(HOST, self.port)
134        self.dataq.join()
135        data = telnet.read_until('not seen', self.block_short)
136        self.assertEqual(data, want[0])
137        self.assertEqual(telnet.read_all(), 'not seen')
138
139    def test_read_until_with_poll(self):
140        """Use select.poll() to implement telnet.read_until()."""
141        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
142        self.dataq.put(want)
143        telnet = telnetlib.Telnet(HOST, self.port)
144        if not telnet._has_poll:
145            raise unittest.SkipTest('select.poll() is required')
146        telnet._has_poll = True
147        self.dataq.join()
148        data = telnet.read_until('match')
149        self.assertEqual(data, ''.join(want[:-2]))
150
151    def test_read_until_with_select(self):
152        """Use select.select() to implement telnet.read_until()."""
153        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
154        self.dataq.put(want)
155        telnet = telnetlib.Telnet(HOST, self.port)
156        telnet._has_poll = False
157        self.dataq.join()
158        data = telnet.read_until('match')
159        self.assertEqual(data, ''.join(want[:-2]))
160
161    def test_read_all_A(self):
162        """
163        read_all()
164          Read all data until EOF; may block.
165        """
166        want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
167        self.dataq.put(want)
168        telnet = telnetlib.Telnet(HOST, self.port)
169        self.dataq.join()
170        data = telnet.read_all()
171        self.assertEqual(data, ''.join(want[:-1]))
172        return
173
174    def _test_blocking(self, func):
175        self.dataq.put([self.block_long, EOF_sigil])
176        self.dataq.join()
177        start = time.time()
178        data = func()
179        self.assertTrue(self.block_short <= time.time() - start)
180
181    def test_read_all_B(self):
182        self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
183
184    def test_read_all_C(self):
185        self.dataq.put([EOF_sigil])
186        telnet = telnetlib.Telnet(HOST, self.port)
187        self.dataq.join()
188        telnet.read_all()
189        telnet.read_all() # shouldn't raise
190
191    def test_read_some_A(self):
192        """
193        read_some()
194          Read at least one byte or EOF; may block.
195        """
196        # test 'at least one byte'
197        want = ['x' * 500, EOF_sigil]
198        self.dataq.put(want)
199        telnet = telnetlib.Telnet(HOST, self.port)
200        self.dataq.join()
201        data = telnet.read_all()
202        self.assertTrue(len(data) >= 1)
203
204    def test_read_some_B(self):
205        # test EOF
206        self.dataq.put([EOF_sigil])
207        telnet = telnetlib.Telnet(HOST, self.port)
208        self.dataq.join()
209        self.assertEqual('', telnet.read_some())
210
211    def test_read_some_C(self):
212        self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
213
214    def _test_read_any_eager_A(self, func_name):
215        """
216        read_very_eager()
217          Read all data available already queued or on the socket,
218          without blocking.
219        """
220        want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
221        expects = want[1] + want[2]
222        self.dataq.put(want)
223        telnet = telnetlib.Telnet(HOST, self.port)
224        self.dataq.join()
225        func = getattr(telnet, func_name)
226        data = ''
227        while True:
228            try:
229                data += func()
230                self.assertTrue(expects.startswith(data))
231            except EOFError:
232                break
233        self.assertEqual(expects, data)
234
235    def _test_read_any_eager_B(self, func_name):
236        # test EOF
237        self.dataq.put([EOF_sigil])
238        telnet = telnetlib.Telnet(HOST, self.port)
239        self.dataq.join()
240        time.sleep(self.block_short)
241        func = getattr(telnet, func_name)
242        self.assertRaises(EOFError, func)
243
244    # read_eager and read_very_eager make the same gaurantees
245    # (they behave differently but we only test the gaurantees)
246    def test_read_very_eager_A(self):
247        self._test_read_any_eager_A('read_very_eager')
248    def test_read_very_eager_B(self):
249        self._test_read_any_eager_B('read_very_eager')
250    def test_read_eager_A(self):
251        self._test_read_any_eager_A('read_eager')
252    def test_read_eager_B(self):
253        self._test_read_any_eager_B('read_eager')
254    # NB -- we need to test the IAC block which is mentioned in the docstring
255    # but not in the module docs
256
257    def _test_read_any_lazy_B(self, func_name):
258        self.dataq.put([EOF_sigil])
259        telnet = telnetlib.Telnet(HOST, self.port)
260        self.dataq.join()
261        func = getattr(telnet, func_name)
262        telnet.fill_rawq()
263        self.assertRaises(EOFError, func)
264
265    def test_read_lazy_A(self):
266        want = ['x' * 100, EOF_sigil]
267        self.dataq.put(want)
268        telnet = telnetlib.Telnet(HOST, self.port)
269        self.dataq.join()
270        time.sleep(self.block_short)
271        self.assertEqual('', telnet.read_lazy())
272        data = ''
273        while True:
274            try:
275                read_data = telnet.read_lazy()
276                data += read_data
277                if not read_data:
278                    telnet.fill_rawq()
279            except EOFError:
280                break
281            self.assertTrue(want[0].startswith(data))
282        self.assertEqual(data, want[0])
283
284    def test_read_lazy_B(self):
285        self._test_read_any_lazy_B('read_lazy')
286
287    def test_read_very_lazy_A(self):
288        want = ['x' * 100, EOF_sigil]
289        self.dataq.put(want)
290        telnet = telnetlib.Telnet(HOST, self.port)
291        self.dataq.join()
292        time.sleep(self.block_short)
293        self.assertEqual('', telnet.read_very_lazy())
294        data = ''
295        while True:
296            try:
297                read_data = telnet.read_very_lazy()
298            except EOFError:
299                break
300            data += read_data
301            if not read_data:
302                telnet.fill_rawq()
303                self.assertEqual('', telnet.cookedq)
304                telnet.process_rawq()
305            self.assertTrue(want[0].startswith(data))
306        self.assertEqual(data, want[0])
307
308    def test_read_very_lazy_B(self):
309        self._test_read_any_lazy_B('read_very_lazy')
310
311class nego_collector(object):
312    def __init__(self, sb_getter=None):
313        self.seen = ''
314        self.sb_getter = sb_getter
315        self.sb_seen = ''
316
317    def do_nego(self, sock, cmd, opt):
318        self.seen += cmd + opt
319        if cmd == tl.SE and self.sb_getter:
320            sb_data = self.sb_getter()
321            self.sb_seen += sb_data
322
323tl = telnetlib
324class OptionTests(TestCase):
325    setUp = _read_setUp
326    tearDown = _read_tearDown
327    # RFC 854 commands
328    cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
329
330    def _test_command(self, data):
331        """ helper for testing IAC + cmd """
332        self.setUp()
333        self.dataq.put(data)
334        telnet = telnetlib.Telnet(HOST, self.port)
335        self.dataq.join()
336        nego = nego_collector()
337        telnet.set_option_negotiation_callback(nego.do_nego)
338        txt = telnet.read_all()
339        cmd = nego.seen
340        self.assertTrue(len(cmd) > 0) # we expect at least one command
341        self.assertIn(cmd[0], self.cmds)
342        self.assertEqual(cmd[1], tl.NOOPT)
343        self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
344        nego.sb_getter = None # break the nego => telnet cycle
345        self.tearDown()
346
347    def test_IAC_commands(self):
348        # reset our setup
349        self.dataq.put([EOF_sigil])
350        telnet = telnetlib.Telnet(HOST, self.port)
351        self.dataq.join()
352        self.tearDown()
353
354        for cmd in self.cmds:
355            self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
356            self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
357            self._test_command([tl.IAC + cmd, EOF_sigil])
358        # all at once
359        self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
360        self.assertEqual('', telnet.read_sb_data())
361
362    def test_SB_commands(self):
363        # RFC 855, subnegotiations portion
364        send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
365                tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
366                tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
367                tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
368                tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
369                EOF_sigil,
370               ]
371        self.dataq.put(send)
372        telnet = telnetlib.Telnet(HOST, self.port)
373        self.dataq.join()
374        nego = nego_collector(telnet.read_sb_data)
375        telnet.set_option_negotiation_callback(nego.do_nego)
376        txt = telnet.read_all()
377        self.assertEqual(txt, '')
378        want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
379        self.assertEqual(nego.sb_seen, want_sb_data)
380        self.assertEqual('', telnet.read_sb_data())
381        nego.sb_getter = None # break the nego => telnet cycle
382
383
384class ExpectTests(TestCase):
385    def setUp(self):
386        self.evt = threading.Event()
387        self.dataq = Queue.Queue()
388        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
389        self.sock.settimeout(10)
390        self.port = test_support.bind_port(self.sock)
391        self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
392                                                            self.dataq))
393        self.thread.start()
394        self.evt.wait()
395
396    def tearDown(self):
397        self.thread.join()
398
399    # use a similar approach to testing timeouts as test_timeout.py
400    # these will never pass 100% but make the fuzz big enough that it is rare
401    block_long = 0.6
402    block_short = 0.3
403    def test_expect_A(self):
404        """
405        expect(expected, [timeout])
406          Read until the expected string has been seen, or a timeout is
407          hit (default is no timeout); may block.
408        """
409        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
410        self.dataq.put(want)
411        telnet = telnetlib.Telnet(HOST, self.port)
412        self.dataq.join()
413        (_,_,data) = telnet.expect(['match'])
414        self.assertEqual(data, ''.join(want[:-2]))
415
416    def test_expect_B(self):
417        # test the timeout - it does NOT raise socket.timeout
418        want = ['hello', self.block_long, 'not seen', EOF_sigil]
419        self.dataq.put(want)
420        telnet = telnetlib.Telnet(HOST, self.port)
421        self.dataq.join()
422        (_,_,data) = telnet.expect(['not seen'], self.block_short)
423        self.assertEqual(data, want[0])
424        self.assertEqual(telnet.read_all(), 'not seen')
425
426    def test_expect_with_poll(self):
427        """Use select.poll() to implement telnet.expect()."""
428        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
429        self.dataq.put(want)
430        telnet = telnetlib.Telnet(HOST, self.port)
431        if not telnet._has_poll:
432            raise unittest.SkipTest('select.poll() is required')
433        telnet._has_poll = True
434        self.dataq.join()
435        (_,_,data) = telnet.expect(['match'])
436        self.assertEqual(data, ''.join(want[:-2]))
437
438    def test_expect_with_select(self):
439        """Use select.select() to implement telnet.expect()."""
440        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
441        self.dataq.put(want)
442        telnet = telnetlib.Telnet(HOST, self.port)
443        telnet._has_poll = False
444        self.dataq.join()
445        (_,_,data) = telnet.expect(['match'])
446        self.assertEqual(data, ''.join(want[:-2]))
447
448
449def test_main(verbose=None):
450    test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
451                              ExpectTests)
452
453if __name__ == '__main__':
454    test_main()
455