1import base64
2import datetime
3import decimal
4import sys
5import time
6import unittest
7from unittest import mock
8import xmlrpc.client as xmlrpclib
9import xmlrpc.server
10import http.client
11import http, http.server
12import socket
13import re
14import io
15import contextlib
16from test import support
17
18try:
19    import gzip
20except ImportError:
21    gzip = None
22try:
23    import threading
24except ImportError:
25    threading = None
26
27alist = [{'astring': 'foo@bar.baz.spam',
28          'afloat': 7283.43,
29          'anint': 2**20,
30          'ashortlong': 2,
31          'anotherlist': ['.zyx.41'],
32          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
33          'b64bytes': b"my dog has fleas",
34          'b64bytearray': bytearray(b"my dog has fleas"),
35          'boolean': False,
36          'unicode': '\u4000\u6000\u8000',
37          'ukey\u4000': 'regular value',
38          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
39          'datetime2': xmlrpclib.DateTime(
40                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
41          'datetime3': xmlrpclib.DateTime(
42                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
43          }]
44
45class XMLRPCTestCase(unittest.TestCase):
46
47    def test_dump_load(self):
48        dump = xmlrpclib.dumps((alist,))
49        load = xmlrpclib.loads(dump)
50        self.assertEqual(alist, load[0][0])
51
52    def test_dump_bare_datetime(self):
53        # This checks that an unwrapped datetime.date object can be handled
54        # by the marshalling code.  This can't be done via test_dump_load()
55        # since with use_builtin_types set to 1 the unmarshaller would create
56        # datetime objects for the 'datetime[123]' keys as well
57        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
58        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
59        s = xmlrpclib.dumps((dt,))
60
61        result, m = xmlrpclib.loads(s, use_builtin_types=True)
62        (newdt,) = result
63        self.assertEqual(newdt, dt)
64        self.assertIs(type(newdt), datetime.datetime)
65        self.assertIsNone(m)
66
67        result, m = xmlrpclib.loads(s, use_builtin_types=False)
68        (newdt,) = result
69        self.assertEqual(newdt, dt)
70        self.assertIs(type(newdt), xmlrpclib.DateTime)
71        self.assertIsNone(m)
72
73        result, m = xmlrpclib.loads(s, use_datetime=True)
74        (newdt,) = result
75        self.assertEqual(newdt, dt)
76        self.assertIs(type(newdt), datetime.datetime)
77        self.assertIsNone(m)
78
79        result, m = xmlrpclib.loads(s, use_datetime=False)
80        (newdt,) = result
81        self.assertEqual(newdt, dt)
82        self.assertIs(type(newdt), xmlrpclib.DateTime)
83        self.assertIsNone(m)
84
85
86    def test_datetime_before_1900(self):
87        # same as before but with a date before 1900
88        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
89        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
90        s = xmlrpclib.dumps((dt,))
91
92        result, m = xmlrpclib.loads(s, use_builtin_types=True)
93        (newdt,) = result
94        self.assertEqual(newdt, dt)
95        self.assertIs(type(newdt), datetime.datetime)
96        self.assertIsNone(m)
97
98        result, m = xmlrpclib.loads(s, use_builtin_types=False)
99        (newdt,) = result
100        self.assertEqual(newdt, dt)
101        self.assertIs(type(newdt), xmlrpclib.DateTime)
102        self.assertIsNone(m)
103
104    def test_bug_1164912 (self):
105        d = xmlrpclib.DateTime()
106        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
107                                            methodresponse=True))
108        self.assertIsInstance(new_d.value, str)
109
110        # Check that the output of dumps() is still an 8-bit string
111        s = xmlrpclib.dumps((new_d,), methodresponse=True)
112        self.assertIsInstance(s, str)
113
114    def test_newstyle_class(self):
115        class T(object):
116            pass
117        t = T()
118        t.x = 100
119        t.y = "Hello"
120        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
121        self.assertEqual(t2, t.__dict__)
122
123    def test_dump_big_long(self):
124        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
125
126    def test_dump_bad_dict(self):
127        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
128
129    def test_dump_recursive_seq(self):
130        l = [1,2,3]
131        t = [3,4,5,l]
132        l.append(t)
133        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
134
135    def test_dump_recursive_dict(self):
136        d = {'1':1, '2':1}
137        t = {'3':3, 'd':d}
138        d['t'] = t
139        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
140
141    def test_dump_big_int(self):
142        if sys.maxsize > 2**31-1:
143            self.assertRaises(OverflowError, xmlrpclib.dumps,
144                              (int(2**34),))
145
146        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
147        self.assertRaises(OverflowError, xmlrpclib.dumps,
148                          (xmlrpclib.MAXINT+1,))
149        self.assertRaises(OverflowError, xmlrpclib.dumps,
150                          (xmlrpclib.MININT-1,))
151
152        def dummy_write(s):
153            pass
154
155        m = xmlrpclib.Marshaller()
156        m.dump_int(xmlrpclib.MAXINT, dummy_write)
157        m.dump_int(xmlrpclib.MININT, dummy_write)
158        self.assertRaises(OverflowError, m.dump_int,
159                          xmlrpclib.MAXINT+1, dummy_write)
160        self.assertRaises(OverflowError, m.dump_int,
161                          xmlrpclib.MININT-1, dummy_write)
162
163    def test_dump_double(self):
164        xmlrpclib.dumps((float(2 ** 34),))
165        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
166                         float(xmlrpclib.MININT)))
167        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
168                         float(xmlrpclib.MININT - 42)))
169
170        def dummy_write(s):
171            pass
172
173        m = xmlrpclib.Marshaller()
174        m.dump_double(xmlrpclib.MAXINT, dummy_write)
175        m.dump_double(xmlrpclib.MININT, dummy_write)
176        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
177        m.dump_double(xmlrpclib.MININT - 42, dummy_write)
178
179    def test_dump_none(self):
180        value = alist + [None]
181        arg1 = (alist + [None],)
182        strg = xmlrpclib.dumps(arg1, allow_none=True)
183        self.assertEqual(value,
184                          xmlrpclib.loads(strg)[0][0])
185        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
186
187    def test_dump_encoding(self):
188        value = {'key\u20ac\xa4':
189                 'value\u20ac\xa4'}
190        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
191        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
192        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
193        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
194        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
195
196        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
197                               methodresponse=True)
198        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
199        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
200        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
201
202        methodname = 'method\u20ac\xa4'
203        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
204                               methodname=methodname)
205        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
206        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
207
208    def test_dump_bytes(self):
209        sample = b"my dog has fleas"
210        self.assertEqual(sample, xmlrpclib.Binary(sample))
211        for type_ in bytes, bytearray, xmlrpclib.Binary:
212            value = type_(sample)
213            s = xmlrpclib.dumps((value,))
214
215            result, m = xmlrpclib.loads(s, use_builtin_types=True)
216            (newvalue,) = result
217            self.assertEqual(newvalue, sample)
218            self.assertIs(type(newvalue), bytes)
219            self.assertIsNone(m)
220
221            result, m = xmlrpclib.loads(s, use_builtin_types=False)
222            (newvalue,) = result
223            self.assertEqual(newvalue, sample)
224            self.assertIs(type(newvalue), xmlrpclib.Binary)
225            self.assertIsNone(m)
226
227    def test_loads_unsupported(self):
228        ResponseError = xmlrpclib.ResponseError
229        data = '<params><param><value><spam/></value></param></params>'
230        self.assertRaises(ResponseError, xmlrpclib.loads, data)
231        data = ('<params><param><value><array>'
232                '<value><spam/></value>'
233                '</array></value></param></params>')
234        self.assertRaises(ResponseError, xmlrpclib.loads, data)
235        data = ('<params><param><value><struct>'
236                '<member><name>a</name><value><spam/></value></member>'
237                '<member><name>b</name><value><spam/></value></member>'
238                '</struct></value></param></params>')
239        self.assertRaises(ResponseError, xmlrpclib.loads, data)
240
241    def check_loads(self, s, value, **kwargs):
242        dump = '<params><param><value>%s</value></param></params>' % s
243        result, m = xmlrpclib.loads(dump, **kwargs)
244        (newvalue,) = result
245        self.assertEqual(newvalue, value)
246        self.assertIs(type(newvalue), type(value))
247        self.assertIsNone(m)
248
249    def test_load_standard_types(self):
250        check = self.check_loads
251        check('string', 'string')
252        check('<string>string</string>', 'string')
253        check('<string>�������������� string</string>', '�������������� string')
254        check('<int>2056183947</int>', 2056183947)
255        check('<int>-2056183947</int>', -2056183947)
256        check('<i4>2056183947</i4>', 2056183947)
257        check('<double>46093.78125</double>', 46093.78125)
258        check('<boolean>0</boolean>', False)
259        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
260              xmlrpclib.Binary(b'\x00byte string\xff'))
261        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
262              b'\x00byte string\xff', use_builtin_types=True)
263        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
264              xmlrpclib.DateTime('20050210T11:41:23'))
265        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
266              datetime.datetime(2005, 2, 10, 11, 41, 23),
267              use_builtin_types=True)
268        check('<array><data>'
269              '<value><int>1</int></value><value><int>2</int></value>'
270              '</data></array>', [1, 2])
271        check('<struct>'
272              '<member><name>b</name><value><int>2</int></value></member>'
273              '<member><name>a</name><value><int>1</int></value></member>'
274              '</struct>', {'a': 1, 'b': 2})
275
276    def test_load_extension_types(self):
277        check = self.check_loads
278        check('<nil/>', None)
279        check('<ex:nil/>', None)
280        check('<i1>205</i1>', 205)
281        check('<i2>20561</i2>', 20561)
282        check('<i8>9876543210</i8>', 9876543210)
283        check('<biginteger>98765432100123456789</biginteger>',
284              98765432100123456789)
285        check('<float>93.78125</float>', 93.78125)
286        check('<bigdecimal>9876543210.0123456789</bigdecimal>',
287              decimal.Decimal('9876543210.0123456789'))
288
289    def test_get_host_info(self):
290        # see bug #3613, this raised a TypeError
291        transp = xmlrpc.client.Transport()
292        self.assertEqual(transp.get_host_info("user@host.tld"),
293                          ('host.tld',
294                           [('Authorization', 'Basic dXNlcg==')], {}))
295
296    def test_ssl_presence(self):
297        try:
298            import ssl
299        except ImportError:
300            has_ssl = False
301        else:
302            has_ssl = True
303        try:
304            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
305        except NotImplementedError:
306            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
307        except OSError:
308            self.assertTrue(has_ssl)
309
310    @unittest.skipUnless(threading, "Threading required for this test.")
311    def test_keepalive_disconnect(self):
312        class RequestHandler(http.server.BaseHTTPRequestHandler):
313            protocol_version = "HTTP/1.1"
314            handled = False
315
316            def do_POST(self):
317                length = int(self.headers.get("Content-Length"))
318                self.rfile.read(length)
319                if self.handled:
320                    self.close_connection = True
321                    return
322                response = xmlrpclib.dumps((5,), methodresponse=True)
323                response = response.encode()
324                self.send_response(http.HTTPStatus.OK)
325                self.send_header("Content-Length", len(response))
326                self.end_headers()
327                self.wfile.write(response)
328                self.handled = True
329                self.close_connection = False
330
331        def run_server():
332            server.socket.settimeout(float(1))  # Don't hang if client fails
333            server.handle_request()  # First request and attempt at second
334            server.handle_request()  # Retried second request
335
336        server = http.server.HTTPServer((support.HOST, 0), RequestHandler)
337        self.addCleanup(server.server_close)
338        thread = threading.Thread(target=run_server)
339        thread.start()
340        self.addCleanup(thread.join)
341        url = "http://{}:{}/".format(*server.server_address)
342        with xmlrpclib.ServerProxy(url) as p:
343            self.assertEqual(p.method(), 5)
344            self.assertEqual(p.method(), 5)
345
346class HelperTestCase(unittest.TestCase):
347    def test_escape(self):
348        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
349        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
350        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
351
352class FaultTestCase(unittest.TestCase):
353    def test_repr(self):
354        f = xmlrpclib.Fault(42, 'Test Fault')
355        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
356        self.assertEqual(repr(f), str(f))
357
358    def test_dump_fault(self):
359        f = xmlrpclib.Fault(42, 'Test Fault')
360        s = xmlrpclib.dumps((f,))
361        (newf,), m = xmlrpclib.loads(s)
362        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
363        self.assertEqual(m, None)
364
365        s = xmlrpclib.Marshaller().dumps(f)
366        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
367
368    def test_dotted_attribute(self):
369        # this will raise AttributeError because code don't want us to use
370        # private methods
371        self.assertRaises(AttributeError,
372                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
373        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
374
375class DateTimeTestCase(unittest.TestCase):
376    def test_default(self):
377        with mock.patch('time.localtime') as localtime_mock:
378            time_struct = time.struct_time(
379                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
380            localtime_mock.return_value = time_struct
381            localtime = time.localtime()
382            t = xmlrpclib.DateTime()
383            self.assertEqual(str(t),
384                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
385
386    def test_time(self):
387        d = 1181399930.036952
388        t = xmlrpclib.DateTime(d)
389        self.assertEqual(str(t),
390                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
391
392    def test_time_tuple(self):
393        d = (2007,6,9,10,38,50,5,160,0)
394        t = xmlrpclib.DateTime(d)
395        self.assertEqual(str(t), '20070609T10:38:50')
396
397    def test_time_struct(self):
398        d = time.localtime(1181399930.036952)
399        t = xmlrpclib.DateTime(d)
400        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
401
402    def test_datetime_datetime(self):
403        d = datetime.datetime(2007,1,2,3,4,5)
404        t = xmlrpclib.DateTime(d)
405        self.assertEqual(str(t), '20070102T03:04:05')
406
407    def test_repr(self):
408        d = datetime.datetime(2007,1,2,3,4,5)
409        t = xmlrpclib.DateTime(d)
410        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
411        self.assertEqual(repr(t), val)
412
413    def test_decode(self):
414        d = ' 20070908T07:11:13  '
415        t1 = xmlrpclib.DateTime()
416        t1.decode(d)
417        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
418        self.assertEqual(t1, tref)
419
420        t2 = xmlrpclib._datetime(d)
421        self.assertEqual(t2, tref)
422
423    def test_comparison(self):
424        now = datetime.datetime.now()
425        dtime = xmlrpclib.DateTime(now.timetuple())
426
427        # datetime vs. DateTime
428        self.assertTrue(dtime == now)
429        self.assertTrue(now == dtime)
430        then = now + datetime.timedelta(seconds=4)
431        self.assertTrue(then >= dtime)
432        self.assertTrue(dtime < then)
433
434        # str vs. DateTime
435        dstr = now.strftime("%Y%m%dT%H:%M:%S")
436        self.assertTrue(dtime == dstr)
437        self.assertTrue(dstr == dtime)
438        dtime_then = xmlrpclib.DateTime(then.timetuple())
439        self.assertTrue(dtime_then >= dstr)
440        self.assertTrue(dstr < dtime_then)
441
442        # some other types
443        dbytes = dstr.encode('ascii')
444        dtuple = now.timetuple()
445        with self.assertRaises(TypeError):
446            dtime == 1970
447        with self.assertRaises(TypeError):
448            dtime != dbytes
449        with self.assertRaises(TypeError):
450            dtime == bytearray(dbytes)
451        with self.assertRaises(TypeError):
452            dtime != dtuple
453        with self.assertRaises(TypeError):
454            dtime < float(1970)
455        with self.assertRaises(TypeError):
456            dtime > dbytes
457        with self.assertRaises(TypeError):
458            dtime <= bytearray(dbytes)
459        with self.assertRaises(TypeError):
460            dtime >= dtuple
461
462class BinaryTestCase(unittest.TestCase):
463
464    # XXX What should str(Binary(b"\xff")) return?  I'm chosing "\xff"
465    # for now (i.e. interpreting the binary data as Latin-1-encoded
466    # text).  But this feels very unsatisfactory.  Perhaps we should
467    # only define repr(), and return r"Binary(b'\xff')" instead?
468
469    def test_default(self):
470        t = xmlrpclib.Binary()
471        self.assertEqual(str(t), '')
472
473    def test_string(self):
474        d = b'\x01\x02\x03abc123\xff\xfe'
475        t = xmlrpclib.Binary(d)
476        self.assertEqual(str(t), str(d, "latin-1"))
477
478    def test_decode(self):
479        d = b'\x01\x02\x03abc123\xff\xfe'
480        de = base64.encodebytes(d)
481        t1 = xmlrpclib.Binary()
482        t1.decode(de)
483        self.assertEqual(str(t1), str(d, "latin-1"))
484
485        t2 = xmlrpclib._binary(de)
486        self.assertEqual(str(t2), str(d, "latin-1"))
487
488
489ADDR = PORT = URL = None
490
491# The evt is set twice.  First when the server is ready to serve.
492# Second when the server has been shutdown.  The user must clear
493# the event after it has been set the first time to catch the second set.
494def http_server(evt, numrequests, requestHandler=None, encoding=None):
495    class TestInstanceClass:
496        def div(self, x, y):
497            return x // y
498
499        def _methodHelp(self, name):
500            if name == 'div':
501                return 'This is the div function'
502
503        class Fixture:
504            @staticmethod
505            def getData():
506                return '42'
507
508    def my_function():
509        '''This is my function'''
510        return True
511
512    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
513        def get_request(self):
514            # Ensure the socket is always non-blocking.  On Linux, socket
515            # attributes are not inherited like they are on *BSD and Windows.
516            s, port = self.socket.accept()
517            s.setblocking(True)
518            return s, port
519
520    if not requestHandler:
521        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
522    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
523                          encoding=encoding,
524                          logRequests=False, bind_and_activate=False)
525    try:
526        serv.server_bind()
527        global ADDR, PORT, URL
528        ADDR, PORT = serv.socket.getsockname()
529        #connect to IP address directly.  This avoids socket.create_connection()
530        #trying to connect to "localhost" using all address families, which
531        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
532        #on AF_INET only.
533        URL = "http://%s:%d"%(ADDR, PORT)
534        serv.server_activate()
535        serv.register_introspection_functions()
536        serv.register_multicall_functions()
537        serv.register_function(pow)
538        serv.register_function(lambda x,y: x+y, 'add')
539        serv.register_function(lambda x: x, 'têšt')
540        serv.register_function(my_function)
541        testInstance = TestInstanceClass()
542        serv.register_instance(testInstance, allow_dotted_names=True)
543        evt.set()
544
545        # handle up to 'numrequests' requests
546        while numrequests > 0:
547            serv.handle_request()
548            numrequests -= 1
549
550    except socket.timeout:
551        pass
552    finally:
553        serv.socket.close()
554        PORT = None
555        evt.set()
556
557def http_multi_server(evt, numrequests, requestHandler=None):
558    class TestInstanceClass:
559        def div(self, x, y):
560            return x // y
561
562        def _methodHelp(self, name):
563            if name == 'div':
564                return 'This is the div function'
565
566    def my_function():
567        '''This is my function'''
568        return True
569
570    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
571        def get_request(self):
572            # Ensure the socket is always non-blocking.  On Linux, socket
573            # attributes are not inherited like they are on *BSD and Windows.
574            s, port = self.socket.accept()
575            s.setblocking(True)
576            return s, port
577
578    if not requestHandler:
579        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
580    class MyRequestHandler(requestHandler):
581        rpc_paths = []
582
583    class BrokenDispatcher:
584        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
585            raise RuntimeError("broken dispatcher")
586
587    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
588                          logRequests=False, bind_and_activate=False)
589    serv.socket.settimeout(3)
590    serv.server_bind()
591    try:
592        global ADDR, PORT, URL
593        ADDR, PORT = serv.socket.getsockname()
594        #connect to IP address directly.  This avoids socket.create_connection()
595        #trying to connect to "localhost" using all address families, which
596        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
597        #on AF_INET only.
598        URL = "http://%s:%d"%(ADDR, PORT)
599        serv.server_activate()
600        paths = ["/foo", "/foo/bar"]
601        for path in paths:
602            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
603            d.register_introspection_functions()
604            d.register_multicall_functions()
605        serv.get_dispatcher(paths[0]).register_function(pow)
606        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
607        serv.add_dispatcher("/is/broken", BrokenDispatcher())
608        evt.set()
609
610        # handle up to 'numrequests' requests
611        while numrequests > 0:
612            serv.handle_request()
613            numrequests -= 1
614
615    except socket.timeout:
616        pass
617    finally:
618        serv.socket.close()
619        PORT = None
620        evt.set()
621
622# This function prevents errors like:
623#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
624def is_unavailable_exception(e):
625    '''Returns True if the given ProtocolError is the product of a server-side
626       exception caused by the 'temporarily unavailable' response sometimes
627       given by operations on non-blocking sockets.'''
628
629    # sometimes we get a -1 error code and/or empty headers
630    try:
631        if e.errcode == -1 or e.headers is None:
632            return True
633        exc_mess = e.headers.get('X-exception')
634    except AttributeError:
635        # Ignore OSErrors here.
636        exc_mess = str(e)
637
638    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
639        return True
640
641def make_request_and_skipIf(condition, reason):
642    # If we skip the test, we have to make a request because
643    # the server created in setUp blocks expecting one to come in.
644    if not condition:
645        return lambda func: func
646    def decorator(func):
647        def make_request_and_skip(self):
648            try:
649                xmlrpclib.ServerProxy(URL).my_function()
650            except (xmlrpclib.ProtocolError, OSError) as e:
651                if not is_unavailable_exception(e):
652                    raise
653            raise unittest.SkipTest(reason)
654        return make_request_and_skip
655    return decorator
656
657@unittest.skipUnless(threading, 'Threading required for this test.')
658class BaseServerTestCase(unittest.TestCase):
659    requestHandler = None
660    request_count = 1
661    threadFunc = staticmethod(http_server)
662
663    def setUp(self):
664        # enable traceback reporting
665        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
666
667        self.evt = threading.Event()
668        # start server thread to handle requests
669        serv_args = (self.evt, self.request_count, self.requestHandler)
670        threading.Thread(target=self.threadFunc, args=serv_args).start()
671
672        # wait for the server to be ready
673        self.evt.wait()
674        self.evt.clear()
675
676    def tearDown(self):
677        # wait on the server thread to terminate
678        self.evt.wait()
679
680        # disable traceback reporting
681        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
682
683class SimpleServerTestCase(BaseServerTestCase):
684    def test_simple1(self):
685        try:
686            p = xmlrpclib.ServerProxy(URL)
687            self.assertEqual(p.pow(6,8), 6**8)
688        except (xmlrpclib.ProtocolError, OSError) as e:
689            # ignore failures due to non-blocking socket 'unavailable' errors
690            if not is_unavailable_exception(e):
691                # protocol error; provide additional information in test output
692                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
693
694    def test_nonascii(self):
695        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
696        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
697        try:
698            p = xmlrpclib.ServerProxy(URL)
699            self.assertEqual(p.add(start_string, end_string),
700                             start_string + end_string)
701        except (xmlrpclib.ProtocolError, OSError) as e:
702            # ignore failures due to non-blocking socket 'unavailable' errors
703            if not is_unavailable_exception(e):
704                # protocol error; provide additional information in test output
705                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
706
707    def test_client_encoding(self):
708        start_string = '\u20ac'
709        end_string = '\xa4'
710
711        try:
712            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
713            self.assertEqual(p.add(start_string, end_string),
714                             start_string + end_string)
715        except (xmlrpclib.ProtocolError, socket.error) as e:
716            # ignore failures due to non-blocking socket unavailable errors.
717            if not is_unavailable_exception(e):
718                # protocol error; provide additional information in test output
719                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
720
721    def test_nonascii_methodname(self):
722        try:
723            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
724            self.assertEqual(p.têšt(42), 42)
725        except (xmlrpclib.ProtocolError, socket.error) as e:
726            # ignore failures due to non-blocking socket unavailable errors.
727            if not is_unavailable_exception(e):
728                # protocol error; provide additional information in test output
729                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
730
731    # [ch] The test 404 is causing lots of false alarms.
732    def XXXtest_404(self):
733        # send POST with http.client, it should return 404 header and
734        # 'Not Found' message.
735        conn = httplib.client.HTTPConnection(ADDR, PORT)
736        conn.request('POST', '/this-is-not-valid')
737        response = conn.getresponse()
738        conn.close()
739
740        self.assertEqual(response.status, 404)
741        self.assertEqual(response.reason, 'Not Found')
742
743    def test_introspection1(self):
744        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
745                                'system.listMethods', 'system.methodHelp',
746                                'system.methodSignature', 'system.multicall',
747                                'Fixture'])
748        try:
749            p = xmlrpclib.ServerProxy(URL)
750            meth = p.system.listMethods()
751            self.assertEqual(set(meth), expected_methods)
752        except (xmlrpclib.ProtocolError, OSError) as e:
753            # ignore failures due to non-blocking socket 'unavailable' errors
754            if not is_unavailable_exception(e):
755                # protocol error; provide additional information in test output
756                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
757
758
759    def test_introspection2(self):
760        try:
761            # test _methodHelp()
762            p = xmlrpclib.ServerProxy(URL)
763            divhelp = p.system.methodHelp('div')
764            self.assertEqual(divhelp, 'This is the div function')
765        except (xmlrpclib.ProtocolError, OSError) as e:
766            # ignore failures due to non-blocking socket 'unavailable' errors
767            if not is_unavailable_exception(e):
768                # protocol error; provide additional information in test output
769                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
770
771    @make_request_and_skipIf(sys.flags.optimize >= 2,
772                     "Docstrings are omitted with -O2 and above")
773    def test_introspection3(self):
774        try:
775            # test native doc
776            p = xmlrpclib.ServerProxy(URL)
777            myfunction = p.system.methodHelp('my_function')
778            self.assertEqual(myfunction, 'This is my function')
779        except (xmlrpclib.ProtocolError, OSError) as e:
780            # ignore failures due to non-blocking socket 'unavailable' errors
781            if not is_unavailable_exception(e):
782                # protocol error; provide additional information in test output
783                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
784
785    def test_introspection4(self):
786        # the SimpleXMLRPCServer doesn't support signatures, but
787        # at least check that we can try making the call
788        try:
789            p = xmlrpclib.ServerProxy(URL)
790            divsig = p.system.methodSignature('div')
791            self.assertEqual(divsig, 'signatures not supported')
792        except (xmlrpclib.ProtocolError, OSError) as e:
793            # ignore failures due to non-blocking socket 'unavailable' errors
794            if not is_unavailable_exception(e):
795                # protocol error; provide additional information in test output
796                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
797
798    def test_multicall(self):
799        try:
800            p = xmlrpclib.ServerProxy(URL)
801            multicall = xmlrpclib.MultiCall(p)
802            multicall.add(2,3)
803            multicall.pow(6,8)
804            multicall.div(127,42)
805            add_result, pow_result, div_result = multicall()
806            self.assertEqual(add_result, 2+3)
807            self.assertEqual(pow_result, 6**8)
808            self.assertEqual(div_result, 127//42)
809        except (xmlrpclib.ProtocolError, OSError) as e:
810            # ignore failures due to non-blocking socket 'unavailable' errors
811            if not is_unavailable_exception(e):
812                # protocol error; provide additional information in test output
813                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
814
815    def test_non_existing_multicall(self):
816        try:
817            p = xmlrpclib.ServerProxy(URL)
818            multicall = xmlrpclib.MultiCall(p)
819            multicall.this_is_not_exists()
820            result = multicall()
821
822            # result.results contains;
823            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
824            #   'method "this_is_not_exists" is not supported'>}]
825
826            self.assertEqual(result.results[0]['faultCode'], 1)
827            self.assertEqual(result.results[0]['faultString'],
828                '<class \'Exception\'>:method "this_is_not_exists" '
829                'is not supported')
830        except (xmlrpclib.ProtocolError, OSError) as e:
831            # ignore failures due to non-blocking socket 'unavailable' errors
832            if not is_unavailable_exception(e):
833                # protocol error; provide additional information in test output
834                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
835
836    def test_dotted_attribute(self):
837        # Raises an AttributeError because private methods are not allowed.
838        self.assertRaises(AttributeError,
839                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
840
841        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
842        # Get the test to run faster by sending a request with test_simple1.
843        # This avoids waiting for the socket timeout.
844        self.test_simple1()
845
846    def test_allow_dotted_names_true(self):
847        # XXX also need allow_dotted_names_false test.
848        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
849        data = server.Fixture.getData()
850        self.assertEqual(data, '42')
851
852    def test_unicode_host(self):
853        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
854        self.assertEqual(server.add("a", "\xe9"), "a\xe9")
855
856    def test_partial_post(self):
857        # Check that a partial POST doesn't make the server loop: issue #14001.
858        conn = http.client.HTTPConnection(ADDR, PORT)
859        conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
860        conn.close()
861
862    def test_context_manager(self):
863        with xmlrpclib.ServerProxy(URL) as server:
864            server.add(2, 3)
865            self.assertNotEqual(server('transport')._connection,
866                                (None, None))
867        self.assertEqual(server('transport')._connection,
868                         (None, None))
869
870    def test_context_manager_method_error(self):
871        try:
872            with xmlrpclib.ServerProxy(URL) as server:
873                server.add(2, "a")
874        except xmlrpclib.Fault:
875            pass
876        self.assertEqual(server('transport')._connection,
877                         (None, None))
878
879
880class SimpleServerEncodingTestCase(BaseServerTestCase):
881    @staticmethod
882    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
883        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
884
885    def test_server_encoding(self):
886        start_string = '\u20ac'
887        end_string = '\xa4'
888
889        try:
890            p = xmlrpclib.ServerProxy(URL)
891            self.assertEqual(p.add(start_string, end_string),
892                             start_string + end_string)
893        except (xmlrpclib.ProtocolError, socket.error) as e:
894            # ignore failures due to non-blocking socket unavailable errors.
895            if not is_unavailable_exception(e):
896                # protocol error; provide additional information in test output
897                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
898
899
900class MultiPathServerTestCase(BaseServerTestCase):
901    threadFunc = staticmethod(http_multi_server)
902    request_count = 2
903    def test_path1(self):
904        p = xmlrpclib.ServerProxy(URL+"/foo")
905        self.assertEqual(p.pow(6,8), 6**8)
906        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
907
908    def test_path2(self):
909        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
910        self.assertEqual(p.add(6,8), 6+8)
911        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
912
913    def test_path3(self):
914        p = xmlrpclib.ServerProxy(URL+"/is/broken")
915        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
916
917#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
918#does indeed serve subsequent requests on the same connection
919class BaseKeepaliveServerTestCase(BaseServerTestCase):
920    #a request handler that supports keep-alive and logs requests into a
921    #class variable
922    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
923        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
924        protocol_version = 'HTTP/1.1'
925        myRequests = []
926        def handle(self):
927            self.myRequests.append([])
928            self.reqidx = len(self.myRequests)-1
929            return self.parentClass.handle(self)
930        def handle_one_request(self):
931            result = self.parentClass.handle_one_request(self)
932            self.myRequests[self.reqidx].append(self.raw_requestline)
933            return result
934
935    requestHandler = RequestHandler
936    def setUp(self):
937        #clear request log
938        self.RequestHandler.myRequests = []
939        return BaseServerTestCase.setUp(self)
940
941#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
942#does indeed serve subsequent requests on the same connection
943class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
944    def test_two(self):
945        p = xmlrpclib.ServerProxy(URL)
946        #do three requests.
947        self.assertEqual(p.pow(6,8), 6**8)
948        self.assertEqual(p.pow(6,8), 6**8)
949        self.assertEqual(p.pow(6,8), 6**8)
950        p("close")()
951
952        #they should have all been handled by a single request handler
953        self.assertEqual(len(self.RequestHandler.myRequests), 1)
954
955        #check that we did at least two (the third may be pending append
956        #due to thread scheduling)
957        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
958
959
960#test special attribute access on the serverproxy, through the __call__
961#function.
962class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
963    #ask for two keepalive requests to be handled.
964    request_count=2
965
966    def test_close(self):
967        p = xmlrpclib.ServerProxy(URL)
968        #do some requests with close.
969        self.assertEqual(p.pow(6,8), 6**8)
970        self.assertEqual(p.pow(6,8), 6**8)
971        self.assertEqual(p.pow(6,8), 6**8)
972        p("close")() #this should trigger a new keep-alive request
973        self.assertEqual(p.pow(6,8), 6**8)
974        self.assertEqual(p.pow(6,8), 6**8)
975        self.assertEqual(p.pow(6,8), 6**8)
976        p("close")()
977
978        #they should have all been two request handlers, each having logged at least
979        #two complete requests
980        self.assertEqual(len(self.RequestHandler.myRequests), 2)
981        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
982        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
983
984
985    def test_transport(self):
986        p = xmlrpclib.ServerProxy(URL)
987        #do some requests with close.
988        self.assertEqual(p.pow(6,8), 6**8)
989        p("transport").close() #same as above, really.
990        self.assertEqual(p.pow(6,8), 6**8)
991        p("close")()
992        self.assertEqual(len(self.RequestHandler.myRequests), 2)
993
994#A test case that verifies that gzip encoding works in both directions
995#(for a request and the response)
996@unittest.skipIf(gzip is None, 'requires gzip')
997class GzipServerTestCase(BaseServerTestCase):
998    #a request handler that supports keep-alive and logs requests into a
999    #class variable
1000    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1001        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1002        protocol_version = 'HTTP/1.1'
1003
1004        def do_POST(self):
1005            #store content of last request in class
1006            self.__class__.content_length = int(self.headers["content-length"])
1007            return self.parentClass.do_POST(self)
1008    requestHandler = RequestHandler
1009
1010    class Transport(xmlrpclib.Transport):
1011        #custom transport, stores the response length for our perusal
1012        fake_gzip = False
1013        def parse_response(self, response):
1014            self.response_length=int(response.getheader("content-length", 0))
1015            return xmlrpclib.Transport.parse_response(self, response)
1016
1017        def send_content(self, connection, body):
1018            if self.fake_gzip:
1019                #add a lone gzip header to induce decode error remotely
1020                connection.putheader("Content-Encoding", "gzip")
1021            return xmlrpclib.Transport.send_content(self, connection, body)
1022
1023    def setUp(self):
1024        BaseServerTestCase.setUp(self)
1025
1026    def test_gzip_request(self):
1027        t = self.Transport()
1028        t.encode_threshold = None
1029        p = xmlrpclib.ServerProxy(URL, transport=t)
1030        self.assertEqual(p.pow(6,8), 6**8)
1031        a = self.RequestHandler.content_length
1032        t.encode_threshold = 0 #turn on request encoding
1033        self.assertEqual(p.pow(6,8), 6**8)
1034        b = self.RequestHandler.content_length
1035        self.assertTrue(a>b)
1036        p("close")()
1037
1038    def test_bad_gzip_request(self):
1039        t = self.Transport()
1040        t.encode_threshold = None
1041        t.fake_gzip = True
1042        p = xmlrpclib.ServerProxy(URL, transport=t)
1043        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1044                                    re.compile(r"\b400\b"))
1045        with cm:
1046            p.pow(6, 8)
1047        p("close")()
1048
1049    def test_gzip_response(self):
1050        t = self.Transport()
1051        p = xmlrpclib.ServerProxy(URL, transport=t)
1052        old = self.requestHandler.encode_threshold
1053        self.requestHandler.encode_threshold = None #no encoding
1054        self.assertEqual(p.pow(6,8), 6**8)
1055        a = t.response_length
1056        self.requestHandler.encode_threshold = 0 #always encode
1057        self.assertEqual(p.pow(6,8), 6**8)
1058        p("close")()
1059        b = t.response_length
1060        self.requestHandler.encode_threshold = old
1061        self.assertTrue(a>b)
1062
1063
1064@unittest.skipIf(gzip is None, 'requires gzip')
1065class GzipUtilTestCase(unittest.TestCase):
1066
1067    def test_gzip_decode_limit(self):
1068        max_gzip_decode = 20 * 1024 * 1024
1069        data = b'\0' * max_gzip_decode
1070        encoded = xmlrpclib.gzip_encode(data)
1071        decoded = xmlrpclib.gzip_decode(encoded)
1072        self.assertEqual(len(decoded), max_gzip_decode)
1073
1074        data = b'\0' * (max_gzip_decode + 1)
1075        encoded = xmlrpclib.gzip_encode(data)
1076
1077        with self.assertRaisesRegex(ValueError,
1078                                    "max gzipped payload length exceeded"):
1079            xmlrpclib.gzip_decode(encoded)
1080
1081        xmlrpclib.gzip_decode(encoded, max_decode=-1)
1082
1083
1084#Test special attributes of the ServerProxy object
1085class ServerProxyTestCase(unittest.TestCase):
1086    def setUp(self):
1087        unittest.TestCase.setUp(self)
1088        if threading:
1089            self.url = URL
1090        else:
1091            # Without threading, http_server() and http_multi_server() will not
1092            # be executed and URL is still equal to None. 'http://' is a just
1093            # enough to choose the scheme (HTTP)
1094            self.url = 'http://'
1095
1096    def test_close(self):
1097        p = xmlrpclib.ServerProxy(self.url)
1098        self.assertEqual(p('close')(), None)
1099
1100    def test_transport(self):
1101        t = xmlrpclib.Transport()
1102        p = xmlrpclib.ServerProxy(self.url, transport=t)
1103        self.assertEqual(p('transport'), t)
1104
1105
1106# This is a contrived way to make a failure occur on the server side
1107# in order to test the _send_traceback_header flag on the server
1108class FailingMessageClass(http.client.HTTPMessage):
1109    def get(self, key, failobj=None):
1110        key = key.lower()
1111        if key == 'content-length':
1112            return 'I am broken'
1113        return super().get(key, failobj)
1114
1115
1116@unittest.skipUnless(threading, 'Threading required for this test.')
1117class FailingServerTestCase(unittest.TestCase):
1118    def setUp(self):
1119        self.evt = threading.Event()
1120        # start server thread to handle requests
1121        serv_args = (self.evt, 1)
1122        threading.Thread(target=http_server, args=serv_args).start()
1123
1124        # wait for the server to be ready
1125        self.evt.wait()
1126        self.evt.clear()
1127
1128    def tearDown(self):
1129        # wait on the server thread to terminate
1130        self.evt.wait()
1131        # reset flag
1132        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1133        # reset message class
1134        default_class = http.client.HTTPMessage
1135        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1136
1137    def test_basic(self):
1138        # check that flag is false by default
1139        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1140        self.assertEqual(flagval, False)
1141
1142        # enable traceback reporting
1143        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1144
1145        # test a call that shouldn't fail just as a smoke test
1146        try:
1147            p = xmlrpclib.ServerProxy(URL)
1148            self.assertEqual(p.pow(6,8), 6**8)
1149        except (xmlrpclib.ProtocolError, OSError) as e:
1150            # ignore failures due to non-blocking socket 'unavailable' errors
1151            if not is_unavailable_exception(e):
1152                # protocol error; provide additional information in test output
1153                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1154
1155    def test_fail_no_info(self):
1156        # use the broken message class
1157        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1158
1159        try:
1160            p = xmlrpclib.ServerProxy(URL)
1161            p.pow(6,8)
1162        except (xmlrpclib.ProtocolError, OSError) as e:
1163            # ignore failures due to non-blocking socket 'unavailable' errors
1164            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1165                # The two server-side error headers shouldn't be sent back in this case
1166                self.assertTrue(e.headers.get("X-exception") is None)
1167                self.assertTrue(e.headers.get("X-traceback") is None)
1168        else:
1169            self.fail('ProtocolError not raised')
1170
1171    def test_fail_with_info(self):
1172        # use the broken message class
1173        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1174
1175        # Check that errors in the server send back exception/traceback
1176        # info when flag is set
1177        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1178
1179        try:
1180            p = xmlrpclib.ServerProxy(URL)
1181            p.pow(6,8)
1182        except (xmlrpclib.ProtocolError, OSError) as e:
1183            # ignore failures due to non-blocking socket 'unavailable' errors
1184            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1185                # We should get error info in the response
1186                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1187                self.assertEqual(e.headers.get("X-exception"), expected_err)
1188                self.assertTrue(e.headers.get("X-traceback") is not None)
1189        else:
1190            self.fail('ProtocolError not raised')
1191
1192
1193@contextlib.contextmanager
1194def captured_stdout(encoding='utf-8'):
1195    """A variation on support.captured_stdout() which gives a text stream
1196    having a `buffer` attribute.
1197    """
1198    orig_stdout = sys.stdout
1199    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1200    try:
1201        yield sys.stdout
1202    finally:
1203        sys.stdout = orig_stdout
1204
1205
1206class CGIHandlerTestCase(unittest.TestCase):
1207    def setUp(self):
1208        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1209
1210    def tearDown(self):
1211        self.cgi = None
1212
1213    def test_cgi_get(self):
1214        with support.EnvironmentVarGuard() as env:
1215            env['REQUEST_METHOD'] = 'GET'
1216            # if the method is GET and no request_text is given, it runs handle_get
1217            # get sysout output
1218            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1219                self.cgi.handle_request()
1220
1221            # parse Status header
1222            data_out.seek(0)
1223            handle = data_out.read()
1224            status = handle.split()[1]
1225            message = ' '.join(handle.split()[2:4])
1226
1227            self.assertEqual(status, '400')
1228            self.assertEqual(message, 'Bad Request')
1229
1230
1231    def test_cgi_xmlrpc_response(self):
1232        data = """<?xml version='1.0'?>
1233        <methodCall>
1234            <methodName>test_method</methodName>
1235            <params>
1236                <param>
1237                    <value><string>foo</string></value>
1238                </param>
1239                <param>
1240                    <value><string>bar</string></value>
1241                </param>
1242            </params>
1243        </methodCall>
1244        """
1245
1246        with support.EnvironmentVarGuard() as env, \
1247             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1248             support.captured_stdin() as data_in:
1249            data_in.write(data)
1250            data_in.seek(0)
1251            env['CONTENT_LENGTH'] = str(len(data))
1252            self.cgi.handle_request()
1253        data_out.seek(0)
1254
1255        # will respond exception, if so, our goal is achieved ;)
1256        handle = data_out.read()
1257
1258        # start with 44th char so as not to get http header, we just
1259        # need only xml
1260        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1261
1262        # Also test the content-length returned  by handle_request
1263        # Using the same test method inorder to avoid all the datapassing
1264        # boilerplate code.
1265        # Test for bug: http://bugs.python.org/issue5040
1266
1267        content = handle[handle.find("<?xml"):]
1268
1269        self.assertEqual(
1270            int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1271            len(content))
1272
1273
1274class UseBuiltinTypesTestCase(unittest.TestCase):
1275
1276    def test_use_builtin_types(self):
1277        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1278        # makes all dispatch of binary data as bytes instances, and all
1279        # dispatch of datetime argument as datetime.datetime instances.
1280        self.log = []
1281        expected_bytes = b"my dog has fleas"
1282        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1283        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1284        def foobar(*args):
1285            self.log.extend(args)
1286        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1287            allow_none=True, encoding=None, use_builtin_types=True)
1288        handler.register_function(foobar)
1289        handler._marshaled_dispatch(marshaled)
1290        self.assertEqual(len(self.log), 2)
1291        mybytes, mydate = self.log
1292        self.assertEqual(self.log, [expected_bytes, expected_date])
1293        self.assertIs(type(mydate), datetime.datetime)
1294        self.assertIs(type(mybytes), bytes)
1295
1296    def test_cgihandler_has_use_builtin_types_flag(self):
1297        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1298        self.assertTrue(handler.use_builtin_types)
1299
1300    def test_xmlrpcserver_has_use_builtin_types_flag(self):
1301        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1302            use_builtin_types=True)
1303        server.server_close()
1304        self.assertTrue(server.use_builtin_types)
1305
1306
1307@support.reap_threads
1308def test_main():
1309    support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1310            BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase,
1311            SimpleServerTestCase, SimpleServerEncodingTestCase,
1312            KeepaliveServerTestCase1, KeepaliveServerTestCase2,
1313            GzipServerTestCase, GzipUtilTestCase,
1314            MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
1315            CGIHandlerTestCase)
1316
1317
1318if __name__ == "__main__":
1319    test_main()
1320