1from __future__ import absolute_import, division, unicode_literals
2
3from . import support  # flake8: noqa
4import unittest
5import codecs
6from io import BytesIO
7
8from six.moves import http_client
9
10from html5lib.inputstream import (BufferedStream, HTMLInputStream,
11                                  HTMLUnicodeInputStream, HTMLBinaryInputStream)
12
13class BufferedStreamTest(unittest.TestCase):
14    def test_basic(self):
15        s = b"abc"
16        fp = BufferedStream(BytesIO(s))
17        read = fp.read(10)
18        assert read == s
19
20    def test_read_length(self):
21        fp = BufferedStream(BytesIO(b"abcdef"))
22        read1 = fp.read(1)
23        assert read1 == b"a"
24        read2 = fp.read(2)
25        assert read2 == b"bc"
26        read3 = fp.read(3)
27        assert read3 == b"def"
28        read4 = fp.read(4)
29        assert read4 == b""
30
31    def test_tell(self):
32        fp = BufferedStream(BytesIO(b"abcdef"))
33        read1 = fp.read(1)
34        assert fp.tell() == 1
35        read2 = fp.read(2)
36        assert fp.tell() == 3
37        read3 = fp.read(3)
38        assert fp.tell() == 6
39        read4 = fp.read(4)
40        assert fp.tell() == 6
41
42    def test_seek(self):
43        fp = BufferedStream(BytesIO(b"abcdef"))
44        read1 = fp.read(1)
45        assert read1 == b"a"
46        fp.seek(0)
47        read2 = fp.read(1)
48        assert read2 == b"a"
49        read3 = fp.read(2)
50        assert read3 == b"bc"
51        fp.seek(2)
52        read4 = fp.read(2)
53        assert read4 == b"cd"
54        fp.seek(4)
55        read5 = fp.read(2)
56        assert read5 == b"ef"
57
58    def test_seek_tell(self):
59        fp = BufferedStream(BytesIO(b"abcdef"))
60        read1 = fp.read(1)
61        assert fp.tell() == 1
62        fp.seek(0)
63        read2 = fp.read(1)
64        assert fp.tell() == 1
65        read3 = fp.read(2)
66        assert fp.tell() == 3
67        fp.seek(2)
68        read4 = fp.read(2)
69        assert fp.tell() == 4
70        fp.seek(4)
71        read5 = fp.read(2)
72        assert fp.tell() == 6
73
74
75class HTMLUnicodeInputStreamShortChunk(HTMLUnicodeInputStream):
76    _defaultChunkSize = 2
77
78
79class HTMLBinaryInputStreamShortChunk(HTMLBinaryInputStream):
80    _defaultChunkSize = 2
81
82
83class HTMLInputStreamTest(unittest.TestCase):
84
85    def test_char_ascii(self):
86        stream = HTMLInputStream(b"'", encoding='ascii')
87        self.assertEqual(stream.charEncoding[0], 'ascii')
88        self.assertEqual(stream.char(), "'")
89
90    def test_char_utf8(self):
91        stream = HTMLInputStream('\u2018'.encode('utf-8'), encoding='utf-8')
92        self.assertEqual(stream.charEncoding[0], 'utf-8')
93        self.assertEqual(stream.char(), '\u2018')
94
95    def test_char_win1252(self):
96        stream = HTMLInputStream("\xa9\xf1\u2019".encode('windows-1252'))
97        self.assertEqual(stream.charEncoding[0], 'windows-1252')
98        self.assertEqual(stream.char(), "\xa9")
99        self.assertEqual(stream.char(), "\xf1")
100        self.assertEqual(stream.char(), "\u2019")
101
102    def test_bom(self):
103        stream = HTMLInputStream(codecs.BOM_UTF8 + b"'")
104        self.assertEqual(stream.charEncoding[0], 'utf-8')
105        self.assertEqual(stream.char(), "'")
106
107    def test_utf_16(self):
108        stream = HTMLInputStream((' ' * 1025).encode('utf-16'))
109        self.assertTrue(stream.charEncoding[0] in ['utf-16-le', 'utf-16-be'], stream.charEncoding)
110        self.assertEqual(len(stream.charsUntil(' ', True)), 1025)
111
112    def test_newlines(self):
113        stream = HTMLBinaryInputStreamShortChunk(codecs.BOM_UTF8 + b"a\nbb\r\nccc\rddddxe")
114        self.assertEqual(stream.position(), (1, 0))
115        self.assertEqual(stream.charsUntil('c'), "a\nbb\n")
116        self.assertEqual(stream.position(), (3, 0))
117        self.assertEqual(stream.charsUntil('x'), "ccc\ndddd")
118        self.assertEqual(stream.position(), (4, 4))
119        self.assertEqual(stream.charsUntil('e'), "x")
120        self.assertEqual(stream.position(), (4, 5))
121
122    def test_newlines2(self):
123        size = HTMLUnicodeInputStream._defaultChunkSize
124        stream = HTMLInputStream("\r" * size + "\n")
125        self.assertEqual(stream.charsUntil('x'), "\n" * size)
126
127    def test_position(self):
128        stream = HTMLBinaryInputStreamShortChunk(codecs.BOM_UTF8 + b"a\nbb\nccc\nddde\nf\ngh")
129        self.assertEqual(stream.position(), (1, 0))
130        self.assertEqual(stream.charsUntil('c'), "a\nbb\n")
131        self.assertEqual(stream.position(), (3, 0))
132        stream.unget("\n")
133        self.assertEqual(stream.position(), (2, 2))
134        self.assertEqual(stream.charsUntil('c'), "\n")
135        self.assertEqual(stream.position(), (3, 0))
136        stream.unget("\n")
137        self.assertEqual(stream.position(), (2, 2))
138        self.assertEqual(stream.char(), "\n")
139        self.assertEqual(stream.position(), (3, 0))
140        self.assertEqual(stream.charsUntil('e'), "ccc\nddd")
141        self.assertEqual(stream.position(), (4, 3))
142        self.assertEqual(stream.charsUntil('h'), "e\nf\ng")
143        self.assertEqual(stream.position(), (6, 1))
144
145    def test_position2(self):
146        stream = HTMLUnicodeInputStreamShortChunk("abc\nd")
147        self.assertEqual(stream.position(), (1, 0))
148        self.assertEqual(stream.char(), "a")
149        self.assertEqual(stream.position(), (1, 1))
150        self.assertEqual(stream.char(), "b")
151        self.assertEqual(stream.position(), (1, 2))
152        self.assertEqual(stream.char(), "c")
153        self.assertEqual(stream.position(), (1, 3))
154        self.assertEqual(stream.char(), "\n")
155        self.assertEqual(stream.position(), (2, 0))
156        self.assertEqual(stream.char(), "d")
157        self.assertEqual(stream.position(), (2, 1))
158
159    def test_python_issue_20007(self):
160        """
161        Make sure we have a work-around for Python bug #20007
162        http://bugs.python.org/issue20007
163        """
164        class FakeSocket(object):
165            def makefile(self, _mode, _bufsize=None):
166                return BytesIO(b"HTTP/1.1 200 Ok\r\n\r\nText")
167
168        source = http_client.HTTPResponse(FakeSocket())
169        source.begin()
170        stream = HTMLInputStream(source)
171        self.assertEqual(stream.charsUntil(" "), "Text")
172
173
174def buildTestSuite():
175    return unittest.defaultTestLoader.loadTestsFromName(__name__)
176
177
178def main():
179    buildTestSuite()
180    unittest.main()
181
182if __name__ == '__main__':
183    main()
184