utils_unittest.py revision 0248035d3d970e9d32334610aeba11f853390858
1#!/usr/bin/python 2 3import StringIO 4import errno 5import logging 6import os 7import select 8import shutil 9import socket 10import subprocess 11import time 12import unittest 13import urllib2 14 15import common 16from autotest_lib.client.common_lib import autotemp 17from autotest_lib.client.common_lib import error 18from autotest_lib.client.common_lib import utils 19from autotest_lib.client.common_lib.test_utils import mock 20 21metrics = utils.metrics_mock 22 23 24class test_read_one_line(unittest.TestCase): 25 def setUp(self): 26 self.god = mock.mock_god(ut=self) 27 self.god.stub_function(utils, "open") 28 29 30 def tearDown(self): 31 self.god.unstub_all() 32 33 34 def test_ip_to_long(self): 35 self.assertEqual(utils.ip_to_long('0.0.0.0'), 0) 36 self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295) 37 self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521) 38 self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320) 39 40 41 def test_long_to_ip(self): 42 self.assertEqual(utils.long_to_ip(0), '0.0.0.0') 43 self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255') 44 self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1') 45 self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8') 46 47 48 def test_create_subnet_mask(self): 49 self.assertEqual(utils.create_subnet_mask(0), 0) 50 self.assertEqual(utils.create_subnet_mask(32), 4294967295) 51 self.assertEqual(utils.create_subnet_mask(25), 4294967168) 52 53 54 def test_format_ip_with_mask(self): 55 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0), 56 '0.0.0.0/0') 57 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32), 58 '192.168.0.1/32') 59 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26), 60 '192.168.0.0/26') 61 self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26), 62 '192.168.0.192/26') 63 64 65 def create_test_file(self, contents): 66 test_file = StringIO.StringIO(contents) 67 utils.open.expect_call("filename", "r").and_return(test_file) 68 69 70 def test_reads_one_line_file(self): 71 self.create_test_file("abc\n") 72 self.assertEqual("abc", utils.read_one_line("filename")) 73 self.god.check_playback() 74 75 76 def test_strips_read_lines(self): 77 self.create_test_file("abc \n") 78 self.assertEqual("abc ", utils.read_one_line("filename")) 79 self.god.check_playback() 80 81 82 def test_drops_extra_lines(self): 83 self.create_test_file("line 1\nline 2\nline 3\n") 84 self.assertEqual("line 1", utils.read_one_line("filename")) 85 self.god.check_playback() 86 87 88 def test_works_on_empty_file(self): 89 self.create_test_file("") 90 self.assertEqual("", utils.read_one_line("filename")) 91 self.god.check_playback() 92 93 94 def test_works_on_file_with_no_newlines(self): 95 self.create_test_file("line but no newline") 96 self.assertEqual("line but no newline", 97 utils.read_one_line("filename")) 98 self.god.check_playback() 99 100 101 def test_preserves_leading_whitespace(self): 102 self.create_test_file(" has leading whitespace") 103 self.assertEqual(" has leading whitespace", 104 utils.read_one_line("filename")) 105 106 107class test_write_one_line(unittest.TestCase): 108 def setUp(self): 109 self.god = mock.mock_god(ut=self) 110 self.god.stub_function(utils, "open") 111 112 113 def tearDown(self): 114 self.god.unstub_all() 115 116 117 def get_write_one_line_output(self, content): 118 test_file = mock.SaveDataAfterCloseStringIO() 119 utils.open.expect_call("filename", "w").and_return(test_file) 120 utils.write_one_line("filename", content) 121 self.god.check_playback() 122 return test_file.final_data 123 124 125 def test_writes_one_line_file(self): 126 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) 127 128 129 def test_preserves_existing_newline(self): 130 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) 131 132 133 def test_preserves_leading_whitespace(self): 134 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) 135 136 137 def test_preserves_trailing_whitespace(self): 138 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) 139 140 141 def test_handles_empty_input(self): 142 self.assertEqual("\n", self.get_write_one_line_output("")) 143 144 145class test_open_write_close(unittest.TestCase): 146 def setUp(self): 147 self.god = mock.mock_god(ut=self) 148 self.god.stub_function(utils, "open") 149 150 151 def tearDown(self): 152 self.god.unstub_all() 153 154 155 def test_simple_functionality(self): 156 data = "\n\nwhee\n" 157 test_file = mock.SaveDataAfterCloseStringIO() 158 utils.open.expect_call("filename", "w").and_return(test_file) 159 utils.open_write_close("filename", data) 160 self.god.check_playback() 161 self.assertEqual(data, test_file.final_data) 162 163 164class test_read_keyval(unittest.TestCase): 165 def setUp(self): 166 self.god = mock.mock_god(ut=self) 167 self.god.stub_function(utils, "open") 168 self.god.stub_function(os.path, "isdir") 169 self.god.stub_function(os.path, "exists") 170 171 172 def tearDown(self): 173 self.god.unstub_all() 174 175 176 def create_test_file(self, filename, contents): 177 test_file = StringIO.StringIO(contents) 178 os.path.exists.expect_call(filename).and_return(True) 179 utils.open.expect_call(filename).and_return(test_file) 180 181 182 def read_keyval(self, contents): 183 os.path.isdir.expect_call("file").and_return(False) 184 self.create_test_file("file", contents) 185 keyval = utils.read_keyval("file") 186 self.god.check_playback() 187 return keyval 188 189 190 def test_returns_empty_when_file_doesnt_exist(self): 191 os.path.isdir.expect_call("file").and_return(False) 192 os.path.exists.expect_call("file").and_return(False) 193 self.assertEqual({}, utils.read_keyval("file")) 194 self.god.check_playback() 195 196 197 def test_accesses_files_directly(self): 198 os.path.isdir.expect_call("file").and_return(False) 199 self.create_test_file("file", "") 200 utils.read_keyval("file") 201 self.god.check_playback() 202 203 204 def test_accesses_directories_through_keyval_file(self): 205 os.path.isdir.expect_call("dir").and_return(True) 206 self.create_test_file("dir/keyval", "") 207 utils.read_keyval("dir") 208 self.god.check_playback() 209 210 211 def test_values_are_rstripped(self): 212 keyval = self.read_keyval("a=b \n") 213 self.assertEquals(keyval, {"a": "b"}) 214 215 216 def test_comments_are_ignored(self): 217 keyval = self.read_keyval("a=b # a comment\n") 218 self.assertEquals(keyval, {"a": "b"}) 219 220 221 def test_integers_become_ints(self): 222 keyval = self.read_keyval("a=1\n") 223 self.assertEquals(keyval, {"a": 1}) 224 self.assertEquals(int, type(keyval["a"])) 225 226 227 def test_float_values_become_floats(self): 228 keyval = self.read_keyval("a=1.5\n") 229 self.assertEquals(keyval, {"a": 1.5}) 230 self.assertEquals(float, type(keyval["a"])) 231 232 233 def test_multiple_lines(self): 234 keyval = self.read_keyval("a=one\nb=two\n") 235 self.assertEquals(keyval, {"a": "one", "b": "two"}) 236 237 238 def test_the_last_duplicate_line_is_used(self): 239 keyval = self.read_keyval("a=one\nb=two\na=three\n") 240 self.assertEquals(keyval, {"a": "three", "b": "two"}) 241 242 243 def test_extra_equals_are_included_in_values(self): 244 keyval = self.read_keyval("a=b=c\n") 245 self.assertEquals(keyval, {"a": "b=c"}) 246 247 248 def test_non_alphanumeric_keynames_are_rejected(self): 249 self.assertRaises(ValueError, self.read_keyval, "a$=one\n") 250 251 252 def test_underscores_are_allowed_in_key_names(self): 253 keyval = self.read_keyval("a_b=value\n") 254 self.assertEquals(keyval, {"a_b": "value"}) 255 256 257 def test_dashes_are_allowed_in_key_names(self): 258 keyval = self.read_keyval("a-b=value\n") 259 self.assertEquals(keyval, {"a-b": "value"}) 260 261 def test_empty_value_is_allowed(self): 262 keyval = self.read_keyval("a=\n") 263 self.assertEquals(keyval, {"a": ""}) 264 265 266class test_write_keyval(unittest.TestCase): 267 def setUp(self): 268 self.god = mock.mock_god(ut=self) 269 self.god.stub_function(utils, "open") 270 self.god.stub_function(os.path, "isdir") 271 272 273 def tearDown(self): 274 self.god.unstub_all() 275 276 277 def assertHasLines(self, value, lines): 278 vlines = value.splitlines() 279 vlines.sort() 280 self.assertEquals(vlines, sorted(lines)) 281 282 283 def write_keyval(self, filename, dictionary, expected_filename=None, 284 type_tag=None): 285 if expected_filename is None: 286 expected_filename = filename 287 test_file = StringIO.StringIO() 288 self.god.stub_function(test_file, "close") 289 utils.open.expect_call(expected_filename, "a").and_return(test_file) 290 test_file.close.expect_call() 291 if type_tag is None: 292 utils.write_keyval(filename, dictionary) 293 else: 294 utils.write_keyval(filename, dictionary, type_tag) 295 return test_file.getvalue() 296 297 298 def write_keyval_file(self, dictionary, type_tag=None): 299 os.path.isdir.expect_call("file").and_return(False) 300 return self.write_keyval("file", dictionary, type_tag=type_tag) 301 302 303 def test_accesses_files_directly(self): 304 os.path.isdir.expect_call("file").and_return(False) 305 result = self.write_keyval("file", {"a": "1"}) 306 self.assertEquals(result, "a=1\n") 307 308 309 def test_accesses_directories_through_keyval_file(self): 310 os.path.isdir.expect_call("dir").and_return(True) 311 result = self.write_keyval("dir", {"b": "2"}, "dir/keyval") 312 self.assertEquals(result, "b=2\n") 313 314 315 def test_numbers_are_stringified(self): 316 result = self.write_keyval_file({"c": 3}) 317 self.assertEquals(result, "c=3\n") 318 319 320 def test_type_tags_are_excluded_by_default(self): 321 result = self.write_keyval_file({"d": "a string"}) 322 self.assertEquals(result, "d=a string\n") 323 self.assertRaises(ValueError, self.write_keyval_file, 324 {"d{perf}": "a string"}) 325 326 327 def test_perf_tags_are_allowed(self): 328 result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2}, 329 type_tag="perf") 330 self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"]) 331 self.assertRaises(ValueError, self.write_keyval_file, 332 {"a": 1, "b": 2}, type_tag="perf") 333 334 335 def test_non_alphanumeric_keynames_are_rejected(self): 336 self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0}) 337 338 339 def test_underscores_are_allowed_in_key_names(self): 340 result = self.write_keyval_file({"a_b": "value"}) 341 self.assertEquals(result, "a_b=value\n") 342 343 344 def test_dashes_are_allowed_in_key_names(self): 345 result = self.write_keyval_file({"a-b": "value"}) 346 self.assertEquals(result, "a-b=value\n") 347 348 349class test_is_url(unittest.TestCase): 350 def test_accepts_http(self): 351 self.assertTrue(utils.is_url("http://example.com")) 352 353 354 def test_accepts_ftp(self): 355 self.assertTrue(utils.is_url("ftp://ftp.example.com")) 356 357 358 def test_rejects_local_path(self): 359 self.assertFalse(utils.is_url("/home/username/file")) 360 361 362 def test_rejects_local_filename(self): 363 self.assertFalse(utils.is_url("filename")) 364 365 366 def test_rejects_relative_local_path(self): 367 self.assertFalse(utils.is_url("somedir/somesubdir/file")) 368 369 370 def test_rejects_local_path_containing_url(self): 371 self.assertFalse(utils.is_url("somedir/http://path/file")) 372 373 374class test_urlopen(unittest.TestCase): 375 def setUp(self): 376 self.god = mock.mock_god(ut=self) 377 378 379 def tearDown(self): 380 self.god.unstub_all() 381 382 383 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, 384 *expected_args): 385 expected_args += (None,) * (2 - len(expected_args)) 386 def urlopen(url, data=None): 387 self.assertEquals(expected_args, (url,data)) 388 test_func(socket.getdefaulttimeout()) 389 return expected_return 390 self.god.stub_with(urllib2, "urlopen", urlopen) 391 392 393 def stub_urlopen_with_timeout_check(self, expected_timeout, 394 expected_return, *expected_args): 395 def test_func(timeout): 396 self.assertEquals(timeout, expected_timeout) 397 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, 398 *expected_args) 399 400 401 def test_timeout_set_during_call(self): 402 self.stub_urlopen_with_timeout_check(30, "retval", "url") 403 retval = utils.urlopen("url", timeout=30) 404 self.assertEquals(retval, "retval") 405 406 407 def test_timeout_reset_after_call(self): 408 old_timeout = socket.getdefaulttimeout() 409 self.stub_urlopen_with_timeout_check(30, None, "url") 410 try: 411 socket.setdefaulttimeout(1234) 412 utils.urlopen("url", timeout=30) 413 self.assertEquals(1234, socket.getdefaulttimeout()) 414 finally: 415 socket.setdefaulttimeout(old_timeout) 416 417 418 def test_timeout_set_by_default(self): 419 def test_func(timeout): 420 self.assertTrue(timeout is not None) 421 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") 422 utils.urlopen("url") 423 424 425 def test_args_are_untouched(self): 426 self.stub_urlopen_with_timeout_check(30, None, "http://url", 427 "POST data") 428 utils.urlopen("http://url", timeout=30, data="POST data") 429 430 431class test_urlretrieve(unittest.TestCase): 432 def setUp(self): 433 self.god = mock.mock_god(ut=self) 434 435 436 def tearDown(self): 437 self.god.unstub_all() 438 439 440 def test_urlopen_passed_arguments(self): 441 self.god.stub_function(utils, "urlopen") 442 self.god.stub_function(utils.shutil, "copyfileobj") 443 self.god.stub_function(utils, "open") 444 445 url = "url" 446 dest = "somefile" 447 data = object() 448 timeout = 10 449 450 src_file = self.god.create_mock_class(file, "file") 451 dest_file = self.god.create_mock_class(file, "file") 452 453 (utils.urlopen.expect_call(url, data=data, timeout=timeout) 454 .and_return(src_file)) 455 utils.open.expect_call(dest, "wb").and_return(dest_file) 456 utils.shutil.copyfileobj.expect_call(src_file, dest_file) 457 dest_file.close.expect_call() 458 src_file.close.expect_call() 459 460 utils.urlretrieve(url, dest, data=data, timeout=timeout) 461 self.god.check_playback() 462 463 464class test_merge_trees(unittest.TestCase): 465 # a some path-handling helper functions 466 def src(self, *path_segments): 467 return os.path.join(self.src_tree.name, *path_segments) 468 469 470 def dest(self, *path_segments): 471 return os.path.join(self.dest_tree.name, *path_segments) 472 473 474 def paths(self, *path_segments): 475 return self.src(*path_segments), self.dest(*path_segments) 476 477 478 def assertFileEqual(self, *path_segments): 479 src, dest = self.paths(*path_segments) 480 self.assertEqual(True, os.path.isfile(src)) 481 self.assertEqual(True, os.path.isfile(dest)) 482 self.assertEqual(os.path.getsize(src), os.path.getsize(dest)) 483 self.assertEqual(open(src).read(), open(dest).read()) 484 485 486 def assertFileContents(self, contents, *path_segments): 487 dest = self.dest(*path_segments) 488 self.assertEqual(True, os.path.isfile(dest)) 489 self.assertEqual(os.path.getsize(dest), len(contents)) 490 self.assertEqual(contents, open(dest).read()) 491 492 493 def setUp(self): 494 self.src_tree = autotemp.tempdir(unique_id='utilsrc') 495 self.dest_tree = autotemp.tempdir(unique_id='utilsdest') 496 497 # empty subdirs 498 os.mkdir(self.src("empty")) 499 os.mkdir(self.dest("empty")) 500 501 502 def tearDown(self): 503 self.src_tree.clean() 504 self.dest_tree.clean() 505 506 507 def test_both_dont_exist(self): 508 utils.merge_trees(*self.paths("empty")) 509 510 511 def test_file_only_at_src(self): 512 print >> open(self.src("src_only"), "w"), "line 1" 513 utils.merge_trees(*self.paths("src_only")) 514 self.assertFileEqual("src_only") 515 516 517 def test_file_only_at_dest(self): 518 print >> open(self.dest("dest_only"), "w"), "line 1" 519 utils.merge_trees(*self.paths("dest_only")) 520 self.assertEqual(False, os.path.exists(self.src("dest_only"))) 521 self.assertFileContents("line 1\n", "dest_only") 522 523 524 def test_file_at_both(self): 525 print >> open(self.dest("in_both"), "w"), "line 1" 526 print >> open(self.src("in_both"), "w"), "line 2" 527 utils.merge_trees(*self.paths("in_both")) 528 self.assertFileContents("line 1\nline 2\n", "in_both") 529 530 531 def test_directory_with_files_in_both(self): 532 print >> open(self.dest("in_both"), "w"), "line 1" 533 print >> open(self.src("in_both"), "w"), "line 3" 534 utils.merge_trees(*self.paths()) 535 self.assertFileContents("line 1\nline 3\n", "in_both") 536 537 538 def test_directory_with_mix_of_files(self): 539 print >> open(self.dest("in_dest"), "w"), "dest line" 540 print >> open(self.src("in_src"), "w"), "src line" 541 utils.merge_trees(*self.paths()) 542 self.assertFileContents("dest line\n", "in_dest") 543 self.assertFileContents("src line\n", "in_src") 544 545 546 def test_directory_with_subdirectories(self): 547 os.mkdir(self.src("src_subdir")) 548 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" 549 os.mkdir(self.src("both_subdir")) 550 os.mkdir(self.dest("both_subdir")) 551 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" 552 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" 553 utils.merge_trees(*self.paths()) 554 self.assertFileContents("subdir line\n", "src_subdir", "subfile") 555 self.assertFileContents("dest line\nsrc line\n", "both_subdir", 556 "subfile") 557 558 559class test_get_relative_path(unittest.TestCase): 560 def test_not_absolute(self): 561 self.assertRaises(AssertionError, utils.get_relative_path, "a", "b") 562 563 def test_same_dir(self): 564 self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c") 565 566 def test_forward_dir(self): 567 self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d") 568 569 def test_previous_dir(self): 570 self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..") 571 572 def test_parallel_dir(self): 573 self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"), 574 "../../../c/d") 575 576 577class test_sh_escape(unittest.TestCase): 578 def _test_in_shell(self, text): 579 escaped_text = utils.sh_escape(text) 580 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, 581 stdin=open(os.devnull, 'r'), 582 stdout=subprocess.PIPE, 583 stderr=open(os.devnull, 'w')) 584 stdout, _ = proc.communicate() 585 self.assertEqual(proc.returncode, 0) 586 self.assertEqual(stdout[:-1], text) 587 588 589 def test_normal_string(self): 590 self._test_in_shell('abcd') 591 592 593 def test_spaced_string(self): 594 self._test_in_shell('abcd efgh') 595 596 597 def test_dollar(self): 598 self._test_in_shell('$') 599 600 601 def test_single_quote(self): 602 self._test_in_shell('\'') 603 604 605 def test_single_quoted_string(self): 606 self._test_in_shell('\'efgh\'') 607 608 609 def test_string_with_single_quote(self): 610 self._test_in_shell("a'b") 611 612 613 def test_string_with_escaped_single_quote(self): 614 self._test_in_shell(r"a\'b") 615 616 617 def test_double_quote(self): 618 self._test_in_shell('"') 619 620 621 def test_double_quoted_string(self): 622 self._test_in_shell('"abcd"') 623 624 625 def test_backtick(self): 626 self._test_in_shell('`') 627 628 629 def test_backticked_string(self): 630 self._test_in_shell('`jklm`') 631 632 633 def test_backslash(self): 634 self._test_in_shell('\\') 635 636 637 def test_backslashed_special_characters(self): 638 self._test_in_shell('\\$') 639 self._test_in_shell('\\"') 640 self._test_in_shell('\\\'') 641 self._test_in_shell('\\`') 642 643 644 def test_backslash_codes(self): 645 self._test_in_shell('\\n') 646 self._test_in_shell('\\r') 647 self._test_in_shell('\\t') 648 self._test_in_shell('\\v') 649 self._test_in_shell('\\b') 650 self._test_in_shell('\\a') 651 self._test_in_shell('\\000') 652 653 def test_real_newline(self): 654 self._test_in_shell('\n') 655 self._test_in_shell('\\\n') 656 657 658class test_sh_quote_word(test_sh_escape): 659 """Run tests on sh_quote_word. 660 661 Inherit from test_sh_escape to get the same tests to run on both. 662 """ 663 664 def _test_in_shell(self, text): 665 quoted_word = utils.sh_quote_word(text) 666 echoed_value = subprocess.check_output('echo %s' % quoted_word, 667 shell=True) 668 self.assertEqual(echoed_value, text + '\n') 669 670 671class test_nested_sh_quote_word(test_sh_quote_word): 672 """Run nested tests on sh_quote_word. 673 674 Inherit from test_sh_quote_word to get the same tests to run on both. 675 """ 676 677 def _test_in_shell(self, text): 678 command = 'echo ' + utils.sh_quote_word(text) 679 nested_command = 'echo ' + utils.sh_quote_word(command) 680 produced_command = subprocess.check_output(nested_command, shell=True) 681 echoed_value = subprocess.check_output(produced_command, shell=True) 682 self.assertEqual(echoed_value, text + '\n') 683 684 685class test_run(unittest.TestCase): 686 """ 687 Test the utils.run() function. 688 689 Note: This test runs simple external commands to test the utils.run() 690 API without assuming implementation details. 691 """ 692 def setUp(self): 693 self.god = mock.mock_god(ut=self) 694 self.god.stub_function(utils.logging, 'warning') 695 self.god.stub_function(utils.logging, 'debug') 696 697 698 def tearDown(self): 699 self.god.unstub_all() 700 701 702 def __check_result(self, result, command, exit_status=0, stdout='', 703 stderr=''): 704 self.assertEquals(result.command, command) 705 self.assertEquals(result.exit_status, exit_status) 706 self.assertEquals(result.stdout, stdout) 707 self.assertEquals(result.stderr, stderr) 708 709 710 def test_default_simple(self): 711 cmd = 'echo "hello world"' 712 # expect some king of logging.debug() call but don't care about args 713 utils.logging.debug.expect_any_call() 714 self.__check_result(utils.run(cmd), cmd, stdout='hello world\n') 715 716 717 def test_default_failure(self): 718 cmd = 'exit 11' 719 try: 720 utils.run(cmd, verbose=False) 721 except utils.error.CmdError, err: 722 self.__check_result(err.result_obj, cmd, exit_status=11) 723 724 725 def test_ignore_status(self): 726 cmd = 'echo error >&2 && exit 11' 727 self.__check_result(utils.run(cmd, ignore_status=True, verbose=False), 728 cmd, exit_status=11, stderr='error\n') 729 730 731 def test_timeout(self): 732 # we expect a logging.warning() message, don't care about the contents 733 utils.logging.warning.expect_any_call() 734 try: 735 utils.run('echo -n output && sleep 10', timeout=1, verbose=False) 736 except utils.error.CmdError, err: 737 self.assertEquals(err.result_obj.stdout, 'output') 738 739 740 def test_stdout_stderr_tee(self): 741 cmd = 'echo output && echo error >&2' 742 stdout_tee = StringIO.StringIO() 743 stderr_tee = StringIO.StringIO() 744 745 self.__check_result(utils.run( 746 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, 747 verbose=False), cmd, stdout='output\n', stderr='error\n') 748 self.assertEqual(stdout_tee.getvalue(), 'output\n') 749 self.assertEqual(stderr_tee.getvalue(), 'error\n') 750 751 752 def test_stdin_string(self): 753 cmd = 'cat' 754 self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'), 755 cmd, stdout='hi!\n') 756 757 758 def test_safe_args(self): 759 # NOTE: The string in expected_quoted_cmd depends on the internal 760 # implementation of shell quoting which is used by utils.run(), 761 # in this case, sh_quote_word(). 762 expected_quoted_cmd = "echo 'hello \"world' again" 763 self.__check_result(utils.run( 764 'echo', verbose=False, args=('hello "world', 'again')), 765 expected_quoted_cmd, stdout='hello "world again\n') 766 767 768 def test_safe_args_given_string(self): 769 self.assertRaises(TypeError, utils.run, 'echo', args='hello') 770 771 772 def test_wait_interrupt(self): 773 """Test that we actually select twice if the first one returns EINTR.""" 774 utils.logging.debug.expect_any_call() 775 776 bg_job = utils.BgJob('echo "hello world"') 777 bg_job.result.exit_status = 0 778 self.god.stub_function(utils.select, 'select') 779 780 utils.select.select.expect_any_call().and_raises( 781 select.error(errno.EINTR, 'Select interrupted')) 782 utils.logging.warning.expect_any_call() 783 784 utils.select.select.expect_any_call().and_return( 785 ([bg_job.sp.stdout, bg_job.sp.stderr], [], None)) 786 utils.logging.warning.expect_any_call() 787 788 self.assertFalse( 789 utils._wait_for_commands([bg_job], time.time(), None)) 790 791 792class test_compare_versions(unittest.TestCase): 793 def test_zerofill(self): 794 self.assertEqual(utils.compare_versions('1.7', '1.10'), -1) 795 self.assertEqual(utils.compare_versions('1.222', '1.3'), 1) 796 self.assertEqual(utils.compare_versions('1.03', '1.3'), 0) 797 798 799 def test_unequal_len(self): 800 self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1) 801 self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1) 802 803 804 def test_dash_delimited(self): 805 self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1) 806 self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1) 807 self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0) 808 809 810 def test_alphabets(self): 811 self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1) 812 self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1) 813 self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0) 814 815 816 def test_mix_symbols(self): 817 self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1) 818 self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1) 819 self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0) 820 821 self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1) 822 self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1) 823 self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0) 824 825 826class test_args_to_dict(unittest.TestCase): 827 def test_no_args(self): 828 result = utils.args_to_dict([]) 829 self.assertEqual({}, result) 830 831 832 def test_matches(self): 833 result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', 834 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) 835 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', 836 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) 837 838 839 def test_unmatches(self): 840 # Temporarily shut warning messages from args_to_dict() when an argument 841 # doesn't match its pattern. 842 logger = logging.getLogger() 843 saved_level = logger.level 844 logger.setLevel(logging.ERROR) 845 846 try: 847 result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b', 848 ':VAL', '=VVV', 'WORD']) 849 self.assertEqual({}, result) 850 finally: 851 # Restore level. 852 logger.setLevel(saved_level) 853 854 855class test_get_random_port(unittest.TestCase): 856 def do_bind(self, port, socket_type, socket_proto): 857 s = socket.socket(socket.AF_INET, socket_type, socket_proto) 858 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 859 s.bind(('', port)) 860 return s 861 862 863 def test_get_port(self): 864 for _ in xrange(100): 865 p = utils.get_unused_port() 866 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) 867 self.assert_(s.getsockname()) 868 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 869 self.assert_(s.getsockname()) 870 871 872def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 873 """Test global function. 874 """ 875 876 877class TestClass(object): 878 """Test class. 879 """ 880 881 def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 882 """Test instance function. 883 """ 884 885 886 @classmethod 887 def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 888 """Test class function. 889 """ 890 891 892 @staticmethod 893 def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 894 """Test static function. 895 """ 896 897 898class GetFunctionArgUnittest(unittest.TestCase): 899 """Tests for method get_function_arg_value.""" 900 901 def run_test(self, func, insert_arg): 902 """Run test. 903 904 @param func: Function being called with given arguments. 905 @param insert_arg: Set to True to insert an object in the argument list. 906 This is to mock instance/class object. 907 """ 908 if insert_arg: 909 args = (None, 1, 2, 3) 910 else: 911 args = (1, 2, 3) 912 for i in range(1, 7): 913 self.assertEquals(utils.get_function_arg_value( 914 func, 'arg%d'%i, args, {}), i) 915 916 self.assertEquals(utils.get_function_arg_value( 917 func, 'arg7', args, {'arg7': 7}), 7) 918 self.assertRaises( 919 KeyError, utils.get_function_arg_value, 920 func, 'arg3', args[:-1], {}) 921 922 923 def test_global_function(self): 924 """Test global function. 925 """ 926 self.run_test(test_function, False) 927 928 929 def test_instance_function(self): 930 """Test instance function. 931 """ 932 self.run_test(TestClass().test_instance_function, True) 933 934 935 def test_class_function(self): 936 """Test class function. 937 """ 938 self.run_test(TestClass.test_class_function, True) 939 940 941 def test_static_function(self): 942 """Test static function. 943 """ 944 self.run_test(TestClass.test_static_function, False) 945 946 947class VersionMatchUnittest(unittest.TestCase): 948 """Test version_match function.""" 949 950 def test_version_match(self): 951 """Test version_match function.""" 952 canary_build = 'lumpy-release/R43-6803.0.0' 953 canary_release = '6803.0.0' 954 cq_build = 'lumpy-release/R43-6803.0.0-rc1' 955 cq_release = '6803.0.0-rc1' 956 trybot_paladin_build = 'trybot-lumpy-paladin/R43-6803.0.0-b123' 957 trybot_paladin_release = '6803.0.2015_03_12_2103' 958 trybot_pre_cq_build = 'trybot-wifi-pre-cq/R43-7000.0.0-b36' 959 trybot_pre_cq_release = '7000.0.2016_03_12_2103' 960 trybot_toolchain_build = 'trybot-nyan_big-gcc-toolchain/R56-8936.0.0-b14' 961 trybot_toolchain_release = '8936.0.2016_10_26_1403' 962 963 964 builds = [canary_build, cq_build, trybot_paladin_build, 965 trybot_pre_cq_build, trybot_toolchain_build] 966 releases = [canary_release, cq_release, trybot_paladin_release, 967 trybot_pre_cq_release, trybot_toolchain_release] 968 for i in range(len(builds)): 969 for j in range(len(releases)): 970 self.assertEqual( 971 utils.version_match(builds[i], releases[j]), i==j, 972 'Build version %s should%s match release version %s.' % 973 (builds[i], '' if i==j else ' not', releases[j])) 974 975 976class IsInSameSubnetUnittest(unittest.TestCase): 977 """Test is_in_same_subnet function.""" 978 979 def test_is_in_same_subnet(self): 980 """Test is_in_same_subnet function.""" 981 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2', 982 23)) 983 self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2', 984 24)) 985 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255', 986 24)) 987 self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0', 988 24)) 989 990 991class GetWirelessSsidUnittest(unittest.TestCase): 992 """Test get_wireless_ssid function.""" 993 994 DEFAULT_SSID = 'default' 995 SSID_1 = 'ssid_1' 996 SSID_2 = 'ssid_2' 997 SSID_3 = 'ssid_3' 998 999 def test_get_wireless_ssid(self): 1000 """Test is_in_same_subnet function.""" 1001 god = mock.mock_god() 1002 god.stub_function_to_return(utils.CONFIG, 'get_config_value', 1003 self.DEFAULT_SSID) 1004 god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex', 1005 {'wireless_ssid_1.2.3.4/24': self.SSID_1, 1006 'wireless_ssid_4.3.2.1/16': self.SSID_2, 1007 'wireless_ssid_4.3.2.111/32': self.SSID_3}) 1008 self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100')) 1009 self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100')) 1010 self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111')) 1011 self.assertEqual(self.DEFAULT_SSID, 1012 utils.get_wireless_ssid('100.0.0.100')) 1013 1014 1015class LaunchControlBuildParseUnittest(unittest.TestCase): 1016 """Test various parsing functions related to Launch Control builds and 1017 devices. 1018 """ 1019 1020 def test_parse_launch_control_target(self): 1021 """Test parse_launch_control_target function.""" 1022 target_tests = { 1023 ('shamu', 'userdebug'): 'shamu-userdebug', 1024 ('shamu', 'eng'): 'shamu-eng', 1025 ('shamu-board', 'eng'): 'shamu-board-eng', 1026 (None, None): 'bad_target', 1027 (None, None): 'target'} 1028 for result, target in target_tests.items(): 1029 self.assertEqual(result, utils.parse_launch_control_target(target)) 1030 1031 1032class GetOffloaderUriTest(unittest.TestCase): 1033 """Test get_offload_gsuri function.""" 1034 _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket' 1035 1036 def setUp(self): 1037 self.god = mock.mock_god() 1038 1039 def tearDown(self): 1040 self.god.unstub_all() 1041 1042 def test_get_default_lab_offload_gsuri(self): 1043 """Test default lab offload gsuri .""" 1044 self.god.mock_up(utils.CONFIG, 'CONFIG') 1045 self.god.stub_function_to_return(utils, 'is_moblab', False) 1046 self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI, 1047 utils.get_offload_gsuri()) 1048 1049 self.god.check_playback() 1050 1051 def test_get_default_moblab_offload_gsuri(self): 1052 self.god.mock_up(utils.CONFIG, 'CONFIG') 1053 self.god.stub_function_to_return(utils, 'is_moblab', True) 1054 utils.CONFIG.get_config_value.expect_call( 1055 'CROS', 'image_storage_server').and_return( 1056 self._IMAGE_STORAGE_SERVER) 1057 self.god.stub_function_to_return(utils, 1058 'get_interface_mac_address', 'test_mac') 1059 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id') 1060 expected_gsuri = '%sresults/%s/%s/' % ( 1061 self._IMAGE_STORAGE_SERVER, 'test_mac', 'test_id') 1062 cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI 1063 utils.DEFAULT_OFFLOAD_GSURI = None 1064 gsuri = utils.get_offload_gsuri() 1065 utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri 1066 self.assertEqual(expected_gsuri, gsuri) 1067 1068 self.god.check_playback() 1069 1070 def test_get_moblab_offload_gsuri(self): 1071 """Test default lab offload gsuri .""" 1072 self.god.mock_up(utils.CONFIG, 'CONFIG') 1073 self.god.stub_function_to_return(utils, 'is_moblab', True) 1074 self.god.stub_function_to_return(utils, 1075 'get_interface_mac_address', 'test_mac') 1076 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id') 1077 gsuri = '%s%s/%s/' % ( 1078 utils.DEFAULT_OFFLOAD_GSURI, 'test_mac', 'test_id') 1079 self.assertEqual(gsuri, utils.get_offload_gsuri()) 1080 1081 self.god.check_playback() 1082 1083 1084class GetBuiltinEthernetNicNameTest(unittest.TestCase): 1085 """Test get moblab public network interface name.""" 1086 1087 def setUp(self): 1088 self.god = mock.mock_god() 1089 1090 def tearDown(self): 1091 self.god.unstub_all() 1092 1093 def test_is_eth0(self): 1094 """Test when public network interface is eth0.""" 1095 run_func = self.god.create_mock_function('run_func') 1096 self.god.stub_with(utils, 'run', run_func) 1097 run_func.expect_call('readlink -f /sys/class/net/eth0').and_return( 1098 utils.CmdResult(exit_status=0, stdout='not_u_s_b')) 1099 eth = utils.get_built_in_ethernet_nic_name() 1100 self.assertEqual('eth0', eth) 1101 self.god.check_playback() 1102 1103 def test_readlin_fails(self): 1104 """Test when readlink does not work.""" 1105 run_func = self.god.create_mock_function('run_func') 1106 self.god.stub_with(utils, 'run', run_func) 1107 run_func.expect_call('readlink -f /sys/class/net/eth0').and_return( 1108 utils.CmdResult(exit_status=-1, stdout='blah')) 1109 eth = utils.get_built_in_ethernet_nic_name() 1110 self.assertEqual('eth0', eth) 1111 self.god.check_playback() 1112 1113 def test_no_readlink(self): 1114 """Test when readlink does not exist.""" 1115 run_func = self.god.create_mock_function('run_func') 1116 self.god.stub_with(utils, 'run', run_func) 1117 run_func.expect_call('readlink -f /sys/class/net/eth0').and_raises( 1118 error.CmdError('readlink', 'no such command')) 1119 eth = utils.get_built_in_ethernet_nic_name() 1120 self.assertEqual('eth0', eth) 1121 self.god.check_playback() 1122 1123 def test_is_eth1(self): 1124 """Test when public network interface is eth1.""" 1125 run_func = self.god.create_mock_function('run_func') 1126 self.god.stub_with(utils, 'run', run_func) 1127 run_func.expect_call('readlink -f /sys/class/net/eth0').and_return( 1128 utils.CmdResult(exit_status=0, stdout='is usb')) 1129 run_func.expect_call('readlink -f /sys/class/net/eth1').and_return( 1130 utils.CmdResult(exit_status=0, stdout='not_u_s_b')) 1131 eth = utils.get_built_in_ethernet_nic_name() 1132 self.assertEqual('eth1', eth) 1133 self.god.check_playback() 1134 1135 1136class MockMetricsTest(unittest.TestCase): 1137 """Test metrics mock class can handle various metrics calls.""" 1138 1139 def test_Counter(self): 1140 """Test the mock class can create an instance and call any method. 1141 """ 1142 c = metrics.Counter('counter') 1143 c.increment(fields={'key': 1}) 1144 1145 1146 def test_Context(self): 1147 """Test the mock class can handle context class. 1148 """ 1149 test_value = None 1150 with metrics.SecondsTimer('context') as t: 1151 test_value = 'called_in_context' 1152 t['random_key'] = 'pass' 1153 self.assertEqual('called_in_context', test_value) 1154 1155 1156 def test_decorator(self): 1157 """Test the mock class can handle decorator. 1158 """ 1159 class TestClass(object): 1160 1161 def __init__(self): 1162 self.value = None 1163 1164 test_value = TestClass() 1165 test_value.value = None 1166 @metrics.SecondsTimerDecorator('decorator') 1167 def test(arg): 1168 arg.value = 'called_in_decorator' 1169 1170 test(test_value) 1171 self.assertEqual('called_in_decorator', test_value.value) 1172 1173 1174 def test_setitem(self): 1175 """Test the mock class can handle set item call. 1176 """ 1177 timer = metrics.SecondsTimer('name') 1178 timer['random_key'] = 'pass' 1179 1180 1181 1182if __name__ == "__main__": 1183 unittest.main() 1184