1# -*- coding: iso-8859-1 -*-
2# Copyright (C) 2001,2002 Python Software Foundation
3# csv package unit tests
4
5import sys
6import os
7import unittest
8from StringIO import StringIO
9import tempfile
10import csv
11import gc
12import io
13from test import test_support
14
15class Test_Csv(unittest.TestCase):
16    """
17    Test the underlying C csv parser in ways that are not appropriate
18    from the high level interface. Further tests of this nature are done
19    in TestDialectRegistry.
20    """
21    def _test_arg_valid(self, ctor, arg):
22        self.assertRaises(TypeError, ctor)
23        self.assertRaises(TypeError, ctor, None)
24        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
25        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
26        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
27        self.assertRaises(csv.Error, ctor, arg, 'foo')
28        self.assertRaises(TypeError, ctor, arg, delimiter=None)
29        self.assertRaises(TypeError, ctor, arg, delimiter=1)
30        self.assertRaises(TypeError, ctor, arg, quotechar=1)
31        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
32        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
33        self.assertRaises(TypeError, ctor, arg, quoting=None)
34        self.assertRaises(TypeError, ctor, arg,
35                          quoting=csv.QUOTE_ALL, quotechar='')
36        self.assertRaises(TypeError, ctor, arg,
37                          quoting=csv.QUOTE_ALL, quotechar=None)
38
39    def test_reader_arg_valid(self):
40        self._test_arg_valid(csv.reader, [])
41
42    def test_writer_arg_valid(self):
43        self._test_arg_valid(csv.writer, StringIO())
44
45    def _test_default_attrs(self, ctor, *args):
46        obj = ctor(*args)
47        # Check defaults
48        self.assertEqual(obj.dialect.delimiter, ',')
49        self.assertEqual(obj.dialect.doublequote, True)
50        self.assertEqual(obj.dialect.escapechar, None)
51        self.assertEqual(obj.dialect.lineterminator, "\r\n")
52        self.assertEqual(obj.dialect.quotechar, '"')
53        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
54        self.assertEqual(obj.dialect.skipinitialspace, False)
55        self.assertEqual(obj.dialect.strict, False)
56        # Try deleting or changing attributes (they are read-only)
57        self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')
58        self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':')
59        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
60        self.assertRaises(AttributeError, setattr, obj.dialect,
61                          'quoting', None)
62
63    def test_reader_attrs(self):
64        self._test_default_attrs(csv.reader, [])
65
66    def test_writer_attrs(self):
67        self._test_default_attrs(csv.writer, StringIO())
68
69    def _test_kw_attrs(self, ctor, *args):
70        # Now try with alternate options
71        kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',
72                      lineterminator='\r', quotechar='*',
73                      quoting=csv.QUOTE_NONE, skipinitialspace=True,
74                      strict=True)
75        obj = ctor(*args, **kwargs)
76        self.assertEqual(obj.dialect.delimiter, ':')
77        self.assertEqual(obj.dialect.doublequote, False)
78        self.assertEqual(obj.dialect.escapechar, '\\')
79        self.assertEqual(obj.dialect.lineterminator, "\r")
80        self.assertEqual(obj.dialect.quotechar, '*')
81        self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)
82        self.assertEqual(obj.dialect.skipinitialspace, True)
83        self.assertEqual(obj.dialect.strict, True)
84
85    def test_reader_kw_attrs(self):
86        self._test_kw_attrs(csv.reader, [])
87
88    def test_writer_kw_attrs(self):
89        self._test_kw_attrs(csv.writer, StringIO())
90
91    def _test_dialect_attrs(self, ctor, *args):
92        # Now try with dialect-derived options
93        class dialect:
94            delimiter='-'
95            doublequote=False
96            escapechar='^'
97            lineterminator='$'
98            quotechar='#'
99            quoting=csv.QUOTE_ALL
100            skipinitialspace=True
101            strict=False
102        args = args + (dialect,)
103        obj = ctor(*args)
104        self.assertEqual(obj.dialect.delimiter, '-')
105        self.assertEqual(obj.dialect.doublequote, False)
106        self.assertEqual(obj.dialect.escapechar, '^')
107        self.assertEqual(obj.dialect.lineterminator, "$")
108        self.assertEqual(obj.dialect.quotechar, '#')
109        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
110        self.assertEqual(obj.dialect.skipinitialspace, True)
111        self.assertEqual(obj.dialect.strict, False)
112
113    def test_reader_dialect_attrs(self):
114        self._test_dialect_attrs(csv.reader, [])
115
116    def test_writer_dialect_attrs(self):
117        self._test_dialect_attrs(csv.writer, StringIO())
118
119
120    def _write_test(self, fields, expect, **kwargs):
121        fd, name = tempfile.mkstemp()
122        fileobj = os.fdopen(fd, "w+b")
123        try:
124            writer = csv.writer(fileobj, **kwargs)
125            writer.writerow(fields)
126            fileobj.seek(0)
127            self.assertEqual(fileobj.read(),
128                             expect + writer.dialect.lineterminator)
129        finally:
130            fileobj.close()
131            os.unlink(name)
132
133    def test_write_arg_valid(self):
134        self.assertRaises(csv.Error, self._write_test, None, '')
135        self._write_test((), '')
136        self._write_test([None], '""')
137        self.assertRaises(csv.Error, self._write_test,
138                          [None], None, quoting = csv.QUOTE_NONE)
139        # Check that exceptions are passed up the chain
140        class BadList:
141            def __len__(self):
142                return 10;
143            def __getitem__(self, i):
144                if i > 2:
145                    raise IOError
146        self.assertRaises(IOError, self._write_test, BadList(), '')
147        class BadItem:
148            def __str__(self):
149                raise IOError
150        self.assertRaises(IOError, self._write_test, [BadItem()], '')
151
152    def test_write_bigfield(self):
153        # This exercises the buffer realloc functionality
154        bigstring = 'X' * 50000
155        self._write_test([bigstring,bigstring], '%s,%s' % \
156                         (bigstring, bigstring))
157
158    def test_write_quoting(self):
159        self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
160        self.assertRaises(csv.Error,
161                          self._write_test,
162                          ['a',1,'p,q'], 'a,1,p,q',
163                          quoting = csv.QUOTE_NONE)
164        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
165                         quoting = csv.QUOTE_MINIMAL)
166        self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
167                         quoting = csv.QUOTE_NONNUMERIC)
168        self._write_test(['a',1,'p,q'], '"a","1","p,q"',
169                         quoting = csv.QUOTE_ALL)
170        self._write_test(['a\nb',1], '"a\nb","1"',
171                         quoting = csv.QUOTE_ALL)
172
173    def test_write_escape(self):
174        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
175                         escapechar='\\')
176        self.assertRaises(csv.Error,
177                          self._write_test,
178                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
179                          escapechar=None, doublequote=False)
180        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
181                         escapechar='\\', doublequote = False)
182        self._write_test(['"'], '""""',
183                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
184        self._write_test(['"'], '\\"',
185                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
186                         doublequote = False)
187        self._write_test(['"'], '\\"',
188                         escapechar='\\', quoting = csv.QUOTE_NONE)
189        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
190                         escapechar='\\', quoting = csv.QUOTE_NONE)
191
192    def test_writerows(self):
193        class BrokenFile:
194            def write(self, buf):
195                raise IOError
196        writer = csv.writer(BrokenFile())
197        self.assertRaises(IOError, writer.writerows, [['a']])
198        fd, name = tempfile.mkstemp()
199        fileobj = os.fdopen(fd, "w+b")
200        try:
201            writer = csv.writer(fileobj)
202            self.assertRaises(TypeError, writer.writerows, None)
203            writer.writerows([['a','b'],['c','d']])
204            fileobj.seek(0)
205            self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
206        finally:
207            fileobj.close()
208            os.unlink(name)
209
210    def _read_test(self, input, expect, **kwargs):
211        reader = csv.reader(input, **kwargs)
212        result = list(reader)
213        self.assertEqual(result, expect)
214
215    def test_read_oddinputs(self):
216        self._read_test([], [])
217        self._read_test([''], [[]])
218        self.assertRaises(csv.Error, self._read_test,
219                          ['"ab"c'], None, strict = 1)
220        # cannot handle null bytes for the moment
221        self.assertRaises(csv.Error, self._read_test,
222                          ['ab\0c'], None, strict = 1)
223        self._read_test(['"ab"c'], [['abc']], doublequote = 0)
224
225    def test_read_eol(self):
226        self._read_test(['a,b'], [['a','b']])
227        self._read_test(['a,b\n'], [['a','b']])
228        self._read_test(['a,b\r\n'], [['a','b']])
229        self._read_test(['a,b\r'], [['a','b']])
230        self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], [])
231        self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], [])
232        self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], [])
233
234    def test_read_escape(self):
235        self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')
236        self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\')
237        self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
238        self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')
239        self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
240        self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
241
242    def test_read_quoting(self):
243        self._read_test(['1,",3,",5'], [['1', ',3,', '5']])
244        self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
245                        quotechar=None, escapechar='\\')
246        self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
247                        quoting=csv.QUOTE_NONE, escapechar='\\')
248        # will this fail where locale uses comma for decimals?
249        self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
250                        quoting=csv.QUOTE_NONNUMERIC)
251        self._read_test(['"a\nb", 7'], [['a\nb', ' 7']])
252        self.assertRaises(ValueError, self._read_test,
253                          ['abc,3'], [[]],
254                          quoting=csv.QUOTE_NONNUMERIC)
255
256    def test_read_bigfield(self):
257        # This exercises the buffer realloc functionality and field size
258        # limits.
259        limit = csv.field_size_limit()
260        try:
261            size = 50000
262            bigstring = 'X' * size
263            bigline = '%s,%s' % (bigstring, bigstring)
264            self._read_test([bigline], [[bigstring, bigstring]])
265            csv.field_size_limit(size)
266            self._read_test([bigline], [[bigstring, bigstring]])
267            self.assertEqual(csv.field_size_limit(), size)
268            csv.field_size_limit(size-1)
269            self.assertRaises(csv.Error, self._read_test, [bigline], [])
270            self.assertRaises(TypeError, csv.field_size_limit, None)
271            self.assertRaises(TypeError, csv.field_size_limit, 1, None)
272        finally:
273            csv.field_size_limit(limit)
274
275    def test_read_linenum(self):
276        for r in (csv.reader(['line,1', 'line,2', 'line,3']),
277                  csv.DictReader(['line,1', 'line,2', 'line,3'],
278                                 fieldnames=['a', 'b', 'c'])):
279            self.assertEqual(r.line_num, 0)
280            r.next()
281            self.assertEqual(r.line_num, 1)
282            r.next()
283            self.assertEqual(r.line_num, 2)
284            r.next()
285            self.assertEqual(r.line_num, 3)
286            self.assertRaises(StopIteration, r.next)
287            self.assertEqual(r.line_num, 3)
288
289    def test_roundtrip_quoteed_newlines(self):
290        fd, name = tempfile.mkstemp()
291        fileobj = os.fdopen(fd, "w+b")
292        try:
293            writer = csv.writer(fileobj)
294            self.assertRaises(TypeError, writer.writerows, None)
295            rows = [['a\nb','b'],['c','x\r\nd']]
296            writer.writerows(rows)
297            fileobj.seek(0)
298            for i, row in enumerate(csv.reader(fileobj)):
299                self.assertEqual(row, rows[i])
300        finally:
301            fileobj.close()
302            os.unlink(name)
303
304class TestDialectRegistry(unittest.TestCase):
305    def test_registry_badargs(self):
306        self.assertRaises(TypeError, csv.list_dialects, None)
307        self.assertRaises(TypeError, csv.get_dialect)
308        self.assertRaises(csv.Error, csv.get_dialect, None)
309        self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
310        self.assertRaises(TypeError, csv.unregister_dialect)
311        self.assertRaises(csv.Error, csv.unregister_dialect, None)
312        self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
313        self.assertRaises(TypeError, csv.register_dialect, None)
314        self.assertRaises(TypeError, csv.register_dialect, None, None)
315        self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)
316        self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
317                          badargument=None)
318        self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
319                          quoting=None)
320        self.assertRaises(TypeError, csv.register_dialect, [])
321
322    def test_registry(self):
323        class myexceltsv(csv.excel):
324            delimiter = "\t"
325        name = "myexceltsv"
326        expected_dialects = csv.list_dialects() + [name]
327        expected_dialects.sort()
328        csv.register_dialect(name, myexceltsv)
329        self.addCleanup(csv.unregister_dialect, name)
330        self.assertEqual(csv.get_dialect(name).delimiter, '\t')
331        got_dialects = sorted(csv.list_dialects())
332        self.assertEqual(expected_dialects, got_dialects)
333
334    def test_register_kwargs(self):
335        name = 'fedcba'
336        csv.register_dialect(name, delimiter=';')
337        self.addCleanup(csv.unregister_dialect, name)
338        self.assertEqual(csv.get_dialect(name).delimiter, ';')
339        self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name)))
340
341    def test_incomplete_dialect(self):
342        class myexceltsv(csv.Dialect):
343            delimiter = "\t"
344        self.assertRaises(csv.Error, myexceltsv)
345
346    def test_space_dialect(self):
347        class space(csv.excel):
348            delimiter = " "
349            quoting = csv.QUOTE_NONE
350            escapechar = "\\"
351
352        fd, name = tempfile.mkstemp()
353        fileobj = os.fdopen(fd, "w+b")
354        try:
355            fileobj.write("abc def\nc1ccccc1 benzene\n")
356            fileobj.seek(0)
357            rdr = csv.reader(fileobj, dialect=space())
358            self.assertEqual(rdr.next(), ["abc", "def"])
359            self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
360        finally:
361            fileobj.close()
362            os.unlink(name)
363
364    def test_dialect_apply(self):
365        class testA(csv.excel):
366            delimiter = "\t"
367        class testB(csv.excel):
368            delimiter = ":"
369        class testC(csv.excel):
370            delimiter = "|"
371
372        csv.register_dialect('testC', testC)
373        try:
374            fd, name = tempfile.mkstemp()
375            fileobj = os.fdopen(fd, "w+b")
376            try:
377                writer = csv.writer(fileobj)
378                writer.writerow([1,2,3])
379                fileobj.seek(0)
380                self.assertEqual(fileobj.read(), "1,2,3\r\n")
381            finally:
382                fileobj.close()
383                os.unlink(name)
384
385            fd, name = tempfile.mkstemp()
386            fileobj = os.fdopen(fd, "w+b")
387            try:
388                writer = csv.writer(fileobj, testA)
389                writer.writerow([1,2,3])
390                fileobj.seek(0)
391                self.assertEqual(fileobj.read(), "1\t2\t3\r\n")
392            finally:
393                fileobj.close()
394                os.unlink(name)
395
396            fd, name = tempfile.mkstemp()
397            fileobj = os.fdopen(fd, "w+b")
398            try:
399                writer = csv.writer(fileobj, dialect=testB())
400                writer.writerow([1,2,3])
401                fileobj.seek(0)
402                self.assertEqual(fileobj.read(), "1:2:3\r\n")
403            finally:
404                fileobj.close()
405                os.unlink(name)
406
407            fd, name = tempfile.mkstemp()
408            fileobj = os.fdopen(fd, "w+b")
409            try:
410                writer = csv.writer(fileobj, dialect='testC')
411                writer.writerow([1,2,3])
412                fileobj.seek(0)
413                self.assertEqual(fileobj.read(), "1|2|3\r\n")
414            finally:
415                fileobj.close()
416                os.unlink(name)
417
418            fd, name = tempfile.mkstemp()
419            fileobj = os.fdopen(fd, "w+b")
420            try:
421                writer = csv.writer(fileobj, dialect=testA, delimiter=';')
422                writer.writerow([1,2,3])
423                fileobj.seek(0)
424                self.assertEqual(fileobj.read(), "1;2;3\r\n")
425            finally:
426                fileobj.close()
427                os.unlink(name)
428
429        finally:
430            csv.unregister_dialect('testC')
431
432    def test_bad_dialect(self):
433        # Unknown parameter
434        self.assertRaises(TypeError, csv.reader, [], bad_attr = 0)
435        # Bad values
436        self.assertRaises(TypeError, csv.reader, [], delimiter = None)
437        self.assertRaises(TypeError, csv.reader, [], quoting = -1)
438        self.assertRaises(TypeError, csv.reader, [], quoting = 100)
439
440class TestCsvBase(unittest.TestCase):
441    def readerAssertEqual(self, input, expected_result):
442        fd, name = tempfile.mkstemp()
443        fileobj = os.fdopen(fd, "w+b")
444        try:
445            fileobj.write(input)
446            fileobj.seek(0)
447            reader = csv.reader(fileobj, dialect = self.dialect)
448            fields = list(reader)
449            self.assertEqual(fields, expected_result)
450        finally:
451            fileobj.close()
452            os.unlink(name)
453
454    def writerAssertEqual(self, input, expected_result):
455        fd, name = tempfile.mkstemp()
456        fileobj = os.fdopen(fd, "w+b")
457        try:
458            writer = csv.writer(fileobj, dialect = self.dialect)
459            writer.writerows(input)
460            fileobj.seek(0)
461            self.assertEqual(fileobj.read(), expected_result)
462        finally:
463            fileobj.close()
464            os.unlink(name)
465
466class TestDialectExcel(TestCsvBase):
467    dialect = 'excel'
468
469    def test_single(self):
470        self.readerAssertEqual('abc', [['abc']])
471
472    def test_simple(self):
473        self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])
474
475    def test_blankline(self):
476        self.readerAssertEqual('', [])
477
478    def test_empty_fields(self):
479        self.readerAssertEqual(',', [['', '']])
480
481    def test_singlequoted(self):
482        self.readerAssertEqual('""', [['']])
483
484    def test_singlequoted_left_empty(self):
485        self.readerAssertEqual('"",', [['','']])
486
487    def test_singlequoted_right_empty(self):
488        self.readerAssertEqual(',""', [['','']])
489
490    def test_single_quoted_quote(self):
491        self.readerAssertEqual('""""', [['"']])
492
493    def test_quoted_quotes(self):
494        self.readerAssertEqual('""""""', [['""']])
495
496    def test_inline_quote(self):
497        self.readerAssertEqual('a""b', [['a""b']])
498
499    def test_inline_quotes(self):
500        self.readerAssertEqual('a"b"c', [['a"b"c']])
501
502    def test_quotes_and_more(self):
503        # Excel would never write a field containing '"a"b', but when
504        # reading one, it will return 'ab'.
505        self.readerAssertEqual('"a"b', [['ab']])
506
507    def test_lone_quote(self):
508        self.readerAssertEqual('a"b', [['a"b']])
509
510    def test_quote_and_quote(self):
511        # Excel would never write a field containing '"a" "b"', but when
512        # reading one, it will return 'a "b"'.
513        self.readerAssertEqual('"a" "b"', [['a "b"']])
514
515    def test_space_and_quote(self):
516        self.readerAssertEqual(' "a"', [[' "a"']])
517
518    def test_quoted(self):
519        self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',
520                               [['1', '2', '3',
521                                 'I think, therefore I am',
522                                 '5', '6']])
523
524    def test_quoted_quote(self):
525        self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',
526                               [['1', '2', '3',
527                                 '"I see," said the blind man',
528                                 'as he picked up his hammer and saw']])
529
530    def test_quoted_nl(self):
531        input = '''\
5321,2,3,"""I see,""
533said the blind man","as he picked up his
534hammer and saw"
5359,8,7,6'''
536        self.readerAssertEqual(input,
537                               [['1', '2', '3',
538                                   '"I see,"\nsaid the blind man',
539                                   'as he picked up his\nhammer and saw'],
540                                ['9','8','7','6']])
541
542    def test_dubious_quote(self):
543        self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])
544
545    def test_null(self):
546        self.writerAssertEqual([], '')
547
548    def test_single_writer(self):
549        self.writerAssertEqual([['abc']], 'abc\r\n')
550
551    def test_simple_writer(self):
552        self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')
553
554    def test_quotes(self):
555        self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')
556
557    def test_quote_fieldsep(self):
558        self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
559
560    def test_newlines(self):
561        self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')
562
563class EscapedExcel(csv.excel):
564    quoting = csv.QUOTE_NONE
565    escapechar = '\\'
566
567class TestEscapedExcel(TestCsvBase):
568    dialect = EscapedExcel()
569
570    def test_escape_fieldsep(self):
571        self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')
572
573    def test_read_escape_fieldsep(self):
574        self.readerAssertEqual('abc\\,def\r\n', [['abc,def']])
575
576class QuotedEscapedExcel(csv.excel):
577    quoting = csv.QUOTE_NONNUMERIC
578    escapechar = '\\'
579
580class TestQuotedEscapedExcel(TestCsvBase):
581    dialect = QuotedEscapedExcel()
582
583    def test_write_escape_fieldsep(self):
584        self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
585
586    def test_read_escape_fieldsep(self):
587        self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])
588
589class TestDictFields(unittest.TestCase):
590    ### "long" means the row is longer than the number of fieldnames
591    ### "short" means there are fewer elements in the row than fieldnames
592    def test_write_simple_dict(self):
593        fd, name = tempfile.mkstemp()
594        fileobj = io.open(fd, 'w+b')
595        try:
596            writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
597            writer.writeheader()
598            fileobj.seek(0)
599            self.assertEqual(fileobj.readline(), "f1,f2,f3\r\n")
600            writer.writerow({"f1": 10, "f3": "abc"})
601            fileobj.seek(0)
602            fileobj.readline() # header
603            self.assertEqual(fileobj.read(), "10,,abc\r\n")
604        finally:
605            fileobj.close()
606            os.unlink(name)
607
608    def test_write_no_fields(self):
609        fileobj = StringIO()
610        self.assertRaises(TypeError, csv.DictWriter, fileobj)
611
612    def test_read_dict_fields(self):
613        fd, name = tempfile.mkstemp()
614        fileobj = os.fdopen(fd, "w+b")
615        try:
616            fileobj.write("1,2,abc\r\n")
617            fileobj.seek(0)
618            reader = csv.DictReader(fileobj,
619                                    fieldnames=["f1", "f2", "f3"])
620            self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
621        finally:
622            fileobj.close()
623            os.unlink(name)
624
625    def test_read_dict_no_fieldnames(self):
626        fd, name = tempfile.mkstemp()
627        fileobj = os.fdopen(fd, "w+b")
628        try:
629            fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
630            fileobj.seek(0)
631            reader = csv.DictReader(fileobj)
632            self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
633            self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
634        finally:
635            fileobj.close()
636            os.unlink(name)
637
638    # Two test cases to make sure existing ways of implicitly setting
639    # fieldnames continue to work.  Both arise from discussion in issue3436.
640    def test_read_dict_fieldnames_from_file(self):
641        fd, name = tempfile.mkstemp()
642        f = os.fdopen(fd, "w+b")
643        try:
644            f.write("f1,f2,f3\r\n1,2,abc\r\n")
645            f.seek(0)
646            reader = csv.DictReader(f, fieldnames=csv.reader(f).next())
647            self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
648            self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
649        finally:
650            f.close()
651            os.unlink(name)
652
653    def test_read_dict_fieldnames_chain(self):
654        import itertools
655        fd, name = tempfile.mkstemp()
656        f = os.fdopen(fd, "w+b")
657        try:
658            f.write("f1,f2,f3\r\n1,2,abc\r\n")
659            f.seek(0)
660            reader = csv.DictReader(f)
661            first = next(reader)
662            for row in itertools.chain([first], reader):
663                self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
664                self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'})
665        finally:
666            f.close()
667            os.unlink(name)
668
669    def test_read_long(self):
670        fd, name = tempfile.mkstemp()
671        fileobj = os.fdopen(fd, "w+b")
672        try:
673            fileobj.write("1,2,abc,4,5,6\r\n")
674            fileobj.seek(0)
675            reader = csv.DictReader(fileobj,
676                                    fieldnames=["f1", "f2"])
677            self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
678                                             None: ["abc", "4", "5", "6"]})
679        finally:
680            fileobj.close()
681            os.unlink(name)
682
683    def test_read_long_with_rest(self):
684        fd, name = tempfile.mkstemp()
685        fileobj = os.fdopen(fd, "w+b")
686        try:
687            fileobj.write("1,2,abc,4,5,6\r\n")
688            fileobj.seek(0)
689            reader = csv.DictReader(fileobj,
690                                    fieldnames=["f1", "f2"], restkey="_rest")
691            self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
692                                             "_rest": ["abc", "4", "5", "6"]})
693        finally:
694            fileobj.close()
695            os.unlink(name)
696
697    def test_read_long_with_rest_no_fieldnames(self):
698        fd, name = tempfile.mkstemp()
699        fileobj = os.fdopen(fd, "w+b")
700        try:
701            fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
702            fileobj.seek(0)
703            reader = csv.DictReader(fileobj, restkey="_rest")
704            self.assertEqual(reader.fieldnames, ["f1", "f2"])
705            self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
706                                             "_rest": ["abc", "4", "5", "6"]})
707        finally:
708            fileobj.close()
709            os.unlink(name)
710
711    def test_read_short(self):
712        fd, name = tempfile.mkstemp()
713        fileobj = os.fdopen(fd, "w+b")
714        try:
715            fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
716            fileobj.seek(0)
717            reader = csv.DictReader(fileobj,
718                                    fieldnames="1 2 3 4 5 6".split(),
719                                    restval="DEFAULT")
720            self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
721                                             "4": '4', "5": '5', "6": '6'})
722            self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
723                                             "4": 'DEFAULT', "5": 'DEFAULT',
724                                             "6": 'DEFAULT'})
725        finally:
726            fileobj.close()
727            os.unlink(name)
728
729    def test_read_multi(self):
730        sample = [
731            '2147483648,43.0e12,17,abc,def\r\n',
732            '147483648,43.0e2,17,abc,def\r\n',
733            '47483648,43.0,170,abc,def\r\n'
734            ]
735
736        reader = csv.DictReader(sample,
737                                fieldnames="i1 float i2 s1 s2".split())
738        self.assertEqual(reader.next(), {"i1": '2147483648',
739                                         "float": '43.0e12',
740                                         "i2": '17',
741                                         "s1": 'abc',
742                                         "s2": 'def'})
743
744    def test_read_with_blanks(self):
745        reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n",
746                                 "1,2,abc,4,5,6\r\n"],
747                                fieldnames="1 2 3 4 5 6".split())
748        self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
749                                         "4": '4', "5": '5', "6": '6'})
750        self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
751                                         "4": '4', "5": '5', "6": '6'})
752
753    def test_read_semi_sep(self):
754        reader = csv.DictReader(["1;2;abc;4;5;6\r\n"],
755                                fieldnames="1 2 3 4 5 6".split(),
756                                delimiter=';')
757        self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
758                                         "4": '4', "5": '5', "6": '6'})
759
760class TestArrayWrites(unittest.TestCase):
761    def test_int_write(self):
762        import array
763        contents = [(20-i) for i in range(20)]
764        a = array.array('i', contents)
765
766        fd, name = tempfile.mkstemp()
767        fileobj = os.fdopen(fd, "w+b")
768        try:
769            writer = csv.writer(fileobj, dialect="excel")
770            writer.writerow(a)
771            expected = ",".join([str(i) for i in a])+"\r\n"
772            fileobj.seek(0)
773            self.assertEqual(fileobj.read(), expected)
774        finally:
775            fileobj.close()
776            os.unlink(name)
777
778    def test_double_write(self):
779        import array
780        contents = [(20-i)*0.1 for i in range(20)]
781        a = array.array('d', contents)
782        fd, name = tempfile.mkstemp()
783        fileobj = os.fdopen(fd, "w+b")
784        try:
785            writer = csv.writer(fileobj, dialect="excel")
786            writer.writerow(a)
787            expected = ",".join([str(i) for i in a])+"\r\n"
788            fileobj.seek(0)
789            self.assertEqual(fileobj.read(), expected)
790        finally:
791            fileobj.close()
792            os.unlink(name)
793
794    def test_float_write(self):
795        import array
796        contents = [(20-i)*0.1 for i in range(20)]
797        a = array.array('f', contents)
798        fd, name = tempfile.mkstemp()
799        fileobj = os.fdopen(fd, "w+b")
800        try:
801            writer = csv.writer(fileobj, dialect="excel")
802            writer.writerow(a)
803            expected = ",".join([str(i) for i in a])+"\r\n"
804            fileobj.seek(0)
805            self.assertEqual(fileobj.read(), expected)
806        finally:
807            fileobj.close()
808            os.unlink(name)
809
810    def test_char_write(self):
811        import array, string
812        a = array.array('c', string.letters)
813        fd, name = tempfile.mkstemp()
814        fileobj = os.fdopen(fd, "w+b")
815        try:
816            writer = csv.writer(fileobj, dialect="excel")
817            writer.writerow(a)
818            expected = ",".join(a)+"\r\n"
819            fileobj.seek(0)
820            self.assertEqual(fileobj.read(), expected)
821        finally:
822            fileobj.close()
823            os.unlink(name)
824
825class TestDialectValidity(unittest.TestCase):
826    def test_quoting(self):
827        class mydialect(csv.Dialect):
828            delimiter = ";"
829            escapechar = '\\'
830            doublequote = False
831            skipinitialspace = True
832            lineterminator = '\r\n'
833            quoting = csv.QUOTE_NONE
834        d = mydialect()
835
836        mydialect.quoting = None
837        self.assertRaises(csv.Error, mydialect)
838
839        mydialect.doublequote = True
840        mydialect.quoting = csv.QUOTE_ALL
841        mydialect.quotechar = '"'
842        d = mydialect()
843
844        mydialect.quotechar = "''"
845        self.assertRaises(csv.Error, mydialect)
846
847        mydialect.quotechar = 4
848        self.assertRaises(csv.Error, mydialect)
849
850    def test_delimiter(self):
851        class mydialect(csv.Dialect):
852            delimiter = ";"
853            escapechar = '\\'
854            doublequote = False
855            skipinitialspace = True
856            lineterminator = '\r\n'
857            quoting = csv.QUOTE_NONE
858        d = mydialect()
859
860        mydialect.delimiter = ":::"
861        self.assertRaises(csv.Error, mydialect)
862
863        mydialect.delimiter = 4
864        self.assertRaises(csv.Error, mydialect)
865
866    def test_lineterminator(self):
867        class mydialect(csv.Dialect):
868            delimiter = ";"
869            escapechar = '\\'
870            doublequote = False
871            skipinitialspace = True
872            lineterminator = '\r\n'
873            quoting = csv.QUOTE_NONE
874        d = mydialect()
875
876        mydialect.lineterminator = ":::"
877        d = mydialect()
878
879        mydialect.lineterminator = 4
880        self.assertRaises(csv.Error, mydialect)
881
882
883class TestSniffer(unittest.TestCase):
884    sample1 = """\
885Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes
886Shark City, Glendale Heights, IL, 12/28/02, Prezence
887Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow
888Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back
889"""
890    sample2 = """\
891'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'
892'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'
893'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'
894'Stonecutters ''Seafood'' and Chop House':'Lemont':'IL':'12/19/02':'Week Back'
895"""
896    header = '''\
897"venue","city","state","date","performers"
898'''
899    sample3 = '''\
90005/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
90105/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
90205/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
903'''
904
905    sample4 = '''\
9062147483648;43.0e12;17;abc;def
907147483648;43.0e2;17;abc;def
90847483648;43.0;170;abc;def
909'''
910
911    sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n"
912    sample6 = "a|b|c\r\nd|e|f\r\n"
913    sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n"
914
915    def test_has_header(self):
916        sniffer = csv.Sniffer()
917        self.assertEqual(sniffer.has_header(self.sample1), False)
918        self.assertEqual(sniffer.has_header(self.header+self.sample1), True)
919
920    def test_sniff(self):
921        sniffer = csv.Sniffer()
922        dialect = sniffer.sniff(self.sample1)
923        self.assertEqual(dialect.delimiter, ",")
924        self.assertEqual(dialect.quotechar, '"')
925        self.assertEqual(dialect.skipinitialspace, True)
926
927        dialect = sniffer.sniff(self.sample2)
928        self.assertEqual(dialect.delimiter, ":")
929        self.assertEqual(dialect.quotechar, "'")
930        self.assertEqual(dialect.skipinitialspace, False)
931
932    def test_delimiters(self):
933        sniffer = csv.Sniffer()
934        dialect = sniffer.sniff(self.sample3)
935        # given that all three lines in sample3 are equal,
936        # I think that any character could have been 'guessed' as the
937        # delimiter, depending on dictionary order
938        self.assertIn(dialect.delimiter, self.sample3)
939        dialect = sniffer.sniff(self.sample3, delimiters="?,")
940        self.assertEqual(dialect.delimiter, "?")
941        dialect = sniffer.sniff(self.sample3, delimiters="/,")
942        self.assertEqual(dialect.delimiter, "/")
943        dialect = sniffer.sniff(self.sample4)
944        self.assertEqual(dialect.delimiter, ";")
945        dialect = sniffer.sniff(self.sample5)
946        self.assertEqual(dialect.delimiter, "\t")
947        dialect = sniffer.sniff(self.sample6)
948        self.assertEqual(dialect.delimiter, "|")
949        dialect = sniffer.sniff(self.sample7)
950        self.assertEqual(dialect.delimiter, "|")
951        self.assertEqual(dialect.quotechar, "'")
952
953    def test_doublequote(self):
954        sniffer = csv.Sniffer()
955        dialect = sniffer.sniff(self.header)
956        self.assertFalse(dialect.doublequote)
957        dialect = sniffer.sniff(self.sample2)
958        self.assertTrue(dialect.doublequote)
959
960if not hasattr(sys, "gettotalrefcount"):
961    if test_support.verbose: print "*** skipping leakage tests ***"
962else:
963    class NUL:
964        def write(s, *args):
965            pass
966        writelines = write
967
968    class TestLeaks(unittest.TestCase):
969        def test_create_read(self):
970            delta = 0
971            lastrc = sys.gettotalrefcount()
972            for i in xrange(20):
973                gc.collect()
974                self.assertEqual(gc.garbage, [])
975                rc = sys.gettotalrefcount()
976                csv.reader(["a,b,c\r\n"])
977                csv.reader(["a,b,c\r\n"])
978                csv.reader(["a,b,c\r\n"])
979                delta = rc-lastrc
980                lastrc = rc
981            # if csv.reader() leaks, last delta should be 3 or more
982            self.assertEqual(delta < 3, True)
983
984        def test_create_write(self):
985            delta = 0
986            lastrc = sys.gettotalrefcount()
987            s = NUL()
988            for i in xrange(20):
989                gc.collect()
990                self.assertEqual(gc.garbage, [])
991                rc = sys.gettotalrefcount()
992                csv.writer(s)
993                csv.writer(s)
994                csv.writer(s)
995                delta = rc-lastrc
996                lastrc = rc
997            # if csv.writer() leaks, last delta should be 3 or more
998            self.assertEqual(delta < 3, True)
999
1000        def test_read(self):
1001            delta = 0
1002            rows = ["a,b,c\r\n"]*5
1003            lastrc = sys.gettotalrefcount()
1004            for i in xrange(20):
1005                gc.collect()
1006                self.assertEqual(gc.garbage, [])
1007                rc = sys.gettotalrefcount()
1008                rdr = csv.reader(rows)
1009                for row in rdr:
1010                    pass
1011                delta = rc-lastrc
1012                lastrc = rc
1013            # if reader leaks during read, delta should be 5 or more
1014            self.assertEqual(delta < 5, True)
1015
1016        def test_write(self):
1017            delta = 0
1018            rows = [[1,2,3]]*5
1019            s = NUL()
1020            lastrc = sys.gettotalrefcount()
1021            for i in xrange(20):
1022                gc.collect()
1023                self.assertEqual(gc.garbage, [])
1024                rc = sys.gettotalrefcount()
1025                writer = csv.writer(s)
1026                for row in rows:
1027                    writer.writerow(row)
1028                delta = rc-lastrc
1029                lastrc = rc
1030            # if writer leaks during write, last delta should be 5 or more
1031            self.assertEqual(delta < 5, True)
1032
1033# commented out for now - csv module doesn't yet support Unicode
1034## class TestUnicode(unittest.TestCase):
1035##     def test_unicode_read(self):
1036##         import codecs
1037##         f = codecs.EncodedFile(StringIO("Martin von L�wis,"
1038##                                         "Marc Andr� Lemburg,"
1039##                                         "Guido van Rossum,"
1040##                                         "Fran�ois Pinard\r\n"),
1041##                                data_encoding='iso-8859-1')
1042##         reader = csv.reader(f)
1043##         self.assertEqual(list(reader), [[u"Martin von L�wis",
1044##                                          u"Marc Andr� Lemburg",
1045##                                          u"Guido van Rossum",
1046##                                          u"Fran�ois Pinardn"]])
1047
1048def test_main():
1049    mod = sys.modules[__name__]
1050    test_support.run_unittest(
1051        *[getattr(mod, name) for name in dir(mod) if name.startswith('Test')]
1052    )
1053
1054if __name__ == '__main__':
1055    test_main()
1056