utils_unittest.py revision 6631273af8b88842cbd6202cc4615daf050cc957
1#!/usr/bin/python 2 3import os, unittest, StringIO, socket, urllib2, shutil, subprocess 4 5import common 6from autotest_lib.client.common_lib import utils, autotemp 7from autotest_lib.client.common_lib.test_utils import mock 8 9 10class test_read_one_line(unittest.TestCase): 11 def setUp(self): 12 self.god = mock.mock_god() 13 self.god.stub_function(utils, "open") 14 15 16 def tearDown(self): 17 self.god.unstub_all() 18 19 20 def test_ip_to_long(self): 21 self.assertEqual(utils.ip_to_long('0.0.0.0'), 0) 22 self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295) 23 self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521) 24 self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320) 25 26 27 def test_long_to_ip(self): 28 self.assertEqual(utils.long_to_ip(0), '0.0.0.0') 29 self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255') 30 self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1') 31 self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8') 32 33 34 def test_create_subnet_mask(self): 35 self.assertEqual(utils.create_subnet_mask(0), 0) 36 self.assertEqual(utils.create_subnet_mask(32), 4294967295) 37 self.assertEqual(utils.create_subnet_mask(25), 4294967168) 38 39 40 def test_format_ip_with_mask(self): 41 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0), 42 '0.0.0.0/0') 43 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32), 44 '192.168.0.1/32') 45 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26), 46 '192.168.0.0/26') 47 self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26), 48 '192.168.0.192/26') 49 50 51 def create_test_file(self, contents): 52 test_file = StringIO.StringIO(contents) 53 utils.open.expect_call("filename", "r").and_return(test_file) 54 55 56 def test_reads_one_line_file(self): 57 self.create_test_file("abc\n") 58 self.assertEqual("abc", utils.read_one_line("filename")) 59 self.god.check_playback() 60 61 62 def test_strips_read_lines(self): 63 self.create_test_file("abc \n") 64 self.assertEqual("abc ", utils.read_one_line("filename")) 65 self.god.check_playback() 66 67 68 def test_drops_extra_lines(self): 69 self.create_test_file("line 1\nline 2\nline 3\n") 70 self.assertEqual("line 1", utils.read_one_line("filename")) 71 self.god.check_playback() 72 73 74 def test_works_on_empty_file(self): 75 self.create_test_file("") 76 self.assertEqual("", utils.read_one_line("filename")) 77 self.god.check_playback() 78 79 80 def test_works_on_file_with_no_newlines(self): 81 self.create_test_file("line but no newline") 82 self.assertEqual("line but no newline", 83 utils.read_one_line("filename")) 84 self.god.check_playback() 85 86 87 def test_preserves_leading_whitespace(self): 88 self.create_test_file(" has leading whitespace") 89 self.assertEqual(" has leading whitespace", 90 utils.read_one_line("filename")) 91 92 93class test_write_one_line(unittest.TestCase): 94 def setUp(self): 95 self.god = mock.mock_god() 96 self.god.stub_function(utils, "open") 97 98 99 def tearDown(self): 100 self.god.unstub_all() 101 102 103 def get_write_one_line_output(self, content): 104 test_file = mock.SaveDataAfterCloseStringIO() 105 utils.open.expect_call("filename", "w").and_return(test_file) 106 utils.write_one_line("filename", content) 107 self.god.check_playback() 108 return test_file.final_data 109 110 111 def test_writes_one_line_file(self): 112 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) 113 114 115 def test_preserves_existing_newline(self): 116 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) 117 118 119 def test_preserves_leading_whitespace(self): 120 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) 121 122 123 def test_preserves_trailing_whitespace(self): 124 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) 125 126 127 def test_handles_empty_input(self): 128 self.assertEqual("\n", self.get_write_one_line_output("")) 129 130 131class test_open_write_close(unittest.TestCase): 132 def setUp(self): 133 self.god = mock.mock_god() 134 self.god.stub_function(utils, "open") 135 136 137 def tearDown(self): 138 self.god.unstub_all() 139 140 141 def test_simple_functionality(self): 142 data = "\n\nwhee\n" 143 test_file = mock.SaveDataAfterCloseStringIO() 144 utils.open.expect_call("filename", "w").and_return(test_file) 145 utils.open_write_close("filename", data) 146 self.god.check_playback() 147 self.assertEqual(data, test_file.final_data) 148 149 150class test_read_keyval(unittest.TestCase): 151 def setUp(self): 152 self.god = mock.mock_god() 153 self.god.stub_function(utils, "open") 154 self.god.stub_function(os.path, "isdir") 155 self.god.stub_function(os.path, "exists") 156 157 158 def tearDown(self): 159 self.god.unstub_all() 160 161 162 def create_test_file(self, filename, contents): 163 test_file = StringIO.StringIO(contents) 164 os.path.exists.expect_call(filename).and_return(True) 165 utils.open.expect_call(filename).and_return(test_file) 166 167 168 def read_keyval(self, contents): 169 os.path.isdir.expect_call("file").and_return(False) 170 self.create_test_file("file", contents) 171 keyval = utils.read_keyval("file") 172 self.god.check_playback() 173 return keyval 174 175 176 def test_returns_empty_when_file_doesnt_exist(self): 177 os.path.isdir.expect_call("file").and_return(False) 178 os.path.exists.expect_call("file").and_return(False) 179 self.assertEqual({}, utils.read_keyval("file")) 180 self.god.check_playback() 181 182 183 def test_accesses_files_directly(self): 184 os.path.isdir.expect_call("file").and_return(False) 185 self.create_test_file("file", "") 186 utils.read_keyval("file") 187 self.god.check_playback() 188 189 190 def test_accesses_directories_through_keyval_file(self): 191 os.path.isdir.expect_call("dir").and_return(True) 192 self.create_test_file("dir/keyval", "") 193 utils.read_keyval("dir") 194 self.god.check_playback() 195 196 197 def test_values_are_rstripped(self): 198 keyval = self.read_keyval("a=b \n") 199 self.assertEquals(keyval, {"a": "b"}) 200 201 202 def test_comments_are_ignored(self): 203 keyval = self.read_keyval("a=b # a comment\n") 204 self.assertEquals(keyval, {"a": "b"}) 205 206 207 def test_integers_become_ints(self): 208 keyval = self.read_keyval("a=1\n") 209 self.assertEquals(keyval, {"a": 1}) 210 self.assertEquals(int, type(keyval["a"])) 211 212 213 def test_float_values_become_floats(self): 214 keyval = self.read_keyval("a=1.5\n") 215 self.assertEquals(keyval, {"a": 1.5}) 216 self.assertEquals(float, type(keyval["a"])) 217 218 219 def test_multiple_lines(self): 220 keyval = self.read_keyval("a=one\nb=two\n") 221 self.assertEquals(keyval, {"a": "one", "b": "two"}) 222 223 224 def test_the_last_duplicate_line_is_used(self): 225 keyval = self.read_keyval("a=one\nb=two\na=three\n") 226 self.assertEquals(keyval, {"a": "three", "b": "two"}) 227 228 229 def test_extra_equals_are_included_in_values(self): 230 keyval = self.read_keyval("a=b=c\n") 231 self.assertEquals(keyval, {"a": "b=c"}) 232 233 234 def test_non_alphanumeric_keynames_are_rejected(self): 235 self.assertRaises(ValueError, self.read_keyval, "a$=one\n") 236 237 238 def test_underscores_are_allowed_in_key_names(self): 239 keyval = self.read_keyval("a_b=value\n") 240 self.assertEquals(keyval, {"a_b": "value"}) 241 242 243 def test_dashes_are_allowed_in_key_names(self): 244 keyval = self.read_keyval("a-b=value\n") 245 self.assertEquals(keyval, {"a-b": "value"}) 246 247 248class test_write_keyval(unittest.TestCase): 249 def setUp(self): 250 self.god = mock.mock_god() 251 self.god.stub_function(utils, "open") 252 self.god.stub_function(os.path, "isdir") 253 254 255 def tearDown(self): 256 self.god.unstub_all() 257 258 259 def assertHasLines(self, value, lines): 260 vlines = value.splitlines() 261 vlines.sort() 262 self.assertEquals(vlines, sorted(lines)) 263 264 265 def write_keyval(self, filename, dictionary, expected_filename=None, 266 type_tag=None): 267 if expected_filename is None: 268 expected_filename = filename 269 test_file = StringIO.StringIO() 270 self.god.stub_function(test_file, "close") 271 utils.open.expect_call(expected_filename, "a").and_return(test_file) 272 test_file.close.expect_call() 273 if type_tag is None: 274 utils.write_keyval(filename, dictionary) 275 else: 276 utils.write_keyval(filename, dictionary, type_tag) 277 return test_file.getvalue() 278 279 280 def write_keyval_file(self, dictionary, type_tag=None): 281 os.path.isdir.expect_call("file").and_return(False) 282 return self.write_keyval("file", dictionary, type_tag=type_tag) 283 284 285 def test_accesses_files_directly(self): 286 os.path.isdir.expect_call("file").and_return(False) 287 result = self.write_keyval("file", {"a": "1"}) 288 self.assertEquals(result, "a=1\n") 289 290 291 def test_accesses_directories_through_keyval_file(self): 292 os.path.isdir.expect_call("dir").and_return(True) 293 result = self.write_keyval("dir", {"b": "2"}, "dir/keyval") 294 self.assertEquals(result, "b=2\n") 295 296 297 def test_numbers_are_stringified(self): 298 result = self.write_keyval_file({"c": 3}) 299 self.assertEquals(result, "c=3\n") 300 301 302 def test_type_tags_are_excluded_by_default(self): 303 result = self.write_keyval_file({"d": "a string"}) 304 self.assertEquals(result, "d=a string\n") 305 self.assertRaises(ValueError, self.write_keyval_file, 306 {"d{perf}": "a string"}) 307 308 309 def test_perf_tags_are_allowed(self): 310 result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2}, 311 type_tag="perf") 312 self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"]) 313 self.assertRaises(ValueError, self.write_keyval_file, 314 {"a": 1, "b": 2}, type_tag="perf") 315 316 317 def test_non_alphanumeric_keynames_are_rejected(self): 318 self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0}) 319 320 321 def test_underscores_are_allowed_in_key_names(self): 322 result = self.write_keyval_file({"a_b": "value"}) 323 self.assertEquals(result, "a_b=value\n") 324 325 326 def test_dashes_are_allowed_in_key_names(self): 327 result = self.write_keyval_file({"a-b": "value"}) 328 self.assertEquals(result, "a-b=value\n") 329 330 331class test_is_url(unittest.TestCase): 332 def test_accepts_http(self): 333 self.assertTrue(utils.is_url("http://example.com")) 334 335 336 def test_accepts_ftp(self): 337 self.assertTrue(utils.is_url("ftp://ftp.example.com")) 338 339 340 def test_rejects_local_path(self): 341 self.assertFalse(utils.is_url("/home/username/file")) 342 343 344 def test_rejects_local_filename(self): 345 self.assertFalse(utils.is_url("filename")) 346 347 348 def test_rejects_relative_local_path(self): 349 self.assertFalse(utils.is_url("somedir/somesubdir/file")) 350 351 352 def test_rejects_local_path_containing_url(self): 353 self.assertFalse(utils.is_url("somedir/http://path/file")) 354 355 356class test_urlopen(unittest.TestCase): 357 def setUp(self): 358 self.god = mock.mock_god() 359 360 361 def tearDown(self): 362 self.god.unstub_all() 363 364 365 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, 366 *expected_args): 367 expected_args += (None,) * (2 - len(expected_args)) 368 def urlopen(url, data=None): 369 self.assertEquals(expected_args, (url,data)) 370 test_func(socket.getdefaulttimeout()) 371 return expected_return 372 self.god.stub_with(urllib2, "urlopen", urlopen) 373 374 375 def stub_urlopen_with_timeout_check(self, expected_timeout, 376 expected_return, *expected_args): 377 def test_func(timeout): 378 self.assertEquals(timeout, expected_timeout) 379 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, 380 *expected_args) 381 382 383 def test_timeout_set_during_call(self): 384 self.stub_urlopen_with_timeout_check(30, "retval", "url") 385 retval = utils.urlopen("url", timeout=30) 386 self.assertEquals(retval, "retval") 387 388 389 def test_timeout_reset_after_call(self): 390 old_timeout = socket.getdefaulttimeout() 391 self.stub_urlopen_with_timeout_check(30, None, "url") 392 try: 393 socket.setdefaulttimeout(1234) 394 utils.urlopen("url", timeout=30) 395 self.assertEquals(1234, socket.getdefaulttimeout()) 396 finally: 397 socket.setdefaulttimeout(old_timeout) 398 399 400 def test_timeout_set_by_default(self): 401 def test_func(timeout): 402 self.assertTrue(timeout is not None) 403 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") 404 utils.urlopen("url") 405 406 407 def test_args_are_untouched(self): 408 self.stub_urlopen_with_timeout_check(30, None, "http://url", 409 "POST data") 410 utils.urlopen("http://url", timeout=30, data="POST data") 411 412 413class test_urlretrieve(unittest.TestCase): 414 def setUp(self): 415 self.god = mock.mock_god() 416 417 418 def tearDown(self): 419 self.god.unstub_all() 420 421 422 def test_urlopen_passed_arguments(self): 423 self.god.stub_function(utils, "urlopen") 424 self.god.stub_function(utils.shutil, "copyfileobj") 425 self.god.stub_function(utils, "open") 426 427 url = "url" 428 dest = "somefile" 429 data = object() 430 timeout = 10 431 432 src_file = self.god.create_mock_class(file, "file") 433 dest_file = self.god.create_mock_class(file, "file") 434 435 (utils.urlopen.expect_call(url, data=data, timeout=timeout) 436 .and_return(src_file)) 437 utils.open.expect_call(dest, "wb").and_return(dest_file) 438 utils.shutil.copyfileobj.expect_call(src_file, dest_file) 439 dest_file.close.expect_call() 440 src_file.close.expect_call() 441 442 utils.urlretrieve(url, dest, data=data, timeout=timeout) 443 self.god.check_playback() 444 445 446class test_merge_trees(unittest.TestCase): 447 # a some path-handling helper functions 448 def src(self, *path_segments): 449 return os.path.join(self.src_tree.name, *path_segments) 450 451 452 def dest(self, *path_segments): 453 return os.path.join(self.dest_tree.name, *path_segments) 454 455 456 def paths(self, *path_segments): 457 return self.src(*path_segments), self.dest(*path_segments) 458 459 460 def assertFileEqual(self, *path_segments): 461 src, dest = self.paths(*path_segments) 462 self.assertEqual(True, os.path.isfile(src)) 463 self.assertEqual(True, os.path.isfile(dest)) 464 self.assertEqual(os.path.getsize(src), os.path.getsize(dest)) 465 self.assertEqual(open(src).read(), open(dest).read()) 466 467 468 def assertFileContents(self, contents, *path_segments): 469 dest = self.dest(*path_segments) 470 self.assertEqual(True, os.path.isfile(dest)) 471 self.assertEqual(os.path.getsize(dest), len(contents)) 472 self.assertEqual(contents, open(dest).read()) 473 474 475 def setUp(self): 476 self.src_tree = autotemp.tempdir(unique_id='utilsrc') 477 self.dest_tree = autotemp.tempdir(unique_id='utilsdest') 478 479 # empty subdirs 480 os.mkdir(self.src("empty")) 481 os.mkdir(self.dest("empty")) 482 483 484 def tearDown(self): 485 self.src_tree.clean() 486 self.dest_tree.clean() 487 488 489 def test_both_dont_exist(self): 490 utils.merge_trees(*self.paths("empty")) 491 492 493 def test_file_only_at_src(self): 494 print >> open(self.src("src_only"), "w"), "line 1" 495 utils.merge_trees(*self.paths("src_only")) 496 self.assertFileEqual("src_only") 497 498 499 def test_file_only_at_dest(self): 500 print >> open(self.dest("dest_only"), "w"), "line 1" 501 utils.merge_trees(*self.paths("dest_only")) 502 self.assertEqual(False, os.path.exists(self.src("dest_only"))) 503 self.assertFileContents("line 1\n", "dest_only") 504 505 506 def test_file_at_both(self): 507 print >> open(self.dest("in_both"), "w"), "line 1" 508 print >> open(self.src("in_both"), "w"), "line 2" 509 utils.merge_trees(*self.paths("in_both")) 510 self.assertFileContents("line 1\nline 2\n", "in_both") 511 512 513 def test_directory_with_files_in_both(self): 514 print >> open(self.dest("in_both"), "w"), "line 1" 515 print >> open(self.src("in_both"), "w"), "line 3" 516 utils.merge_trees(*self.paths()) 517 self.assertFileContents("line 1\nline 3\n", "in_both") 518 519 520 def test_directory_with_mix_of_files(self): 521 print >> open(self.dest("in_dest"), "w"), "dest line" 522 print >> open(self.src("in_src"), "w"), "src line" 523 utils.merge_trees(*self.paths()) 524 self.assertFileContents("dest line\n", "in_dest") 525 self.assertFileContents("src line\n", "in_src") 526 527 528 def test_directory_with_subdirectories(self): 529 os.mkdir(self.src("src_subdir")) 530 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" 531 os.mkdir(self.src("both_subdir")) 532 os.mkdir(self.dest("both_subdir")) 533 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" 534 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" 535 utils.merge_trees(*self.paths()) 536 self.assertFileContents("subdir line\n", "src_subdir", "subfile") 537 self.assertFileContents("dest line\nsrc line\n", "both_subdir", 538 "subfile") 539 540 541class test_get_relative_path(unittest.TestCase): 542 def test_not_absolute(self): 543 self.assertRaises(AssertionError, utils.get_relative_path, "a", "b") 544 545 def test_same_dir(self): 546 self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c") 547 548 def test_forward_dir(self): 549 self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d") 550 551 def test_previous_dir(self): 552 self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..") 553 554 def test_parallel_dir(self): 555 self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"), 556 "../../../c/d") 557 558 559class test_sh_escape(unittest.TestCase): 560 def _test_in_shell(self, text): 561 escaped_text = utils.sh_escape(text) 562 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, 563 stdin=open(os.devnull, 'r'), 564 stdout=subprocess.PIPE, 565 stderr=open(os.devnull, 'w')) 566 stdout, _ = proc.communicate() 567 self.assertEqual(proc.returncode, 0) 568 self.assertEqual(stdout[:-1], text) 569 570 571 def test_normal_string(self): 572 self._test_in_shell('abcd') 573 574 575 def test_spaced_string(self): 576 self._test_in_shell('abcd efgh') 577 578 579 def test_dollar(self): 580 self._test_in_shell('$') 581 582 583 def test_single_quote(self): 584 self._test_in_shell('\'') 585 586 587 def test_single_quoted_string(self): 588 self._test_in_shell('\'efgh\'') 589 590 591 def test_double_quote(self): 592 self._test_in_shell('"') 593 594 595 def test_double_quoted_string(self): 596 self._test_in_shell('"abcd"') 597 598 599 def test_backtick(self): 600 self._test_in_shell('`') 601 602 603 def test_backticked_string(self): 604 self._test_in_shell('`jklm`') 605 606 607 def test_backslash(self): 608 self._test_in_shell('\\') 609 610 611 def test_backslashed_special_characters(self): 612 self._test_in_shell('\\$') 613 self._test_in_shell('\\"') 614 self._test_in_shell('\\\'') 615 self._test_in_shell('\\`') 616 617 618 def test_backslash_codes(self): 619 self._test_in_shell('\\n') 620 self._test_in_shell('\\r') 621 self._test_in_shell('\\t') 622 self._test_in_shell('\\v') 623 self._test_in_shell('\\b') 624 self._test_in_shell('\\a') 625 self._test_in_shell('\\000') 626 627 628if __name__ == "__main__": 629 unittest.main() 630