1from __future__ import absolute_import, division, unicode_literals 2 3from . import support # flake8: noqa 4import unittest 5import codecs 6from io import BytesIO 7 8from six.moves import http_client 9 10from html5lib.inputstream import (BufferedStream, HTMLInputStream, 11 HTMLUnicodeInputStream, HTMLBinaryInputStream) 12 13class BufferedStreamTest(unittest.TestCase): 14 def test_basic(self): 15 s = b"abc" 16 fp = BufferedStream(BytesIO(s)) 17 read = fp.read(10) 18 assert read == s 19 20 def test_read_length(self): 21 fp = BufferedStream(BytesIO(b"abcdef")) 22 read1 = fp.read(1) 23 assert read1 == b"a" 24 read2 = fp.read(2) 25 assert read2 == b"bc" 26 read3 = fp.read(3) 27 assert read3 == b"def" 28 read4 = fp.read(4) 29 assert read4 == b"" 30 31 def test_tell(self): 32 fp = BufferedStream(BytesIO(b"abcdef")) 33 read1 = fp.read(1) 34 assert fp.tell() == 1 35 read2 = fp.read(2) 36 assert fp.tell() == 3 37 read3 = fp.read(3) 38 assert fp.tell() == 6 39 read4 = fp.read(4) 40 assert fp.tell() == 6 41 42 def test_seek(self): 43 fp = BufferedStream(BytesIO(b"abcdef")) 44 read1 = fp.read(1) 45 assert read1 == b"a" 46 fp.seek(0) 47 read2 = fp.read(1) 48 assert read2 == b"a" 49 read3 = fp.read(2) 50 assert read3 == b"bc" 51 fp.seek(2) 52 read4 = fp.read(2) 53 assert read4 == b"cd" 54 fp.seek(4) 55 read5 = fp.read(2) 56 assert read5 == b"ef" 57 58 def test_seek_tell(self): 59 fp = BufferedStream(BytesIO(b"abcdef")) 60 read1 = fp.read(1) 61 assert fp.tell() == 1 62 fp.seek(0) 63 read2 = fp.read(1) 64 assert fp.tell() == 1 65 read3 = fp.read(2) 66 assert fp.tell() == 3 67 fp.seek(2) 68 read4 = fp.read(2) 69 assert fp.tell() == 4 70 fp.seek(4) 71 read5 = fp.read(2) 72 assert fp.tell() == 6 73 74 75class HTMLUnicodeInputStreamShortChunk(HTMLUnicodeInputStream): 76 _defaultChunkSize = 2 77 78 79class HTMLBinaryInputStreamShortChunk(HTMLBinaryInputStream): 80 _defaultChunkSize = 2 81 82 83class HTMLInputStreamTest(unittest.TestCase): 84 85 def test_char_ascii(self): 86 stream = HTMLInputStream(b"'", encoding='ascii') 87 self.assertEqual(stream.charEncoding[0], 'ascii') 88 self.assertEqual(stream.char(), "'") 89 90 def test_char_utf8(self): 91 stream = HTMLInputStream('\u2018'.encode('utf-8'), encoding='utf-8') 92 self.assertEqual(stream.charEncoding[0], 'utf-8') 93 self.assertEqual(stream.char(), '\u2018') 94 95 def test_char_win1252(self): 96 stream = HTMLInputStream("\xa9\xf1\u2019".encode('windows-1252')) 97 self.assertEqual(stream.charEncoding[0], 'windows-1252') 98 self.assertEqual(stream.char(), "\xa9") 99 self.assertEqual(stream.char(), "\xf1") 100 self.assertEqual(stream.char(), "\u2019") 101 102 def test_bom(self): 103 stream = HTMLInputStream(codecs.BOM_UTF8 + b"'") 104 self.assertEqual(stream.charEncoding[0], 'utf-8') 105 self.assertEqual(stream.char(), "'") 106 107 def test_utf_16(self): 108 stream = HTMLInputStream((' ' * 1025).encode('utf-16')) 109 self.assertTrue(stream.charEncoding[0] in ['utf-16-le', 'utf-16-be'], stream.charEncoding) 110 self.assertEqual(len(stream.charsUntil(' ', True)), 1025) 111 112 def test_newlines(self): 113 stream = HTMLBinaryInputStreamShortChunk(codecs.BOM_UTF8 + b"a\nbb\r\nccc\rddddxe") 114 self.assertEqual(stream.position(), (1, 0)) 115 self.assertEqual(stream.charsUntil('c'), "a\nbb\n") 116 self.assertEqual(stream.position(), (3, 0)) 117 self.assertEqual(stream.charsUntil('x'), "ccc\ndddd") 118 self.assertEqual(stream.position(), (4, 4)) 119 self.assertEqual(stream.charsUntil('e'), "x") 120 self.assertEqual(stream.position(), (4, 5)) 121 122 def test_newlines2(self): 123 size = HTMLUnicodeInputStream._defaultChunkSize 124 stream = HTMLInputStream("\r" * size + "\n") 125 self.assertEqual(stream.charsUntil('x'), "\n" * size) 126 127 def test_position(self): 128 stream = HTMLBinaryInputStreamShortChunk(codecs.BOM_UTF8 + b"a\nbb\nccc\nddde\nf\ngh") 129 self.assertEqual(stream.position(), (1, 0)) 130 self.assertEqual(stream.charsUntil('c'), "a\nbb\n") 131 self.assertEqual(stream.position(), (3, 0)) 132 stream.unget("\n") 133 self.assertEqual(stream.position(), (2, 2)) 134 self.assertEqual(stream.charsUntil('c'), "\n") 135 self.assertEqual(stream.position(), (3, 0)) 136 stream.unget("\n") 137 self.assertEqual(stream.position(), (2, 2)) 138 self.assertEqual(stream.char(), "\n") 139 self.assertEqual(stream.position(), (3, 0)) 140 self.assertEqual(stream.charsUntil('e'), "ccc\nddd") 141 self.assertEqual(stream.position(), (4, 3)) 142 self.assertEqual(stream.charsUntil('h'), "e\nf\ng") 143 self.assertEqual(stream.position(), (6, 1)) 144 145 def test_position2(self): 146 stream = HTMLUnicodeInputStreamShortChunk("abc\nd") 147 self.assertEqual(stream.position(), (1, 0)) 148 self.assertEqual(stream.char(), "a") 149 self.assertEqual(stream.position(), (1, 1)) 150 self.assertEqual(stream.char(), "b") 151 self.assertEqual(stream.position(), (1, 2)) 152 self.assertEqual(stream.char(), "c") 153 self.assertEqual(stream.position(), (1, 3)) 154 self.assertEqual(stream.char(), "\n") 155 self.assertEqual(stream.position(), (2, 0)) 156 self.assertEqual(stream.char(), "d") 157 self.assertEqual(stream.position(), (2, 1)) 158 159 def test_python_issue_20007(self): 160 """ 161 Make sure we have a work-around for Python bug #20007 162 http://bugs.python.org/issue20007 163 """ 164 class FakeSocket(object): 165 def makefile(self, _mode, _bufsize=None): 166 return BytesIO(b"HTTP/1.1 200 Ok\r\n\r\nText") 167 168 source = http_client.HTTPResponse(FakeSocket()) 169 source.begin() 170 stream = HTMLInputStream(source) 171 self.assertEqual(stream.charsUntil(" "), "Text") 172 173 174def buildTestSuite(): 175 return unittest.defaultTestLoader.loadTestsFromName(__name__) 176 177 178def main(): 179 buildTestSuite() 180 unittest.main() 181 182if __name__ == '__main__': 183 main() 184