1# -*- coding: iso-8859-1 -*- 2# Copyright (C) 2001,2002 Python Software Foundation 3# csv package unit tests 4 5import sys 6import os 7import unittest 8from StringIO import StringIO 9import tempfile 10import csv 11import gc 12import io 13from test import test_support 14 15class Test_Csv(unittest.TestCase): 16 """ 17 Test the underlying C csv parser in ways that are not appropriate 18 from the high level interface. Further tests of this nature are done 19 in TestDialectRegistry. 20 """ 21 def _test_arg_valid(self, ctor, arg): 22 self.assertRaises(TypeError, ctor) 23 self.assertRaises(TypeError, ctor, None) 24 self.assertRaises(TypeError, ctor, arg, bad_attr = 0) 25 self.assertRaises(TypeError, ctor, arg, delimiter = 0) 26 self.assertRaises(TypeError, ctor, arg, delimiter = 'XX') 27 self.assertRaises(csv.Error, ctor, arg, 'foo') 28 self.assertRaises(TypeError, ctor, arg, delimiter=None) 29 self.assertRaises(TypeError, ctor, arg, delimiter=1) 30 self.assertRaises(TypeError, ctor, arg, quotechar=1) 31 self.assertRaises(TypeError, ctor, arg, lineterminator=None) 32 self.assertRaises(TypeError, ctor, arg, lineterminator=1) 33 self.assertRaises(TypeError, ctor, arg, quoting=None) 34 self.assertRaises(TypeError, ctor, arg, 35 quoting=csv.QUOTE_ALL, quotechar='') 36 self.assertRaises(TypeError, ctor, arg, 37 quoting=csv.QUOTE_ALL, quotechar=None) 38 39 def test_reader_arg_valid(self): 40 self._test_arg_valid(csv.reader, []) 41 42 def test_writer_arg_valid(self): 43 self._test_arg_valid(csv.writer, StringIO()) 44 45 def _test_default_attrs(self, ctor, *args): 46 obj = ctor(*args) 47 # Check defaults 48 self.assertEqual(obj.dialect.delimiter, ',') 49 self.assertEqual(obj.dialect.doublequote, True) 50 self.assertEqual(obj.dialect.escapechar, None) 51 self.assertEqual(obj.dialect.lineterminator, "\r\n") 52 self.assertEqual(obj.dialect.quotechar, '"') 53 self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL) 54 self.assertEqual(obj.dialect.skipinitialspace, False) 55 self.assertEqual(obj.dialect.strict, False) 56 # Try deleting or changing attributes (they are read-only) 57 self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter') 58 self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':') 59 self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting') 60 self.assertRaises(AttributeError, setattr, obj.dialect, 61 'quoting', None) 62 63 def test_reader_attrs(self): 64 self._test_default_attrs(csv.reader, []) 65 66 def test_writer_attrs(self): 67 self._test_default_attrs(csv.writer, StringIO()) 68 69 def _test_kw_attrs(self, ctor, *args): 70 # Now try with alternate options 71 kwargs = dict(delimiter=':', doublequote=False, escapechar='\\', 72 lineterminator='\r', quotechar='*', 73 quoting=csv.QUOTE_NONE, skipinitialspace=True, 74 strict=True) 75 obj = ctor(*args, **kwargs) 76 self.assertEqual(obj.dialect.delimiter, ':') 77 self.assertEqual(obj.dialect.doublequote, False) 78 self.assertEqual(obj.dialect.escapechar, '\\') 79 self.assertEqual(obj.dialect.lineterminator, "\r") 80 self.assertEqual(obj.dialect.quotechar, '*') 81 self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE) 82 self.assertEqual(obj.dialect.skipinitialspace, True) 83 self.assertEqual(obj.dialect.strict, True) 84 85 def test_reader_kw_attrs(self): 86 self._test_kw_attrs(csv.reader, []) 87 88 def test_writer_kw_attrs(self): 89 self._test_kw_attrs(csv.writer, StringIO()) 90 91 def _test_dialect_attrs(self, ctor, *args): 92 # Now try with dialect-derived options 93 class dialect: 94 delimiter='-' 95 doublequote=False 96 escapechar='^' 97 lineterminator='$' 98 quotechar='#' 99 quoting=csv.QUOTE_ALL 100 skipinitialspace=True 101 strict=False 102 args = args + (dialect,) 103 obj = ctor(*args) 104 self.assertEqual(obj.dialect.delimiter, '-') 105 self.assertEqual(obj.dialect.doublequote, False) 106 self.assertEqual(obj.dialect.escapechar, '^') 107 self.assertEqual(obj.dialect.lineterminator, "$") 108 self.assertEqual(obj.dialect.quotechar, '#') 109 self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL) 110 self.assertEqual(obj.dialect.skipinitialspace, True) 111 self.assertEqual(obj.dialect.strict, False) 112 113 def test_reader_dialect_attrs(self): 114 self._test_dialect_attrs(csv.reader, []) 115 116 def test_writer_dialect_attrs(self): 117 self._test_dialect_attrs(csv.writer, StringIO()) 118 119 120 def _write_test(self, fields, expect, **kwargs): 121 fd, name = tempfile.mkstemp() 122 fileobj = os.fdopen(fd, "w+b") 123 try: 124 writer = csv.writer(fileobj, **kwargs) 125 writer.writerow(fields) 126 fileobj.seek(0) 127 self.assertEqual(fileobj.read(), 128 expect + writer.dialect.lineterminator) 129 finally: 130 fileobj.close() 131 os.unlink(name) 132 133 def test_write_arg_valid(self): 134 self.assertRaises(csv.Error, self._write_test, None, '') 135 self._write_test((), '') 136 self._write_test([None], '""') 137 self.assertRaises(csv.Error, self._write_test, 138 [None], None, quoting = csv.QUOTE_NONE) 139 # Check that exceptions are passed up the chain 140 class BadList: 141 def __len__(self): 142 return 10; 143 def __getitem__(self, i): 144 if i > 2: 145 raise IOError 146 self.assertRaises(IOError, self._write_test, BadList(), '') 147 class BadItem: 148 def __str__(self): 149 raise IOError 150 self.assertRaises(IOError, self._write_test, [BadItem()], '') 151 152 def test_write_bigfield(self): 153 # This exercises the buffer realloc functionality 154 bigstring = 'X' * 50000 155 self._write_test([bigstring,bigstring], '%s,%s' % \ 156 (bigstring, bigstring)) 157 158 def test_write_quoting(self): 159 self._write_test(['a',1,'p,q'], 'a,1,"p,q"') 160 self.assertRaises(csv.Error, 161 self._write_test, 162 ['a',1,'p,q'], 'a,1,p,q', 163 quoting = csv.QUOTE_NONE) 164 self._write_test(['a',1,'p,q'], 'a,1,"p,q"', 165 quoting = csv.QUOTE_MINIMAL) 166 self._write_test(['a',1,'p,q'], '"a",1,"p,q"', 167 quoting = csv.QUOTE_NONNUMERIC) 168 self._write_test(['a',1,'p,q'], '"a","1","p,q"', 169 quoting = csv.QUOTE_ALL) 170 self._write_test(['a\nb',1], '"a\nb","1"', 171 quoting = csv.QUOTE_ALL) 172 173 def test_write_escape(self): 174 self._write_test(['a',1,'p,q'], 'a,1,"p,q"', 175 escapechar='\\') 176 self.assertRaises(csv.Error, 177 self._write_test, 178 ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""', 179 escapechar=None, doublequote=False) 180 self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""', 181 escapechar='\\', doublequote = False) 182 self._write_test(['"'], '""""', 183 escapechar='\\', quoting = csv.QUOTE_MINIMAL) 184 self._write_test(['"'], '\\"', 185 escapechar='\\', quoting = csv.QUOTE_MINIMAL, 186 doublequote = False) 187 self._write_test(['"'], '\\"', 188 escapechar='\\', quoting = csv.QUOTE_NONE) 189 self._write_test(['a',1,'p,q'], 'a,1,p\\,q', 190 escapechar='\\', quoting = csv.QUOTE_NONE) 191 192 def test_writerows(self): 193 class BrokenFile: 194 def write(self, buf): 195 raise IOError 196 writer = csv.writer(BrokenFile()) 197 self.assertRaises(IOError, writer.writerows, [['a']]) 198 fd, name = tempfile.mkstemp() 199 fileobj = os.fdopen(fd, "w+b") 200 try: 201 writer = csv.writer(fileobj) 202 self.assertRaises(TypeError, writer.writerows, None) 203 writer.writerows([['a','b'],['c','d']]) 204 fileobj.seek(0) 205 self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n") 206 finally: 207 fileobj.close() 208 os.unlink(name) 209 210 def _read_test(self, input, expect, **kwargs): 211 reader = csv.reader(input, **kwargs) 212 result = list(reader) 213 self.assertEqual(result, expect) 214 215 def test_read_oddinputs(self): 216 self._read_test([], []) 217 self._read_test([''], [[]]) 218 self.assertRaises(csv.Error, self._read_test, 219 ['"ab"c'], None, strict = 1) 220 # cannot handle null bytes for the moment 221 self.assertRaises(csv.Error, self._read_test, 222 ['ab\0c'], None, strict = 1) 223 self._read_test(['"ab"c'], [['abc']], doublequote = 0) 224 225 def test_read_eol(self): 226 self._read_test(['a,b'], [['a','b']]) 227 self._read_test(['a,b\n'], [['a','b']]) 228 self._read_test(['a,b\r\n'], [['a','b']]) 229 self._read_test(['a,b\r'], [['a','b']]) 230 self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], []) 231 self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], []) 232 self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], []) 233 234 def test_read_escape(self): 235 self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\') 236 self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\') 237 self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\') 238 self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\') 239 self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\') 240 self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\') 241 242 def test_read_quoting(self): 243 self._read_test(['1,",3,",5'], [['1', ',3,', '5']]) 244 self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']], 245 quotechar=None, escapechar='\\') 246 self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']], 247 quoting=csv.QUOTE_NONE, escapechar='\\') 248 # will this fail where locale uses comma for decimals? 249 self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]], 250 quoting=csv.QUOTE_NONNUMERIC) 251 self._read_test(['"a\nb", 7'], [['a\nb', ' 7']]) 252 self.assertRaises(ValueError, self._read_test, 253 ['abc,3'], [[]], 254 quoting=csv.QUOTE_NONNUMERIC) 255 256 def test_read_bigfield(self): 257 # This exercises the buffer realloc functionality and field size 258 # limits. 259 limit = csv.field_size_limit() 260 try: 261 size = 50000 262 bigstring = 'X' * size 263 bigline = '%s,%s' % (bigstring, bigstring) 264 self._read_test([bigline], [[bigstring, bigstring]]) 265 csv.field_size_limit(size) 266 self._read_test([bigline], [[bigstring, bigstring]]) 267 self.assertEqual(csv.field_size_limit(), size) 268 csv.field_size_limit(size-1) 269 self.assertRaises(csv.Error, self._read_test, [bigline], []) 270 self.assertRaises(TypeError, csv.field_size_limit, None) 271 self.assertRaises(TypeError, csv.field_size_limit, 1, None) 272 finally: 273 csv.field_size_limit(limit) 274 275 def test_read_linenum(self): 276 for r in (csv.reader(['line,1', 'line,2', 'line,3']), 277 csv.DictReader(['line,1', 'line,2', 'line,3'], 278 fieldnames=['a', 'b', 'c'])): 279 self.assertEqual(r.line_num, 0) 280 r.next() 281 self.assertEqual(r.line_num, 1) 282 r.next() 283 self.assertEqual(r.line_num, 2) 284 r.next() 285 self.assertEqual(r.line_num, 3) 286 self.assertRaises(StopIteration, r.next) 287 self.assertEqual(r.line_num, 3) 288 289 def test_roundtrip_quoteed_newlines(self): 290 fd, name = tempfile.mkstemp() 291 fileobj = os.fdopen(fd, "w+b") 292 try: 293 writer = csv.writer(fileobj) 294 self.assertRaises(TypeError, writer.writerows, None) 295 rows = [['a\nb','b'],['c','x\r\nd']] 296 writer.writerows(rows) 297 fileobj.seek(0) 298 for i, row in enumerate(csv.reader(fileobj)): 299 self.assertEqual(row, rows[i]) 300 finally: 301 fileobj.close() 302 os.unlink(name) 303 304class TestDialectRegistry(unittest.TestCase): 305 def test_registry_badargs(self): 306 self.assertRaises(TypeError, csv.list_dialects, None) 307 self.assertRaises(TypeError, csv.get_dialect) 308 self.assertRaises(csv.Error, csv.get_dialect, None) 309 self.assertRaises(csv.Error, csv.get_dialect, "nonesuch") 310 self.assertRaises(TypeError, csv.unregister_dialect) 311 self.assertRaises(csv.Error, csv.unregister_dialect, None) 312 self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch") 313 self.assertRaises(TypeError, csv.register_dialect, None) 314 self.assertRaises(TypeError, csv.register_dialect, None, None) 315 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0) 316 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 317 badargument=None) 318 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 319 quoting=None) 320 self.assertRaises(TypeError, csv.register_dialect, []) 321 322 def test_registry(self): 323 class myexceltsv(csv.excel): 324 delimiter = "\t" 325 name = "myexceltsv" 326 expected_dialects = csv.list_dialects() + [name] 327 expected_dialects.sort() 328 csv.register_dialect(name, myexceltsv) 329 self.addCleanup(csv.unregister_dialect, name) 330 self.assertEqual(csv.get_dialect(name).delimiter, '\t') 331 got_dialects = sorted(csv.list_dialects()) 332 self.assertEqual(expected_dialects, got_dialects) 333 334 def test_register_kwargs(self): 335 name = 'fedcba' 336 csv.register_dialect(name, delimiter=';') 337 self.addCleanup(csv.unregister_dialect, name) 338 self.assertEqual(csv.get_dialect(name).delimiter, ';') 339 self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name))) 340 341 def test_incomplete_dialect(self): 342 class myexceltsv(csv.Dialect): 343 delimiter = "\t" 344 self.assertRaises(csv.Error, myexceltsv) 345 346 def test_space_dialect(self): 347 class space(csv.excel): 348 delimiter = " " 349 quoting = csv.QUOTE_NONE 350 escapechar = "\\" 351 352 fd, name = tempfile.mkstemp() 353 fileobj = os.fdopen(fd, "w+b") 354 try: 355 fileobj.write("abc def\nc1ccccc1 benzene\n") 356 fileobj.seek(0) 357 rdr = csv.reader(fileobj, dialect=space()) 358 self.assertEqual(rdr.next(), ["abc", "def"]) 359 self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"]) 360 finally: 361 fileobj.close() 362 os.unlink(name) 363 364 def test_dialect_apply(self): 365 class testA(csv.excel): 366 delimiter = "\t" 367 class testB(csv.excel): 368 delimiter = ":" 369 class testC(csv.excel): 370 delimiter = "|" 371 372 csv.register_dialect('testC', testC) 373 try: 374 fd, name = tempfile.mkstemp() 375 fileobj = os.fdopen(fd, "w+b") 376 try: 377 writer = csv.writer(fileobj) 378 writer.writerow([1,2,3]) 379 fileobj.seek(0) 380 self.assertEqual(fileobj.read(), "1,2,3\r\n") 381 finally: 382 fileobj.close() 383 os.unlink(name) 384 385 fd, name = tempfile.mkstemp() 386 fileobj = os.fdopen(fd, "w+b") 387 try: 388 writer = csv.writer(fileobj, testA) 389 writer.writerow([1,2,3]) 390 fileobj.seek(0) 391 self.assertEqual(fileobj.read(), "1\t2\t3\r\n") 392 finally: 393 fileobj.close() 394 os.unlink(name) 395 396 fd, name = tempfile.mkstemp() 397 fileobj = os.fdopen(fd, "w+b") 398 try: 399 writer = csv.writer(fileobj, dialect=testB()) 400 writer.writerow([1,2,3]) 401 fileobj.seek(0) 402 self.assertEqual(fileobj.read(), "1:2:3\r\n") 403 finally: 404 fileobj.close() 405 os.unlink(name) 406 407 fd, name = tempfile.mkstemp() 408 fileobj = os.fdopen(fd, "w+b") 409 try: 410 writer = csv.writer(fileobj, dialect='testC') 411 writer.writerow([1,2,3]) 412 fileobj.seek(0) 413 self.assertEqual(fileobj.read(), "1|2|3\r\n") 414 finally: 415 fileobj.close() 416 os.unlink(name) 417 418 fd, name = tempfile.mkstemp() 419 fileobj = os.fdopen(fd, "w+b") 420 try: 421 writer = csv.writer(fileobj, dialect=testA, delimiter=';') 422 writer.writerow([1,2,3]) 423 fileobj.seek(0) 424 self.assertEqual(fileobj.read(), "1;2;3\r\n") 425 finally: 426 fileobj.close() 427 os.unlink(name) 428 429 finally: 430 csv.unregister_dialect('testC') 431 432 def test_bad_dialect(self): 433 # Unknown parameter 434 self.assertRaises(TypeError, csv.reader, [], bad_attr = 0) 435 # Bad values 436 self.assertRaises(TypeError, csv.reader, [], delimiter = None) 437 self.assertRaises(TypeError, csv.reader, [], quoting = -1) 438 self.assertRaises(TypeError, csv.reader, [], quoting = 100) 439 440class TestCsvBase(unittest.TestCase): 441 def readerAssertEqual(self, input, expected_result): 442 fd, name = tempfile.mkstemp() 443 fileobj = os.fdopen(fd, "w+b") 444 try: 445 fileobj.write(input) 446 fileobj.seek(0) 447 reader = csv.reader(fileobj, dialect = self.dialect) 448 fields = list(reader) 449 self.assertEqual(fields, expected_result) 450 finally: 451 fileobj.close() 452 os.unlink(name) 453 454 def writerAssertEqual(self, input, expected_result): 455 fd, name = tempfile.mkstemp() 456 fileobj = os.fdopen(fd, "w+b") 457 try: 458 writer = csv.writer(fileobj, dialect = self.dialect) 459 writer.writerows(input) 460 fileobj.seek(0) 461 self.assertEqual(fileobj.read(), expected_result) 462 finally: 463 fileobj.close() 464 os.unlink(name) 465 466class TestDialectExcel(TestCsvBase): 467 dialect = 'excel' 468 469 def test_single(self): 470 self.readerAssertEqual('abc', [['abc']]) 471 472 def test_simple(self): 473 self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']]) 474 475 def test_blankline(self): 476 self.readerAssertEqual('', []) 477 478 def test_empty_fields(self): 479 self.readerAssertEqual(',', [['', '']]) 480 481 def test_singlequoted(self): 482 self.readerAssertEqual('""', [['']]) 483 484 def test_singlequoted_left_empty(self): 485 self.readerAssertEqual('"",', [['','']]) 486 487 def test_singlequoted_right_empty(self): 488 self.readerAssertEqual(',""', [['','']]) 489 490 def test_single_quoted_quote(self): 491 self.readerAssertEqual('""""', [['"']]) 492 493 def test_quoted_quotes(self): 494 self.readerAssertEqual('""""""', [['""']]) 495 496 def test_inline_quote(self): 497 self.readerAssertEqual('a""b', [['a""b']]) 498 499 def test_inline_quotes(self): 500 self.readerAssertEqual('a"b"c', [['a"b"c']]) 501 502 def test_quotes_and_more(self): 503 # Excel would never write a field containing '"a"b', but when 504 # reading one, it will return 'ab'. 505 self.readerAssertEqual('"a"b', [['ab']]) 506 507 def test_lone_quote(self): 508 self.readerAssertEqual('a"b', [['a"b']]) 509 510 def test_quote_and_quote(self): 511 # Excel would never write a field containing '"a" "b"', but when 512 # reading one, it will return 'a "b"'. 513 self.readerAssertEqual('"a" "b"', [['a "b"']]) 514 515 def test_space_and_quote(self): 516 self.readerAssertEqual(' "a"', [[' "a"']]) 517 518 def test_quoted(self): 519 self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6', 520 [['1', '2', '3', 521 'I think, therefore I am', 522 '5', '6']]) 523 524 def test_quoted_quote(self): 525 self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"', 526 [['1', '2', '3', 527 '"I see," said the blind man', 528 'as he picked up his hammer and saw']]) 529 530 def test_quoted_nl(self): 531 input = '''\ 5321,2,3,"""I see,"" 533said the blind man","as he picked up his 534hammer and saw" 5359,8,7,6''' 536 self.readerAssertEqual(input, 537 [['1', '2', '3', 538 '"I see,"\nsaid the blind man', 539 'as he picked up his\nhammer and saw'], 540 ['9','8','7','6']]) 541 542 def test_dubious_quote(self): 543 self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']]) 544 545 def test_null(self): 546 self.writerAssertEqual([], '') 547 548 def test_single_writer(self): 549 self.writerAssertEqual([['abc']], 'abc\r\n') 550 551 def test_simple_writer(self): 552 self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n') 553 554 def test_quotes(self): 555 self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n') 556 557 def test_quote_fieldsep(self): 558 self.writerAssertEqual([['abc,def']], '"abc,def"\r\n') 559 560 def test_newlines(self): 561 self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n') 562 563class EscapedExcel(csv.excel): 564 quoting = csv.QUOTE_NONE 565 escapechar = '\\' 566 567class TestEscapedExcel(TestCsvBase): 568 dialect = EscapedExcel() 569 570 def test_escape_fieldsep(self): 571 self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n') 572 573 def test_read_escape_fieldsep(self): 574 self.readerAssertEqual('abc\\,def\r\n', [['abc,def']]) 575 576class QuotedEscapedExcel(csv.excel): 577 quoting = csv.QUOTE_NONNUMERIC 578 escapechar = '\\' 579 580class TestQuotedEscapedExcel(TestCsvBase): 581 dialect = QuotedEscapedExcel() 582 583 def test_write_escape_fieldsep(self): 584 self.writerAssertEqual([['abc,def']], '"abc,def"\r\n') 585 586 def test_read_escape_fieldsep(self): 587 self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']]) 588 589class TestDictFields(unittest.TestCase): 590 ### "long" means the row is longer than the number of fieldnames 591 ### "short" means there are fewer elements in the row than fieldnames 592 def test_write_simple_dict(self): 593 fd, name = tempfile.mkstemp() 594 fileobj = io.open(fd, 'w+b') 595 try: 596 writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"]) 597 writer.writeheader() 598 fileobj.seek(0) 599 self.assertEqual(fileobj.readline(), "f1,f2,f3\r\n") 600 writer.writerow({"f1": 10, "f3": "abc"}) 601 fileobj.seek(0) 602 fileobj.readline() # header 603 self.assertEqual(fileobj.read(), "10,,abc\r\n") 604 finally: 605 fileobj.close() 606 os.unlink(name) 607 608 def test_write_no_fields(self): 609 fileobj = StringIO() 610 self.assertRaises(TypeError, csv.DictWriter, fileobj) 611 612 def test_read_dict_fields(self): 613 fd, name = tempfile.mkstemp() 614 fileobj = os.fdopen(fd, "w+b") 615 try: 616 fileobj.write("1,2,abc\r\n") 617 fileobj.seek(0) 618 reader = csv.DictReader(fileobj, 619 fieldnames=["f1", "f2", "f3"]) 620 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) 621 finally: 622 fileobj.close() 623 os.unlink(name) 624 625 def test_read_dict_no_fieldnames(self): 626 fd, name = tempfile.mkstemp() 627 fileobj = os.fdopen(fd, "w+b") 628 try: 629 fileobj.write("f1,f2,f3\r\n1,2,abc\r\n") 630 fileobj.seek(0) 631 reader = csv.DictReader(fileobj) 632 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) 633 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) 634 finally: 635 fileobj.close() 636 os.unlink(name) 637 638 # Two test cases to make sure existing ways of implicitly setting 639 # fieldnames continue to work. Both arise from discussion in issue3436. 640 def test_read_dict_fieldnames_from_file(self): 641 fd, name = tempfile.mkstemp() 642 f = os.fdopen(fd, "w+b") 643 try: 644 f.write("f1,f2,f3\r\n1,2,abc\r\n") 645 f.seek(0) 646 reader = csv.DictReader(f, fieldnames=csv.reader(f).next()) 647 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) 648 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) 649 finally: 650 f.close() 651 os.unlink(name) 652 653 def test_read_dict_fieldnames_chain(self): 654 import itertools 655 fd, name = tempfile.mkstemp() 656 f = os.fdopen(fd, "w+b") 657 try: 658 f.write("f1,f2,f3\r\n1,2,abc\r\n") 659 f.seek(0) 660 reader = csv.DictReader(f) 661 first = next(reader) 662 for row in itertools.chain([first], reader): 663 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) 664 self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'}) 665 finally: 666 f.close() 667 os.unlink(name) 668 669 def test_read_long(self): 670 fd, name = tempfile.mkstemp() 671 fileobj = os.fdopen(fd, "w+b") 672 try: 673 fileobj.write("1,2,abc,4,5,6\r\n") 674 fileobj.seek(0) 675 reader = csv.DictReader(fileobj, 676 fieldnames=["f1", "f2"]) 677 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', 678 None: ["abc", "4", "5", "6"]}) 679 finally: 680 fileobj.close() 681 os.unlink(name) 682 683 def test_read_long_with_rest(self): 684 fd, name = tempfile.mkstemp() 685 fileobj = os.fdopen(fd, "w+b") 686 try: 687 fileobj.write("1,2,abc,4,5,6\r\n") 688 fileobj.seek(0) 689 reader = csv.DictReader(fileobj, 690 fieldnames=["f1", "f2"], restkey="_rest") 691 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', 692 "_rest": ["abc", "4", "5", "6"]}) 693 finally: 694 fileobj.close() 695 os.unlink(name) 696 697 def test_read_long_with_rest_no_fieldnames(self): 698 fd, name = tempfile.mkstemp() 699 fileobj = os.fdopen(fd, "w+b") 700 try: 701 fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n") 702 fileobj.seek(0) 703 reader = csv.DictReader(fileobj, restkey="_rest") 704 self.assertEqual(reader.fieldnames, ["f1", "f2"]) 705 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', 706 "_rest": ["abc", "4", "5", "6"]}) 707 finally: 708 fileobj.close() 709 os.unlink(name) 710 711 def test_read_short(self): 712 fd, name = tempfile.mkstemp() 713 fileobj = os.fdopen(fd, "w+b") 714 try: 715 fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n") 716 fileobj.seek(0) 717 reader = csv.DictReader(fileobj, 718 fieldnames="1 2 3 4 5 6".split(), 719 restval="DEFAULT") 720 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 721 "4": '4', "5": '5', "6": '6'}) 722 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 723 "4": 'DEFAULT', "5": 'DEFAULT', 724 "6": 'DEFAULT'}) 725 finally: 726 fileobj.close() 727 os.unlink(name) 728 729 def test_read_multi(self): 730 sample = [ 731 '2147483648,43.0e12,17,abc,def\r\n', 732 '147483648,43.0e2,17,abc,def\r\n', 733 '47483648,43.0,170,abc,def\r\n' 734 ] 735 736 reader = csv.DictReader(sample, 737 fieldnames="i1 float i2 s1 s2".split()) 738 self.assertEqual(reader.next(), {"i1": '2147483648', 739 "float": '43.0e12', 740 "i2": '17', 741 "s1": 'abc', 742 "s2": 'def'}) 743 744 def test_read_with_blanks(self): 745 reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n", 746 "1,2,abc,4,5,6\r\n"], 747 fieldnames="1 2 3 4 5 6".split()) 748 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 749 "4": '4', "5": '5', "6": '6'}) 750 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 751 "4": '4', "5": '5', "6": '6'}) 752 753 def test_read_semi_sep(self): 754 reader = csv.DictReader(["1;2;abc;4;5;6\r\n"], 755 fieldnames="1 2 3 4 5 6".split(), 756 delimiter=';') 757 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 758 "4": '4', "5": '5', "6": '6'}) 759 760class TestArrayWrites(unittest.TestCase): 761 def test_int_write(self): 762 import array 763 contents = [(20-i) for i in range(20)] 764 a = array.array('i', contents) 765 766 fd, name = tempfile.mkstemp() 767 fileobj = os.fdopen(fd, "w+b") 768 try: 769 writer = csv.writer(fileobj, dialect="excel") 770 writer.writerow(a) 771 expected = ",".join([str(i) for i in a])+"\r\n" 772 fileobj.seek(0) 773 self.assertEqual(fileobj.read(), expected) 774 finally: 775 fileobj.close() 776 os.unlink(name) 777 778 def test_double_write(self): 779 import array 780 contents = [(20-i)*0.1 for i in range(20)] 781 a = array.array('d', contents) 782 fd, name = tempfile.mkstemp() 783 fileobj = os.fdopen(fd, "w+b") 784 try: 785 writer = csv.writer(fileobj, dialect="excel") 786 writer.writerow(a) 787 expected = ",".join([str(i) for i in a])+"\r\n" 788 fileobj.seek(0) 789 self.assertEqual(fileobj.read(), expected) 790 finally: 791 fileobj.close() 792 os.unlink(name) 793 794 def test_float_write(self): 795 import array 796 contents = [(20-i)*0.1 for i in range(20)] 797 a = array.array('f', contents) 798 fd, name = tempfile.mkstemp() 799 fileobj = os.fdopen(fd, "w+b") 800 try: 801 writer = csv.writer(fileobj, dialect="excel") 802 writer.writerow(a) 803 expected = ",".join([str(i) for i in a])+"\r\n" 804 fileobj.seek(0) 805 self.assertEqual(fileobj.read(), expected) 806 finally: 807 fileobj.close() 808 os.unlink(name) 809 810 def test_char_write(self): 811 import array, string 812 a = array.array('c', string.letters) 813 fd, name = tempfile.mkstemp() 814 fileobj = os.fdopen(fd, "w+b") 815 try: 816 writer = csv.writer(fileobj, dialect="excel") 817 writer.writerow(a) 818 expected = ",".join(a)+"\r\n" 819 fileobj.seek(0) 820 self.assertEqual(fileobj.read(), expected) 821 finally: 822 fileobj.close() 823 os.unlink(name) 824 825class TestDialectValidity(unittest.TestCase): 826 def test_quoting(self): 827 class mydialect(csv.Dialect): 828 delimiter = ";" 829 escapechar = '\\' 830 doublequote = False 831 skipinitialspace = True 832 lineterminator = '\r\n' 833 quoting = csv.QUOTE_NONE 834 d = mydialect() 835 836 mydialect.quoting = None 837 self.assertRaises(csv.Error, mydialect) 838 839 mydialect.doublequote = True 840 mydialect.quoting = csv.QUOTE_ALL 841 mydialect.quotechar = '"' 842 d = mydialect() 843 844 mydialect.quotechar = "''" 845 self.assertRaises(csv.Error, mydialect) 846 847 mydialect.quotechar = 4 848 self.assertRaises(csv.Error, mydialect) 849 850 def test_delimiter(self): 851 class mydialect(csv.Dialect): 852 delimiter = ";" 853 escapechar = '\\' 854 doublequote = False 855 skipinitialspace = True 856 lineterminator = '\r\n' 857 quoting = csv.QUOTE_NONE 858 d = mydialect() 859 860 mydialect.delimiter = ":::" 861 self.assertRaises(csv.Error, mydialect) 862 863 mydialect.delimiter = 4 864 self.assertRaises(csv.Error, mydialect) 865 866 def test_lineterminator(self): 867 class mydialect(csv.Dialect): 868 delimiter = ";" 869 escapechar = '\\' 870 doublequote = False 871 skipinitialspace = True 872 lineterminator = '\r\n' 873 quoting = csv.QUOTE_NONE 874 d = mydialect() 875 876 mydialect.lineterminator = ":::" 877 d = mydialect() 878 879 mydialect.lineterminator = 4 880 self.assertRaises(csv.Error, mydialect) 881 882 883class TestSniffer(unittest.TestCase): 884 sample1 = """\ 885Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes 886Shark City, Glendale Heights, IL, 12/28/02, Prezence 887Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow 888Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back 889""" 890 sample2 = """\ 891'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes' 892'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence' 893'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow' 894'Stonecutters ''Seafood'' and Chop House':'Lemont':'IL':'12/19/02':'Week Back' 895""" 896 header = '''\ 897"venue","city","state","date","performers" 898''' 899 sample3 = '''\ 90005/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03 90105/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03 90205/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03 903''' 904 905 sample4 = '''\ 9062147483648;43.0e12;17;abc;def 907147483648;43.0e2;17;abc;def 90847483648;43.0;170;abc;def 909''' 910 911 sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n" 912 sample6 = "a|b|c\r\nd|e|f\r\n" 913 sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n" 914 915 def test_has_header(self): 916 sniffer = csv.Sniffer() 917 self.assertEqual(sniffer.has_header(self.sample1), False) 918 self.assertEqual(sniffer.has_header(self.header+self.sample1), True) 919 920 def test_sniff(self): 921 sniffer = csv.Sniffer() 922 dialect = sniffer.sniff(self.sample1) 923 self.assertEqual(dialect.delimiter, ",") 924 self.assertEqual(dialect.quotechar, '"') 925 self.assertEqual(dialect.skipinitialspace, True) 926 927 dialect = sniffer.sniff(self.sample2) 928 self.assertEqual(dialect.delimiter, ":") 929 self.assertEqual(dialect.quotechar, "'") 930 self.assertEqual(dialect.skipinitialspace, False) 931 932 def test_delimiters(self): 933 sniffer = csv.Sniffer() 934 dialect = sniffer.sniff(self.sample3) 935 # given that all three lines in sample3 are equal, 936 # I think that any character could have been 'guessed' as the 937 # delimiter, depending on dictionary order 938 self.assertIn(dialect.delimiter, self.sample3) 939 dialect = sniffer.sniff(self.sample3, delimiters="?,") 940 self.assertEqual(dialect.delimiter, "?") 941 dialect = sniffer.sniff(self.sample3, delimiters="/,") 942 self.assertEqual(dialect.delimiter, "/") 943 dialect = sniffer.sniff(self.sample4) 944 self.assertEqual(dialect.delimiter, ";") 945 dialect = sniffer.sniff(self.sample5) 946 self.assertEqual(dialect.delimiter, "\t") 947 dialect = sniffer.sniff(self.sample6) 948 self.assertEqual(dialect.delimiter, "|") 949 dialect = sniffer.sniff(self.sample7) 950 self.assertEqual(dialect.delimiter, "|") 951 self.assertEqual(dialect.quotechar, "'") 952 953 def test_doublequote(self): 954 sniffer = csv.Sniffer() 955 dialect = sniffer.sniff(self.header) 956 self.assertFalse(dialect.doublequote) 957 dialect = sniffer.sniff(self.sample2) 958 self.assertTrue(dialect.doublequote) 959 960if not hasattr(sys, "gettotalrefcount"): 961 if test_support.verbose: print "*** skipping leakage tests ***" 962else: 963 class NUL: 964 def write(s, *args): 965 pass 966 writelines = write 967 968 class TestLeaks(unittest.TestCase): 969 def test_create_read(self): 970 delta = 0 971 lastrc = sys.gettotalrefcount() 972 for i in xrange(20): 973 gc.collect() 974 self.assertEqual(gc.garbage, []) 975 rc = sys.gettotalrefcount() 976 csv.reader(["a,b,c\r\n"]) 977 csv.reader(["a,b,c\r\n"]) 978 csv.reader(["a,b,c\r\n"]) 979 delta = rc-lastrc 980 lastrc = rc 981 # if csv.reader() leaks, last delta should be 3 or more 982 self.assertEqual(delta < 3, True) 983 984 def test_create_write(self): 985 delta = 0 986 lastrc = sys.gettotalrefcount() 987 s = NUL() 988 for i in xrange(20): 989 gc.collect() 990 self.assertEqual(gc.garbage, []) 991 rc = sys.gettotalrefcount() 992 csv.writer(s) 993 csv.writer(s) 994 csv.writer(s) 995 delta = rc-lastrc 996 lastrc = rc 997 # if csv.writer() leaks, last delta should be 3 or more 998 self.assertEqual(delta < 3, True) 999 1000 def test_read(self): 1001 delta = 0 1002 rows = ["a,b,c\r\n"]*5 1003 lastrc = sys.gettotalrefcount() 1004 for i in xrange(20): 1005 gc.collect() 1006 self.assertEqual(gc.garbage, []) 1007 rc = sys.gettotalrefcount() 1008 rdr = csv.reader(rows) 1009 for row in rdr: 1010 pass 1011 delta = rc-lastrc 1012 lastrc = rc 1013 # if reader leaks during read, delta should be 5 or more 1014 self.assertEqual(delta < 5, True) 1015 1016 def test_write(self): 1017 delta = 0 1018 rows = [[1,2,3]]*5 1019 s = NUL() 1020 lastrc = sys.gettotalrefcount() 1021 for i in xrange(20): 1022 gc.collect() 1023 self.assertEqual(gc.garbage, []) 1024 rc = sys.gettotalrefcount() 1025 writer = csv.writer(s) 1026 for row in rows: 1027 writer.writerow(row) 1028 delta = rc-lastrc 1029 lastrc = rc 1030 # if writer leaks during write, last delta should be 5 or more 1031 self.assertEqual(delta < 5, True) 1032 1033# commented out for now - csv module doesn't yet support Unicode 1034## class TestUnicode(unittest.TestCase): 1035## def test_unicode_read(self): 1036## import codecs 1037## f = codecs.EncodedFile(StringIO("Martin von L�wis," 1038## "Marc Andr� Lemburg," 1039## "Guido van Rossum," 1040## "Fran�ois Pinard\r\n"), 1041## data_encoding='iso-8859-1') 1042## reader = csv.reader(f) 1043## self.assertEqual(list(reader), [[u"Martin von L�wis", 1044## u"Marc Andr� Lemburg", 1045## u"Guido van Rossum", 1046## u"Fran�ois Pinardn"]]) 1047 1048def test_main(): 1049 mod = sys.modules[__name__] 1050 test_support.run_unittest( 1051 *[getattr(mod, name) for name in dir(mod) if name.startswith('Test')] 1052 ) 1053 1054if __name__ == '__main__': 1055 test_main() 1056