1#
2# test_multibytecodec.py
3#   Unit test for multibytecodec itself
4#
5
6from test import support
7from test.support import TESTFN
8import unittest, io, codecs, sys
9import _multibytecodec
10
11ALL_CJKENCODINGS = [
12# _codecs_cn
13    'gb2312', 'gbk', 'gb18030', 'hz',
14# _codecs_hk
15    'big5hkscs',
16# _codecs_jp
17    'cp932', 'shift_jis', 'euc_jp', 'euc_jisx0213', 'shift_jisx0213',
18    'euc_jis_2004', 'shift_jis_2004',
19# _codecs_kr
20    'cp949', 'euc_kr', 'johab',
21# _codecs_tw
22    'big5', 'cp950',
23# _codecs_iso2022
24    'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004',
25    'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr',
26]
27
28class Test_MultibyteCodec(unittest.TestCase):
29
30    def test_nullcoding(self):
31        for enc in ALL_CJKENCODINGS:
32            self.assertEqual(b''.decode(enc), '')
33            self.assertEqual(str(b'', enc), '')
34            self.assertEqual(''.encode(enc), b'')
35
36    def test_str_decode(self):
37        for enc in ALL_CJKENCODINGS:
38            self.assertEqual('abcd'.encode(enc), b'abcd')
39
40    def test_errorcallback_longindex(self):
41        dec = codecs.getdecoder('euc-kr')
42        myreplace  = lambda exc: ('', sys.maxsize+1)
43        codecs.register_error('test.cjktest', myreplace)
44        self.assertRaises(IndexError, dec,
45                          b'apple\x92ham\x93spam', 'test.cjktest')
46
47    def test_errorcallback_custom_ignore(self):
48        # Issue #23215: MemoryError with custom error handlers and multibyte codecs
49        data = 100 * "\udc00"
50        codecs.register_error("test.ignore", codecs.ignore_errors)
51        for enc in ALL_CJKENCODINGS:
52            self.assertEqual(data.encode(enc, "test.ignore"), b'')
53
54    def test_codingspec(self):
55        try:
56            for enc in ALL_CJKENCODINGS:
57                code = '# coding: {}\n'.format(enc)
58                exec(code)
59        finally:
60            support.unlink(TESTFN)
61
62    def test_init_segfault(self):
63        # bug #3305: this used to segfault
64        self.assertRaises(AttributeError,
65                          _multibytecodec.MultibyteStreamReader, None)
66        self.assertRaises(AttributeError,
67                          _multibytecodec.MultibyteStreamWriter, None)
68
69    def test_decode_unicode(self):
70        # Trying to decode a unicode string should raise a TypeError
71        for enc in ALL_CJKENCODINGS:
72            self.assertRaises(TypeError, codecs.getdecoder(enc), "")
73
74class Test_IncrementalEncoder(unittest.TestCase):
75
76    def test_stateless(self):
77        # cp949 encoder isn't stateful at all.
78        encoder = codecs.getincrementalencoder('cp949')()
79        self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'),
80                         b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
81        self.assertEqual(encoder.reset(), None)
82        self.assertEqual(encoder.encode('\u2606\u223c\u2606', True),
83                         b'\xa1\xd9\xa1\xad\xa1\xd9')
84        self.assertEqual(encoder.reset(), None)
85        self.assertEqual(encoder.encode('', True), b'')
86        self.assertEqual(encoder.encode('', False), b'')
87        self.assertEqual(encoder.reset(), None)
88
89    def test_stateful(self):
90        # jisx0213 encoder is stateful for a few code points. eg)
91        #   U+00E6 => A9DC
92        #   U+00E6 U+0300 => ABC4
93        #   U+0300 => ABDC
94
95        encoder = codecs.getincrementalencoder('jisx0213')()
96        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
97        self.assertEqual(encoder.encode('\u00e6'), b'')
98        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
99        self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')
100
101        self.assertEqual(encoder.reset(), None)
102        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
103
104        self.assertEqual(encoder.encode('\u00e6'), b'')
105        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
106        self.assertEqual(encoder.encode('', True), b'')
107
108    def test_stateful_keep_buffer(self):
109        encoder = codecs.getincrementalencoder('jisx0213')()
110        self.assertEqual(encoder.encode('\u00e6'), b'')
111        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
112        self.assertEqual(encoder.encode('\u0300\u00e6'), b'\xab\xc4')
113        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
114        self.assertEqual(encoder.reset(), None)
115        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
116        self.assertEqual(encoder.encode('\u00e6'), b'')
117        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
118        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
119
120    def test_issue5640(self):
121        encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
122        self.assertEqual(encoder.encode('\xff'), b'\\xff')
123        self.assertEqual(encoder.encode('\n'), b'\n')
124
125class Test_IncrementalDecoder(unittest.TestCase):
126
127    def test_dbcs(self):
128        # cp949 decoder is simple with only 1 or 2 bytes sequences.
129        decoder = codecs.getincrementaldecoder('cp949')()
130        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0\xcc\xbd'),
131                         '\ud30c\uc774')
132        self.assertEqual(decoder.decode(b'\xe3 \xb8\xb6\xc0\xbb'),
133                         '\uc36c \ub9c8\uc744')
134        self.assertEqual(decoder.decode(b''), '')
135
136    def test_dbcs_keep_buffer(self):
137        decoder = codecs.getincrementaldecoder('cp949')()
138        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
139        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
140        self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
141
142        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
143        self.assertRaises(UnicodeDecodeError, decoder.decode,
144                          b'\xcc\xbd', True)
145        self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
146
147    def test_iso2022(self):
148        decoder = codecs.getincrementaldecoder('iso2022-jp')()
149        ESC = b'\x1b'
150        self.assertEqual(decoder.decode(ESC + b'('), '')
151        self.assertEqual(decoder.decode(b'B', True), '')
152        self.assertEqual(decoder.decode(ESC + b'$'), '')
153        self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
154        self.assertEqual(decoder.decode(b'@$@'), '\u4e16')
155        self.assertEqual(decoder.decode(b'$', True), '\u4e16')
156        self.assertEqual(decoder.reset(), None)
157        self.assertEqual(decoder.decode(b'@$'), '@$')
158        self.assertEqual(decoder.decode(ESC + b'$'), '')
159        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
160        self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
161
162    def test_decode_unicode(self):
163        # Trying to decode a unicode string should raise a TypeError
164        for enc in ALL_CJKENCODINGS:
165            decoder = codecs.getincrementaldecoder(enc)()
166            self.assertRaises(TypeError, decoder.decode, "")
167
168class Test_StreamReader(unittest.TestCase):
169    def test_bug1728403(self):
170        try:
171            f = open(TESTFN, 'wb')
172            try:
173                f.write(b'\xa1')
174            finally:
175                f.close()
176            f = codecs.open(TESTFN, encoding='cp949')
177            try:
178                self.assertRaises(UnicodeDecodeError, f.read, 2)
179            finally:
180                f.close()
181        finally:
182            support.unlink(TESTFN)
183
184class Test_StreamWriter(unittest.TestCase):
185    def test_gb18030(self):
186        s= io.BytesIO()
187        c = codecs.getwriter('gb18030')(s)
188        c.write('123')
189        self.assertEqual(s.getvalue(), b'123')
190        c.write('\U00012345')
191        self.assertEqual(s.getvalue(), b'123\x907\x959')
192        c.write('\uac00\u00ac')
193        self.assertEqual(s.getvalue(),
194                b'123\x907\x959\x827\xcf5\x810\x851')
195
196    def test_utf_8(self):
197        s= io.BytesIO()
198        c = codecs.getwriter('utf-8')(s)
199        c.write('123')
200        self.assertEqual(s.getvalue(), b'123')
201        c.write('\U00012345')
202        self.assertEqual(s.getvalue(), b'123\xf0\x92\x8d\x85')
203        c.write('\uac00\u00ac')
204        self.assertEqual(s.getvalue(),
205            b'123\xf0\x92\x8d\x85'
206            b'\xea\xb0\x80\xc2\xac')
207
208    def test_streamwriter_strwrite(self):
209        s = io.BytesIO()
210        wr = codecs.getwriter('gb18030')(s)
211        wr.write('abcd')
212        self.assertEqual(s.getvalue(), b'abcd')
213
214class Test_ISO2022(unittest.TestCase):
215    def test_g2(self):
216        iso2022jp2 = b'\x1b(B:hu4:unit\x1b.A\x1bNi de famille'
217        uni = ':hu4:unit\xe9 de famille'
218        self.assertEqual(iso2022jp2.decode('iso2022-jp-2'), uni)
219
220    def test_iso2022_jp_g0(self):
221        self.assertNotIn(b'\x0e', '\N{SOFT HYPHEN}'.encode('iso-2022-jp-2'))
222        for encoding in ('iso-2022-jp-2004', 'iso-2022-jp-3'):
223            e = '\u3406'.encode(encoding)
224            self.assertFalse(any(x > 0x80 for x in e))
225
226    def test_bug1572832(self):
227        for x in range(0x10000, 0x110000):
228            # Any ISO 2022 codec will cause the segfault
229            chr(x).encode('iso_2022_jp', 'ignore')
230
231class TestStateful(unittest.TestCase):
232    text = '\u4E16\u4E16'
233    encoding = 'iso-2022-jp'
234    expected = b'\x1b$B@$@$'
235    reset = b'\x1b(B'
236    expected_reset = expected + reset
237
238    def test_encode(self):
239        self.assertEqual(self.text.encode(self.encoding), self.expected_reset)
240
241    def test_incrementalencoder(self):
242        encoder = codecs.getincrementalencoder(self.encoding)()
243        output = b''.join(
244            encoder.encode(char)
245            for char in self.text)
246        self.assertEqual(output, self.expected)
247        self.assertEqual(encoder.encode('', final=True), self.reset)
248        self.assertEqual(encoder.encode('', final=True), b'')
249
250    def test_incrementalencoder_final(self):
251        encoder = codecs.getincrementalencoder(self.encoding)()
252        last_index = len(self.text) - 1
253        output = b''.join(
254            encoder.encode(char, index == last_index)
255            for index, char in enumerate(self.text))
256        self.assertEqual(output, self.expected_reset)
257        self.assertEqual(encoder.encode('', final=True), b'')
258
259class TestHZStateful(TestStateful):
260    text = '\u804a\u804a'
261    encoding = 'hz'
262    expected = b'~{ADAD'
263    reset = b'~}'
264    expected_reset = expected + reset
265
266def test_main():
267    support.run_unittest(__name__)
268
269if __name__ == "__main__":
270    test_main()
271