1# -*- coding: iso-8859-1 -*-
2# Copyright (C) 2001,2002 Python Software Foundation
3# csv package unit tests
4
5import sys
6import os
7import unittest
8from StringIO import StringIO
9import tempfile
10import csv
11import gc
12import io
13from test import test_support
14
15class Test_Csv(unittest.TestCase):
16    """
17    Test the underlying C csv parser in ways that are not appropriate
18    from the high level interface. Further tests of this nature are done
19    in TestDialectRegistry.
20    """
21    def _test_arg_valid(self, ctor, arg):
22        self.assertRaises(TypeError, ctor)
23        self.assertRaises(TypeError, ctor, None)
24        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
25        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
26        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
27        self.assertRaises(csv.Error, ctor, arg, 'foo')
28        self.assertRaises(TypeError, ctor, arg, delimiter=None)
29        self.assertRaises(TypeError, ctor, arg, delimiter=1)
30        self.assertRaises(TypeError, ctor, arg, quotechar=1)
31        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
32        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
33        self.assertRaises(TypeError, ctor, arg, quoting=None)
34        self.assertRaises(TypeError, ctor, arg,
35                          quoting=csv.QUOTE_ALL, quotechar='')
36        self.assertRaises(TypeError, ctor, arg,
37                          quoting=csv.QUOTE_ALL, quotechar=None)
38
39    def test_reader_arg_valid(self):
40        self._test_arg_valid(csv.reader, [])
41
42    def test_writer_arg_valid(self):
43        self._test_arg_valid(csv.writer, StringIO())
44
45    def _test_default_attrs(self, ctor, *args):
46        obj = ctor(*args)
47        # Check defaults
48        self.assertEqual(obj.dialect.delimiter, ',')
49        self.assertEqual(obj.dialect.doublequote, True)
50        self.assertEqual(obj.dialect.escapechar, None)
51        self.assertEqual(obj.dialect.lineterminator, "\r\n")
52        self.assertEqual(obj.dialect.quotechar, '"')
53        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
54        self.assertEqual(obj.dialect.skipinitialspace, False)
55        self.assertEqual(obj.dialect.strict, False)
56        # Try deleting or changing attributes (they are read-only)
57        self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')
58        self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':')
59        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
60        self.assertRaises(AttributeError, setattr, obj.dialect,
61                          'quoting', None)
62
63    def test_reader_attrs(self):
64        self._test_default_attrs(csv.reader, [])
65
66    def test_writer_attrs(self):
67        self._test_default_attrs(csv.writer, StringIO())
68
69    def _test_kw_attrs(self, ctor, *args):
70        # Now try with alternate options
71        kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',
72                      lineterminator='\r', quotechar='*',
73                      quoting=csv.QUOTE_NONE, skipinitialspace=True,
74                      strict=True)
75        obj = ctor(*args, **kwargs)
76        self.assertEqual(obj.dialect.delimiter, ':')
77        self.assertEqual(obj.dialect.doublequote, False)
78        self.assertEqual(obj.dialect.escapechar, '\\')
79        self.assertEqual(obj.dialect.lineterminator, "\r")
80        self.assertEqual(obj.dialect.quotechar, '*')
81        self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)
82        self.assertEqual(obj.dialect.skipinitialspace, True)
83        self.assertEqual(obj.dialect.strict, True)
84
85    def test_reader_kw_attrs(self):
86        self._test_kw_attrs(csv.reader, [])
87
88    def test_writer_kw_attrs(self):
89        self._test_kw_attrs(csv.writer, StringIO())
90
91    def _test_dialect_attrs(self, ctor, *args):
92        # Now try with dialect-derived options
93        class dialect:
94            delimiter='-'
95            doublequote=False
96            escapechar='^'
97            lineterminator='$'
98            quotechar='#'
99            quoting=csv.QUOTE_ALL
100            skipinitialspace=True
101            strict=False
102        args = args + (dialect,)
103        obj = ctor(*args)
104        self.assertEqual(obj.dialect.delimiter, '-')
105        self.assertEqual(obj.dialect.doublequote, False)
106        self.assertEqual(obj.dialect.escapechar, '^')
107        self.assertEqual(obj.dialect.lineterminator, "$")
108        self.assertEqual(obj.dialect.quotechar, '#')
109        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
110        self.assertEqual(obj.dialect.skipinitialspace, True)
111        self.assertEqual(obj.dialect.strict, False)
112
113    def test_reader_dialect_attrs(self):
114        self._test_dialect_attrs(csv.reader, [])
115
116    def test_writer_dialect_attrs(self):
117        self._test_dialect_attrs(csv.writer, StringIO())
118
119
120    def _write_test(self, fields, expect, **kwargs):
121        fd, name = tempfile.mkstemp()
122        fileobj = os.fdopen(fd, "w+b")
123        try:
124            writer = csv.writer(fileobj, **kwargs)
125            writer.writerow(fields)
126            fileobj.seek(0)
127            self.assertEqual(fileobj.read(),
128                             expect + writer.dialect.lineterminator)
129        finally:
130            fileobj.close()
131            os.unlink(name)
132
133    def test_write_arg_valid(self):
134        self.assertRaises(csv.Error, self._write_test, None, '')
135        self._write_test((), '')
136        self._write_test([None], '""')
137        self.assertRaises(csv.Error, self._write_test,
138                          [None], None, quoting = csv.QUOTE_NONE)
139        # Check that exceptions are passed up the chain
140        class BadList:
141            def __len__(self):
142                return 10;
143            def __getitem__(self, i):
144                if i > 2:
145                    raise IOError
146        self.assertRaises(IOError, self._write_test, BadList(), '')
147        class BadItem:
148            def __str__(self):
149                raise IOError
150        self.assertRaises(IOError, self._write_test, [BadItem()], '')
151
152    def test_write_bigfield(self):
153        # This exercises the buffer realloc functionality
154        bigstring = 'X' * 50000
155        self._write_test([bigstring,bigstring], '%s,%s' % \
156                         (bigstring, bigstring))
157
158    def test_write_quoting(self):
159        self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
160        self.assertRaises(csv.Error,
161                          self._write_test,
162                          ['a',1,'p,q'], 'a,1,p,q',
163                          quoting = csv.QUOTE_NONE)
164        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
165                         quoting = csv.QUOTE_MINIMAL)
166        self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
167                         quoting = csv.QUOTE_NONNUMERIC)
168        self._write_test(['a',1,'p,q'], '"a","1","p,q"',
169                         quoting = csv.QUOTE_ALL)
170        self._write_test(['a\nb',1], '"a\nb","1"',
171                         quoting = csv.QUOTE_ALL)
172
173    def test_write_escape(self):
174        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
175                         escapechar='\\')
176        self.assertRaises(csv.Error,
177                          self._write_test,
178                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
179                          escapechar=None, doublequote=False)
180        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
181                         escapechar='\\', doublequote = False)
182        self._write_test(['"'], '""""',
183                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
184        self._write_test(['"'], '\\"',
185                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
186                         doublequote = False)
187        self._write_test(['"'], '\\"',
188                         escapechar='\\', quoting = csv.QUOTE_NONE)
189        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
190                         escapechar='\\', quoting = csv.QUOTE_NONE)
191
192    def test_writerows(self):
193        class BrokenFile:
194            def write(self, buf):
195                raise IOError
196        writer = csv.writer(BrokenFile())
197        self.assertRaises(IOError, writer.writerows, [['a']])
198        fd, name = tempfile.mkstemp()
199        fileobj = os.fdopen(fd, "w+b")
200        try:
201            writer = csv.writer(fileobj)
202            self.assertRaises(TypeError, writer.writerows, None)
203            writer.writerows([['a','b'],['c','d']])
204            fileobj.seek(0)
205            self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
206        finally:
207            fileobj.close()
208            os.unlink(name)
209
210    def test_write_float(self):
211        # Issue 13573: loss of precision because csv.writer
212        # uses str() for floats instead of repr()
213        orig_row = [1.234567890123, 1.0/7.0, 'abc']
214        f = StringIO()
215        c = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)
216        c.writerow(orig_row)
217        f.seek(0)
218        c = csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)
219        new_row = next(c)
220        self.assertEqual(orig_row, new_row)
221
222    def _read_test(self, input, expect, **kwargs):
223        reader = csv.reader(input, **kwargs)
224        result = list(reader)
225        self.assertEqual(result, expect)
226
227    def test_read_oddinputs(self):
228        self._read_test([], [])
229        self._read_test([''], [[]])
230        self.assertRaises(csv.Error, self._read_test,
231                          ['"ab"c'], None, strict = 1)
232        # cannot handle null bytes for the moment
233        self.assertRaises(csv.Error, self._read_test,
234                          ['ab\0c'], None, strict = 1)
235        self._read_test(['"ab"c'], [['abc']], doublequote = 0)
236
237    def test_read_eol(self):
238        self._read_test(['a,b'], [['a','b']])
239        self._read_test(['a,b\n'], [['a','b']])
240        self._read_test(['a,b\r\n'], [['a','b']])
241        self._read_test(['a,b\r'], [['a','b']])
242        self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], [])
243        self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], [])
244        self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], [])
245
246    def test_read_eof(self):
247        self._read_test(['a,"'], [['a', '']])
248        self._read_test(['"a'], [['a']])
249        self._read_test(['^'], [['\n']], escapechar='^')
250        self.assertRaises(csv.Error, self._read_test, ['a,"'], [], strict=True)
251        self.assertRaises(csv.Error, self._read_test, ['"a'], [], strict=True)
252        self.assertRaises(csv.Error, self._read_test,
253                          ['^'], [], escapechar='^', strict=True)
254
255    def test_read_escape(self):
256        self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')
257        self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\')
258        self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
259        self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')
260        self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
261        self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
262
263    def test_read_quoting(self):
264        self._read_test(['1,",3,",5'], [['1', ',3,', '5']])
265        self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
266                        quotechar=None, escapechar='\\')
267        self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
268                        quoting=csv.QUOTE_NONE, escapechar='\\')
269        # will this fail where locale uses comma for decimals?
270        self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
271                        quoting=csv.QUOTE_NONNUMERIC)
272        self._read_test(['"a\nb", 7'], [['a\nb', ' 7']])
273        self.assertRaises(ValueError, self._read_test,
274                          ['abc,3'], [[]],
275                          quoting=csv.QUOTE_NONNUMERIC)
276
277    def test_read_bigfield(self):
278        # This exercises the buffer realloc functionality and field size
279        # limits.
280        limit = csv.field_size_limit()
281        try:
282            size = 50000
283            bigstring = 'X' * size
284            bigline = '%s,%s' % (bigstring, bigstring)
285            self._read_test([bigline], [[bigstring, bigstring]])
286            csv.field_size_limit(size)
287            self._read_test([bigline], [[bigstring, bigstring]])
288            self.assertEqual(csv.field_size_limit(), size)
289            csv.field_size_limit(size-1)
290            self.assertRaises(csv.Error, self._read_test, [bigline], [])
291            self.assertRaises(TypeError, csv.field_size_limit, None)
292            self.assertRaises(TypeError, csv.field_size_limit, 1, None)
293        finally:
294            csv.field_size_limit(limit)
295
296    def test_read_linenum(self):
297        for r in (csv.reader(['line,1', 'line,2', 'line,3']),
298                  csv.DictReader(['line,1', 'line,2', 'line,3'],
299                                 fieldnames=['a', 'b', 'c'])):
300            self.assertEqual(r.line_num, 0)
301            r.next()
302            self.assertEqual(r.line_num, 1)
303            r.next()
304            self.assertEqual(r.line_num, 2)
305            r.next()
306            self.assertEqual(r.line_num, 3)
307            self.assertRaises(StopIteration, r.next)
308            self.assertEqual(r.line_num, 3)
309
310    def test_roundtrip_quoteed_newlines(self):
311        fd, name = tempfile.mkstemp()
312        fileobj = os.fdopen(fd, "w+b")
313        try:
314            writer = csv.writer(fileobj)
315            self.assertRaises(TypeError, writer.writerows, None)
316            rows = [['a\nb','b'],['c','x\r\nd']]
317            writer.writerows(rows)
318            fileobj.seek(0)
319            for i, row in enumerate(csv.reader(fileobj)):
320                self.assertEqual(row, rows[i])
321        finally:
322            fileobj.close()
323            os.unlink(name)
324
325class TestDialectRegistry(unittest.TestCase):
326    def test_registry_badargs(self):
327        self.assertRaises(TypeError, csv.list_dialects, None)
328        self.assertRaises(TypeError, csv.get_dialect)
329        self.assertRaises(csv.Error, csv.get_dialect, None)
330        self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
331        self.assertRaises(TypeError, csv.unregister_dialect)
332        self.assertRaises(csv.Error, csv.unregister_dialect, None)
333        self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
334        self.assertRaises(TypeError, csv.register_dialect, None)
335        self.assertRaises(TypeError, csv.register_dialect, None, None)
336        self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)
337        self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
338                          badargument=None)
339        self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
340                          quoting=None)
341        self.assertRaises(TypeError, csv.register_dialect, [])
342
343    def test_registry(self):
344        class myexceltsv(csv.excel):
345            delimiter = "\t"
346        name = "myexceltsv"
347        expected_dialects = csv.list_dialects() + [name]
348        expected_dialects.sort()
349        csv.register_dialect(name, myexceltsv)
350        self.addCleanup(csv.unregister_dialect, name)
351        self.assertEqual(csv.get_dialect(name).delimiter, '\t')
352        got_dialects = sorted(csv.list_dialects())
353        self.assertEqual(expected_dialects, got_dialects)
354
355    def test_register_kwargs(self):
356        name = 'fedcba'
357        csv.register_dialect(name, delimiter=';')
358        self.addCleanup(csv.unregister_dialect, name)
359        self.assertEqual(csv.get_dialect(name).delimiter, ';')
360        self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name)))
361
362    def test_incomplete_dialect(self):
363        class myexceltsv(csv.Dialect):
364            delimiter = "\t"
365        self.assertRaises(csv.Error, myexceltsv)
366
367    def test_space_dialect(self):
368        class space(csv.excel):
369            delimiter = " "
370            quoting = csv.QUOTE_NONE
371            escapechar = "\\"
372
373        fd, name = tempfile.mkstemp()
374        fileobj = os.fdopen(fd, "w+b")
375        try:
376            fileobj.write("abc def\nc1ccccc1 benzene\n")
377            fileobj.seek(0)
378            rdr = csv.reader(fileobj, dialect=space())
379            self.assertEqual(rdr.next(), ["abc", "def"])
380            self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
381        finally:
382            fileobj.close()
383            os.unlink(name)
384
385    def test_dialect_apply(self):
386        class testA(csv.excel):
387            delimiter = "\t"
388        class testB(csv.excel):
389            delimiter = ":"
390        class testC(csv.excel):
391            delimiter = "|"
392
393        csv.register_dialect('testC', testC)
394        try:
395            fd, name = tempfile.mkstemp()
396            fileobj = os.fdopen(fd, "w+b")
397            try:
398                writer = csv.writer(fileobj)
399                writer.writerow([1,2,3])
400                fileobj.seek(0)
401                self.assertEqual(fileobj.read(), "1,2,3\r\n")
402            finally:
403                fileobj.close()
404                os.unlink(name)
405
406            fd, name = tempfile.mkstemp()
407            fileobj = os.fdopen(fd, "w+b")
408            try:
409                writer = csv.writer(fileobj, testA)
410                writer.writerow([1,2,3])
411                fileobj.seek(0)
412                self.assertEqual(fileobj.read(), "1\t2\t3\r\n")
413            finally:
414                fileobj.close()
415                os.unlink(name)
416
417            fd, name = tempfile.mkstemp()
418            fileobj = os.fdopen(fd, "w+b")
419            try:
420                writer = csv.writer(fileobj, dialect=testB())
421                writer.writerow([1,2,3])
422                fileobj.seek(0)
423                self.assertEqual(fileobj.read(), "1:2:3\r\n")
424            finally:
425                fileobj.close()
426                os.unlink(name)
427
428            fd, name = tempfile.mkstemp()
429            fileobj = os.fdopen(fd, "w+b")
430            try:
431                writer = csv.writer(fileobj, dialect='testC')
432                writer.writerow([1,2,3])
433                fileobj.seek(0)
434                self.assertEqual(fileobj.read(), "1|2|3\r\n")
435            finally:
436                fileobj.close()
437                os.unlink(name)
438
439            fd, name = tempfile.mkstemp()
440            fileobj = os.fdopen(fd, "w+b")
441            try:
442                writer = csv.writer(fileobj, dialect=testA, delimiter=';')
443                writer.writerow([1,2,3])
444                fileobj.seek(0)
445                self.assertEqual(fileobj.read(), "1;2;3\r\n")
446            finally:
447                fileobj.close()
448                os.unlink(name)
449
450        finally:
451            csv.unregister_dialect('testC')
452
453    def test_bad_dialect(self):
454        # Unknown parameter
455        self.assertRaises(TypeError, csv.reader, [], bad_attr = 0)
456        # Bad values
457        self.assertRaises(TypeError, csv.reader, [], delimiter = None)
458        self.assertRaises(TypeError, csv.reader, [], quoting = -1)
459        self.assertRaises(TypeError, csv.reader, [], quoting = 100)
460
461class TestCsvBase(unittest.TestCase):
462    def readerAssertEqual(self, input, expected_result):
463        fd, name = tempfile.mkstemp()
464        fileobj = os.fdopen(fd, "w+b")
465        try:
466            fileobj.write(input)
467            fileobj.seek(0)
468            reader = csv.reader(fileobj, dialect = self.dialect)
469            fields = list(reader)
470            self.assertEqual(fields, expected_result)
471        finally:
472            fileobj.close()
473            os.unlink(name)
474
475    def writerAssertEqual(self, input, expected_result):
476        fd, name = tempfile.mkstemp()
477        fileobj = os.fdopen(fd, "w+b")
478        try:
479            writer = csv.writer(fileobj, dialect = self.dialect)
480            writer.writerows(input)
481            fileobj.seek(0)
482            self.assertEqual(fileobj.read(), expected_result)
483        finally:
484            fileobj.close()
485            os.unlink(name)
486
487class TestDialectExcel(TestCsvBase):
488    dialect = 'excel'
489
490    def test_single(self):
491        self.readerAssertEqual('abc', [['abc']])
492
493    def test_simple(self):
494        self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])
495
496    def test_blankline(self):
497        self.readerAssertEqual('', [])
498
499    def test_empty_fields(self):
500        self.readerAssertEqual(',', [['', '']])
501
502    def test_singlequoted(self):
503        self.readerAssertEqual('""', [['']])
504
505    def test_singlequoted_left_empty(self):
506        self.readerAssertEqual('"",', [['','']])
507
508    def test_singlequoted_right_empty(self):
509        self.readerAssertEqual(',""', [['','']])
510
511    def test_single_quoted_quote(self):
512        self.readerAssertEqual('""""', [['"']])
513
514    def test_quoted_quotes(self):
515        self.readerAssertEqual('""""""', [['""']])
516
517    def test_inline_quote(self):
518        self.readerAssertEqual('a""b', [['a""b']])
519
520    def test_inline_quotes(self):
521        self.readerAssertEqual('a"b"c', [['a"b"c']])
522
523    def test_quotes_and_more(self):
524        # Excel would never write a field containing '"a"b', but when
525        # reading one, it will return 'ab'.
526        self.readerAssertEqual('"a"b', [['ab']])
527
528    def test_lone_quote(self):
529        self.readerAssertEqual('a"b', [['a"b']])
530
531    def test_quote_and_quote(self):
532        # Excel would never write a field containing '"a" "b"', but when
533        # reading one, it will return 'a "b"'.
534        self.readerAssertEqual('"a" "b"', [['a "b"']])
535
536    def test_space_and_quote(self):
537        self.readerAssertEqual(' "a"', [[' "a"']])
538
539    def test_quoted(self):
540        self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',
541                               [['1', '2', '3',
542                                 'I think, therefore I am',
543                                 '5', '6']])
544
545    def test_quoted_quote(self):
546        self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',
547                               [['1', '2', '3',
548                                 '"I see," said the blind man',
549                                 'as he picked up his hammer and saw']])
550
551    def test_quoted_nl(self):
552        input = '''\
5531,2,3,"""I see,""
554said the blind man","as he picked up his
555hammer and saw"
5569,8,7,6'''
557        self.readerAssertEqual(input,
558                               [['1', '2', '3',
559                                   '"I see,"\nsaid the blind man',
560                                   'as he picked up his\nhammer and saw'],
561                                ['9','8','7','6']])
562
563    def test_dubious_quote(self):
564        self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])
565
566    def test_null(self):
567        self.writerAssertEqual([], '')
568
569    def test_single_writer(self):
570        self.writerAssertEqual([['abc']], 'abc\r\n')
571
572    def test_simple_writer(self):
573        self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')
574
575    def test_quotes(self):
576        self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')
577
578    def test_quote_fieldsep(self):
579        self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
580
581    def test_newlines(self):
582        self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')
583
584class EscapedExcel(csv.excel):
585    quoting = csv.QUOTE_NONE
586    escapechar = '\\'
587
588class TestEscapedExcel(TestCsvBase):
589    dialect = EscapedExcel()
590
591    def test_escape_fieldsep(self):
592        self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')
593
594    def test_read_escape_fieldsep(self):
595        self.readerAssertEqual('abc\\,def\r\n', [['abc,def']])
596
597class QuotedEscapedExcel(csv.excel):
598    quoting = csv.QUOTE_NONNUMERIC
599    escapechar = '\\'
600
601class TestQuotedEscapedExcel(TestCsvBase):
602    dialect = QuotedEscapedExcel()
603
604    def test_write_escape_fieldsep(self):
605        self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
606
607    def test_read_escape_fieldsep(self):
608        self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])
609
610class TestDictFields(unittest.TestCase):
611    ### "long" means the row is longer than the number of fieldnames
612    ### "short" means there are fewer elements in the row than fieldnames
613    def test_write_simple_dict(self):
614        fd, name = tempfile.mkstemp()
615        fileobj = io.open(fd, 'w+b')
616        try:
617            writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
618            writer.writeheader()
619            fileobj.seek(0)
620            self.assertEqual(fileobj.readline(), "f1,f2,f3\r\n")
621            writer.writerow({"f1": 10, "f3": "abc"})
622            fileobj.seek(0)
623            fileobj.readline() # header
624            self.assertEqual(fileobj.read(), "10,,abc\r\n")
625        finally:
626            fileobj.close()
627            os.unlink(name)
628
629    def test_write_no_fields(self):
630        fileobj = StringIO()
631        self.assertRaises(TypeError, csv.DictWriter, fileobj)
632
633    def test_read_dict_fields(self):
634        fd, name = tempfile.mkstemp()
635        fileobj = os.fdopen(fd, "w+b")
636        try:
637            fileobj.write("1,2,abc\r\n")
638            fileobj.seek(0)
639            reader = csv.DictReader(fileobj,
640                                    fieldnames=["f1", "f2", "f3"])
641            self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
642        finally:
643            fileobj.close()
644            os.unlink(name)
645
646    def test_read_dict_no_fieldnames(self):
647        fd, name = tempfile.mkstemp()
648        fileobj = os.fdopen(fd, "w+b")
649        try:
650            fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
651            fileobj.seek(0)
652            reader = csv.DictReader(fileobj)
653            self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
654            self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
655        finally:
656            fileobj.close()
657            os.unlink(name)
658
659    # Two test cases to make sure existing ways of implicitly setting
660    # fieldnames continue to work.  Both arise from discussion in issue3436.
661    def test_read_dict_fieldnames_from_file(self):
662        fd, name = tempfile.mkstemp()
663        f = os.fdopen(fd, "w+b")
664        try:
665            f.write("f1,f2,f3\r\n1,2,abc\r\n")
666            f.seek(0)
667            reader = csv.DictReader(f, fieldnames=csv.reader(f).next())
668            self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
669            self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
670        finally:
671            f.close()
672            os.unlink(name)
673
674    def test_read_dict_fieldnames_chain(self):
675        import itertools
676        fd, name = tempfile.mkstemp()
677        f = os.fdopen(fd, "w+b")
678        try:
679            f.write("f1,f2,f3\r\n1,2,abc\r\n")
680            f.seek(0)
681            reader = csv.DictReader(f)
682            first = next(reader)
683            for row in itertools.chain([first], reader):
684                self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
685                self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'})
686        finally:
687            f.close()
688            os.unlink(name)
689
690    def test_read_long(self):
691        fd, name = tempfile.mkstemp()
692        fileobj = os.fdopen(fd, "w+b")
693        try:
694            fileobj.write("1,2,abc,4,5,6\r\n")
695            fileobj.seek(0)
696            reader = csv.DictReader(fileobj,
697                                    fieldnames=["f1", "f2"])
698            self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
699                                             None: ["abc", "4", "5", "6"]})
700        finally:
701            fileobj.close()
702            os.unlink(name)
703
704    def test_read_long_with_rest(self):
705        fd, name = tempfile.mkstemp()
706        fileobj = os.fdopen(fd, "w+b")
707        try:
708            fileobj.write("1,2,abc,4,5,6\r\n")
709            fileobj.seek(0)
710            reader = csv.DictReader(fileobj,
711                                    fieldnames=["f1", "f2"], restkey="_rest")
712            self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
713                                             "_rest": ["abc", "4", "5", "6"]})
714        finally:
715            fileobj.close()
716            os.unlink(name)
717
718    def test_read_long_with_rest_no_fieldnames(self):
719        fd, name = tempfile.mkstemp()
720        fileobj = os.fdopen(fd, "w+b")
721        try:
722            fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
723            fileobj.seek(0)
724            reader = csv.DictReader(fileobj, restkey="_rest")
725            self.assertEqual(reader.fieldnames, ["f1", "f2"])
726            self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
727                                             "_rest": ["abc", "4", "5", "6"]})
728        finally:
729            fileobj.close()
730            os.unlink(name)
731
732    def test_read_short(self):
733        fd, name = tempfile.mkstemp()
734        fileobj = os.fdopen(fd, "w+b")
735        try:
736            fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
737            fileobj.seek(0)
738            reader = csv.DictReader(fileobj,
739                                    fieldnames="1 2 3 4 5 6".split(),
740                                    restval="DEFAULT")
741            self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
742                                             "4": '4', "5": '5', "6": '6'})
743            self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
744                                             "4": 'DEFAULT', "5": 'DEFAULT',
745                                             "6": 'DEFAULT'})
746        finally:
747            fileobj.close()
748            os.unlink(name)
749
750    def test_read_multi(self):
751        sample = [
752            '2147483648,43.0e12,17,abc,def\r\n',
753            '147483648,43.0e2,17,abc,def\r\n',
754            '47483648,43.0,170,abc,def\r\n'
755            ]
756
757        reader = csv.DictReader(sample,
758                                fieldnames="i1 float i2 s1 s2".split())
759        self.assertEqual(reader.next(), {"i1": '2147483648',
760                                         "float": '43.0e12',
761                                         "i2": '17',
762                                         "s1": 'abc',
763                                         "s2": 'def'})
764
765    def test_read_with_blanks(self):
766        reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n",
767                                 "1,2,abc,4,5,6\r\n"],
768                                fieldnames="1 2 3 4 5 6".split())
769        self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
770                                         "4": '4', "5": '5', "6": '6'})
771        self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
772                                         "4": '4', "5": '5', "6": '6'})
773
774    def test_read_semi_sep(self):
775        reader = csv.DictReader(["1;2;abc;4;5;6\r\n"],
776                                fieldnames="1 2 3 4 5 6".split(),
777                                delimiter=';')
778        self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
779                                         "4": '4', "5": '5', "6": '6'})
780
781class TestArrayWrites(unittest.TestCase):
782    def test_int_write(self):
783        import array
784        contents = [(20-i) for i in range(20)]
785        a = array.array('i', contents)
786
787        fd, name = tempfile.mkstemp()
788        fileobj = os.fdopen(fd, "w+b")
789        try:
790            writer = csv.writer(fileobj, dialect="excel")
791            writer.writerow(a)
792            expected = ",".join([str(i) for i in a])+"\r\n"
793            fileobj.seek(0)
794            self.assertEqual(fileobj.read(), expected)
795        finally:
796            fileobj.close()
797            os.unlink(name)
798
799    def test_double_write(self):
800        import array
801        contents = [(20-i)*0.1 for i in range(20)]
802        a = array.array('d', contents)
803        fd, name = tempfile.mkstemp()
804        fileobj = os.fdopen(fd, "w+b")
805        try:
806            writer = csv.writer(fileobj, dialect="excel")
807            writer.writerow(a)
808            expected = ",".join([repr(i) for i in a])+"\r\n"
809            fileobj.seek(0)
810            self.assertEqual(fileobj.read(), expected)
811        finally:
812            fileobj.close()
813            os.unlink(name)
814
815    def test_float_write(self):
816        import array
817        contents = [(20-i)*0.1 for i in range(20)]
818        a = array.array('f', contents)
819        fd, name = tempfile.mkstemp()
820        fileobj = os.fdopen(fd, "w+b")
821        try:
822            writer = csv.writer(fileobj, dialect="excel")
823            writer.writerow(a)
824            expected = ",".join([repr(i) for i in a])+"\r\n"
825            fileobj.seek(0)
826            self.assertEqual(fileobj.read(), expected)
827        finally:
828            fileobj.close()
829            os.unlink(name)
830
831    def test_char_write(self):
832        import array, string
833        a = array.array('c', string.letters)
834        fd, name = tempfile.mkstemp()
835        fileobj = os.fdopen(fd, "w+b")
836        try:
837            writer = csv.writer(fileobj, dialect="excel")
838            writer.writerow(a)
839            expected = ",".join(a)+"\r\n"
840            fileobj.seek(0)
841            self.assertEqual(fileobj.read(), expected)
842        finally:
843            fileobj.close()
844            os.unlink(name)
845
846class TestDialectValidity(unittest.TestCase):
847    def test_quoting(self):
848        class mydialect(csv.Dialect):
849            delimiter = ";"
850            escapechar = '\\'
851            doublequote = False
852            skipinitialspace = True
853            lineterminator = '\r\n'
854            quoting = csv.QUOTE_NONE
855        d = mydialect()
856
857        mydialect.quoting = None
858        self.assertRaises(csv.Error, mydialect)
859
860        mydialect.doublequote = True
861        mydialect.quoting = csv.QUOTE_ALL
862        mydialect.quotechar = '"'
863        d = mydialect()
864
865        mydialect.quotechar = "''"
866        self.assertRaises(csv.Error, mydialect)
867
868        mydialect.quotechar = 4
869        self.assertRaises(csv.Error, mydialect)
870
871    def test_delimiter(self):
872        class mydialect(csv.Dialect):
873            delimiter = ";"
874            escapechar = '\\'
875            doublequote = False
876            skipinitialspace = True
877            lineterminator = '\r\n'
878            quoting = csv.QUOTE_NONE
879        d = mydialect()
880
881        mydialect.delimiter = ":::"
882        self.assertRaises(csv.Error, mydialect)
883
884        mydialect.delimiter = 4
885        self.assertRaises(csv.Error, mydialect)
886
887    def test_lineterminator(self):
888        class mydialect(csv.Dialect):
889            delimiter = ";"
890            escapechar = '\\'
891            doublequote = False
892            skipinitialspace = True
893            lineterminator = '\r\n'
894            quoting = csv.QUOTE_NONE
895        d = mydialect()
896
897        mydialect.lineterminator = ":::"
898        d = mydialect()
899
900        mydialect.lineterminator = 4
901        self.assertRaises(csv.Error, mydialect)
902
903
904class TestSniffer(unittest.TestCase):
905    sample1 = """\
906Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes
907Shark City, Glendale Heights, IL, 12/28/02, Prezence
908Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow
909Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back
910"""
911    sample2 = """\
912'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'
913'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'
914'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'
915'Stonecutters ''Seafood'' and Chop House':'Lemont':'IL':'12/19/02':'Week Back'
916"""
917    header = '''\
918"venue","city","state","date","performers"
919'''
920    sample3 = '''\
92105/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
92205/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
92305/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
924'''
925
926    sample4 = '''\
9272147483648;43.0e12;17;abc;def
928147483648;43.0e2;17;abc;def
92947483648;43.0;170;abc;def
930'''
931
932    sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n"
933    sample6 = "a|b|c\r\nd|e|f\r\n"
934    sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n"
935
936    def test_has_header(self):
937        sniffer = csv.Sniffer()
938        self.assertEqual(sniffer.has_header(self.sample1), False)
939        self.assertEqual(sniffer.has_header(self.header+self.sample1), True)
940
941    def test_sniff(self):
942        sniffer = csv.Sniffer()
943        dialect = sniffer.sniff(self.sample1)
944        self.assertEqual(dialect.delimiter, ",")
945        self.assertEqual(dialect.quotechar, '"')
946        self.assertEqual(dialect.skipinitialspace, True)
947
948        dialect = sniffer.sniff(self.sample2)
949        self.assertEqual(dialect.delimiter, ":")
950        self.assertEqual(dialect.quotechar, "'")
951        self.assertEqual(dialect.skipinitialspace, False)
952
953    def test_delimiters(self):
954        sniffer = csv.Sniffer()
955        dialect = sniffer.sniff(self.sample3)
956        # given that all three lines in sample3 are equal,
957        # I think that any character could have been 'guessed' as the
958        # delimiter, depending on dictionary order
959        self.assertIn(dialect.delimiter, self.sample3)
960        dialect = sniffer.sniff(self.sample3, delimiters="?,")
961        self.assertEqual(dialect.delimiter, "?")
962        dialect = sniffer.sniff(self.sample3, delimiters="/,")
963        self.assertEqual(dialect.delimiter, "/")
964        dialect = sniffer.sniff(self.sample4)
965        self.assertEqual(dialect.delimiter, ";")
966        dialect = sniffer.sniff(self.sample5)
967        self.assertEqual(dialect.delimiter, "\t")
968        dialect = sniffer.sniff(self.sample6)
969        self.assertEqual(dialect.delimiter, "|")
970        dialect = sniffer.sniff(self.sample7)
971        self.assertEqual(dialect.delimiter, "|")
972        self.assertEqual(dialect.quotechar, "'")
973
974    def test_doublequote(self):
975        sniffer = csv.Sniffer()
976        dialect = sniffer.sniff(self.header)
977        self.assertFalse(dialect.doublequote)
978        dialect = sniffer.sniff(self.sample2)
979        self.assertTrue(dialect.doublequote)
980
981if not hasattr(sys, "gettotalrefcount"):
982    if test_support.verbose: print "*** skipping leakage tests ***"
983else:
984    class NUL:
985        def write(s, *args):
986            pass
987        writelines = write
988
989    class TestLeaks(unittest.TestCase):
990        def test_create_read(self):
991            delta = 0
992            lastrc = sys.gettotalrefcount()
993            for i in xrange(20):
994                gc.collect()
995                self.assertEqual(gc.garbage, [])
996                rc = sys.gettotalrefcount()
997                csv.reader(["a,b,c\r\n"])
998                csv.reader(["a,b,c\r\n"])
999                csv.reader(["a,b,c\r\n"])
1000                delta = rc-lastrc
1001                lastrc = rc
1002            # if csv.reader() leaks, last delta should be 3 or more
1003            self.assertEqual(delta < 3, True)
1004
1005        def test_create_write(self):
1006            delta = 0
1007            lastrc = sys.gettotalrefcount()
1008            s = NUL()
1009            for i in xrange(20):
1010                gc.collect()
1011                self.assertEqual(gc.garbage, [])
1012                rc = sys.gettotalrefcount()
1013                csv.writer(s)
1014                csv.writer(s)
1015                csv.writer(s)
1016                delta = rc-lastrc
1017                lastrc = rc
1018            # if csv.writer() leaks, last delta should be 3 or more
1019            self.assertEqual(delta < 3, True)
1020
1021        def test_read(self):
1022            delta = 0
1023            rows = ["a,b,c\r\n"]*5
1024            lastrc = sys.gettotalrefcount()
1025            for i in xrange(20):
1026                gc.collect()
1027                self.assertEqual(gc.garbage, [])
1028                rc = sys.gettotalrefcount()
1029                rdr = csv.reader(rows)
1030                for row in rdr:
1031                    pass
1032                delta = rc-lastrc
1033                lastrc = rc
1034            # if reader leaks during read, delta should be 5 or more
1035            self.assertEqual(delta < 5, True)
1036
1037        def test_write(self):
1038            delta = 0
1039            rows = [[1,2,3]]*5
1040            s = NUL()
1041            lastrc = sys.gettotalrefcount()
1042            for i in xrange(20):
1043                gc.collect()
1044                self.assertEqual(gc.garbage, [])
1045                rc = sys.gettotalrefcount()
1046                writer = csv.writer(s)
1047                for row in rows:
1048                    writer.writerow(row)
1049                delta = rc-lastrc
1050                lastrc = rc
1051            # if writer leaks during write, last delta should be 5 or more
1052            self.assertEqual(delta < 5, True)
1053
1054# commented out for now - csv module doesn't yet support Unicode
1055## class TestUnicode(unittest.TestCase):
1056##     def test_unicode_read(self):
1057##         import codecs
1058##         f = codecs.EncodedFile(StringIO("Martin von L�wis,"
1059##                                         "Marc Andr� Lemburg,"
1060##                                         "Guido van Rossum,"
1061##                                         "Fran�ois Pinard\r\n"),
1062##                                data_encoding='iso-8859-1')
1063##         reader = csv.reader(f)
1064##         self.assertEqual(list(reader), [[u"Martin von L�wis",
1065##                                          u"Marc Andr� Lemburg",
1066##                                          u"Guido van Rossum",
1067##                                          u"Fran�ois Pinardn"]])
1068
1069def test_main():
1070    mod = sys.modules[__name__]
1071    test_support.run_unittest(
1072        *[getattr(mod, name) for name in dir(mod) if name.startswith('Test')]
1073    )
1074
1075if __name__ == '__main__':
1076    test_main()
1077