1import codecs
2import contextlib
3import io
4import locale
5import sys
6import unittest
7import encodings
8
9from test import support
10
11try:
12    import ctypes
13except ImportError:
14    ctypes = None
15    SIZEOF_WCHAR_T = -1
16else:
17    SIZEOF_WCHAR_T = ctypes.sizeof(ctypes.c_wchar)
18
19def coding_checker(self, coder):
20    def check(input, expect):
21        self.assertEqual(coder(input), (expect, len(input)))
22    return check
23
24
25class Queue(object):
26    """
27    queue: write bytes at one end, read bytes from the other end
28    """
29    def __init__(self, buffer):
30        self._buffer = buffer
31
32    def write(self, chars):
33        self._buffer += chars
34
35    def read(self, size=-1):
36        if size<0:
37            s = self._buffer
38            self._buffer = self._buffer[:0] # make empty
39            return s
40        else:
41            s = self._buffer[:size]
42            self._buffer = self._buffer[size:]
43            return s
44
45
46class MixInCheckStateHandling:
47    def check_state_handling_decode(self, encoding, u, s):
48        for i in range(len(s)+1):
49            d = codecs.getincrementaldecoder(encoding)()
50            part1 = d.decode(s[:i])
51            state = d.getstate()
52            self.assertIsInstance(state[1], int)
53            # Check that the condition stated in the documentation for
54            # IncrementalDecoder.getstate() holds
55            if not state[1]:
56                # reset decoder to the default state without anything buffered
57                d.setstate((state[0][:0], 0))
58                # Feeding the previous input may not produce any output
59                self.assertTrue(not d.decode(state[0]))
60                # The decoder must return to the same state
61                self.assertEqual(state, d.getstate())
62            # Create a new decoder and set it to the state
63            # we extracted from the old one
64            d = codecs.getincrementaldecoder(encoding)()
65            d.setstate(state)
66            part2 = d.decode(s[i:], True)
67            self.assertEqual(u, part1+part2)
68
69    def check_state_handling_encode(self, encoding, u, s):
70        for i in range(len(u)+1):
71            d = codecs.getincrementalencoder(encoding)()
72            part1 = d.encode(u[:i])
73            state = d.getstate()
74            d = codecs.getincrementalencoder(encoding)()
75            d.setstate(state)
76            part2 = d.encode(u[i:], True)
77            self.assertEqual(s, part1+part2)
78
79
80class ReadTest(MixInCheckStateHandling):
81    def check_partial(self, input, partialresults):
82        # get a StreamReader for the encoding and feed the bytestring version
83        # of input to the reader byte by byte. Read everything available from
84        # the StreamReader and check that the results equal the appropriate
85        # entries from partialresults.
86        q = Queue(b"")
87        r = codecs.getreader(self.encoding)(q)
88        result = ""
89        for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
90            q.write(bytes([c]))
91            result += r.read()
92            self.assertEqual(result, partialresult)
93        # check that there's nothing left in the buffers
94        self.assertEqual(r.read(), "")
95        self.assertEqual(r.bytebuffer, b"")
96
97        # do the check again, this time using an incremental decoder
98        d = codecs.getincrementaldecoder(self.encoding)()
99        result = ""
100        for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
101            result += d.decode(bytes([c]))
102            self.assertEqual(result, partialresult)
103        # check that there's nothing left in the buffers
104        self.assertEqual(d.decode(b"", True), "")
105        self.assertEqual(d.buffer, b"")
106
107        # Check whether the reset method works properly
108        d.reset()
109        result = ""
110        for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
111            result += d.decode(bytes([c]))
112            self.assertEqual(result, partialresult)
113        # check that there's nothing left in the buffers
114        self.assertEqual(d.decode(b"", True), "")
115        self.assertEqual(d.buffer, b"")
116
117        # check iterdecode()
118        encoded = input.encode(self.encoding)
119        self.assertEqual(
120            input,
121            "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding))
122        )
123
124    def test_readline(self):
125        def getreader(input):
126            stream = io.BytesIO(input.encode(self.encoding))
127            return codecs.getreader(self.encoding)(stream)
128
129        def readalllines(input, keepends=True, size=None):
130            reader = getreader(input)
131            lines = []
132            while True:
133                line = reader.readline(size=size, keepends=keepends)
134                if not line:
135                    break
136                lines.append(line)
137            return "|".join(lines)
138
139        s = "foo\nbar\r\nbaz\rspam\u2028eggs"
140        sexpected = "foo\n|bar\r\n|baz\r|spam\u2028|eggs"
141        sexpectednoends = "foo|bar|baz|spam|eggs"
142        self.assertEqual(readalllines(s, True), sexpected)
143        self.assertEqual(readalllines(s, False), sexpectednoends)
144        self.assertEqual(readalllines(s, True, 10), sexpected)
145        self.assertEqual(readalllines(s, False, 10), sexpectednoends)
146
147        lineends = ("\n", "\r\n", "\r", "\u2028")
148        # Test long lines (multiple calls to read() in readline())
149        vw = []
150        vwo = []
151        for (i, lineend) in enumerate(lineends):
152            vw.append((i*200+200)*"\u3042" + lineend)
153            vwo.append((i*200+200)*"\u3042")
154        self.assertEqual(readalllines("".join(vw), True), "|".join(vw))
155        self.assertEqual(readalllines("".join(vw), False), "|".join(vwo))
156
157        # Test lines where the first read might end with \r, so the
158        # reader has to look ahead whether this is a lone \r or a \r\n
159        for size in range(80):
160            for lineend in lineends:
161                s = 10*(size*"a" + lineend + "xxx\n")
162                reader = getreader(s)
163                for i in range(10):
164                    self.assertEqual(
165                        reader.readline(keepends=True),
166                        size*"a" + lineend,
167                    )
168                    self.assertEqual(
169                        reader.readline(keepends=True),
170                        "xxx\n",
171                    )
172                reader = getreader(s)
173                for i in range(10):
174                    self.assertEqual(
175                        reader.readline(keepends=False),
176                        size*"a",
177                    )
178                    self.assertEqual(
179                        reader.readline(keepends=False),
180                        "xxx",
181                    )
182
183    def test_mixed_readline_and_read(self):
184        lines = ["Humpty Dumpty sat on a wall,\n",
185                 "Humpty Dumpty had a great fall.\r\n",
186                 "All the king's horses and all the king's men\r",
187                 "Couldn't put Humpty together again."]
188        data = ''.join(lines)
189        def getreader():
190            stream = io.BytesIO(data.encode(self.encoding))
191            return codecs.getreader(self.encoding)(stream)
192
193        # Issue #8260: Test readline() followed by read()
194        f = getreader()
195        self.assertEqual(f.readline(), lines[0])
196        self.assertEqual(f.read(), ''.join(lines[1:]))
197        self.assertEqual(f.read(), '')
198
199        # Issue #16636: Test readline() followed by readlines()
200        f = getreader()
201        self.assertEqual(f.readline(), lines[0])
202        self.assertEqual(f.readlines(), lines[1:])
203        self.assertEqual(f.read(), '')
204
205        # Test read() followed by read()
206        f = getreader()
207        self.assertEqual(f.read(size=40, chars=5), data[:5])
208        self.assertEqual(f.read(), data[5:])
209        self.assertEqual(f.read(), '')
210
211        # Issue #12446: Test read() followed by readlines()
212        f = getreader()
213        self.assertEqual(f.read(size=40, chars=5), data[:5])
214        self.assertEqual(f.readlines(), [lines[0][5:]] + lines[1:])
215        self.assertEqual(f.read(), '')
216
217    def test_bug1175396(self):
218        s = [
219            '<%!--===================================================\r\n',
220            '    BLOG index page: show recent articles,\r\n',
221            '    today\'s articles, or articles of a specific date.\r\n',
222            '========================================================--%>\r\n',
223            '<%@inputencoding="ISO-8859-1"%>\r\n',
224            '<%@pagetemplate=TEMPLATE.y%>\r\n',
225            '<%@import=import frog.util, frog%>\r\n',
226            '<%@import=import frog.objects%>\r\n',
227            '<%@import=from frog.storageerrors import StorageError%>\r\n',
228            '<%\r\n',
229            '\r\n',
230            'import logging\r\n',
231            'log=logging.getLogger("Snakelets.logger")\r\n',
232            '\r\n',
233            '\r\n',
234            'user=self.SessionCtx.user\r\n',
235            'storageEngine=self.SessionCtx.storageEngine\r\n',
236            '\r\n',
237            '\r\n',
238            'def readArticlesFromDate(date, count=None):\r\n',
239            '    entryids=storageEngine.listBlogEntries(date)\r\n',
240            '    entryids.reverse() # descending\r\n',
241            '    if count:\r\n',
242            '        entryids=entryids[:count]\r\n',
243            '    try:\r\n',
244            '        return [ frog.objects.BlogEntry.load(storageEngine, date, Id) for Id in entryids ]\r\n',
245            '    except StorageError,x:\r\n',
246            '        log.error("Error loading articles: "+str(x))\r\n',
247            '        self.abort("cannot load articles")\r\n',
248            '\r\n',
249            'showdate=None\r\n',
250            '\r\n',
251            'arg=self.Request.getArg()\r\n',
252            'if arg=="today":\r\n',
253            '    #-------------------- TODAY\'S ARTICLES\r\n',
254            '    self.write("<h2>Today\'s articles</h2>")\r\n',
255            '    showdate = frog.util.isodatestr() \r\n',
256            '    entries = readArticlesFromDate(showdate)\r\n',
257            'elif arg=="active":\r\n',
258            '    #-------------------- ACTIVE ARTICLES redirect\r\n',
259            '    self.Yredirect("active.y")\r\n',
260            'elif arg=="login":\r\n',
261            '    #-------------------- LOGIN PAGE redirect\r\n',
262            '    self.Yredirect("login.y")\r\n',
263            'elif arg=="date":\r\n',
264            '    #-------------------- ARTICLES OF A SPECIFIC DATE\r\n',
265            '    showdate = self.Request.getParameter("date")\r\n',
266            '    self.write("<h2>Articles written on %s</h2>"% frog.util.mediumdatestr(showdate))\r\n',
267            '    entries = readArticlesFromDate(showdate)\r\n',
268            'else:\r\n',
269            '    #-------------------- RECENT ARTICLES\r\n',
270            '    self.write("<h2>Recent articles</h2>")\r\n',
271            '    dates=storageEngine.listBlogEntryDates()\r\n',
272            '    if dates:\r\n',
273            '        entries=[]\r\n',
274            '        SHOWAMOUNT=10\r\n',
275            '        for showdate in dates:\r\n',
276            '            entries.extend( readArticlesFromDate(showdate, SHOWAMOUNT-len(entries)) )\r\n',
277            '            if len(entries)>=SHOWAMOUNT:\r\n',
278            '                break\r\n',
279            '                \r\n',
280        ]
281        stream = io.BytesIO("".join(s).encode(self.encoding))
282        reader = codecs.getreader(self.encoding)(stream)
283        for (i, line) in enumerate(reader):
284            self.assertEqual(line, s[i])
285
286    def test_readlinequeue(self):
287        q = Queue(b"")
288        writer = codecs.getwriter(self.encoding)(q)
289        reader = codecs.getreader(self.encoding)(q)
290
291        # No lineends
292        writer.write("foo\r")
293        self.assertEqual(reader.readline(keepends=False), "foo")
294        writer.write("\nbar\r")
295        self.assertEqual(reader.readline(keepends=False), "")
296        self.assertEqual(reader.readline(keepends=False), "bar")
297        writer.write("baz")
298        self.assertEqual(reader.readline(keepends=False), "baz")
299        self.assertEqual(reader.readline(keepends=False), "")
300
301        # Lineends
302        writer.write("foo\r")
303        self.assertEqual(reader.readline(keepends=True), "foo\r")
304        writer.write("\nbar\r")
305        self.assertEqual(reader.readline(keepends=True), "\n")
306        self.assertEqual(reader.readline(keepends=True), "bar\r")
307        writer.write("baz")
308        self.assertEqual(reader.readline(keepends=True), "baz")
309        self.assertEqual(reader.readline(keepends=True), "")
310        writer.write("foo\r\n")
311        self.assertEqual(reader.readline(keepends=True), "foo\r\n")
312
313    def test_bug1098990_a(self):
314        s1 = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy\r\n"
315        s2 = "offending line: ladfj askldfj klasdj fskla dfzaskdj fasklfj laskd fjasklfzzzzaa%whereisthis!!!\r\n"
316        s3 = "next line.\r\n"
317
318        s = (s1+s2+s3).encode(self.encoding)
319        stream = io.BytesIO(s)
320        reader = codecs.getreader(self.encoding)(stream)
321        self.assertEqual(reader.readline(), s1)
322        self.assertEqual(reader.readline(), s2)
323        self.assertEqual(reader.readline(), s3)
324        self.assertEqual(reader.readline(), "")
325
326    def test_bug1098990_b(self):
327        s1 = "aaaaaaaaaaaaaaaaaaaaaaaa\r\n"
328        s2 = "bbbbbbbbbbbbbbbbbbbbbbbb\r\n"
329        s3 = "stillokay:bbbbxx\r\n"
330        s4 = "broken!!!!badbad\r\n"
331        s5 = "againokay.\r\n"
332
333        s = (s1+s2+s3+s4+s5).encode(self.encoding)
334        stream = io.BytesIO(s)
335        reader = codecs.getreader(self.encoding)(stream)
336        self.assertEqual(reader.readline(), s1)
337        self.assertEqual(reader.readline(), s2)
338        self.assertEqual(reader.readline(), s3)
339        self.assertEqual(reader.readline(), s4)
340        self.assertEqual(reader.readline(), s5)
341        self.assertEqual(reader.readline(), "")
342
343    ill_formed_sequence_replace = "\ufffd"
344
345    def test_lone_surrogates(self):
346        self.assertRaises(UnicodeEncodeError, "\ud800".encode, self.encoding)
347        self.assertEqual("[\uDC80]".encode(self.encoding, "backslashreplace"),
348                         "[\\udc80]".encode(self.encoding))
349        self.assertEqual("[\uDC80]".encode(self.encoding, "namereplace"),
350                         "[\\udc80]".encode(self.encoding))
351        self.assertEqual("[\uDC80]".encode(self.encoding, "xmlcharrefreplace"),
352                         "[&#56448;]".encode(self.encoding))
353        self.assertEqual("[\uDC80]".encode(self.encoding, "ignore"),
354                         "[]".encode(self.encoding))
355        self.assertEqual("[\uDC80]".encode(self.encoding, "replace"),
356                         "[?]".encode(self.encoding))
357
358        # sequential surrogate characters
359        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "ignore"),
360                         "[]".encode(self.encoding))
361        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "replace"),
362                         "[??]".encode(self.encoding))
363
364        bom = "".encode(self.encoding)
365        for before, after in [("\U00010fff", "A"), ("[", "]"),
366                              ("A", "\U00010fff")]:
367            before_sequence = before.encode(self.encoding)[len(bom):]
368            after_sequence = after.encode(self.encoding)[len(bom):]
369            test_string = before + "\uDC80" + after
370            test_sequence = (bom + before_sequence +
371                             self.ill_formed_sequence + after_sequence)
372            self.assertRaises(UnicodeDecodeError, test_sequence.decode,
373                              self.encoding)
374            self.assertEqual(test_string.encode(self.encoding,
375                                                "surrogatepass"),
376                             test_sequence)
377            self.assertEqual(test_sequence.decode(self.encoding,
378                                                  "surrogatepass"),
379                             test_string)
380            self.assertEqual(test_sequence.decode(self.encoding, "ignore"),
381                             before + after)
382            self.assertEqual(test_sequence.decode(self.encoding, "replace"),
383                             before + self.ill_formed_sequence_replace + after)
384            backslashreplace = ''.join('\\x%02x' % b
385                                       for b in self.ill_formed_sequence)
386            self.assertEqual(test_sequence.decode(self.encoding, "backslashreplace"),
387                             before + backslashreplace + after)
388
389
390class UTF32Test(ReadTest, unittest.TestCase):
391    encoding = "utf-32"
392    if sys.byteorder == 'little':
393        ill_formed_sequence = b"\x80\xdc\x00\x00"
394    else:
395        ill_formed_sequence = b"\x00\x00\xdc\x80"
396
397    spamle = (b'\xff\xfe\x00\x00'
398              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00'
399              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00')
400    spambe = (b'\x00\x00\xfe\xff'
401              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m'
402              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m')
403
404    def test_only_one_bom(self):
405        _,_,reader,writer = codecs.lookup(self.encoding)
406        # encode some stream
407        s = io.BytesIO()
408        f = writer(s)
409        f.write("spam")
410        f.write("spam")
411        d = s.getvalue()
412        # check whether there is exactly one BOM in it
413        self.assertTrue(d == self.spamle or d == self.spambe)
414        # try to read it back
415        s = io.BytesIO(d)
416        f = reader(s)
417        self.assertEqual(f.read(), "spamspam")
418
419    def test_badbom(self):
420        s = io.BytesIO(4*b"\xff")
421        f = codecs.getreader(self.encoding)(s)
422        self.assertRaises(UnicodeError, f.read)
423
424        s = io.BytesIO(8*b"\xff")
425        f = codecs.getreader(self.encoding)(s)
426        self.assertRaises(UnicodeError, f.read)
427
428    def test_partial(self):
429        self.check_partial(
430            "\x00\xff\u0100\uffff\U00010000",
431            [
432                "", # first byte of BOM read
433                "", # second byte of BOM read
434                "", # third byte of BOM read
435                "", # fourth byte of BOM read => byteorder known
436                "",
437                "",
438                "",
439                "\x00",
440                "\x00",
441                "\x00",
442                "\x00",
443                "\x00\xff",
444                "\x00\xff",
445                "\x00\xff",
446                "\x00\xff",
447                "\x00\xff\u0100",
448                "\x00\xff\u0100",
449                "\x00\xff\u0100",
450                "\x00\xff\u0100",
451                "\x00\xff\u0100\uffff",
452                "\x00\xff\u0100\uffff",
453                "\x00\xff\u0100\uffff",
454                "\x00\xff\u0100\uffff",
455                "\x00\xff\u0100\uffff\U00010000",
456            ]
457        )
458
459    def test_handlers(self):
460        self.assertEqual(('\ufffd', 1),
461                         codecs.utf_32_decode(b'\x01', 'replace', True))
462        self.assertEqual(('', 1),
463                         codecs.utf_32_decode(b'\x01', 'ignore', True))
464
465    def test_errors(self):
466        self.assertRaises(UnicodeDecodeError, codecs.utf_32_decode,
467                          b"\xff", "strict", True)
468
469    def test_decoder_state(self):
470        self.check_state_handling_decode(self.encoding,
471                                         "spamspam", self.spamle)
472        self.check_state_handling_decode(self.encoding,
473                                         "spamspam", self.spambe)
474
475    def test_issue8941(self):
476        # Issue #8941: insufficient result allocation when decoding into
477        # surrogate pairs on UCS-2 builds.
478        encoded_le = b'\xff\xfe\x00\x00' + b'\x00\x00\x01\x00' * 1024
479        self.assertEqual('\U00010000' * 1024,
480                         codecs.utf_32_decode(encoded_le)[0])
481        encoded_be = b'\x00\x00\xfe\xff' + b'\x00\x01\x00\x00' * 1024
482        self.assertEqual('\U00010000' * 1024,
483                         codecs.utf_32_decode(encoded_be)[0])
484
485
486class UTF32LETest(ReadTest, unittest.TestCase):
487    encoding = "utf-32-le"
488    ill_formed_sequence = b"\x80\xdc\x00\x00"
489
490    def test_partial(self):
491        self.check_partial(
492            "\x00\xff\u0100\uffff\U00010000",
493            [
494                "",
495                "",
496                "",
497                "\x00",
498                "\x00",
499                "\x00",
500                "\x00",
501                "\x00\xff",
502                "\x00\xff",
503                "\x00\xff",
504                "\x00\xff",
505                "\x00\xff\u0100",
506                "\x00\xff\u0100",
507                "\x00\xff\u0100",
508                "\x00\xff\u0100",
509                "\x00\xff\u0100\uffff",
510                "\x00\xff\u0100\uffff",
511                "\x00\xff\u0100\uffff",
512                "\x00\xff\u0100\uffff",
513                "\x00\xff\u0100\uffff\U00010000",
514            ]
515        )
516
517    def test_simple(self):
518        self.assertEqual("\U00010203".encode(self.encoding), b"\x03\x02\x01\x00")
519
520    def test_errors(self):
521        self.assertRaises(UnicodeDecodeError, codecs.utf_32_le_decode,
522                          b"\xff", "strict", True)
523
524    def test_issue8941(self):
525        # Issue #8941: insufficient result allocation when decoding into
526        # surrogate pairs on UCS-2 builds.
527        encoded = b'\x00\x00\x01\x00' * 1024
528        self.assertEqual('\U00010000' * 1024,
529                         codecs.utf_32_le_decode(encoded)[0])
530
531
532class UTF32BETest(ReadTest, unittest.TestCase):
533    encoding = "utf-32-be"
534    ill_formed_sequence = b"\x00\x00\xdc\x80"
535
536    def test_partial(self):
537        self.check_partial(
538            "\x00\xff\u0100\uffff\U00010000",
539            [
540                "",
541                "",
542                "",
543                "\x00",
544                "\x00",
545                "\x00",
546                "\x00",
547                "\x00\xff",
548                "\x00\xff",
549                "\x00\xff",
550                "\x00\xff",
551                "\x00\xff\u0100",
552                "\x00\xff\u0100",
553                "\x00\xff\u0100",
554                "\x00\xff\u0100",
555                "\x00\xff\u0100\uffff",
556                "\x00\xff\u0100\uffff",
557                "\x00\xff\u0100\uffff",
558                "\x00\xff\u0100\uffff",
559                "\x00\xff\u0100\uffff\U00010000",
560            ]
561        )
562
563    def test_simple(self):
564        self.assertEqual("\U00010203".encode(self.encoding), b"\x00\x01\x02\x03")
565
566    def test_errors(self):
567        self.assertRaises(UnicodeDecodeError, codecs.utf_32_be_decode,
568                          b"\xff", "strict", True)
569
570    def test_issue8941(self):
571        # Issue #8941: insufficient result allocation when decoding into
572        # surrogate pairs on UCS-2 builds.
573        encoded = b'\x00\x01\x00\x00' * 1024
574        self.assertEqual('\U00010000' * 1024,
575                         codecs.utf_32_be_decode(encoded)[0])
576
577
578class UTF16Test(ReadTest, unittest.TestCase):
579    encoding = "utf-16"
580    if sys.byteorder == 'little':
581        ill_formed_sequence = b"\x80\xdc"
582    else:
583        ill_formed_sequence = b"\xdc\x80"
584
585    spamle = b'\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00'
586    spambe = b'\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m'
587
588    def test_only_one_bom(self):
589        _,_,reader,writer = codecs.lookup(self.encoding)
590        # encode some stream
591        s = io.BytesIO()
592        f = writer(s)
593        f.write("spam")
594        f.write("spam")
595        d = s.getvalue()
596        # check whether there is exactly one BOM in it
597        self.assertTrue(d == self.spamle or d == self.spambe)
598        # try to read it back
599        s = io.BytesIO(d)
600        f = reader(s)
601        self.assertEqual(f.read(), "spamspam")
602
603    def test_badbom(self):
604        s = io.BytesIO(b"\xff\xff")
605        f = codecs.getreader(self.encoding)(s)
606        self.assertRaises(UnicodeError, f.read)
607
608        s = io.BytesIO(b"\xff\xff\xff\xff")
609        f = codecs.getreader(self.encoding)(s)
610        self.assertRaises(UnicodeError, f.read)
611
612    def test_partial(self):
613        self.check_partial(
614            "\x00\xff\u0100\uffff\U00010000",
615            [
616                "", # first byte of BOM read
617                "", # second byte of BOM read => byteorder known
618                "",
619                "\x00",
620                "\x00",
621                "\x00\xff",
622                "\x00\xff",
623                "\x00\xff\u0100",
624                "\x00\xff\u0100",
625                "\x00\xff\u0100\uffff",
626                "\x00\xff\u0100\uffff",
627                "\x00\xff\u0100\uffff",
628                "\x00\xff\u0100\uffff",
629                "\x00\xff\u0100\uffff\U00010000",
630            ]
631        )
632
633    def test_handlers(self):
634        self.assertEqual(('\ufffd', 1),
635                         codecs.utf_16_decode(b'\x01', 'replace', True))
636        self.assertEqual(('', 1),
637                         codecs.utf_16_decode(b'\x01', 'ignore', True))
638
639    def test_errors(self):
640        self.assertRaises(UnicodeDecodeError, codecs.utf_16_decode,
641                          b"\xff", "strict", True)
642
643    def test_decoder_state(self):
644        self.check_state_handling_decode(self.encoding,
645                                         "spamspam", self.spamle)
646        self.check_state_handling_decode(self.encoding,
647                                         "spamspam", self.spambe)
648
649    def test_bug691291(self):
650        # Files are always opened in binary mode, even if no binary mode was
651        # specified.  This means that no automatic conversion of '\n' is done
652        # on reading and writing.
653        s1 = 'Hello\r\nworld\r\n'
654
655        s = s1.encode(self.encoding)
656        self.addCleanup(support.unlink, support.TESTFN)
657        with open(support.TESTFN, 'wb') as fp:
658            fp.write(s)
659        with support.check_warnings(('', DeprecationWarning)):
660            reader = codecs.open(support.TESTFN, 'U', encoding=self.encoding)
661        with reader:
662            self.assertEqual(reader.read(), s1)
663
664class UTF16LETest(ReadTest, unittest.TestCase):
665    encoding = "utf-16-le"
666    ill_formed_sequence = b"\x80\xdc"
667
668    def test_partial(self):
669        self.check_partial(
670            "\x00\xff\u0100\uffff\U00010000",
671            [
672                "",
673                "\x00",
674                "\x00",
675                "\x00\xff",
676                "\x00\xff",
677                "\x00\xff\u0100",
678                "\x00\xff\u0100",
679                "\x00\xff\u0100\uffff",
680                "\x00\xff\u0100\uffff",
681                "\x00\xff\u0100\uffff",
682                "\x00\xff\u0100\uffff",
683                "\x00\xff\u0100\uffff\U00010000",
684            ]
685        )
686
687    def test_errors(self):
688        tests = [
689            (b'\xff', '\ufffd'),
690            (b'A\x00Z', 'A\ufffd'),
691            (b'A\x00B\x00C\x00D\x00Z', 'ABCD\ufffd'),
692            (b'\x00\xd8', '\ufffd'),
693            (b'\x00\xd8A', '\ufffd'),
694            (b'\x00\xd8A\x00', '\ufffdA'),
695            (b'\x00\xdcA\x00', '\ufffdA'),
696        ]
697        for raw, expected in tests:
698            self.assertRaises(UnicodeDecodeError, codecs.utf_16_le_decode,
699                              raw, 'strict', True)
700            self.assertEqual(raw.decode('utf-16le', 'replace'), expected)
701
702    def test_nonbmp(self):
703        self.assertEqual("\U00010203".encode(self.encoding),
704                         b'\x00\xd8\x03\xde')
705        self.assertEqual(b'\x00\xd8\x03\xde'.decode(self.encoding),
706                         "\U00010203")
707
708class UTF16BETest(ReadTest, unittest.TestCase):
709    encoding = "utf-16-be"
710    ill_formed_sequence = b"\xdc\x80"
711
712    def test_partial(self):
713        self.check_partial(
714            "\x00\xff\u0100\uffff\U00010000",
715            [
716                "",
717                "\x00",
718                "\x00",
719                "\x00\xff",
720                "\x00\xff",
721                "\x00\xff\u0100",
722                "\x00\xff\u0100",
723                "\x00\xff\u0100\uffff",
724                "\x00\xff\u0100\uffff",
725                "\x00\xff\u0100\uffff",
726                "\x00\xff\u0100\uffff",
727                "\x00\xff\u0100\uffff\U00010000",
728            ]
729        )
730
731    def test_errors(self):
732        tests = [
733            (b'\xff', '\ufffd'),
734            (b'\x00A\xff', 'A\ufffd'),
735            (b'\x00A\x00B\x00C\x00DZ', 'ABCD\ufffd'),
736            (b'\xd8\x00', '\ufffd'),
737            (b'\xd8\x00\xdc', '\ufffd'),
738            (b'\xd8\x00\x00A', '\ufffdA'),
739            (b'\xdc\x00\x00A', '\ufffdA'),
740        ]
741        for raw, expected in tests:
742            self.assertRaises(UnicodeDecodeError, codecs.utf_16_be_decode,
743                              raw, 'strict', True)
744            self.assertEqual(raw.decode('utf-16be', 'replace'), expected)
745
746    def test_nonbmp(self):
747        self.assertEqual("\U00010203".encode(self.encoding),
748                         b'\xd8\x00\xde\x03')
749        self.assertEqual(b'\xd8\x00\xde\x03'.decode(self.encoding),
750                         "\U00010203")
751
752class UTF8Test(ReadTest, unittest.TestCase):
753    encoding = "utf-8"
754    ill_formed_sequence = b"\xed\xb2\x80"
755    ill_formed_sequence_replace = "\ufffd" * 3
756    BOM = b''
757
758    def test_partial(self):
759        self.check_partial(
760            "\x00\xff\u07ff\u0800\uffff\U00010000",
761            [
762                "\x00",
763                "\x00",
764                "\x00\xff",
765                "\x00\xff",
766                "\x00\xff\u07ff",
767                "\x00\xff\u07ff",
768                "\x00\xff\u07ff",
769                "\x00\xff\u07ff\u0800",
770                "\x00\xff\u07ff\u0800",
771                "\x00\xff\u07ff\u0800",
772                "\x00\xff\u07ff\u0800\uffff",
773                "\x00\xff\u07ff\u0800\uffff",
774                "\x00\xff\u07ff\u0800\uffff",
775                "\x00\xff\u07ff\u0800\uffff",
776                "\x00\xff\u07ff\u0800\uffff\U00010000",
777            ]
778        )
779
780    def test_decoder_state(self):
781        u = "\x00\x7f\x80\xff\u0100\u07ff\u0800\uffff\U0010ffff"
782        self.check_state_handling_decode(self.encoding,
783                                         u, u.encode(self.encoding))
784
785    def test_decode_error(self):
786        for data, error_handler, expected in (
787            (b'[\x80\xff]', 'ignore', '[]'),
788            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
789            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
790            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
791        ):
792            with self.subTest(data=data, error_handler=error_handler,
793                              expected=expected):
794                self.assertEqual(data.decode(self.encoding, error_handler),
795                                 expected)
796
797    def test_lone_surrogates(self):
798        super().test_lone_surrogates()
799        # not sure if this is making sense for
800        # UTF-16 and UTF-32
801        self.assertEqual("[\uDC80]".encode(self.encoding, "surrogateescape"),
802                         self.BOM + b'[\x80]')
803
804        with self.assertRaises(UnicodeEncodeError) as cm:
805            "[\uDC80\uD800\uDFFF]".encode(self.encoding, "surrogateescape")
806        exc = cm.exception
807        self.assertEqual(exc.object[exc.start:exc.end], '\uD800\uDFFF')
808
809    def test_surrogatepass_handler(self):
810        self.assertEqual("abc\ud800def".encode(self.encoding, "surrogatepass"),
811                         self.BOM + b"abc\xed\xa0\x80def")
812        self.assertEqual("\U00010fff\uD800".encode(self.encoding, "surrogatepass"),
813                         self.BOM + b"\xf0\x90\xbf\xbf\xed\xa0\x80")
814        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "surrogatepass"),
815                         self.BOM + b'[\xed\xa0\x80\xed\xb2\x80]')
816
817        self.assertEqual(b"abc\xed\xa0\x80def".decode(self.encoding, "surrogatepass"),
818                         "abc\ud800def")
819        self.assertEqual(b"\xf0\x90\xbf\xbf\xed\xa0\x80".decode(self.encoding, "surrogatepass"),
820                         "\U00010fff\uD800")
821
822        self.assertTrue(codecs.lookup_error("surrogatepass"))
823        with self.assertRaises(UnicodeDecodeError):
824            b"abc\xed\xa0".decode(self.encoding, "surrogatepass")
825        with self.assertRaises(UnicodeDecodeError):
826            b"abc\xed\xa0z".decode(self.encoding, "surrogatepass")
827
828
829@unittest.skipUnless(sys.platform == 'win32',
830                     'cp65001 is a Windows-only codec')
831class CP65001Test(ReadTest, unittest.TestCase):
832    encoding = "cp65001"
833
834    def test_encode(self):
835        tests = [
836            ('abc', 'strict', b'abc'),
837            ('\xe9\u20ac', 'strict',  b'\xc3\xa9\xe2\x82\xac'),
838            ('\U0010ffff', 'strict', b'\xf4\x8f\xbf\xbf'),
839            ('\udc80', 'strict', None),
840            ('\udc80', 'ignore', b''),
841            ('\udc80', 'replace', b'?'),
842            ('\udc80', 'backslashreplace', b'\\udc80'),
843            ('\udc80', 'namereplace', b'\\udc80'),
844            ('\udc80', 'surrogatepass', b'\xed\xb2\x80'),
845        ]
846        for text, errors, expected in tests:
847            if expected is not None:
848                try:
849                    encoded = text.encode('cp65001', errors)
850                except UnicodeEncodeError as err:
851                    self.fail('Unable to encode %a to cp65001 with '
852                              'errors=%r: %s' % (text, errors, err))
853                self.assertEqual(encoded, expected,
854                    '%a.encode("cp65001", %r)=%a != %a'
855                    % (text, errors, encoded, expected))
856            else:
857                self.assertRaises(UnicodeEncodeError,
858                    text.encode, "cp65001", errors)
859
860    def test_decode(self):
861        tests = [
862            (b'abc', 'strict', 'abc'),
863            (b'\xc3\xa9\xe2\x82\xac', 'strict', '\xe9\u20ac'),
864            (b'\xf4\x8f\xbf\xbf', 'strict', '\U0010ffff'),
865            (b'\xef\xbf\xbd', 'strict', '\ufffd'),
866            (b'[\xc3\xa9]', 'strict', '[\xe9]'),
867            # invalid bytes
868            (b'[\xff]', 'strict', None),
869            (b'[\xff]', 'ignore', '[]'),
870            (b'[\xff]', 'replace', '[\ufffd]'),
871            (b'[\xff]', 'surrogateescape', '[\udcff]'),
872            (b'[\xed\xb2\x80]', 'strict', None),
873            (b'[\xed\xb2\x80]', 'ignore', '[]'),
874            (b'[\xed\xb2\x80]', 'replace', '[\ufffd\ufffd\ufffd]'),
875        ]
876        for raw, errors, expected in tests:
877            if expected is not None:
878                try:
879                    decoded = raw.decode('cp65001', errors)
880                except UnicodeDecodeError as err:
881                    self.fail('Unable to decode %a from cp65001 with '
882                              'errors=%r: %s' % (raw, errors, err))
883                self.assertEqual(decoded, expected,
884                    '%a.decode("cp65001", %r)=%a != %a'
885                    % (raw, errors, decoded, expected))
886            else:
887                self.assertRaises(UnicodeDecodeError,
888                    raw.decode, 'cp65001', errors)
889
890    def test_lone_surrogates(self):
891        self.assertRaises(UnicodeEncodeError, "\ud800".encode, "cp65001")
892        self.assertRaises(UnicodeDecodeError, b"\xed\xa0\x80".decode, "cp65001")
893        self.assertEqual("[\uDC80]".encode("cp65001", "backslashreplace"),
894                         b'[\\udc80]')
895        self.assertEqual("[\uDC80]".encode("cp65001", "namereplace"),
896                         b'[\\udc80]')
897        self.assertEqual("[\uDC80]".encode("cp65001", "xmlcharrefreplace"),
898                         b'[&#56448;]')
899        self.assertEqual("[\uDC80]".encode("cp65001", "surrogateescape"),
900                         b'[\x80]')
901        self.assertEqual("[\uDC80]".encode("cp65001", "ignore"),
902                         b'[]')
903        self.assertEqual("[\uDC80]".encode("cp65001", "replace"),
904                         b'[?]')
905
906    def test_surrogatepass_handler(self):
907        self.assertEqual("abc\ud800def".encode("cp65001", "surrogatepass"),
908                         b"abc\xed\xa0\x80def")
909        self.assertEqual(b"abc\xed\xa0\x80def".decode("cp65001", "surrogatepass"),
910                         "abc\ud800def")
911        self.assertEqual("\U00010fff\uD800".encode("cp65001", "surrogatepass"),
912                         b"\xf0\x90\xbf\xbf\xed\xa0\x80")
913        self.assertEqual(b"\xf0\x90\xbf\xbf\xed\xa0\x80".decode("cp65001", "surrogatepass"),
914                         "\U00010fff\uD800")
915        self.assertTrue(codecs.lookup_error("surrogatepass"))
916
917
918class UTF7Test(ReadTest, unittest.TestCase):
919    encoding = "utf-7"
920
921    def test_ascii(self):
922        # Set D (directly encoded characters)
923        set_d = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
924                 'abcdefghijklmnopqrstuvwxyz'
925                 '0123456789'
926                 '\'(),-./:?')
927        self.assertEqual(set_d.encode(self.encoding), set_d.encode('ascii'))
928        self.assertEqual(set_d.encode('ascii').decode(self.encoding), set_d)
929        # Set O (optional direct characters)
930        set_o = ' !"#$%&*;<=>@[]^_`{|}'
931        self.assertEqual(set_o.encode(self.encoding), set_o.encode('ascii'))
932        self.assertEqual(set_o.encode('ascii').decode(self.encoding), set_o)
933        # +
934        self.assertEqual('a+b'.encode(self.encoding), b'a+-b')
935        self.assertEqual(b'a+-b'.decode(self.encoding), 'a+b')
936        # White spaces
937        ws = ' \t\n\r'
938        self.assertEqual(ws.encode(self.encoding), ws.encode('ascii'))
939        self.assertEqual(ws.encode('ascii').decode(self.encoding), ws)
940        # Other ASCII characters
941        other_ascii = ''.join(sorted(set(bytes(range(0x80)).decode()) -
942                                     set(set_d + set_o + '+' + ws)))
943        self.assertEqual(other_ascii.encode(self.encoding),
944                         b'+AAAAAQACAAMABAAFAAYABwAIAAsADAAOAA8AEAARABIAEwAU'
945                         b'ABUAFgAXABgAGQAaABsAHAAdAB4AHwBcAH4Afw-')
946
947    def test_partial(self):
948        self.check_partial(
949            'a+-b\x00c\x80d\u0100e\U00010000f',
950            [
951                'a',
952                'a',
953                'a+',
954                'a+-',
955                'a+-b',
956                'a+-b',
957                'a+-b',
958                'a+-b',
959                'a+-b',
960                'a+-b\x00',
961                'a+-b\x00c',
962                'a+-b\x00c',
963                'a+-b\x00c',
964                'a+-b\x00c',
965                'a+-b\x00c',
966                'a+-b\x00c\x80',
967                'a+-b\x00c\x80d',
968                'a+-b\x00c\x80d',
969                'a+-b\x00c\x80d',
970                'a+-b\x00c\x80d',
971                'a+-b\x00c\x80d',
972                'a+-b\x00c\x80d\u0100',
973                'a+-b\x00c\x80d\u0100e',
974                'a+-b\x00c\x80d\u0100e',
975                'a+-b\x00c\x80d\u0100e',
976                'a+-b\x00c\x80d\u0100e',
977                'a+-b\x00c\x80d\u0100e',
978                'a+-b\x00c\x80d\u0100e',
979                'a+-b\x00c\x80d\u0100e',
980                'a+-b\x00c\x80d\u0100e',
981                'a+-b\x00c\x80d\u0100e\U00010000',
982                'a+-b\x00c\x80d\u0100e\U00010000f',
983            ]
984        )
985
986    def test_errors(self):
987        tests = [
988            (b'\xffb', '\ufffdb'),
989            (b'a\xffb', 'a\ufffdb'),
990            (b'a\xff\xffb', 'a\ufffd\ufffdb'),
991            (b'a+IK', 'a\ufffd'),
992            (b'a+IK-b', 'a\ufffdb'),
993            (b'a+IK,b', 'a\ufffdb'),
994            (b'a+IKx', 'a\u20ac\ufffd'),
995            (b'a+IKx-b', 'a\u20ac\ufffdb'),
996            (b'a+IKwgr', 'a\u20ac\ufffd'),
997            (b'a+IKwgr-b', 'a\u20ac\ufffdb'),
998            (b'a+IKwgr,', 'a\u20ac\ufffd'),
999            (b'a+IKwgr,-b', 'a\u20ac\ufffd-b'),
1000            (b'a+IKwgrB', 'a\u20ac\u20ac\ufffd'),
1001            (b'a+IKwgrB-b', 'a\u20ac\u20ac\ufffdb'),
1002            (b'a+/,+IKw-b', 'a\ufffd\u20acb'),
1003            (b'a+//,+IKw-b', 'a\ufffd\u20acb'),
1004            (b'a+///,+IKw-b', 'a\uffff\ufffd\u20acb'),
1005            (b'a+////,+IKw-b', 'a\uffff\ufffd\u20acb'),
1006            (b'a+IKw-b\xff', 'a\u20acb\ufffd'),
1007            (b'a+IKw\xffb', 'a\u20ac\ufffdb'),
1008        ]
1009        for raw, expected in tests:
1010            with self.subTest(raw=raw):
1011                self.assertRaises(UnicodeDecodeError, codecs.utf_7_decode,
1012                                raw, 'strict', True)
1013                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1014
1015    def test_nonbmp(self):
1016        self.assertEqual('\U000104A0'.encode(self.encoding), b'+2AHcoA-')
1017        self.assertEqual('\ud801\udca0'.encode(self.encoding), b'+2AHcoA-')
1018        self.assertEqual(b'+2AHcoA-'.decode(self.encoding), '\U000104A0')
1019        self.assertEqual(b'+2AHcoA'.decode(self.encoding), '\U000104A0')
1020        self.assertEqual('\u20ac\U000104A0'.encode(self.encoding), b'+IKzYAdyg-')
1021        self.assertEqual(b'+IKzYAdyg-'.decode(self.encoding), '\u20ac\U000104A0')
1022        self.assertEqual(b'+IKzYAdyg'.decode(self.encoding), '\u20ac\U000104A0')
1023        self.assertEqual('\u20ac\u20ac\U000104A0'.encode(self.encoding),
1024                         b'+IKwgrNgB3KA-')
1025        self.assertEqual(b'+IKwgrNgB3KA-'.decode(self.encoding),
1026                         '\u20ac\u20ac\U000104A0')
1027        self.assertEqual(b'+IKwgrNgB3KA'.decode(self.encoding),
1028                         '\u20ac\u20ac\U000104A0')
1029
1030    def test_lone_surrogates(self):
1031        tests = [
1032            (b'a+2AE-b', 'a\ud801b'),
1033            (b'a+2AE\xffb', 'a\ufffdb'),
1034            (b'a+2AE', 'a\ufffd'),
1035            (b'a+2AEA-b', 'a\ufffdb'),
1036            (b'a+2AH-b', 'a\ufffdb'),
1037            (b'a+IKzYAQ-b', 'a\u20ac\ud801b'),
1038            (b'a+IKzYAQ\xffb', 'a\u20ac\ufffdb'),
1039            (b'a+IKzYAQA-b', 'a\u20ac\ufffdb'),
1040            (b'a+IKzYAd-b', 'a\u20ac\ufffdb'),
1041            (b'a+IKwgrNgB-b', 'a\u20ac\u20ac\ud801b'),
1042            (b'a+IKwgrNgB\xffb', 'a\u20ac\u20ac\ufffdb'),
1043            (b'a+IKwgrNgB', 'a\u20ac\u20ac\ufffd'),
1044            (b'a+IKwgrNgBA-b', 'a\u20ac\u20ac\ufffdb'),
1045        ]
1046        for raw, expected in tests:
1047            with self.subTest(raw=raw):
1048                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1049
1050
1051class UTF16ExTest(unittest.TestCase):
1052
1053    def test_errors(self):
1054        self.assertRaises(UnicodeDecodeError, codecs.utf_16_ex_decode, b"\xff", "strict", 0, True)
1055
1056    def test_bad_args(self):
1057        self.assertRaises(TypeError, codecs.utf_16_ex_decode)
1058
1059class ReadBufferTest(unittest.TestCase):
1060
1061    def test_array(self):
1062        import array
1063        self.assertEqual(
1064            codecs.readbuffer_encode(array.array("b", b"spam")),
1065            (b"spam", 4)
1066        )
1067
1068    def test_empty(self):
1069        self.assertEqual(codecs.readbuffer_encode(""), (b"", 0))
1070
1071    def test_bad_args(self):
1072        self.assertRaises(TypeError, codecs.readbuffer_encode)
1073        self.assertRaises(TypeError, codecs.readbuffer_encode, 42)
1074
1075class UTF8SigTest(UTF8Test, unittest.TestCase):
1076    encoding = "utf-8-sig"
1077    BOM = codecs.BOM_UTF8
1078
1079    def test_partial(self):
1080        self.check_partial(
1081            "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1082            [
1083                "",
1084                "",
1085                "", # First BOM has been read and skipped
1086                "",
1087                "",
1088                "\ufeff", # Second BOM has been read and emitted
1089                "\ufeff\x00", # "\x00" read and emitted
1090                "\ufeff\x00", # First byte of encoded "\xff" read
1091                "\ufeff\x00\xff", # Second byte of encoded "\xff" read
1092                "\ufeff\x00\xff", # First byte of encoded "\u07ff" read
1093                "\ufeff\x00\xff\u07ff", # Second byte of encoded "\u07ff" read
1094                "\ufeff\x00\xff\u07ff",
1095                "\ufeff\x00\xff\u07ff",
1096                "\ufeff\x00\xff\u07ff\u0800",
1097                "\ufeff\x00\xff\u07ff\u0800",
1098                "\ufeff\x00\xff\u07ff\u0800",
1099                "\ufeff\x00\xff\u07ff\u0800\uffff",
1100                "\ufeff\x00\xff\u07ff\u0800\uffff",
1101                "\ufeff\x00\xff\u07ff\u0800\uffff",
1102                "\ufeff\x00\xff\u07ff\u0800\uffff",
1103                "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1104            ]
1105        )
1106
1107    def test_bug1601501(self):
1108        # SF bug #1601501: check that the codec works with a buffer
1109        self.assertEqual(str(b"\xef\xbb\xbf", "utf-8-sig"), "")
1110
1111    def test_bom(self):
1112        d = codecs.getincrementaldecoder("utf-8-sig")()
1113        s = "spam"
1114        self.assertEqual(d.decode(s.encode("utf-8-sig")), s)
1115
1116    def test_stream_bom(self):
1117        unistring = "ABC\u00A1\u2200XYZ"
1118        bytestring = codecs.BOM_UTF8 + b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1119
1120        reader = codecs.getreader("utf-8-sig")
1121        for sizehint in [None] + list(range(1, 11)) + \
1122                        [64, 128, 256, 512, 1024]:
1123            istream = reader(io.BytesIO(bytestring))
1124            ostream = io.StringIO()
1125            while 1:
1126                if sizehint is not None:
1127                    data = istream.read(sizehint)
1128                else:
1129                    data = istream.read()
1130
1131                if not data:
1132                    break
1133                ostream.write(data)
1134
1135            got = ostream.getvalue()
1136            self.assertEqual(got, unistring)
1137
1138    def test_stream_bare(self):
1139        unistring = "ABC\u00A1\u2200XYZ"
1140        bytestring = b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1141
1142        reader = codecs.getreader("utf-8-sig")
1143        for sizehint in [None] + list(range(1, 11)) + \
1144                        [64, 128, 256, 512, 1024]:
1145            istream = reader(io.BytesIO(bytestring))
1146            ostream = io.StringIO()
1147            while 1:
1148                if sizehint is not None:
1149                    data = istream.read(sizehint)
1150                else:
1151                    data = istream.read()
1152
1153                if not data:
1154                    break
1155                ostream.write(data)
1156
1157            got = ostream.getvalue()
1158            self.assertEqual(got, unistring)
1159
1160class EscapeDecodeTest(unittest.TestCase):
1161    def test_empty(self):
1162        self.assertEqual(codecs.escape_decode(b""), (b"", 0))
1163        self.assertEqual(codecs.escape_decode(bytearray()), (b"", 0))
1164
1165    def test_raw(self):
1166        decode = codecs.escape_decode
1167        for b in range(256):
1168            b = bytes([b])
1169            if b != b'\\':
1170                self.assertEqual(decode(b + b'0'), (b + b'0', 2))
1171
1172    def test_escape(self):
1173        decode = codecs.escape_decode
1174        check = coding_checker(self, decode)
1175        check(b"[\\\n]", b"[]")
1176        check(br'[\"]', b'["]')
1177        check(br"[\']", b"[']")
1178        check(br"[\\]", b"[\\]")
1179        check(br"[\a]", b"[\x07]")
1180        check(br"[\b]", b"[\x08]")
1181        check(br"[\t]", b"[\x09]")
1182        check(br"[\n]", b"[\x0a]")
1183        check(br"[\v]", b"[\x0b]")
1184        check(br"[\f]", b"[\x0c]")
1185        check(br"[\r]", b"[\x0d]")
1186        check(br"[\7]", b"[\x07]")
1187        check(br"[\78]", b"[\x078]")
1188        check(br"[\41]", b"[!]")
1189        check(br"[\418]", b"[!8]")
1190        check(br"[\101]", b"[A]")
1191        check(br"[\1010]", b"[A0]")
1192        check(br"[\501]", b"[A]")
1193        check(br"[\x41]", b"[A]")
1194        check(br"[\x410]", b"[A0]")
1195        for i in range(97, 123):
1196            b = bytes([i])
1197            if b not in b'abfnrtvx':
1198                with self.assertWarns(DeprecationWarning):
1199                    check(b"\\" + b, b"\\" + b)
1200            with self.assertWarns(DeprecationWarning):
1201                check(b"\\" + b.upper(), b"\\" + b.upper())
1202        with self.assertWarns(DeprecationWarning):
1203            check(br"\8", b"\\8")
1204        with self.assertWarns(DeprecationWarning):
1205            check(br"\9", b"\\9")
1206
1207    def test_errors(self):
1208        decode = codecs.escape_decode
1209        self.assertRaises(ValueError, decode, br"\x")
1210        self.assertRaises(ValueError, decode, br"[\x]")
1211        self.assertEqual(decode(br"[\x]\x", "ignore"), (b"[]", 6))
1212        self.assertEqual(decode(br"[\x]\x", "replace"), (b"[?]?", 6))
1213        self.assertRaises(ValueError, decode, br"\x0")
1214        self.assertRaises(ValueError, decode, br"[\x0]")
1215        self.assertEqual(decode(br"[\x0]\x0", "ignore"), (b"[]", 8))
1216        self.assertEqual(decode(br"[\x0]\x0", "replace"), (b"[?]?", 8))
1217
1218
1219class RecodingTest(unittest.TestCase):
1220    def test_recoding(self):
1221        f = io.BytesIO()
1222        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
1223        f2.write("a")
1224        f2.close()
1225        # Python used to crash on this at exit because of a refcount
1226        # bug in _codecsmodule.c
1227
1228        self.assertTrue(f.closed)
1229
1230# From RFC 3492
1231punycode_testcases = [
1232    # A Arabic (Egyptian):
1233    ("\u0644\u064A\u0647\u0645\u0627\u0628\u062A\u0643\u0644"
1234     "\u0645\u0648\u0634\u0639\u0631\u0628\u064A\u061F",
1235     b"egbpdaj6bu4bxfgehfvwxn"),
1236    # B Chinese (simplified):
1237    ("\u4ED6\u4EEC\u4E3A\u4EC0\u4E48\u4E0D\u8BF4\u4E2D\u6587",
1238     b"ihqwcrb4cv8a8dqg056pqjye"),
1239    # C Chinese (traditional):
1240    ("\u4ED6\u5011\u7232\u4EC0\u9EBD\u4E0D\u8AAA\u4E2D\u6587",
1241     b"ihqwctvzc91f659drss3x8bo0yb"),
1242    # D Czech: Pro<ccaron>prost<ecaron>nemluv<iacute><ccaron>esky
1243    ("\u0050\u0072\u006F\u010D\u0070\u0072\u006F\u0073\u0074"
1244     "\u011B\u006E\u0065\u006D\u006C\u0075\u0076\u00ED\u010D"
1245     "\u0065\u0073\u006B\u0079",
1246     b"Proprostnemluvesky-uyb24dma41a"),
1247    # E Hebrew:
1248    ("\u05DC\u05DE\u05D4\u05D4\u05DD\u05E4\u05E9\u05D5\u05D8"
1249     "\u05DC\u05D0\u05DE\u05D3\u05D1\u05E8\u05D9\u05DD\u05E2"
1250     "\u05D1\u05E8\u05D9\u05EA",
1251     b"4dbcagdahymbxekheh6e0a7fei0b"),
1252    # F Hindi (Devanagari):
1253    ("\u092F\u0939\u0932\u094B\u0917\u0939\u093F\u0928\u094D"
1254     "\u0926\u0940\u0915\u094D\u092F\u094B\u0902\u0928\u0939"
1255     "\u0940\u0902\u092C\u094B\u0932\u0938\u0915\u0924\u0947"
1256     "\u0939\u0948\u0902",
1257     b"i1baa7eci9glrd9b2ae1bj0hfcgg6iyaf8o0a1dig0cd"),
1258
1259    #(G) Japanese (kanji and hiragana):
1260    ("\u306A\u305C\u307F\u3093\u306A\u65E5\u672C\u8A9E\u3092"
1261     "\u8A71\u3057\u3066\u304F\u308C\u306A\u3044\u306E\u304B",
1262     b"n8jok5ay5dzabd5bym9f0cm5685rrjetr6pdxa"),
1263
1264    # (H) Korean (Hangul syllables):
1265    ("\uC138\uACC4\uC758\uBAA8\uB4E0\uC0AC\uB78C\uB4E4\uC774"
1266     "\uD55C\uAD6D\uC5B4\uB97C\uC774\uD574\uD55C\uB2E4\uBA74"
1267     "\uC5BC\uB9C8\uB098\uC88B\uC744\uAE4C",
1268     b"989aomsvi5e83db1d2a355cv1e0vak1dwrv93d5xbh15a0dt30a5j"
1269     b"psd879ccm6fea98c"),
1270
1271    # (I) Russian (Cyrillic):
1272    ("\u043F\u043E\u0447\u0435\u043C\u0443\u0436\u0435\u043E"
1273     "\u043D\u0438\u043D\u0435\u0433\u043E\u0432\u043E\u0440"
1274     "\u044F\u0442\u043F\u043E\u0440\u0443\u0441\u0441\u043A"
1275     "\u0438",
1276     b"b1abfaaepdrnnbgefbaDotcwatmq2g4l"),
1277
1278    # (J) Spanish: Porqu<eacute>nopuedensimplementehablarenEspa<ntilde>ol
1279    ("\u0050\u006F\u0072\u0071\u0075\u00E9\u006E\u006F\u0070"
1280     "\u0075\u0065\u0064\u0065\u006E\u0073\u0069\u006D\u0070"
1281     "\u006C\u0065\u006D\u0065\u006E\u0074\u0065\u0068\u0061"
1282     "\u0062\u006C\u0061\u0072\u0065\u006E\u0045\u0073\u0070"
1283     "\u0061\u00F1\u006F\u006C",
1284     b"PorqunopuedensimplementehablarenEspaol-fmd56a"),
1285
1286    # (K) Vietnamese:
1287    #  T<adotbelow>isaoh<odotbelow>kh<ocirc>ngth<ecirchookabove>ch\
1288    #   <ihookabove>n<oacute>iti<ecircacute>ngVi<ecircdotbelow>t
1289    ("\u0054\u1EA1\u0069\u0073\u0061\u006F\u0068\u1ECD\u006B"
1290     "\u0068\u00F4\u006E\u0067\u0074\u0068\u1EC3\u0063\u0068"
1291     "\u1EC9\u006E\u00F3\u0069\u0074\u0069\u1EBF\u006E\u0067"
1292     "\u0056\u0069\u1EC7\u0074",
1293     b"TisaohkhngthchnitingVit-kjcr8268qyxafd2f1b9g"),
1294
1295    #(L) 3<nen>B<gumi><kinpachi><sensei>
1296    ("\u0033\u5E74\u0042\u7D44\u91D1\u516B\u5148\u751F",
1297     b"3B-ww4c5e180e575a65lsy2b"),
1298
1299    # (M) <amuro><namie>-with-SUPER-MONKEYS
1300    ("\u5B89\u5BA4\u5948\u7F8E\u6075\u002D\u0077\u0069\u0074"
1301     "\u0068\u002D\u0053\u0055\u0050\u0045\u0052\u002D\u004D"
1302     "\u004F\u004E\u004B\u0045\u0059\u0053",
1303     b"-with-SUPER-MONKEYS-pc58ag80a8qai00g7n9n"),
1304
1305    # (N) Hello-Another-Way-<sorezore><no><basho>
1306    ("\u0048\u0065\u006C\u006C\u006F\u002D\u0041\u006E\u006F"
1307     "\u0074\u0068\u0065\u0072\u002D\u0057\u0061\u0079\u002D"
1308     "\u305D\u308C\u305E\u308C\u306E\u5834\u6240",
1309     b"Hello-Another-Way--fc4qua05auwb3674vfr0b"),
1310
1311    # (O) <hitotsu><yane><no><shita>2
1312    ("\u3072\u3068\u3064\u5C4B\u6839\u306E\u4E0B\u0032",
1313     b"2-u9tlzr9756bt3uc0v"),
1314
1315    # (P) Maji<de>Koi<suru>5<byou><mae>
1316    ("\u004D\u0061\u006A\u0069\u3067\u004B\u006F\u0069\u3059"
1317     "\u308B\u0035\u79D2\u524D",
1318     b"MajiKoi5-783gue6qz075azm5e"),
1319
1320     # (Q) <pafii>de<runba>
1321    ("\u30D1\u30D5\u30A3\u30FC\u0064\u0065\u30EB\u30F3\u30D0",
1322     b"de-jg4avhby1noc0d"),
1323
1324    # (R) <sono><supiido><de>
1325    ("\u305D\u306E\u30B9\u30D4\u30FC\u30C9\u3067",
1326     b"d9juau41awczczp"),
1327
1328    # (S) -> $1.00 <-
1329    ("\u002D\u003E\u0020\u0024\u0031\u002E\u0030\u0030\u0020"
1330     "\u003C\u002D",
1331     b"-> $1.00 <--")
1332    ]
1333
1334for i in punycode_testcases:
1335    if len(i)!=2:
1336        print(repr(i))
1337
1338
1339class PunycodeTest(unittest.TestCase):
1340    def test_encode(self):
1341        for uni, puny in punycode_testcases:
1342            # Need to convert both strings to lower case, since
1343            # some of the extended encodings use upper case, but our
1344            # code produces only lower case. Converting just puny to
1345            # lower is also insufficient, since some of the input characters
1346            # are upper case.
1347            self.assertEqual(
1348                str(uni.encode("punycode"), "ascii").lower(),
1349                str(puny, "ascii").lower()
1350            )
1351
1352    def test_decode(self):
1353        for uni, puny in punycode_testcases:
1354            self.assertEqual(uni, puny.decode("punycode"))
1355            puny = puny.decode("ascii").encode("ascii")
1356            self.assertEqual(uni, puny.decode("punycode"))
1357
1358
1359class UnicodeInternalTest(unittest.TestCase):
1360    @unittest.skipUnless(SIZEOF_WCHAR_T == 4, 'specific to 32-bit wchar_t')
1361    def test_bug1251300(self):
1362        # Decoding with unicode_internal used to not correctly handle "code
1363        # points" above 0x10ffff on UCS-4 builds.
1364        ok = [
1365            (b"\x00\x10\xff\xff", "\U0010ffff"),
1366            (b"\x00\x00\x01\x01", "\U00000101"),
1367            (b"", ""),
1368        ]
1369        not_ok = [
1370            b"\x7f\xff\xff\xff",
1371            b"\x80\x00\x00\x00",
1372            b"\x81\x00\x00\x00",
1373            b"\x00",
1374            b"\x00\x00\x00\x00\x00",
1375        ]
1376        for internal, uni in ok:
1377            if sys.byteorder == "little":
1378                internal = bytes(reversed(internal))
1379            with support.check_warnings():
1380                self.assertEqual(uni, internal.decode("unicode_internal"))
1381        for internal in not_ok:
1382            if sys.byteorder == "little":
1383                internal = bytes(reversed(internal))
1384            with support.check_warnings(('unicode_internal codec has been '
1385                                         'deprecated', DeprecationWarning)):
1386                self.assertRaises(UnicodeDecodeError, internal.decode,
1387                                  "unicode_internal")
1388        if sys.byteorder == "little":
1389            invalid = b"\x00\x00\x11\x00"
1390            invalid_backslashreplace = r"\x00\x00\x11\x00"
1391        else:
1392            invalid = b"\x00\x11\x00\x00"
1393            invalid_backslashreplace = r"\x00\x11\x00\x00"
1394        with support.check_warnings():
1395            self.assertRaises(UnicodeDecodeError,
1396                              invalid.decode, "unicode_internal")
1397        with support.check_warnings():
1398            self.assertEqual(invalid.decode("unicode_internal", "replace"),
1399                             '\ufffd')
1400        with support.check_warnings():
1401            self.assertEqual(invalid.decode("unicode_internal", "backslashreplace"),
1402                             invalid_backslashreplace)
1403
1404    @unittest.skipUnless(SIZEOF_WCHAR_T == 4, 'specific to 32-bit wchar_t')
1405    def test_decode_error_attributes(self):
1406        try:
1407            with support.check_warnings(('unicode_internal codec has been '
1408                                         'deprecated', DeprecationWarning)):
1409                b"\x00\x00\x00\x00\x00\x11\x11\x00".decode("unicode_internal")
1410        except UnicodeDecodeError as ex:
1411            self.assertEqual("unicode_internal", ex.encoding)
1412            self.assertEqual(b"\x00\x00\x00\x00\x00\x11\x11\x00", ex.object)
1413            self.assertEqual(4, ex.start)
1414            self.assertEqual(8, ex.end)
1415        else:
1416            self.fail()
1417
1418    @unittest.skipUnless(SIZEOF_WCHAR_T == 4, 'specific to 32-bit wchar_t')
1419    def test_decode_callback(self):
1420        codecs.register_error("UnicodeInternalTest", codecs.ignore_errors)
1421        decoder = codecs.getdecoder("unicode_internal")
1422        with support.check_warnings(('unicode_internal codec has been '
1423                                     'deprecated', DeprecationWarning)):
1424            ab = "ab".encode("unicode_internal").decode()
1425            ignored = decoder(bytes("%s\x22\x22\x22\x22%s" % (ab[:4], ab[4:]),
1426                                    "ascii"),
1427                              "UnicodeInternalTest")
1428        self.assertEqual(("ab", 12), ignored)
1429
1430    def test_encode_length(self):
1431        with support.check_warnings(('unicode_internal codec has been '
1432                                     'deprecated', DeprecationWarning)):
1433            # Issue 3739
1434            encoder = codecs.getencoder("unicode_internal")
1435            self.assertEqual(encoder("a")[1], 1)
1436            self.assertEqual(encoder("\xe9\u0142")[1], 2)
1437
1438            self.assertEqual(codecs.escape_encode(br'\x00')[1], 4)
1439
1440# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
1441nameprep_tests = [
1442    # 3.1 Map to nothing.
1443    (b'foo\xc2\xad\xcd\x8f\xe1\xa0\x86\xe1\xa0\x8bbar'
1444     b'\xe2\x80\x8b\xe2\x81\xa0baz\xef\xb8\x80\xef\xb8\x88\xef'
1445     b'\xb8\x8f\xef\xbb\xbf',
1446     b'foobarbaz'),
1447    # 3.2 Case folding ASCII U+0043 U+0041 U+0046 U+0045.
1448    (b'CAFE',
1449     b'cafe'),
1450    # 3.3 Case folding 8bit U+00DF (german sharp s).
1451    # The original test case is bogus; it says \xc3\xdf
1452    (b'\xc3\x9f',
1453     b'ss'),
1454    # 3.4 Case folding U+0130 (turkish capital I with dot).
1455    (b'\xc4\xb0',
1456     b'i\xcc\x87'),
1457    # 3.5 Case folding multibyte U+0143 U+037A.
1458    (b'\xc5\x83\xcd\xba',
1459     b'\xc5\x84 \xce\xb9'),
1460    # 3.6 Case folding U+2121 U+33C6 U+1D7BB.
1461    # XXX: skip this as it fails in UCS-2 mode
1462    #('\xe2\x84\xa1\xe3\x8f\x86\xf0\x9d\x9e\xbb',
1463    # 'telc\xe2\x88\x95kg\xcf\x83'),
1464    (None, None),
1465    # 3.7 Normalization of U+006a U+030c U+00A0 U+00AA.
1466    (b'j\xcc\x8c\xc2\xa0\xc2\xaa',
1467     b'\xc7\xb0 a'),
1468    # 3.8 Case folding U+1FB7 and normalization.
1469    (b'\xe1\xbe\xb7',
1470     b'\xe1\xbe\xb6\xce\xb9'),
1471    # 3.9 Self-reverting case folding U+01F0 and normalization.
1472    # The original test case is bogus, it says `\xc7\xf0'
1473    (b'\xc7\xb0',
1474     b'\xc7\xb0'),
1475    # 3.10 Self-reverting case folding U+0390 and normalization.
1476    (b'\xce\x90',
1477     b'\xce\x90'),
1478    # 3.11 Self-reverting case folding U+03B0 and normalization.
1479    (b'\xce\xb0',
1480     b'\xce\xb0'),
1481    # 3.12 Self-reverting case folding U+1E96 and normalization.
1482    (b'\xe1\xba\x96',
1483     b'\xe1\xba\x96'),
1484    # 3.13 Self-reverting case folding U+1F56 and normalization.
1485    (b'\xe1\xbd\x96',
1486     b'\xe1\xbd\x96'),
1487    # 3.14 ASCII space character U+0020.
1488    (b' ',
1489     b' '),
1490    # 3.15 Non-ASCII 8bit space character U+00A0.
1491    (b'\xc2\xa0',
1492     b' '),
1493    # 3.16 Non-ASCII multibyte space character U+1680.
1494    (b'\xe1\x9a\x80',
1495     None),
1496    # 3.17 Non-ASCII multibyte space character U+2000.
1497    (b'\xe2\x80\x80',
1498     b' '),
1499    # 3.18 Zero Width Space U+200b.
1500    (b'\xe2\x80\x8b',
1501     b''),
1502    # 3.19 Non-ASCII multibyte space character U+3000.
1503    (b'\xe3\x80\x80',
1504     b' '),
1505    # 3.20 ASCII control characters U+0010 U+007F.
1506    (b'\x10\x7f',
1507     b'\x10\x7f'),
1508    # 3.21 Non-ASCII 8bit control character U+0085.
1509    (b'\xc2\x85',
1510     None),
1511    # 3.22 Non-ASCII multibyte control character U+180E.
1512    (b'\xe1\xa0\x8e',
1513     None),
1514    # 3.23 Zero Width No-Break Space U+FEFF.
1515    (b'\xef\xbb\xbf',
1516     b''),
1517    # 3.24 Non-ASCII control character U+1D175.
1518    (b'\xf0\x9d\x85\xb5',
1519     None),
1520    # 3.25 Plane 0 private use character U+F123.
1521    (b'\xef\x84\xa3',
1522     None),
1523    # 3.26 Plane 15 private use character U+F1234.
1524    (b'\xf3\xb1\x88\xb4',
1525     None),
1526    # 3.27 Plane 16 private use character U+10F234.
1527    (b'\xf4\x8f\x88\xb4',
1528     None),
1529    # 3.28 Non-character code point U+8FFFE.
1530    (b'\xf2\x8f\xbf\xbe',
1531     None),
1532    # 3.29 Non-character code point U+10FFFF.
1533    (b'\xf4\x8f\xbf\xbf',
1534     None),
1535    # 3.30 Surrogate code U+DF42.
1536    (b'\xed\xbd\x82',
1537     None),
1538    # 3.31 Non-plain text character U+FFFD.
1539    (b'\xef\xbf\xbd',
1540     None),
1541    # 3.32 Ideographic description character U+2FF5.
1542    (b'\xe2\xbf\xb5',
1543     None),
1544    # 3.33 Display property character U+0341.
1545    (b'\xcd\x81',
1546     b'\xcc\x81'),
1547    # 3.34 Left-to-right mark U+200E.
1548    (b'\xe2\x80\x8e',
1549     None),
1550    # 3.35 Deprecated U+202A.
1551    (b'\xe2\x80\xaa',
1552     None),
1553    # 3.36 Language tagging character U+E0001.
1554    (b'\xf3\xa0\x80\x81',
1555     None),
1556    # 3.37 Language tagging character U+E0042.
1557    (b'\xf3\xa0\x81\x82',
1558     None),
1559    # 3.38 Bidi: RandALCat character U+05BE and LCat characters.
1560    (b'foo\xd6\xbebar',
1561     None),
1562    # 3.39 Bidi: RandALCat character U+FD50 and LCat characters.
1563    (b'foo\xef\xb5\x90bar',
1564     None),
1565    # 3.40 Bidi: RandALCat character U+FB38 and LCat characters.
1566    (b'foo\xef\xb9\xb6bar',
1567     b'foo \xd9\x8ebar'),
1568    # 3.41 Bidi: RandALCat without trailing RandALCat U+0627 U+0031.
1569    (b'\xd8\xa71',
1570     None),
1571    # 3.42 Bidi: RandALCat character U+0627 U+0031 U+0628.
1572    (b'\xd8\xa71\xd8\xa8',
1573     b'\xd8\xa71\xd8\xa8'),
1574    # 3.43 Unassigned code point U+E0002.
1575    # Skip this test as we allow unassigned
1576    #(b'\xf3\xa0\x80\x82',
1577    # None),
1578    (None, None),
1579    # 3.44 Larger test (shrinking).
1580    # Original test case reads \xc3\xdf
1581    (b'X\xc2\xad\xc3\x9f\xc4\xb0\xe2\x84\xa1j\xcc\x8c\xc2\xa0\xc2'
1582     b'\xaa\xce\xb0\xe2\x80\x80',
1583     b'xssi\xcc\x87tel\xc7\xb0 a\xce\xb0 '),
1584    # 3.45 Larger test (expanding).
1585    # Original test case reads \xc3\x9f
1586    (b'X\xc3\x9f\xe3\x8c\x96\xc4\xb0\xe2\x84\xa1\xe2\x92\x9f\xe3\x8c'
1587     b'\x80',
1588     b'xss\xe3\x82\xad\xe3\x83\xad\xe3\x83\xa1\xe3\x83\xbc\xe3'
1589     b'\x83\x88\xe3\x83\xabi\xcc\x87tel\x28d\x29\xe3\x82'
1590     b'\xa2\xe3\x83\x91\xe3\x83\xbc\xe3\x83\x88')
1591    ]
1592
1593
1594class NameprepTest(unittest.TestCase):
1595    def test_nameprep(self):
1596        from encodings.idna import nameprep
1597        for pos, (orig, prepped) in enumerate(nameprep_tests):
1598            if orig is None:
1599                # Skipped
1600                continue
1601            # The Unicode strings are given in UTF-8
1602            orig = str(orig, "utf-8", "surrogatepass")
1603            if prepped is None:
1604                # Input contains prohibited characters
1605                self.assertRaises(UnicodeError, nameprep, orig)
1606            else:
1607                prepped = str(prepped, "utf-8", "surrogatepass")
1608                try:
1609                    self.assertEqual(nameprep(orig), prepped)
1610                except Exception as e:
1611                    raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
1612
1613
1614class IDNACodecTest(unittest.TestCase):
1615    def test_builtin_decode(self):
1616        self.assertEqual(str(b"python.org", "idna"), "python.org")
1617        self.assertEqual(str(b"python.org.", "idna"), "python.org.")
1618        self.assertEqual(str(b"xn--pythn-mua.org", "idna"), "pyth\xf6n.org")
1619        self.assertEqual(str(b"xn--pythn-mua.org.", "idna"), "pyth\xf6n.org.")
1620
1621    def test_builtin_encode(self):
1622        self.assertEqual("python.org".encode("idna"), b"python.org")
1623        self.assertEqual("python.org.".encode("idna"), b"python.org.")
1624        self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org")
1625        self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
1626
1627    def test_stream(self):
1628        r = codecs.getreader("idna")(io.BytesIO(b"abc"))
1629        r.read(3)
1630        self.assertEqual(r.read(), "")
1631
1632    def test_incremental_decode(self):
1633        self.assertEqual(
1634            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
1635            "python.org"
1636        )
1637        self.assertEqual(
1638            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")),
1639            "python.org."
1640        )
1641        self.assertEqual(
1642            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1643            "pyth\xf6n.org."
1644        )
1645        self.assertEqual(
1646            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1647            "pyth\xf6n.org."
1648        )
1649
1650        decoder = codecs.getincrementaldecoder("idna")()
1651        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1652        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1653        self.assertEqual(decoder.decode(b"rg"), "")
1654        self.assertEqual(decoder.decode(b"", True), "org")
1655
1656        decoder.reset()
1657        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1658        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1659        self.assertEqual(decoder.decode(b"rg."), "org.")
1660        self.assertEqual(decoder.decode(b"", True), "")
1661
1662    def test_incremental_encode(self):
1663        self.assertEqual(
1664            b"".join(codecs.iterencode("python.org", "idna")),
1665            b"python.org"
1666        )
1667        self.assertEqual(
1668            b"".join(codecs.iterencode("python.org.", "idna")),
1669            b"python.org."
1670        )
1671        self.assertEqual(
1672            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1673            b"xn--pythn-mua.org."
1674        )
1675        self.assertEqual(
1676            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1677            b"xn--pythn-mua.org."
1678        )
1679
1680        encoder = codecs.getincrementalencoder("idna")()
1681        self.assertEqual(encoder.encode("\xe4x"), b"")
1682        self.assertEqual(encoder.encode("ample.org"), b"xn--xample-9ta.")
1683        self.assertEqual(encoder.encode("", True), b"org")
1684
1685        encoder.reset()
1686        self.assertEqual(encoder.encode("\xe4x"), b"")
1687        self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.")
1688        self.assertEqual(encoder.encode("", True), b"")
1689
1690    def test_errors(self):
1691        """Only supports "strict" error handler"""
1692        "python.org".encode("idna", "strict")
1693        b"python.org".decode("idna", "strict")
1694        for errors in ("ignore", "replace", "backslashreplace",
1695                "surrogateescape"):
1696            self.assertRaises(Exception, "python.org".encode, "idna", errors)
1697            self.assertRaises(Exception,
1698                b"python.org".decode, "idna", errors)
1699
1700
1701class CodecsModuleTest(unittest.TestCase):
1702
1703    def test_decode(self):
1704        self.assertEqual(codecs.decode(b'\xe4\xf6\xfc', 'latin-1'),
1705                         '\xe4\xf6\xfc')
1706        self.assertRaises(TypeError, codecs.decode)
1707        self.assertEqual(codecs.decode(b'abc'), 'abc')
1708        self.assertRaises(UnicodeDecodeError, codecs.decode, b'\xff', 'ascii')
1709
1710        # test keywords
1711        self.assertEqual(codecs.decode(obj=b'\xe4\xf6\xfc', encoding='latin-1'),
1712                         '\xe4\xf6\xfc')
1713        self.assertEqual(codecs.decode(b'[\xff]', 'ascii', errors='ignore'),
1714                         '[]')
1715
1716    def test_encode(self):
1717        self.assertEqual(codecs.encode('\xe4\xf6\xfc', 'latin-1'),
1718                         b'\xe4\xf6\xfc')
1719        self.assertRaises(TypeError, codecs.encode)
1720        self.assertRaises(LookupError, codecs.encode, "foo", "__spam__")
1721        self.assertEqual(codecs.encode('abc'), b'abc')
1722        self.assertRaises(UnicodeEncodeError, codecs.encode, '\xffff', 'ascii')
1723
1724        # test keywords
1725        self.assertEqual(codecs.encode(obj='\xe4\xf6\xfc', encoding='latin-1'),
1726                         b'\xe4\xf6\xfc')
1727        self.assertEqual(codecs.encode('[\xff]', 'ascii', errors='ignore'),
1728                         b'[]')
1729
1730    def test_register(self):
1731        self.assertRaises(TypeError, codecs.register)
1732        self.assertRaises(TypeError, codecs.register, 42)
1733
1734    def test_lookup(self):
1735        self.assertRaises(TypeError, codecs.lookup)
1736        self.assertRaises(LookupError, codecs.lookup, "__spam__")
1737        self.assertRaises(LookupError, codecs.lookup, " ")
1738
1739    def test_getencoder(self):
1740        self.assertRaises(TypeError, codecs.getencoder)
1741        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
1742
1743    def test_getdecoder(self):
1744        self.assertRaises(TypeError, codecs.getdecoder)
1745        self.assertRaises(LookupError, codecs.getdecoder, "__spam__")
1746
1747    def test_getreader(self):
1748        self.assertRaises(TypeError, codecs.getreader)
1749        self.assertRaises(LookupError, codecs.getreader, "__spam__")
1750
1751    def test_getwriter(self):
1752        self.assertRaises(TypeError, codecs.getwriter)
1753        self.assertRaises(LookupError, codecs.getwriter, "__spam__")
1754
1755    def test_lookup_issue1813(self):
1756        # Issue #1813: under Turkish locales, lookup of some codecs failed
1757        # because 'I' is lowercased as "ı" (dotless i)
1758        oldlocale = locale.setlocale(locale.LC_CTYPE)
1759        self.addCleanup(locale.setlocale, locale.LC_CTYPE, oldlocale)
1760        try:
1761            locale.setlocale(locale.LC_CTYPE, 'tr_TR')
1762        except locale.Error:
1763            # Unsupported locale on this system
1764            self.skipTest('test needs Turkish locale')
1765        c = codecs.lookup('ASCII')
1766        self.assertEqual(c.name, 'ascii')
1767
1768    def test_all(self):
1769        api = (
1770            "encode", "decode",
1771            "register", "CodecInfo", "Codec", "IncrementalEncoder",
1772            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
1773            "getencoder", "getdecoder", "getincrementalencoder",
1774            "getincrementaldecoder", "getreader", "getwriter",
1775            "register_error", "lookup_error",
1776            "strict_errors", "replace_errors", "ignore_errors",
1777            "xmlcharrefreplace_errors", "backslashreplace_errors",
1778            "namereplace_errors",
1779            "open", "EncodedFile",
1780            "iterencode", "iterdecode",
1781            "BOM", "BOM_BE", "BOM_LE",
1782            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
1783            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
1784            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
1785            "StreamReaderWriter", "StreamRecoder",
1786        )
1787        self.assertCountEqual(api, codecs.__all__)
1788        for api in codecs.__all__:
1789            getattr(codecs, api)
1790
1791    def test_open(self):
1792        self.addCleanup(support.unlink, support.TESTFN)
1793        for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'):
1794            with self.subTest(mode), \
1795                    codecs.open(support.TESTFN, mode, 'ascii') as file:
1796                self.assertIsInstance(file, codecs.StreamReaderWriter)
1797
1798    def test_undefined(self):
1799        self.assertRaises(UnicodeError, codecs.encode, 'abc', 'undefined')
1800        self.assertRaises(UnicodeError, codecs.decode, b'abc', 'undefined')
1801        self.assertRaises(UnicodeError, codecs.encode, '', 'undefined')
1802        self.assertRaises(UnicodeError, codecs.decode, b'', 'undefined')
1803        for errors in ('strict', 'ignore', 'replace', 'backslashreplace'):
1804            self.assertRaises(UnicodeError,
1805                codecs.encode, 'abc', 'undefined', errors)
1806            self.assertRaises(UnicodeError,
1807                codecs.decode, b'abc', 'undefined', errors)
1808
1809
1810class StreamReaderTest(unittest.TestCase):
1811
1812    def setUp(self):
1813        self.reader = codecs.getreader('utf-8')
1814        self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1815
1816    def test_readlines(self):
1817        f = self.reader(self.stream)
1818        self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
1819
1820
1821class EncodedFileTest(unittest.TestCase):
1822
1823    def test_basic(self):
1824        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1825        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
1826        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
1827
1828        f = io.BytesIO()
1829        ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
1830        ef.write(b'\xc3\xbc')
1831        self.assertEqual(f.getvalue(), b'\xfc')
1832
1833all_unicode_encodings = [
1834    "ascii",
1835    "big5",
1836    "big5hkscs",
1837    "charmap",
1838    "cp037",
1839    "cp1006",
1840    "cp1026",
1841    "cp1125",
1842    "cp1140",
1843    "cp1250",
1844    "cp1251",
1845    "cp1252",
1846    "cp1253",
1847    "cp1254",
1848    "cp1255",
1849    "cp1256",
1850    "cp1257",
1851    "cp1258",
1852    "cp424",
1853    "cp437",
1854    "cp500",
1855    "cp720",
1856    "cp737",
1857    "cp775",
1858    "cp850",
1859    "cp852",
1860    "cp855",
1861    "cp856",
1862    "cp857",
1863    "cp858",
1864    "cp860",
1865    "cp861",
1866    "cp862",
1867    "cp863",
1868    "cp864",
1869    "cp865",
1870    "cp866",
1871    "cp869",
1872    "cp874",
1873    "cp875",
1874    "cp932",
1875    "cp949",
1876    "cp950",
1877    "euc_jis_2004",
1878    "euc_jisx0213",
1879    "euc_jp",
1880    "euc_kr",
1881    "gb18030",
1882    "gb2312",
1883    "gbk",
1884    "hp_roman8",
1885    "hz",
1886    "idna",
1887    "iso2022_jp",
1888    "iso2022_jp_1",
1889    "iso2022_jp_2",
1890    "iso2022_jp_2004",
1891    "iso2022_jp_3",
1892    "iso2022_jp_ext",
1893    "iso2022_kr",
1894    "iso8859_1",
1895    "iso8859_10",
1896    "iso8859_11",
1897    "iso8859_13",
1898    "iso8859_14",
1899    "iso8859_15",
1900    "iso8859_16",
1901    "iso8859_2",
1902    "iso8859_3",
1903    "iso8859_4",
1904    "iso8859_5",
1905    "iso8859_6",
1906    "iso8859_7",
1907    "iso8859_8",
1908    "iso8859_9",
1909    "johab",
1910    "koi8_r",
1911    "koi8_t",
1912    "koi8_u",
1913    "kz1048",
1914    "latin_1",
1915    "mac_cyrillic",
1916    "mac_greek",
1917    "mac_iceland",
1918    "mac_latin2",
1919    "mac_roman",
1920    "mac_turkish",
1921    "palmos",
1922    "ptcp154",
1923    "punycode",
1924    "raw_unicode_escape",
1925    "shift_jis",
1926    "shift_jis_2004",
1927    "shift_jisx0213",
1928    "tis_620",
1929    "unicode_escape",
1930    "unicode_internal",
1931    "utf_16",
1932    "utf_16_be",
1933    "utf_16_le",
1934    "utf_7",
1935    "utf_8",
1936]
1937
1938if hasattr(codecs, "mbcs_encode"):
1939    all_unicode_encodings.append("mbcs")
1940if hasattr(codecs, "oem_encode"):
1941    all_unicode_encodings.append("oem")
1942
1943# The following encoding is not tested, because it's not supposed
1944# to work:
1945#    "undefined"
1946
1947# The following encodings don't work in stateful mode
1948broken_unicode_with_stateful = [
1949    "punycode",
1950    "unicode_internal"
1951]
1952
1953
1954class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
1955    def test_basics(self):
1956        s = "abc123"  # all codecs should be able to encode these
1957        for encoding in all_unicode_encodings:
1958            name = codecs.lookup(encoding).name
1959            if encoding.endswith("_codec"):
1960                name += "_codec"
1961            elif encoding == "latin_1":
1962                name = "latin_1"
1963            self.assertEqual(encoding.replace("_", "-"), name.replace("_", "-"))
1964
1965            with support.check_warnings():
1966                # unicode-internal has been deprecated
1967                (b, size) = codecs.getencoder(encoding)(s)
1968                self.assertEqual(size, len(s), "encoding=%r" % encoding)
1969                (chars, size) = codecs.getdecoder(encoding)(b)
1970                self.assertEqual(chars, s, "encoding=%r" % encoding)
1971
1972            if encoding not in broken_unicode_with_stateful:
1973                # check stream reader/writer
1974                q = Queue(b"")
1975                writer = codecs.getwriter(encoding)(q)
1976                encodedresult = b""
1977                for c in s:
1978                    writer.write(c)
1979                    chunk = q.read()
1980                    self.assertTrue(type(chunk) is bytes, type(chunk))
1981                    encodedresult += chunk
1982                q = Queue(b"")
1983                reader = codecs.getreader(encoding)(q)
1984                decodedresult = ""
1985                for c in encodedresult:
1986                    q.write(bytes([c]))
1987                    decodedresult += reader.read()
1988                self.assertEqual(decodedresult, s, "encoding=%r" % encoding)
1989
1990            if encoding not in broken_unicode_with_stateful:
1991                # check incremental decoder/encoder and iterencode()/iterdecode()
1992                try:
1993                    encoder = codecs.getincrementalencoder(encoding)()
1994                except LookupError:  # no IncrementalEncoder
1995                    pass
1996                else:
1997                    # check incremental decoder/encoder
1998                    encodedresult = b""
1999                    for c in s:
2000                        encodedresult += encoder.encode(c)
2001                    encodedresult += encoder.encode("", True)
2002                    decoder = codecs.getincrementaldecoder(encoding)()
2003                    decodedresult = ""
2004                    for c in encodedresult:
2005                        decodedresult += decoder.decode(bytes([c]))
2006                    decodedresult += decoder.decode(b"", True)
2007                    self.assertEqual(decodedresult, s,
2008                                     "encoding=%r" % encoding)
2009
2010                    # check iterencode()/iterdecode()
2011                    result = "".join(codecs.iterdecode(
2012                            codecs.iterencode(s, encoding), encoding))
2013                    self.assertEqual(result, s, "encoding=%r" % encoding)
2014
2015                    # check iterencode()/iterdecode() with empty string
2016                    result = "".join(codecs.iterdecode(
2017                            codecs.iterencode("", encoding), encoding))
2018                    self.assertEqual(result, "")
2019
2020                if encoding not in ("idna", "mbcs"):
2021                    # check incremental decoder/encoder with errors argument
2022                    try:
2023                        encoder = codecs.getincrementalencoder(encoding)("ignore")
2024                    except LookupError:  # no IncrementalEncoder
2025                        pass
2026                    else:
2027                        encodedresult = b"".join(encoder.encode(c) for c in s)
2028                        decoder = codecs.getincrementaldecoder(encoding)("ignore")
2029                        decodedresult = "".join(decoder.decode(bytes([c]))
2030                                                for c in encodedresult)
2031                        self.assertEqual(decodedresult, s,
2032                                         "encoding=%r" % encoding)
2033
2034    @support.cpython_only
2035    def test_basics_capi(self):
2036        from _testcapi import codec_incrementalencoder, codec_incrementaldecoder
2037        s = "abc123"  # all codecs should be able to encode these
2038        for encoding in all_unicode_encodings:
2039            if encoding not in broken_unicode_with_stateful:
2040                # check incremental decoder/encoder (fetched via the C API)
2041                try:
2042                    cencoder = codec_incrementalencoder(encoding)
2043                except LookupError:  # no IncrementalEncoder
2044                    pass
2045                else:
2046                    # check C API
2047                    encodedresult = b""
2048                    for c in s:
2049                        encodedresult += cencoder.encode(c)
2050                    encodedresult += cencoder.encode("", True)
2051                    cdecoder = codec_incrementaldecoder(encoding)
2052                    decodedresult = ""
2053                    for c in encodedresult:
2054                        decodedresult += cdecoder.decode(bytes([c]))
2055                    decodedresult += cdecoder.decode(b"", True)
2056                    self.assertEqual(decodedresult, s,
2057                                     "encoding=%r" % encoding)
2058
2059                if encoding not in ("idna", "mbcs"):
2060                    # check incremental decoder/encoder with errors argument
2061                    try:
2062                        cencoder = codec_incrementalencoder(encoding, "ignore")
2063                    except LookupError:  # no IncrementalEncoder
2064                        pass
2065                    else:
2066                        encodedresult = b"".join(cencoder.encode(c) for c in s)
2067                        cdecoder = codec_incrementaldecoder(encoding, "ignore")
2068                        decodedresult = "".join(cdecoder.decode(bytes([c]))
2069                                                for c in encodedresult)
2070                        self.assertEqual(decodedresult, s,
2071                                         "encoding=%r" % encoding)
2072
2073    def test_seek(self):
2074        # all codecs should be able to encode these
2075        s = "%s\n%s\n" % (100*"abc123", 100*"def456")
2076        for encoding in all_unicode_encodings:
2077            if encoding == "idna": # FIXME: See SF bug #1163178
2078                continue
2079            if encoding in broken_unicode_with_stateful:
2080                continue
2081            reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
2082            for t in range(5):
2083                # Test that calling seek resets the internal codec state and buffers
2084                reader.seek(0, 0)
2085                data = reader.read()
2086                self.assertEqual(s, data)
2087
2088    def test_bad_decode_args(self):
2089        for encoding in all_unicode_encodings:
2090            decoder = codecs.getdecoder(encoding)
2091            self.assertRaises(TypeError, decoder)
2092            if encoding not in ("idna", "punycode"):
2093                self.assertRaises(TypeError, decoder, 42)
2094
2095    def test_bad_encode_args(self):
2096        for encoding in all_unicode_encodings:
2097            encoder = codecs.getencoder(encoding)
2098            with support.check_warnings():
2099                # unicode-internal has been deprecated
2100                self.assertRaises(TypeError, encoder)
2101
2102    def test_encoding_map_type_initialized(self):
2103        from encodings import cp1140
2104        # This used to crash, we are only verifying there's no crash.
2105        table_type = type(cp1140.encoding_table)
2106        self.assertEqual(table_type, table_type)
2107
2108    def test_decoder_state(self):
2109        # Check that getstate() and setstate() handle the state properly
2110        u = "abc123"
2111        for encoding in all_unicode_encodings:
2112            if encoding not in broken_unicode_with_stateful:
2113                self.check_state_handling_decode(encoding, u, u.encode(encoding))
2114                self.check_state_handling_encode(encoding, u, u.encode(encoding))
2115
2116
2117class CharmapTest(unittest.TestCase):
2118    def test_decode_with_string_map(self):
2119        self.assertEqual(
2120            codecs.charmap_decode(b"\x00\x01\x02", "strict", "abc"),
2121            ("abc", 3)
2122        )
2123
2124        self.assertEqual(
2125            codecs.charmap_decode(b"\x00\x01\x02", "strict", "\U0010FFFFbc"),
2126            ("\U0010FFFFbc", 3)
2127        )
2128
2129        self.assertRaises(UnicodeDecodeError,
2130            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab"
2131        )
2132
2133        self.assertRaises(UnicodeDecodeError,
2134            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab\ufffe"
2135        )
2136
2137        self.assertEqual(
2138            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab"),
2139            ("ab\ufffd", 3)
2140        )
2141
2142        self.assertEqual(
2143            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab\ufffe"),
2144            ("ab\ufffd", 3)
2145        )
2146
2147        self.assertEqual(
2148            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab"),
2149            ("ab\\x02", 3)
2150        )
2151
2152        self.assertEqual(
2153            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab\ufffe"),
2154            ("ab\\x02", 3)
2155        )
2156
2157        self.assertEqual(
2158            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab"),
2159            ("ab", 3)
2160        )
2161
2162        self.assertEqual(
2163            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab\ufffe"),
2164            ("ab", 3)
2165        )
2166
2167        allbytes = bytes(range(256))
2168        self.assertEqual(
2169            codecs.charmap_decode(allbytes, "ignore", ""),
2170            ("", len(allbytes))
2171        )
2172
2173    def test_decode_with_int2str_map(self):
2174        self.assertEqual(
2175            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2176                                  {0: 'a', 1: 'b', 2: 'c'}),
2177            ("abc", 3)
2178        )
2179
2180        self.assertEqual(
2181            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2182                                  {0: 'Aa', 1: 'Bb', 2: 'Cc'}),
2183            ("AaBbCc", 3)
2184        )
2185
2186        self.assertEqual(
2187            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2188                                  {0: '\U0010FFFF', 1: 'b', 2: 'c'}),
2189            ("\U0010FFFFbc", 3)
2190        )
2191
2192        self.assertEqual(
2193            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2194                                  {0: 'a', 1: 'b', 2: ''}),
2195            ("ab", 3)
2196        )
2197
2198        self.assertRaises(UnicodeDecodeError,
2199            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2200                                   {0: 'a', 1: 'b'}
2201        )
2202
2203        self.assertRaises(UnicodeDecodeError,
2204            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2205                                   {0: 'a', 1: 'b', 2: None}
2206        )
2207
2208        # Issue #14850
2209        self.assertRaises(UnicodeDecodeError,
2210            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2211                                   {0: 'a', 1: 'b', 2: '\ufffe'}
2212        )
2213
2214        self.assertEqual(
2215            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2216                                  {0: 'a', 1: 'b'}),
2217            ("ab\ufffd", 3)
2218        )
2219
2220        self.assertEqual(
2221            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2222                                  {0: 'a', 1: 'b', 2: None}),
2223            ("ab\ufffd", 3)
2224        )
2225
2226        # Issue #14850
2227        self.assertEqual(
2228            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2229                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2230            ("ab\ufffd", 3)
2231        )
2232
2233        self.assertEqual(
2234            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2235                                  {0: 'a', 1: 'b'}),
2236            ("ab\\x02", 3)
2237        )
2238
2239        self.assertEqual(
2240            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2241                                  {0: 'a', 1: 'b', 2: None}),
2242            ("ab\\x02", 3)
2243        )
2244
2245        # Issue #14850
2246        self.assertEqual(
2247            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2248                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2249            ("ab\\x02", 3)
2250        )
2251
2252        self.assertEqual(
2253            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2254                                  {0: 'a', 1: 'b'}),
2255            ("ab", 3)
2256        )
2257
2258        self.assertEqual(
2259            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2260                                  {0: 'a', 1: 'b', 2: None}),
2261            ("ab", 3)
2262        )
2263
2264        # Issue #14850
2265        self.assertEqual(
2266            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2267                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2268            ("ab", 3)
2269        )
2270
2271        allbytes = bytes(range(256))
2272        self.assertEqual(
2273            codecs.charmap_decode(allbytes, "ignore", {}),
2274            ("", len(allbytes))
2275        )
2276
2277    def test_decode_with_int2int_map(self):
2278        a = ord('a')
2279        b = ord('b')
2280        c = ord('c')
2281
2282        self.assertEqual(
2283            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2284                                  {0: a, 1: b, 2: c}),
2285            ("abc", 3)
2286        )
2287
2288        # Issue #15379
2289        self.assertEqual(
2290            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2291                                  {0: 0x10FFFF, 1: b, 2: c}),
2292            ("\U0010FFFFbc", 3)
2293        )
2294
2295        self.assertEqual(
2296            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2297                                  {0: sys.maxunicode, 1: b, 2: c}),
2298            (chr(sys.maxunicode) + "bc", 3)
2299        )
2300
2301        self.assertRaises(TypeError,
2302            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2303                                   {0: sys.maxunicode + 1, 1: b, 2: c}
2304        )
2305
2306        self.assertRaises(UnicodeDecodeError,
2307            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2308                                   {0: a, 1: b},
2309        )
2310
2311        self.assertRaises(UnicodeDecodeError,
2312            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2313                                   {0: a, 1: b, 2: 0xFFFE},
2314        )
2315
2316        self.assertEqual(
2317            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2318                                  {0: a, 1: b}),
2319            ("ab\ufffd", 3)
2320        )
2321
2322        self.assertEqual(
2323            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2324                                  {0: a, 1: b, 2: 0xFFFE}),
2325            ("ab\ufffd", 3)
2326        )
2327
2328        self.assertEqual(
2329            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2330                                  {0: a, 1: b}),
2331            ("ab\\x02", 3)
2332        )
2333
2334        self.assertEqual(
2335            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2336                                  {0: a, 1: b, 2: 0xFFFE}),
2337            ("ab\\x02", 3)
2338        )
2339
2340        self.assertEqual(
2341            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2342                                  {0: a, 1: b}),
2343            ("ab", 3)
2344        )
2345
2346        self.assertEqual(
2347            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2348                                  {0: a, 1: b, 2: 0xFFFE}),
2349            ("ab", 3)
2350        )
2351
2352
2353class WithStmtTest(unittest.TestCase):
2354    def test_encodedfile(self):
2355        f = io.BytesIO(b"\xc3\xbc")
2356        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
2357            self.assertEqual(ef.read(), b"\xfc")
2358        self.assertTrue(f.closed)
2359
2360    def test_streamreaderwriter(self):
2361        f = io.BytesIO(b"\xc3\xbc")
2362        info = codecs.lookup("utf-8")
2363        with codecs.StreamReaderWriter(f, info.streamreader,
2364                                       info.streamwriter, 'strict') as srw:
2365            self.assertEqual(srw.read(), "\xfc")
2366
2367
2368class TypesTest(unittest.TestCase):
2369    def test_decode_unicode(self):
2370        # Most decoders don't accept unicode input
2371        decoders = [
2372            codecs.utf_7_decode,
2373            codecs.utf_8_decode,
2374            codecs.utf_16_le_decode,
2375            codecs.utf_16_be_decode,
2376            codecs.utf_16_ex_decode,
2377            codecs.utf_32_decode,
2378            codecs.utf_32_le_decode,
2379            codecs.utf_32_be_decode,
2380            codecs.utf_32_ex_decode,
2381            codecs.latin_1_decode,
2382            codecs.ascii_decode,
2383            codecs.charmap_decode,
2384        ]
2385        if hasattr(codecs, "mbcs_decode"):
2386            decoders.append(codecs.mbcs_decode)
2387        for decoder in decoders:
2388            self.assertRaises(TypeError, decoder, "xxx")
2389
2390    def test_unicode_escape(self):
2391        # Escape-decoding a unicode string is supported and gives the same
2392        # result as decoding the equivalent ASCII bytes string.
2393        self.assertEqual(codecs.unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2394        self.assertEqual(codecs.unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2395        self.assertEqual(codecs.raw_unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2396        self.assertEqual(codecs.raw_unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2397
2398        self.assertRaises(UnicodeDecodeError, codecs.unicode_escape_decode, br"\U00110000")
2399        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2400        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "backslashreplace"),
2401                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2402
2403        self.assertRaises(UnicodeDecodeError, codecs.raw_unicode_escape_decode, br"\U00110000")
2404        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2405        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "backslashreplace"),
2406                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2407
2408
2409class UnicodeEscapeTest(unittest.TestCase):
2410    def test_empty(self):
2411        self.assertEqual(codecs.unicode_escape_encode(""), (b"", 0))
2412        self.assertEqual(codecs.unicode_escape_decode(b""), ("", 0))
2413
2414    def test_raw_encode(self):
2415        encode = codecs.unicode_escape_encode
2416        for b in range(32, 127):
2417            if b != b'\\'[0]:
2418                self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2419
2420    def test_raw_decode(self):
2421        decode = codecs.unicode_escape_decode
2422        for b in range(256):
2423            if b != b'\\'[0]:
2424                self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2425
2426    def test_escape_encode(self):
2427        encode = codecs.unicode_escape_encode
2428        check = coding_checker(self, encode)
2429        check('\t', br'\t')
2430        check('\n', br'\n')
2431        check('\r', br'\r')
2432        check('\\', br'\\')
2433        for b in range(32):
2434            if chr(b) not in '\t\n\r':
2435                check(chr(b), ('\\x%02x' % b).encode())
2436        for b in range(127, 256):
2437            check(chr(b), ('\\x%02x' % b).encode())
2438        check('\u20ac', br'\u20ac')
2439        check('\U0001d120', br'\U0001d120')
2440
2441    def test_escape_decode(self):
2442        decode = codecs.unicode_escape_decode
2443        check = coding_checker(self, decode)
2444        check(b"[\\\n]", "[]")
2445        check(br'[\"]', '["]')
2446        check(br"[\']", "[']")
2447        check(br"[\\]", r"[\]")
2448        check(br"[\a]", "[\x07]")
2449        check(br"[\b]", "[\x08]")
2450        check(br"[\t]", "[\x09]")
2451        check(br"[\n]", "[\x0a]")
2452        check(br"[\v]", "[\x0b]")
2453        check(br"[\f]", "[\x0c]")
2454        check(br"[\r]", "[\x0d]")
2455        check(br"[\7]", "[\x07]")
2456        check(br"[\78]", "[\x078]")
2457        check(br"[\41]", "[!]")
2458        check(br"[\418]", "[!8]")
2459        check(br"[\101]", "[A]")
2460        check(br"[\1010]", "[A0]")
2461        check(br"[\x41]", "[A]")
2462        check(br"[\x410]", "[A0]")
2463        check(br"\u20ac", "\u20ac")
2464        check(br"\U0001d120", "\U0001d120")
2465        for i in range(97, 123):
2466            b = bytes([i])
2467            if b not in b'abfnrtuvx':
2468                with self.assertWarns(DeprecationWarning):
2469                    check(b"\\" + b, "\\" + chr(i))
2470            if b.upper() not in b'UN':
2471                with self.assertWarns(DeprecationWarning):
2472                    check(b"\\" + b.upper(), "\\" + chr(i-32))
2473        with self.assertWarns(DeprecationWarning):
2474            check(br"\8", "\\8")
2475        with self.assertWarns(DeprecationWarning):
2476            check(br"\9", "\\9")
2477
2478    def test_decode_errors(self):
2479        decode = codecs.unicode_escape_decode
2480        for c, d in (b'x', 2), (b'u', 4), (b'U', 4):
2481            for i in range(d):
2482                self.assertRaises(UnicodeDecodeError, decode,
2483                                  b"\\" + c + b"0"*i)
2484                self.assertRaises(UnicodeDecodeError, decode,
2485                                  b"[\\" + c + b"0"*i + b"]")
2486                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2487                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2488                self.assertEqual(decode(data, "replace"),
2489                                 ("[\ufffd]\ufffd", len(data)))
2490        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2491        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2492        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2493
2494
2495class RawUnicodeEscapeTest(unittest.TestCase):
2496    def test_empty(self):
2497        self.assertEqual(codecs.raw_unicode_escape_encode(""), (b"", 0))
2498        self.assertEqual(codecs.raw_unicode_escape_decode(b""), ("", 0))
2499
2500    def test_raw_encode(self):
2501        encode = codecs.raw_unicode_escape_encode
2502        for b in range(256):
2503            self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2504
2505    def test_raw_decode(self):
2506        decode = codecs.raw_unicode_escape_decode
2507        for b in range(256):
2508            self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2509
2510    def test_escape_encode(self):
2511        encode = codecs.raw_unicode_escape_encode
2512        check = coding_checker(self, encode)
2513        for b in range(256):
2514            if b not in b'uU':
2515                check('\\' + chr(b), b'\\' + bytes([b]))
2516        check('\u20ac', br'\u20ac')
2517        check('\U0001d120', br'\U0001d120')
2518
2519    def test_escape_decode(self):
2520        decode = codecs.raw_unicode_escape_decode
2521        check = coding_checker(self, decode)
2522        for b in range(256):
2523            if b not in b'uU':
2524                check(b'\\' + bytes([b]), '\\' + chr(b))
2525        check(br"\u20ac", "\u20ac")
2526        check(br"\U0001d120", "\U0001d120")
2527
2528    def test_decode_errors(self):
2529        decode = codecs.raw_unicode_escape_decode
2530        for c, d in (b'u', 4), (b'U', 4):
2531            for i in range(d):
2532                self.assertRaises(UnicodeDecodeError, decode,
2533                                  b"\\" + c + b"0"*i)
2534                self.assertRaises(UnicodeDecodeError, decode,
2535                                  b"[\\" + c + b"0"*i + b"]")
2536                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2537                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2538                self.assertEqual(decode(data, "replace"),
2539                                 ("[\ufffd]\ufffd", len(data)))
2540        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2541        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2542        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2543
2544
2545class EscapeEncodeTest(unittest.TestCase):
2546
2547    def test_escape_encode(self):
2548        tests = [
2549            (b'', (b'', 0)),
2550            (b'foobar', (b'foobar', 6)),
2551            (b'spam\0eggs', (b'spam\\x00eggs', 9)),
2552            (b'a\'b', (b"a\\'b", 3)),
2553            (b'b\\c', (b'b\\\\c', 3)),
2554            (b'c\nd', (b'c\\nd', 3)),
2555            (b'd\re', (b'd\\re', 3)),
2556            (b'f\x7fg', (b'f\\x7fg', 3)),
2557        ]
2558        for data, output in tests:
2559            with self.subTest(data=data):
2560                self.assertEqual(codecs.escape_encode(data), output)
2561        self.assertRaises(TypeError, codecs.escape_encode, 'spam')
2562        self.assertRaises(TypeError, codecs.escape_encode, bytearray(b'spam'))
2563
2564
2565class SurrogateEscapeTest(unittest.TestCase):
2566
2567    def test_utf8(self):
2568        # Bad byte
2569        self.assertEqual(b"foo\x80bar".decode("utf-8", "surrogateescape"),
2570                         "foo\udc80bar")
2571        self.assertEqual("foo\udc80bar".encode("utf-8", "surrogateescape"),
2572                         b"foo\x80bar")
2573        # bad-utf-8 encoded surrogate
2574        self.assertEqual(b"\xed\xb0\x80".decode("utf-8", "surrogateescape"),
2575                         "\udced\udcb0\udc80")
2576        self.assertEqual("\udced\udcb0\udc80".encode("utf-8", "surrogateescape"),
2577                         b"\xed\xb0\x80")
2578
2579    def test_ascii(self):
2580        # bad byte
2581        self.assertEqual(b"foo\x80bar".decode("ascii", "surrogateescape"),
2582                         "foo\udc80bar")
2583        self.assertEqual("foo\udc80bar".encode("ascii", "surrogateescape"),
2584                         b"foo\x80bar")
2585
2586    def test_charmap(self):
2587        # bad byte: \xa5 is unmapped in iso-8859-3
2588        self.assertEqual(b"foo\xa5bar".decode("iso-8859-3", "surrogateescape"),
2589                         "foo\udca5bar")
2590        self.assertEqual("foo\udca5bar".encode("iso-8859-3", "surrogateescape"),
2591                         b"foo\xa5bar")
2592
2593    def test_latin1(self):
2594        # Issue6373
2595        self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin-1", "surrogateescape"),
2596                         b"\xe4\xeb\xef\xf6\xfc")
2597
2598
2599class BomTest(unittest.TestCase):
2600    def test_seek0(self):
2601        data = "1234567890"
2602        tests = ("utf-16",
2603                 "utf-16-le",
2604                 "utf-16-be",
2605                 "utf-32",
2606                 "utf-32-le",
2607                 "utf-32-be")
2608        self.addCleanup(support.unlink, support.TESTFN)
2609        for encoding in tests:
2610            # Check if the BOM is written only once
2611            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2612                f.write(data)
2613                f.write(data)
2614                f.seek(0)
2615                self.assertEqual(f.read(), data * 2)
2616                f.seek(0)
2617                self.assertEqual(f.read(), data * 2)
2618
2619            # Check that the BOM is written after a seek(0)
2620            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2621                f.write(data[0])
2622                self.assertNotEqual(f.tell(), 0)
2623                f.seek(0)
2624                f.write(data)
2625                f.seek(0)
2626                self.assertEqual(f.read(), data)
2627
2628            # (StreamWriter) Check that the BOM is written after a seek(0)
2629            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2630                f.writer.write(data[0])
2631                self.assertNotEqual(f.writer.tell(), 0)
2632                f.writer.seek(0)
2633                f.writer.write(data)
2634                f.seek(0)
2635                self.assertEqual(f.read(), data)
2636
2637            # Check that the BOM is not written after a seek() at a position
2638            # different than the start
2639            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2640                f.write(data)
2641                f.seek(f.tell())
2642                f.write(data)
2643                f.seek(0)
2644                self.assertEqual(f.read(), data * 2)
2645
2646            # (StreamWriter) Check that the BOM is not written after a seek()
2647            # at a position different than the start
2648            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2649                f.writer.write(data)
2650                f.writer.seek(f.writer.tell())
2651                f.writer.write(data)
2652                f.seek(0)
2653                self.assertEqual(f.read(), data * 2)
2654
2655
2656bytes_transform_encodings = [
2657    "base64_codec",
2658    "uu_codec",
2659    "quopri_codec",
2660    "hex_codec",
2661]
2662
2663transform_aliases = {
2664    "base64_codec": ["base64", "base_64"],
2665    "uu_codec": ["uu"],
2666    "quopri_codec": ["quopri", "quoted_printable", "quotedprintable"],
2667    "hex_codec": ["hex"],
2668    "rot_13": ["rot13"],
2669}
2670
2671try:
2672    import zlib
2673except ImportError:
2674    zlib = None
2675else:
2676    bytes_transform_encodings.append("zlib_codec")
2677    transform_aliases["zlib_codec"] = ["zip", "zlib"]
2678try:
2679    import bz2
2680except ImportError:
2681    pass
2682else:
2683    bytes_transform_encodings.append("bz2_codec")
2684    transform_aliases["bz2_codec"] = ["bz2"]
2685
2686
2687class TransformCodecTest(unittest.TestCase):
2688
2689    def test_basics(self):
2690        binput = bytes(range(256))
2691        for encoding in bytes_transform_encodings:
2692            with self.subTest(encoding=encoding):
2693                # generic codecs interface
2694                (o, size) = codecs.getencoder(encoding)(binput)
2695                self.assertEqual(size, len(binput))
2696                (i, size) = codecs.getdecoder(encoding)(o)
2697                self.assertEqual(size, len(o))
2698                self.assertEqual(i, binput)
2699
2700    def test_read(self):
2701        for encoding in bytes_transform_encodings:
2702            with self.subTest(encoding=encoding):
2703                sin = codecs.encode(b"\x80", encoding)
2704                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2705                sout = reader.read()
2706                self.assertEqual(sout, b"\x80")
2707
2708    def test_readline(self):
2709        for encoding in bytes_transform_encodings:
2710            with self.subTest(encoding=encoding):
2711                sin = codecs.encode(b"\x80", encoding)
2712                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2713                sout = reader.readline()
2714                self.assertEqual(sout, b"\x80")
2715
2716    def test_buffer_api_usage(self):
2717        # We check all the transform codecs accept memoryview input
2718        # for encoding and decoding
2719        # and also that they roundtrip correctly
2720        original = b"12345\x80"
2721        for encoding in bytes_transform_encodings:
2722            with self.subTest(encoding=encoding):
2723                data = original
2724                view = memoryview(data)
2725                data = codecs.encode(data, encoding)
2726                view_encoded = codecs.encode(view, encoding)
2727                self.assertEqual(view_encoded, data)
2728                view = memoryview(data)
2729                data = codecs.decode(data, encoding)
2730                self.assertEqual(data, original)
2731                view_decoded = codecs.decode(view, encoding)
2732                self.assertEqual(view_decoded, data)
2733
2734    def test_text_to_binary_blacklists_binary_transforms(self):
2735        # Check binary -> binary codecs give a good error for str input
2736        bad_input = "bad input type"
2737        for encoding in bytes_transform_encodings:
2738            with self.subTest(encoding=encoding):
2739                fmt = (r"{!r} is not a text encoding; "
2740                       r"use codecs.encode\(\) to handle arbitrary codecs")
2741                msg = fmt.format(encoding)
2742                with self.assertRaisesRegex(LookupError, msg) as failure:
2743                    bad_input.encode(encoding)
2744                self.assertIsNone(failure.exception.__cause__)
2745
2746    def test_text_to_binary_blacklists_text_transforms(self):
2747        # Check str.encode gives a good error message for str -> str codecs
2748        msg = (r"^'rot_13' is not a text encoding; "
2749               r"use codecs.encode\(\) to handle arbitrary codecs")
2750        with self.assertRaisesRegex(LookupError, msg):
2751            "just an example message".encode("rot_13")
2752
2753    def test_binary_to_text_blacklists_binary_transforms(self):
2754        # Check bytes.decode and bytearray.decode give a good error
2755        # message for binary -> binary codecs
2756        data = b"encode first to ensure we meet any format restrictions"
2757        for encoding in bytes_transform_encodings:
2758            with self.subTest(encoding=encoding):
2759                encoded_data = codecs.encode(data, encoding)
2760                fmt = (r"{!r} is not a text encoding; "
2761                       r"use codecs.decode\(\) to handle arbitrary codecs")
2762                msg = fmt.format(encoding)
2763                with self.assertRaisesRegex(LookupError, msg):
2764                    encoded_data.decode(encoding)
2765                with self.assertRaisesRegex(LookupError, msg):
2766                    bytearray(encoded_data).decode(encoding)
2767
2768    def test_binary_to_text_blacklists_text_transforms(self):
2769        # Check str -> str codec gives a good error for binary input
2770        for bad_input in (b"immutable", bytearray(b"mutable")):
2771            with self.subTest(bad_input=bad_input):
2772                msg = (r"^'rot_13' is not a text encoding; "
2773                       r"use codecs.decode\(\) to handle arbitrary codecs")
2774                with self.assertRaisesRegex(LookupError, msg) as failure:
2775                    bad_input.decode("rot_13")
2776                self.assertIsNone(failure.exception.__cause__)
2777
2778    @unittest.skipUnless(zlib, "Requires zlib support")
2779    def test_custom_zlib_error_is_wrapped(self):
2780        # Check zlib codec gives a good error for malformed input
2781        msg = "^decoding with 'zlib_codec' codec failed"
2782        with self.assertRaisesRegex(Exception, msg) as failure:
2783            codecs.decode(b"hello", "zlib_codec")
2784        self.assertIsInstance(failure.exception.__cause__,
2785                                                type(failure.exception))
2786
2787    def test_custom_hex_error_is_wrapped(self):
2788        # Check hex codec gives a good error for malformed input
2789        msg = "^decoding with 'hex_codec' codec failed"
2790        with self.assertRaisesRegex(Exception, msg) as failure:
2791            codecs.decode(b"hello", "hex_codec")
2792        self.assertIsInstance(failure.exception.__cause__,
2793                                                type(failure.exception))
2794
2795    # Unfortunately, the bz2 module throws OSError, which the codec
2796    # machinery currently can't wrap :(
2797
2798    # Ensure codec aliases from http://bugs.python.org/issue7475 work
2799    def test_aliases(self):
2800        for codec_name, aliases in transform_aliases.items():
2801            expected_name = codecs.lookup(codec_name).name
2802            for alias in aliases:
2803                with self.subTest(alias=alias):
2804                    info = codecs.lookup(alias)
2805                    self.assertEqual(info.name, expected_name)
2806
2807    def test_quopri_stateless(self):
2808        # Should encode with quotetabs=True
2809        encoded = codecs.encode(b"space tab\teol \n", "quopri-codec")
2810        self.assertEqual(encoded, b"space=20tab=09eol=20\n")
2811        # But should still support unescaped tabs and spaces
2812        unescaped = b"space tab eol\n"
2813        self.assertEqual(codecs.decode(unescaped, "quopri-codec"), unescaped)
2814
2815    def test_uu_invalid(self):
2816        # Missing "begin" line
2817        self.assertRaises(ValueError, codecs.decode, b"", "uu-codec")
2818
2819
2820# The codec system tries to wrap exceptions in order to ensure the error
2821# mentions the operation being performed and the codec involved. We
2822# currently *only* want this to happen for relatively stateless
2823# exceptions, where the only significant information they contain is their
2824# type and a single str argument.
2825
2826# Use a local codec registry to avoid appearing to leak objects when
2827# registering multiple search functions
2828_TEST_CODECS = {}
2829
2830def _get_test_codec(codec_name):
2831    return _TEST_CODECS.get(codec_name)
2832codecs.register(_get_test_codec) # Returns None, not usable as a decorator
2833
2834try:
2835    # Issue #22166: Also need to clear the internal cache in CPython
2836    from _codecs import _forget_codec
2837except ImportError:
2838    def _forget_codec(codec_name):
2839        pass
2840
2841
2842class ExceptionChainingTest(unittest.TestCase):
2843
2844    def setUp(self):
2845        # There's no way to unregister a codec search function, so we just
2846        # ensure we render this one fairly harmless after the test
2847        # case finishes by using the test case repr as the codec name
2848        # The codecs module normalizes codec names, although this doesn't
2849        # appear to be formally documented...
2850        # We also make sure we use a truly unique id for the custom codec
2851        # to avoid issues with the codec cache when running these tests
2852        # multiple times (e.g. when hunting for refleaks)
2853        unique_id = repr(self) + str(id(self))
2854        self.codec_name = encodings.normalize_encoding(unique_id).lower()
2855
2856        # We store the object to raise on the instance because of a bad
2857        # interaction between the codec caching (which means we can't
2858        # recreate the codec entry) and regrtest refleak hunting (which
2859        # runs the same test instance multiple times). This means we
2860        # need to ensure the codecs call back in to the instance to find
2861        # out which exception to raise rather than binding them in a
2862        # closure to an object that may change on the next run
2863        self.obj_to_raise = RuntimeError
2864
2865    def tearDown(self):
2866        _TEST_CODECS.pop(self.codec_name, None)
2867        # Issue #22166: Also pop from caches to avoid appearance of ref leaks
2868        encodings._cache.pop(self.codec_name, None)
2869        try:
2870            _forget_codec(self.codec_name)
2871        except KeyError:
2872            pass
2873
2874    def set_codec(self, encode, decode):
2875        codec_info = codecs.CodecInfo(encode, decode,
2876                                      name=self.codec_name)
2877        _TEST_CODECS[self.codec_name] = codec_info
2878
2879    @contextlib.contextmanager
2880    def assertWrapped(self, operation, exc_type, msg):
2881        full_msg = r"{} with {!r} codec failed \({}: {}\)".format(
2882                  operation, self.codec_name, exc_type.__name__, msg)
2883        with self.assertRaisesRegex(exc_type, full_msg) as caught:
2884            yield caught
2885        self.assertIsInstance(caught.exception.__cause__, exc_type)
2886        self.assertIsNotNone(caught.exception.__cause__.__traceback__)
2887
2888    def raise_obj(self, *args, **kwds):
2889        # Helper to dynamically change the object raised by a test codec
2890        raise self.obj_to_raise
2891
2892    def check_wrapped(self, obj_to_raise, msg, exc_type=RuntimeError):
2893        self.obj_to_raise = obj_to_raise
2894        self.set_codec(self.raise_obj, self.raise_obj)
2895        with self.assertWrapped("encoding", exc_type, msg):
2896            "str_input".encode(self.codec_name)
2897        with self.assertWrapped("encoding", exc_type, msg):
2898            codecs.encode("str_input", self.codec_name)
2899        with self.assertWrapped("decoding", exc_type, msg):
2900            b"bytes input".decode(self.codec_name)
2901        with self.assertWrapped("decoding", exc_type, msg):
2902            codecs.decode(b"bytes input", self.codec_name)
2903
2904    def test_raise_by_type(self):
2905        self.check_wrapped(RuntimeError, "")
2906
2907    def test_raise_by_value(self):
2908        msg = "This should be wrapped"
2909        self.check_wrapped(RuntimeError(msg), msg)
2910
2911    def test_raise_grandchild_subclass_exact_size(self):
2912        msg = "This should be wrapped"
2913        class MyRuntimeError(RuntimeError):
2914            __slots__ = ()
2915        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2916
2917    def test_raise_subclass_with_weakref_support(self):
2918        msg = "This should be wrapped"
2919        class MyRuntimeError(RuntimeError):
2920            pass
2921        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2922
2923    def check_not_wrapped(self, obj_to_raise, msg):
2924        def raise_obj(*args, **kwds):
2925            raise obj_to_raise
2926        self.set_codec(raise_obj, raise_obj)
2927        with self.assertRaisesRegex(RuntimeError, msg):
2928            "str input".encode(self.codec_name)
2929        with self.assertRaisesRegex(RuntimeError, msg):
2930            codecs.encode("str input", self.codec_name)
2931        with self.assertRaisesRegex(RuntimeError, msg):
2932            b"bytes input".decode(self.codec_name)
2933        with self.assertRaisesRegex(RuntimeError, msg):
2934            codecs.decode(b"bytes input", self.codec_name)
2935
2936    def test_init_override_is_not_wrapped(self):
2937        class CustomInit(RuntimeError):
2938            def __init__(self):
2939                pass
2940        self.check_not_wrapped(CustomInit, "")
2941
2942    def test_new_override_is_not_wrapped(self):
2943        class CustomNew(RuntimeError):
2944            def __new__(cls):
2945                return super().__new__(cls)
2946        self.check_not_wrapped(CustomNew, "")
2947
2948    def test_instance_attribute_is_not_wrapped(self):
2949        msg = "This should NOT be wrapped"
2950        exc = RuntimeError(msg)
2951        exc.attr = 1
2952        self.check_not_wrapped(exc, "^{}$".format(msg))
2953
2954    def test_non_str_arg_is_not_wrapped(self):
2955        self.check_not_wrapped(RuntimeError(1), "1")
2956
2957    def test_multiple_args_is_not_wrapped(self):
2958        msg_re = r"^\('a', 'b', 'c'\)$"
2959        self.check_not_wrapped(RuntimeError('a', 'b', 'c'), msg_re)
2960
2961    # http://bugs.python.org/issue19609
2962    def test_codec_lookup_failure_not_wrapped(self):
2963        msg = "^unknown encoding: {}$".format(self.codec_name)
2964        # The initial codec lookup should not be wrapped
2965        with self.assertRaisesRegex(LookupError, msg):
2966            "str input".encode(self.codec_name)
2967        with self.assertRaisesRegex(LookupError, msg):
2968            codecs.encode("str input", self.codec_name)
2969        with self.assertRaisesRegex(LookupError, msg):
2970            b"bytes input".decode(self.codec_name)
2971        with self.assertRaisesRegex(LookupError, msg):
2972            codecs.decode(b"bytes input", self.codec_name)
2973
2974    def test_unflagged_non_text_codec_handling(self):
2975        # The stdlib non-text codecs are now marked so they're
2976        # pre-emptively skipped by the text model related methods
2977        # However, third party codecs won't be flagged, so we still make
2978        # sure the case where an inappropriate output type is produced is
2979        # handled appropriately
2980        def encode_to_str(*args, **kwds):
2981            return "not bytes!", 0
2982        def decode_to_bytes(*args, **kwds):
2983            return b"not str!", 0
2984        self.set_codec(encode_to_str, decode_to_bytes)
2985        # No input or output type checks on the codecs module functions
2986        encoded = codecs.encode(None, self.codec_name)
2987        self.assertEqual(encoded, "not bytes!")
2988        decoded = codecs.decode(None, self.codec_name)
2989        self.assertEqual(decoded, b"not str!")
2990        # Text model methods should complain
2991        fmt = (r"^{!r} encoder returned 'str' instead of 'bytes'; "
2992               r"use codecs.encode\(\) to encode to arbitrary types$")
2993        msg = fmt.format(self.codec_name)
2994        with self.assertRaisesRegex(TypeError, msg):
2995            "str_input".encode(self.codec_name)
2996        fmt = (r"^{!r} decoder returned 'bytes' instead of 'str'; "
2997               r"use codecs.decode\(\) to decode to arbitrary types$")
2998        msg = fmt.format(self.codec_name)
2999        with self.assertRaisesRegex(TypeError, msg):
3000            b"bytes input".decode(self.codec_name)
3001
3002
3003
3004@unittest.skipUnless(sys.platform == 'win32',
3005                     'code pages are specific to Windows')
3006class CodePageTest(unittest.TestCase):
3007    # CP_UTF8 is already tested by CP65001Test
3008    CP_UTF8 = 65001
3009
3010    def test_invalid_code_page(self):
3011        self.assertRaises(ValueError, codecs.code_page_encode, -1, 'a')
3012        self.assertRaises(ValueError, codecs.code_page_decode, -1, b'a')
3013        self.assertRaises(OSError, codecs.code_page_encode, 123, 'a')
3014        self.assertRaises(OSError, codecs.code_page_decode, 123, b'a')
3015
3016    def test_code_page_name(self):
3017        self.assertRaisesRegex(UnicodeEncodeError, 'cp932',
3018            codecs.code_page_encode, 932, '\xff')
3019        self.assertRaisesRegex(UnicodeDecodeError, 'cp932',
3020            codecs.code_page_decode, 932, b'\x81\x00', 'strict', True)
3021        self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8',
3022            codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True)
3023
3024    def check_decode(self, cp, tests):
3025        for raw, errors, expected in tests:
3026            if expected is not None:
3027                try:
3028                    decoded = codecs.code_page_decode(cp, raw, errors, True)
3029                except UnicodeDecodeError as err:
3030                    self.fail('Unable to decode %a from "cp%s" with '
3031                              'errors=%r: %s' % (raw, cp, errors, err))
3032                self.assertEqual(decoded[0], expected,
3033                    '%a.decode("cp%s", %r)=%a != %a'
3034                    % (raw, cp, errors, decoded[0], expected))
3035                # assert 0 <= decoded[1] <= len(raw)
3036                self.assertGreaterEqual(decoded[1], 0)
3037                self.assertLessEqual(decoded[1], len(raw))
3038            else:
3039                self.assertRaises(UnicodeDecodeError,
3040                    codecs.code_page_decode, cp, raw, errors, True)
3041
3042    def check_encode(self, cp, tests):
3043        for text, errors, expected in tests:
3044            if expected is not None:
3045                try:
3046                    encoded = codecs.code_page_encode(cp, text, errors)
3047                except UnicodeEncodeError as err:
3048                    self.fail('Unable to encode %a to "cp%s" with '
3049                              'errors=%r: %s' % (text, cp, errors, err))
3050                self.assertEqual(encoded[0], expected,
3051                    '%a.encode("cp%s", %r)=%a != %a'
3052                    % (text, cp, errors, encoded[0], expected))
3053                self.assertEqual(encoded[1], len(text))
3054            else:
3055                self.assertRaises(UnicodeEncodeError,
3056                    codecs.code_page_encode, cp, text, errors)
3057
3058    def test_cp932(self):
3059        self.check_encode(932, (
3060            ('abc', 'strict', b'abc'),
3061            ('\uff44\u9a3e', 'strict', b'\x82\x84\xe9\x80'),
3062            # test error handlers
3063            ('\xff', 'strict', None),
3064            ('[\xff]', 'ignore', b'[]'),
3065            ('[\xff]', 'replace', b'[y]'),
3066            ('[\u20ac]', 'replace', b'[?]'),
3067            ('[\xff]', 'backslashreplace', b'[\\xff]'),
3068            ('[\xff]', 'namereplace',
3069             b'[\\N{LATIN SMALL LETTER Y WITH DIAERESIS}]'),
3070            ('[\xff]', 'xmlcharrefreplace', b'[&#255;]'),
3071            ('\udcff', 'strict', None),
3072            ('[\udcff]', 'surrogateescape', b'[\xff]'),
3073            ('[\udcff]', 'surrogatepass', None),
3074        ))
3075        self.check_decode(932, (
3076            (b'abc', 'strict', 'abc'),
3077            (b'\x82\x84\xe9\x80', 'strict', '\uff44\u9a3e'),
3078            # invalid bytes
3079            (b'[\xff]', 'strict', None),
3080            (b'[\xff]', 'ignore', '[]'),
3081            (b'[\xff]', 'replace', '[\ufffd]'),
3082            (b'[\xff]', 'backslashreplace', '[\\xff]'),
3083            (b'[\xff]', 'surrogateescape', '[\udcff]'),
3084            (b'[\xff]', 'surrogatepass', None),
3085            (b'\x81\x00abc', 'strict', None),
3086            (b'\x81\x00abc', 'ignore', '\x00abc'),
3087            (b'\x81\x00abc', 'replace', '\ufffd\x00abc'),
3088            (b'\x81\x00abc', 'backslashreplace', '\\x81\x00abc'),
3089        ))
3090
3091    def test_cp1252(self):
3092        self.check_encode(1252, (
3093            ('abc', 'strict', b'abc'),
3094            ('\xe9\u20ac', 'strict',  b'\xe9\x80'),
3095            ('\xff', 'strict', b'\xff'),
3096            # test error handlers
3097            ('\u0141', 'strict', None),
3098            ('\u0141', 'ignore', b''),
3099            ('\u0141', 'replace', b'L'),
3100            ('\udc98', 'surrogateescape', b'\x98'),
3101            ('\udc98', 'surrogatepass', None),
3102        ))
3103        self.check_decode(1252, (
3104            (b'abc', 'strict', 'abc'),
3105            (b'\xe9\x80', 'strict', '\xe9\u20ac'),
3106            (b'\xff', 'strict', '\xff'),
3107        ))
3108
3109    def test_cp_utf7(self):
3110        cp = 65000
3111        self.check_encode(cp, (
3112            ('abc', 'strict', b'abc'),
3113            ('\xe9\u20ac', 'strict',  b'+AOkgrA-'),
3114            ('\U0010ffff', 'strict',  b'+2//f/w-'),
3115            ('\udc80', 'strict', b'+3IA-'),
3116            ('\ufffd', 'strict', b'+//0-'),
3117        ))
3118        self.check_decode(cp, (
3119            (b'abc', 'strict', 'abc'),
3120            (b'+AOkgrA-', 'strict', '\xe9\u20ac'),
3121            (b'+2//f/w-', 'strict', '\U0010ffff'),
3122            (b'+3IA-', 'strict', '\udc80'),
3123            (b'+//0-', 'strict', '\ufffd'),
3124            # invalid bytes
3125            (b'[+/]', 'strict', '[]'),
3126            (b'[\xff]', 'strict', '[\xff]'),
3127        ))
3128
3129    def test_multibyte_encoding(self):
3130        self.check_decode(932, (
3131            (b'\x84\xe9\x80', 'ignore', '\u9a3e'),
3132            (b'\x84\xe9\x80', 'replace', '\ufffd\u9a3e'),
3133        ))
3134        self.check_decode(self.CP_UTF8, (
3135            (b'\xff\xf4\x8f\xbf\xbf', 'ignore', '\U0010ffff'),
3136            (b'\xff\xf4\x8f\xbf\xbf', 'replace', '\ufffd\U0010ffff'),
3137        ))
3138        self.check_encode(self.CP_UTF8, (
3139            ('[\U0010ffff\uDC80]', 'ignore', b'[\xf4\x8f\xbf\xbf]'),
3140            ('[\U0010ffff\uDC80]', 'replace', b'[\xf4\x8f\xbf\xbf?]'),
3141        ))
3142
3143    def test_incremental(self):
3144        decoded = codecs.code_page_decode(932, b'\x82', 'strict', False)
3145        self.assertEqual(decoded, ('', 0))
3146
3147        decoded = codecs.code_page_decode(932,
3148                                          b'\xe9\x80\xe9', 'strict',
3149                                          False)
3150        self.assertEqual(decoded, ('\u9a3e', 2))
3151
3152        decoded = codecs.code_page_decode(932,
3153                                          b'\xe9\x80\xe9\x80', 'strict',
3154                                          False)
3155        self.assertEqual(decoded, ('\u9a3e\u9a3e', 4))
3156
3157        decoded = codecs.code_page_decode(932,
3158                                          b'abc', 'strict',
3159                                          False)
3160        self.assertEqual(decoded, ('abc', 3))
3161
3162    def test_mbcs_alias(self):
3163        # Check that looking up our 'default' codepage will return
3164        # mbcs when we don't have a more specific one available
3165        import _bootlocale
3166        def _get_fake_codepage(*a):
3167            return 'cp123'
3168        old_getpreferredencoding = _bootlocale.getpreferredencoding
3169        _bootlocale.getpreferredencoding = _get_fake_codepage
3170        try:
3171            codec = codecs.lookup('cp123')
3172            self.assertEqual(codec.name, 'mbcs')
3173        finally:
3174            _bootlocale.getpreferredencoding = old_getpreferredencoding
3175
3176
3177class ASCIITest(unittest.TestCase):
3178    def test_encode(self):
3179        self.assertEqual('abc123'.encode('ascii'), b'abc123')
3180
3181    def test_encode_error(self):
3182        for data, error_handler, expected in (
3183            ('[\x80\xff\u20ac]', 'ignore', b'[]'),
3184            ('[\x80\xff\u20ac]', 'replace', b'[???]'),
3185            ('[\x80\xff\u20ac]', 'xmlcharrefreplace', b'[&#128;&#255;&#8364;]'),
3186            ('[\x80\xff\u20ac\U000abcde]', 'backslashreplace',
3187             b'[\\x80\\xff\\u20ac\\U000abcde]'),
3188            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3189        ):
3190            with self.subTest(data=data, error_handler=error_handler,
3191                              expected=expected):
3192                self.assertEqual(data.encode('ascii', error_handler),
3193                                 expected)
3194
3195    def test_encode_surrogateescape_error(self):
3196        with self.assertRaises(UnicodeEncodeError):
3197            # the first character can be decoded, but not the second
3198            '\udc80\xff'.encode('ascii', 'surrogateescape')
3199
3200    def test_decode(self):
3201        self.assertEqual(b'abc'.decode('ascii'), 'abc')
3202
3203    def test_decode_error(self):
3204        for data, error_handler, expected in (
3205            (b'[\x80\xff]', 'ignore', '[]'),
3206            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
3207            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
3208            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
3209        ):
3210            with self.subTest(data=data, error_handler=error_handler,
3211                              expected=expected):
3212                self.assertEqual(data.decode('ascii', error_handler),
3213                                 expected)
3214
3215
3216class Latin1Test(unittest.TestCase):
3217    def test_encode(self):
3218        for data, expected in (
3219            ('abc', b'abc'),
3220            ('\x80\xe9\xff', b'\x80\xe9\xff'),
3221        ):
3222            with self.subTest(data=data, expected=expected):
3223                self.assertEqual(data.encode('latin1'), expected)
3224
3225    def test_encode_errors(self):
3226        for data, error_handler, expected in (
3227            ('[\u20ac\udc80]', 'ignore', b'[]'),
3228            ('[\u20ac\udc80]', 'replace', b'[??]'),
3229            ('[\u20ac\U000abcde]', 'backslashreplace',
3230             b'[\\u20ac\\U000abcde]'),
3231            ('[\u20ac\udc80]', 'xmlcharrefreplace', b'[&#8364;&#56448;]'),
3232            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3233        ):
3234            with self.subTest(data=data, error_handler=error_handler,
3235                              expected=expected):
3236                self.assertEqual(data.encode('latin1', error_handler),
3237                                 expected)
3238
3239    def test_encode_surrogateescape_error(self):
3240        with self.assertRaises(UnicodeEncodeError):
3241            # the first character can be decoded, but not the second
3242            '\udc80\u20ac'.encode('latin1', 'surrogateescape')
3243
3244    def test_decode(self):
3245        for data, expected in (
3246            (b'abc', 'abc'),
3247            (b'[\x80\xff]', '[\x80\xff]'),
3248        ):
3249            with self.subTest(data=data, expected=expected):
3250                self.assertEqual(data.decode('latin1'), expected)
3251
3252
3253if __name__ == "__main__":
3254    unittest.main()
3255