1import httplib
2import array
3import httplib
4import StringIO
5import socket
6import errno
7
8import unittest
9TestCase = unittest.TestCase
10
11from test import test_support
12
13HOST = test_support.HOST
14
15class FakeSocket:
16    def __init__(self, text, fileclass=StringIO.StringIO):
17        self.text = text
18        self.fileclass = fileclass
19        self.data = ''
20
21    def sendall(self, data):
22        self.data += ''.join(data)
23
24    def makefile(self, mode, bufsize=None):
25        if mode != 'r' and mode != 'rb':
26            raise httplib.UnimplementedFileMode()
27        return self.fileclass(self.text)
28
29class EPipeSocket(FakeSocket):
30
31    def __init__(self, text, pipe_trigger):
32        # When sendall() is called with pipe_trigger, raise EPIPE.
33        FakeSocket.__init__(self, text)
34        self.pipe_trigger = pipe_trigger
35
36    def sendall(self, data):
37        if self.pipe_trigger in data:
38            raise socket.error(errno.EPIPE, "gotcha")
39        self.data += data
40
41    def close(self):
42        pass
43
44class NoEOFStringIO(StringIO.StringIO):
45    """Like StringIO, but raises AssertionError on EOF.
46
47    This is used below to test that httplib doesn't try to read
48    more from the underlying file than it should.
49    """
50    def read(self, n=-1):
51        data = StringIO.StringIO.read(self, n)
52        if data == '':
53            raise AssertionError('caller tried to read past EOF')
54        return data
55
56    def readline(self, length=None):
57        data = StringIO.StringIO.readline(self, length)
58        if data == '':
59            raise AssertionError('caller tried to read past EOF')
60        return data
61
62
63class HeaderTests(TestCase):
64    def test_auto_headers(self):
65        # Some headers are added automatically, but should not be added by
66        # .request() if they are explicitly set.
67
68        class HeaderCountingBuffer(list):
69            def __init__(self):
70                self.count = {}
71            def append(self, item):
72                kv = item.split(':')
73                if len(kv) > 1:
74                    # item is a 'Key: Value' header string
75                    lcKey = kv[0].lower()
76                    self.count.setdefault(lcKey, 0)
77                    self.count[lcKey] += 1
78                list.append(self, item)
79
80        for explicit_header in True, False:
81            for header in 'Content-length', 'Host', 'Accept-encoding':
82                conn = httplib.HTTPConnection('example.com')
83                conn.sock = FakeSocket('blahblahblah')
84                conn._buffer = HeaderCountingBuffer()
85
86                body = 'spamspamspam'
87                headers = {}
88                if explicit_header:
89                    headers[header] = str(len(body))
90                conn.request('POST', '/', body, headers)
91                self.assertEqual(conn._buffer.count[header.lower()], 1)
92
93    def test_content_length_0(self):
94
95        class ContentLengthChecker(list):
96            def __init__(self):
97                list.__init__(self)
98                self.content_length = None
99            def append(self, item):
100                kv = item.split(':', 1)
101                if len(kv) > 1 and kv[0].lower() == 'content-length':
102                    self.content_length = kv[1].strip()
103                list.append(self, item)
104
105        # POST with empty body
106        conn = httplib.HTTPConnection('example.com')
107        conn.sock = FakeSocket(None)
108        conn._buffer = ContentLengthChecker()
109        conn.request('POST', '/', '')
110        self.assertEqual(conn._buffer.content_length, '0',
111                        'Header Content-Length not set')
112
113        # PUT request with empty body
114        conn = httplib.HTTPConnection('example.com')
115        conn.sock = FakeSocket(None)
116        conn._buffer = ContentLengthChecker()
117        conn.request('PUT', '/', '')
118        self.assertEqual(conn._buffer.content_length, '0',
119                        'Header Content-Length not set')
120
121    def test_putheader(self):
122        conn = httplib.HTTPConnection('example.com')
123        conn.sock = FakeSocket(None)
124        conn.putrequest('GET','/')
125        conn.putheader('Content-length',42)
126        self.assertTrue('Content-length: 42' in conn._buffer)
127
128    def test_ipv6host_header(self):
129        # Default host header on IPv6 transaction should wrapped by [] if
130        # its actual IPv6 address
131        expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
132                   'Accept-Encoding: identity\r\n\r\n'
133        conn = httplib.HTTPConnection('[2001::]:81')
134        sock = FakeSocket('')
135        conn.sock = sock
136        conn.request('GET', '/foo')
137        self.assertTrue(sock.data.startswith(expected))
138
139        expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
140                   'Accept-Encoding: identity\r\n\r\n'
141        conn = httplib.HTTPConnection('[2001:102A::]')
142        sock = FakeSocket('')
143        conn.sock = sock
144        conn.request('GET', '/foo')
145        self.assertTrue(sock.data.startswith(expected))
146
147
148class BasicTest(TestCase):
149    def test_status_lines(self):
150        # Test HTTP status lines
151
152        body = "HTTP/1.1 200 Ok\r\n\r\nText"
153        sock = FakeSocket(body)
154        resp = httplib.HTTPResponse(sock)
155        resp.begin()
156        self.assertEqual(resp.read(), 'Text')
157        self.assertTrue(resp.isclosed())
158
159        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
160        sock = FakeSocket(body)
161        resp = httplib.HTTPResponse(sock)
162        self.assertRaises(httplib.BadStatusLine, resp.begin)
163
164    def test_bad_status_repr(self):
165        exc = httplib.BadStatusLine('')
166        self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
167
168    def test_partial_reads(self):
169        # if we have a length, the system knows when to close itself
170        # same behaviour than when we read the whole thing with read()
171        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
172        sock = FakeSocket(body)
173        resp = httplib.HTTPResponse(sock)
174        resp.begin()
175        self.assertEqual(resp.read(2), 'Te')
176        self.assertFalse(resp.isclosed())
177        self.assertEqual(resp.read(2), 'xt')
178        self.assertTrue(resp.isclosed())
179
180    def test_partial_reads_no_content_length(self):
181        # when no length is present, the socket should be gracefully closed when
182        # all data was read
183        body = "HTTP/1.1 200 Ok\r\n\r\nText"
184        sock = FakeSocket(body)
185        resp = httplib.HTTPResponse(sock)
186        resp.begin()
187        self.assertEqual(resp.read(2), 'Te')
188        self.assertFalse(resp.isclosed())
189        self.assertEqual(resp.read(2), 'xt')
190        self.assertEqual(resp.read(1), '')
191        self.assertTrue(resp.isclosed())
192
193    def test_partial_reads_incomplete_body(self):
194        # if the server shuts down the connection before the whole
195        # content-length is delivered, the socket is gracefully closed
196        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
197        sock = FakeSocket(body)
198        resp = httplib.HTTPResponse(sock)
199        resp.begin()
200        self.assertEqual(resp.read(2), 'Te')
201        self.assertFalse(resp.isclosed())
202        self.assertEqual(resp.read(2), 'xt')
203        self.assertEqual(resp.read(1), '')
204        self.assertTrue(resp.isclosed())
205
206    def test_host_port(self):
207        # Check invalid host_port
208
209        # Note that httplib does not accept user:password@ in the host-port.
210        for hp in ("www.python.org:abc", "user:password@www.python.org"):
211            self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp)
212
213        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b",
214                          8000),
215                         ("www.python.org:80", "www.python.org", 80),
216                         ("www.python.org", "www.python.org", 80),
217                         ("www.python.org:", "www.python.org", 80),
218                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
219            http = httplib.HTTP(hp)
220            c = http._conn
221            if h != c.host:
222                self.fail("Host incorrectly parsed: %s != %s" % (h, c.host))
223            if p != c.port:
224                self.fail("Port incorrectly parsed: %s != %s" % (p, c.host))
225
226    def test_response_headers(self):
227        # test response with multiple message headers with the same field name.
228        text = ('HTTP/1.1 200 OK\r\n'
229                'Set-Cookie: Customer="WILE_E_COYOTE";'
230                ' Version="1"; Path="/acme"\r\n'
231                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
232                ' Path="/acme"\r\n'
233                '\r\n'
234                'No body\r\n')
235        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
236               ', '
237               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
238        s = FakeSocket(text)
239        r = httplib.HTTPResponse(s)
240        r.begin()
241        cookies = r.getheader("Set-Cookie")
242        if cookies != hdr:
243            self.fail("multiple headers not combined properly")
244
245    def test_read_head(self):
246        # Test that the library doesn't attempt to read any data
247        # from a HEAD request.  (Tickles SF bug #622042.)
248        sock = FakeSocket(
249            'HTTP/1.1 200 OK\r\n'
250            'Content-Length: 14432\r\n'
251            '\r\n',
252            NoEOFStringIO)
253        resp = httplib.HTTPResponse(sock, method="HEAD")
254        resp.begin()
255        if resp.read() != "":
256            self.fail("Did not expect response from HEAD request")
257
258    def test_send_file(self):
259        expected = 'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
260                   'Accept-Encoding: identity\r\nContent-Length:'
261
262        body = open(__file__, 'rb')
263        conn = httplib.HTTPConnection('example.com')
264        sock = FakeSocket(body)
265        conn.sock = sock
266        conn.request('GET', '/foo', body)
267        self.assertTrue(sock.data.startswith(expected))
268
269    def test_send(self):
270        expected = 'this is a test this is only a test'
271        conn = httplib.HTTPConnection('example.com')
272        sock = FakeSocket(None)
273        conn.sock = sock
274        conn.send(expected)
275        self.assertEqual(expected, sock.data)
276        sock.data = ''
277        conn.send(array.array('c', expected))
278        self.assertEqual(expected, sock.data)
279        sock.data = ''
280        conn.send(StringIO.StringIO(expected))
281        self.assertEqual(expected, sock.data)
282
283    def test_chunked(self):
284        chunked_start = (
285            'HTTP/1.1 200 OK\r\n'
286            'Transfer-Encoding: chunked\r\n\r\n'
287            'a\r\n'
288            'hello worl\r\n'
289            '1\r\n'
290            'd\r\n'
291        )
292        sock = FakeSocket(chunked_start + '0\r\n')
293        resp = httplib.HTTPResponse(sock, method="GET")
294        resp.begin()
295        self.assertEqual(resp.read(), 'hello world')
296        resp.close()
297
298        for x in ('', 'foo\r\n'):
299            sock = FakeSocket(chunked_start + x)
300            resp = httplib.HTTPResponse(sock, method="GET")
301            resp.begin()
302            try:
303                resp.read()
304            except httplib.IncompleteRead, i:
305                self.assertEqual(i.partial, 'hello world')
306                self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
307                self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
308            else:
309                self.fail('IncompleteRead expected')
310            finally:
311                resp.close()
312
313    def test_chunked_head(self):
314        chunked_start = (
315            'HTTP/1.1 200 OK\r\n'
316            'Transfer-Encoding: chunked\r\n\r\n'
317            'a\r\n'
318            'hello world\r\n'
319            '1\r\n'
320            'd\r\n'
321        )
322        sock = FakeSocket(chunked_start + '0\r\n')
323        resp = httplib.HTTPResponse(sock, method="HEAD")
324        resp.begin()
325        self.assertEqual(resp.read(), '')
326        self.assertEqual(resp.status, 200)
327        self.assertEqual(resp.reason, 'OK')
328        self.assertTrue(resp.isclosed())
329
330    def test_negative_content_length(self):
331        sock = FakeSocket('HTTP/1.1 200 OK\r\n'
332                          'Content-Length: -1\r\n\r\nHello\r\n')
333        resp = httplib.HTTPResponse(sock, method="GET")
334        resp.begin()
335        self.assertEqual(resp.read(), 'Hello\r\n')
336        self.assertTrue(resp.isclosed())
337
338    def test_incomplete_read(self):
339        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
340        resp = httplib.HTTPResponse(sock, method="GET")
341        resp.begin()
342        try:
343            resp.read()
344        except httplib.IncompleteRead as i:
345            self.assertEqual(i.partial, 'Hello\r\n')
346            self.assertEqual(repr(i),
347                             "IncompleteRead(7 bytes read, 3 more expected)")
348            self.assertEqual(str(i),
349                             "IncompleteRead(7 bytes read, 3 more expected)")
350            self.assertTrue(resp.isclosed())
351        else:
352            self.fail('IncompleteRead expected')
353
354    def test_epipe(self):
355        sock = EPipeSocket(
356            "HTTP/1.0 401 Authorization Required\r\n"
357            "Content-type: text/html\r\n"
358            "WWW-Authenticate: Basic realm=\"example\"\r\n",
359            b"Content-Length")
360        conn = httplib.HTTPConnection("example.com")
361        conn.sock = sock
362        self.assertRaises(socket.error,
363                          lambda: conn.request("PUT", "/url", "body"))
364        resp = conn.getresponse()
365        self.assertEqual(401, resp.status)
366        self.assertEqual("Basic realm=\"example\"",
367                         resp.getheader("www-authenticate"))
368
369    def test_filenoattr(self):
370        # Just test the fileno attribute in the HTTPResponse Object.
371        body = "HTTP/1.1 200 Ok\r\n\r\nText"
372        sock = FakeSocket(body)
373        resp = httplib.HTTPResponse(sock)
374        self.assertTrue(hasattr(resp,'fileno'),
375                'HTTPResponse should expose a fileno attribute')
376
377    # Test lines overflowing the max line size (_MAXLINE in http.client)
378
379    def test_overflowing_status_line(self):
380        self.skipTest("disabled for HTTP 0.9 support")
381        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
382        resp = httplib.HTTPResponse(FakeSocket(body))
383        self.assertRaises((httplib.LineTooLong, httplib.BadStatusLine), resp.begin)
384
385    def test_overflowing_header_line(self):
386        body = (
387            'HTTP/1.1 200 OK\r\n'
388            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
389        )
390        resp = httplib.HTTPResponse(FakeSocket(body))
391        self.assertRaises(httplib.LineTooLong, resp.begin)
392
393    def test_overflowing_chunked_line(self):
394        body = (
395            'HTTP/1.1 200 OK\r\n'
396            'Transfer-Encoding: chunked\r\n\r\n'
397            + '0' * 65536 + 'a\r\n'
398            'hello world\r\n'
399            '0\r\n'
400        )
401        resp = httplib.HTTPResponse(FakeSocket(body))
402        resp.begin()
403        self.assertRaises(httplib.LineTooLong, resp.read)
404
405    def test_early_eof(self):
406        # Test httpresponse with no \r\n termination,
407        body = "HTTP/1.1 200 Ok"
408        sock = FakeSocket(body)
409        resp = httplib.HTTPResponse(sock)
410        resp.begin()
411        self.assertEqual(resp.read(), '')
412        self.assertTrue(resp.isclosed())
413
414class OfflineTest(TestCase):
415    def test_responses(self):
416        self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
417
418
419class SourceAddressTest(TestCase):
420    def setUp(self):
421        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
422        self.port = test_support.bind_port(self.serv)
423        self.source_port = test_support.find_unused_port()
424        self.serv.listen(5)
425        self.conn = None
426
427    def tearDown(self):
428        if self.conn:
429            self.conn.close()
430            self.conn = None
431        self.serv.close()
432        self.serv = None
433
434    def testHTTPConnectionSourceAddress(self):
435        self.conn = httplib.HTTPConnection(HOST, self.port,
436                source_address=('', self.source_port))
437        self.conn.connect()
438        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
439
440    @unittest.skipIf(not hasattr(httplib, 'HTTPSConnection'),
441                     'httplib.HTTPSConnection not defined')
442    def testHTTPSConnectionSourceAddress(self):
443        self.conn = httplib.HTTPSConnection(HOST, self.port,
444                source_address=('', self.source_port))
445        # We don't test anything here other the constructor not barfing as
446        # this code doesn't deal with setting up an active running SSL server
447        # for an ssl_wrapped connect() to actually return from.
448
449
450class TimeoutTest(TestCase):
451    PORT = None
452
453    def setUp(self):
454        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
455        TimeoutTest.PORT = test_support.bind_port(self.serv)
456        self.serv.listen(5)
457
458    def tearDown(self):
459        self.serv.close()
460        self.serv = None
461
462    def testTimeoutAttribute(self):
463        '''This will prove that the timeout gets through
464        HTTPConnection and into the socket.
465        '''
466        # default -- use global socket timeout
467        self.assertTrue(socket.getdefaulttimeout() is None)
468        socket.setdefaulttimeout(30)
469        try:
470            httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT)
471            httpConn.connect()
472        finally:
473            socket.setdefaulttimeout(None)
474        self.assertEqual(httpConn.sock.gettimeout(), 30)
475        httpConn.close()
476
477        # no timeout -- do not use global socket default
478        self.assertTrue(socket.getdefaulttimeout() is None)
479        socket.setdefaulttimeout(30)
480        try:
481            httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT,
482                                              timeout=None)
483            httpConn.connect()
484        finally:
485            socket.setdefaulttimeout(None)
486        self.assertEqual(httpConn.sock.gettimeout(), None)
487        httpConn.close()
488
489        # a value
490        httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
491        httpConn.connect()
492        self.assertEqual(httpConn.sock.gettimeout(), 30)
493        httpConn.close()
494
495
496class HTTPSTimeoutTest(TestCase):
497# XXX Here should be tests for HTTPS, there isn't any right now!
498
499    def test_attributes(self):
500        # simple test to check it's storing it
501        if hasattr(httplib, 'HTTPSConnection'):
502            h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
503            self.assertEqual(h.timeout, 30)
504
505    @unittest.skipIf(not hasattr(httplib, 'HTTPS'), 'httplib.HTTPS not available')
506    def test_host_port(self):
507        # Check invalid host_port
508
509        # Note that httplib does not accept user:password@ in the host-port.
510        for hp in ("www.python.org:abc", "user:password@www.python.org"):
511            self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp)
512
513        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b",
514                          8000),
515                         ("pypi.python.org:443", "pypi.python.org", 443),
516                         ("pypi.python.org", "pypi.python.org", 443),
517                         ("pypi.python.org:", "pypi.python.org", 443),
518                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443)):
519            http = httplib.HTTPS(hp)
520            c = http._conn
521            if h != c.host:
522                self.fail("Host incorrectly parsed: %s != %s" % (h, c.host))
523            if p != c.port:
524                self.fail("Port incorrectly parsed: %s != %s" % (p, c.host))
525
526
527def test_main(verbose=None):
528    test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
529                              HTTPSTimeoutTest, SourceAddressTest)
530
531if __name__ == '__main__':
532    test_main()
533