1#!/usr/bin/python 2 3import errno 4import logging 5import os 6import select 7import shutil 8import socket 9import StringIO 10import subprocess 11import time 12import unittest 13import urllib2 14 15import common 16from autotest_lib.client.common_lib import base_utils, autotemp 17from autotest_lib.client.common_lib.test_utils import mock 18 19 20class test_read_one_line(unittest.TestCase): 21 def setUp(self): 22 self.god = mock.mock_god(ut=self) 23 self.god.stub_function(base_utils, "open") 24 25 26 def tearDown(self): 27 self.god.unstub_all() 28 29 30 def test_ip_to_long(self): 31 self.assertEqual(base_utils.ip_to_long('0.0.0.0'), 0) 32 self.assertEqual(base_utils.ip_to_long('255.255.255.255'), 4294967295) 33 self.assertEqual(base_utils.ip_to_long('192.168.0.1'), 3232235521) 34 self.assertEqual(base_utils.ip_to_long('1.2.4.8'), 16909320) 35 36 37 def test_long_to_ip(self): 38 self.assertEqual(base_utils.long_to_ip(0), '0.0.0.0') 39 self.assertEqual(base_utils.long_to_ip(4294967295), '255.255.255.255') 40 self.assertEqual(base_utils.long_to_ip(3232235521), '192.168.0.1') 41 self.assertEqual(base_utils.long_to_ip(16909320), '1.2.4.8') 42 43 44 def test_create_subnet_mask(self): 45 self.assertEqual(base_utils.create_subnet_mask(0), 0) 46 self.assertEqual(base_utils.create_subnet_mask(32), 4294967295) 47 self.assertEqual(base_utils.create_subnet_mask(25), 4294967168) 48 49 50 def test_format_ip_with_mask(self): 51 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 0), 52 '0.0.0.0/0') 53 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 32), 54 '192.168.0.1/32') 55 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 26), 56 '192.168.0.0/26') 57 self.assertEqual(base_utils.format_ip_with_mask('192.168.0.255', 26), 58 '192.168.0.192/26') 59 60 61 def create_test_file(self, contents): 62 test_file = StringIO.StringIO(contents) 63 base_utils.open.expect_call("filename", "r").and_return(test_file) 64 65 66 def test_reads_one_line_file(self): 67 self.create_test_file("abc\n") 68 self.assertEqual("abc", base_utils.read_one_line("filename")) 69 self.god.check_playback() 70 71 72 def test_strips_read_lines(self): 73 self.create_test_file("abc \n") 74 self.assertEqual("abc ", base_utils.read_one_line("filename")) 75 self.god.check_playback() 76 77 78 def test_drops_extra_lines(self): 79 self.create_test_file("line 1\nline 2\nline 3\n") 80 self.assertEqual("line 1", base_utils.read_one_line("filename")) 81 self.god.check_playback() 82 83 84 def test_works_on_empty_file(self): 85 self.create_test_file("") 86 self.assertEqual("", base_utils.read_one_line("filename")) 87 self.god.check_playback() 88 89 90 def test_works_on_file_with_no_newlines(self): 91 self.create_test_file("line but no newline") 92 self.assertEqual("line but no newline", 93 base_utils.read_one_line("filename")) 94 self.god.check_playback() 95 96 97 def test_preserves_leading_whitespace(self): 98 self.create_test_file(" has leading whitespace") 99 self.assertEqual(" has leading whitespace", 100 base_utils.read_one_line("filename")) 101 102 103class test_write_one_line(unittest.TestCase): 104 def setUp(self): 105 self.god = mock.mock_god(ut=self) 106 self.god.stub_function(base_utils, "open") 107 108 109 def tearDown(self): 110 self.god.unstub_all() 111 112 113 def get_write_one_line_output(self, content): 114 test_file = mock.SaveDataAfterCloseStringIO() 115 base_utils.open.expect_call("filename", "w").and_return(test_file) 116 base_utils.write_one_line("filename", content) 117 self.god.check_playback() 118 return test_file.final_data 119 120 121 def test_writes_one_line_file(self): 122 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) 123 124 125 def test_preserves_existing_newline(self): 126 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) 127 128 129 def test_preserves_leading_whitespace(self): 130 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) 131 132 133 def test_preserves_trailing_whitespace(self): 134 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) 135 136 137 def test_handles_empty_input(self): 138 self.assertEqual("\n", self.get_write_one_line_output("")) 139 140 141class test_open_write_close(unittest.TestCase): 142 def setUp(self): 143 self.god = mock.mock_god(ut=self) 144 self.god.stub_function(base_utils, "open") 145 146 147 def tearDown(self): 148 self.god.unstub_all() 149 150 151 def test_simple_functionality(self): 152 data = "\n\nwhee\n" 153 test_file = mock.SaveDataAfterCloseStringIO() 154 base_utils.open.expect_call("filename", "w").and_return(test_file) 155 base_utils.open_write_close("filename", data) 156 self.god.check_playback() 157 self.assertEqual(data, test_file.final_data) 158 159 160class test_read_keyval(unittest.TestCase): 161 def setUp(self): 162 self.god = mock.mock_god(ut=self) 163 self.god.stub_function(base_utils, "open") 164 self.god.stub_function(os.path, "isdir") 165 self.god.stub_function(os.path, "exists") 166 167 168 def tearDown(self): 169 self.god.unstub_all() 170 171 172 def create_test_file(self, filename, contents): 173 test_file = StringIO.StringIO(contents) 174 os.path.exists.expect_call(filename).and_return(True) 175 base_utils.open.expect_call(filename).and_return(test_file) 176 177 178 def read_keyval(self, contents): 179 os.path.isdir.expect_call("file").and_return(False) 180 self.create_test_file("file", contents) 181 keyval = base_utils.read_keyval("file") 182 self.god.check_playback() 183 return keyval 184 185 186 def test_returns_empty_when_file_doesnt_exist(self): 187 os.path.isdir.expect_call("file").and_return(False) 188 os.path.exists.expect_call("file").and_return(False) 189 self.assertEqual({}, base_utils.read_keyval("file")) 190 self.god.check_playback() 191 192 193 def test_accesses_files_directly(self): 194 os.path.isdir.expect_call("file").and_return(False) 195 self.create_test_file("file", "") 196 base_utils.read_keyval("file") 197 self.god.check_playback() 198 199 200 def test_accesses_directories_through_keyval_file(self): 201 os.path.isdir.expect_call("dir").and_return(True) 202 self.create_test_file("dir/keyval", "") 203 base_utils.read_keyval("dir") 204 self.god.check_playback() 205 206 207 def test_values_are_rstripped(self): 208 keyval = self.read_keyval("a=b \n") 209 self.assertEquals(keyval, {"a": "b"}) 210 211 212 def test_comments_are_ignored(self): 213 keyval = self.read_keyval("a=b # a comment\n") 214 self.assertEquals(keyval, {"a": "b"}) 215 216 217 def test_integers_become_ints(self): 218 keyval = self.read_keyval("a=1\n") 219 self.assertEquals(keyval, {"a": 1}) 220 self.assertEquals(int, type(keyval["a"])) 221 222 223 def test_float_values_become_floats(self): 224 keyval = self.read_keyval("a=1.5\n") 225 self.assertEquals(keyval, {"a": 1.5}) 226 self.assertEquals(float, type(keyval["a"])) 227 228 229 def test_multiple_lines(self): 230 keyval = self.read_keyval("a=one\nb=two\n") 231 self.assertEquals(keyval, {"a": "one", "b": "two"}) 232 233 234 def test_the_last_duplicate_line_is_used(self): 235 keyval = self.read_keyval("a=one\nb=two\na=three\n") 236 self.assertEquals(keyval, {"a": "three", "b": "two"}) 237 238 239 def test_extra_equals_are_included_in_values(self): 240 keyval = self.read_keyval("a=b=c\n") 241 self.assertEquals(keyval, {"a": "b=c"}) 242 243 244 def test_non_alphanumeric_keynames_are_rejected(self): 245 self.assertRaises(ValueError, self.read_keyval, "a$=one\n") 246 247 248 def test_underscores_are_allowed_in_key_names(self): 249 keyval = self.read_keyval("a_b=value\n") 250 self.assertEquals(keyval, {"a_b": "value"}) 251 252 253 def test_dashes_are_allowed_in_key_names(self): 254 keyval = self.read_keyval("a-b=value\n") 255 self.assertEquals(keyval, {"a-b": "value"}) 256 257 def test_empty_value_is_allowed(self): 258 keyval = self.read_keyval("a=\n") 259 self.assertEquals(keyval, {"a": ""}) 260 261 262class test_write_keyval(unittest.TestCase): 263 def setUp(self): 264 self.god = mock.mock_god(ut=self) 265 self.god.stub_function(base_utils, "open") 266 self.god.stub_function(os.path, "isdir") 267 268 269 def tearDown(self): 270 self.god.unstub_all() 271 272 273 def assertHasLines(self, value, lines): 274 vlines = value.splitlines() 275 vlines.sort() 276 self.assertEquals(vlines, sorted(lines)) 277 278 279 def write_keyval(self, filename, dictionary, expected_filename=None, 280 type_tag=None): 281 if expected_filename is None: 282 expected_filename = filename 283 test_file = StringIO.StringIO() 284 self.god.stub_function(test_file, "close") 285 base_utils.open.expect_call(expected_filename, "a").and_return(test_file) 286 test_file.close.expect_call() 287 if type_tag is None: 288 base_utils.write_keyval(filename, dictionary) 289 else: 290 base_utils.write_keyval(filename, dictionary, type_tag) 291 return test_file.getvalue() 292 293 294 def write_keyval_file(self, dictionary, type_tag=None): 295 os.path.isdir.expect_call("file").and_return(False) 296 return self.write_keyval("file", dictionary, type_tag=type_tag) 297 298 299 def test_accesses_files_directly(self): 300 os.path.isdir.expect_call("file").and_return(False) 301 result = self.write_keyval("file", {"a": "1"}) 302 self.assertEquals(result, "a=1\n") 303 304 305 def test_accesses_directories_through_keyval_file(self): 306 os.path.isdir.expect_call("dir").and_return(True) 307 result = self.write_keyval("dir", {"b": "2"}, "dir/keyval") 308 self.assertEquals(result, "b=2\n") 309 310 311 def test_numbers_are_stringified(self): 312 result = self.write_keyval_file({"c": 3}) 313 self.assertEquals(result, "c=3\n") 314 315 316 def test_type_tags_are_excluded_by_default(self): 317 result = self.write_keyval_file({"d": "a string"}) 318 self.assertEquals(result, "d=a string\n") 319 self.assertRaises(ValueError, self.write_keyval_file, 320 {"d{perf}": "a string"}) 321 322 323 def test_perf_tags_are_allowed(self): 324 result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2}, 325 type_tag="perf") 326 self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"]) 327 self.assertRaises(ValueError, self.write_keyval_file, 328 {"a": 1, "b": 2}, type_tag="perf") 329 330 331 def test_non_alphanumeric_keynames_are_rejected(self): 332 self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0}) 333 334 335 def test_underscores_are_allowed_in_key_names(self): 336 result = self.write_keyval_file({"a_b": "value"}) 337 self.assertEquals(result, "a_b=value\n") 338 339 340 def test_dashes_are_allowed_in_key_names(self): 341 result = self.write_keyval_file({"a-b": "value"}) 342 self.assertEquals(result, "a-b=value\n") 343 344 345class test_is_url(unittest.TestCase): 346 def test_accepts_http(self): 347 self.assertTrue(base_utils.is_url("http://example.com")) 348 349 350 def test_accepts_ftp(self): 351 self.assertTrue(base_utils.is_url("ftp://ftp.example.com")) 352 353 354 def test_rejects_local_path(self): 355 self.assertFalse(base_utils.is_url("/home/username/file")) 356 357 358 def test_rejects_local_filename(self): 359 self.assertFalse(base_utils.is_url("filename")) 360 361 362 def test_rejects_relative_local_path(self): 363 self.assertFalse(base_utils.is_url("somedir/somesubdir/file")) 364 365 366 def test_rejects_local_path_containing_url(self): 367 self.assertFalse(base_utils.is_url("somedir/http://path/file")) 368 369 370class test_urlopen(unittest.TestCase): 371 def setUp(self): 372 self.god = mock.mock_god(ut=self) 373 374 375 def tearDown(self): 376 self.god.unstub_all() 377 378 379 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, 380 *expected_args): 381 expected_args += (None,) * (2 - len(expected_args)) 382 def urlopen(url, data=None): 383 self.assertEquals(expected_args, (url,data)) 384 test_func(socket.getdefaulttimeout()) 385 return expected_return 386 self.god.stub_with(urllib2, "urlopen", urlopen) 387 388 389 def stub_urlopen_with_timeout_check(self, expected_timeout, 390 expected_return, *expected_args): 391 def test_func(timeout): 392 self.assertEquals(timeout, expected_timeout) 393 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, 394 *expected_args) 395 396 397 def test_timeout_set_during_call(self): 398 self.stub_urlopen_with_timeout_check(30, "retval", "url") 399 retval = base_utils.urlopen("url", timeout=30) 400 self.assertEquals(retval, "retval") 401 402 403 def test_timeout_reset_after_call(self): 404 old_timeout = socket.getdefaulttimeout() 405 self.stub_urlopen_with_timeout_check(30, None, "url") 406 try: 407 socket.setdefaulttimeout(1234) 408 base_utils.urlopen("url", timeout=30) 409 self.assertEquals(1234, socket.getdefaulttimeout()) 410 finally: 411 socket.setdefaulttimeout(old_timeout) 412 413 414 def test_timeout_set_by_default(self): 415 def test_func(timeout): 416 self.assertTrue(timeout is not None) 417 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") 418 base_utils.urlopen("url") 419 420 421 def test_args_are_untouched(self): 422 self.stub_urlopen_with_timeout_check(30, None, "http://url", 423 "POST data") 424 base_utils.urlopen("http://url", timeout=30, data="POST data") 425 426 427class test_urlretrieve(unittest.TestCase): 428 def setUp(self): 429 self.god = mock.mock_god(ut=self) 430 431 432 def tearDown(self): 433 self.god.unstub_all() 434 435 436 def test_urlopen_passed_arguments(self): 437 self.god.stub_function(base_utils, "urlopen") 438 self.god.stub_function(base_utils.shutil, "copyfileobj") 439 self.god.stub_function(base_utils, "open") 440 441 url = "url" 442 dest = "somefile" 443 data = object() 444 timeout = 10 445 446 src_file = self.god.create_mock_class(file, "file") 447 dest_file = self.god.create_mock_class(file, "file") 448 449 (base_utils.urlopen.expect_call(url, data=data, timeout=timeout) 450 .and_return(src_file)) 451 base_utils.open.expect_call(dest, "wb").and_return(dest_file) 452 base_utils.shutil.copyfileobj.expect_call(src_file, dest_file) 453 dest_file.close.expect_call() 454 src_file.close.expect_call() 455 456 base_utils.urlretrieve(url, dest, data=data, timeout=timeout) 457 self.god.check_playback() 458 459 460class test_merge_trees(unittest.TestCase): 461 # a some path-handling helper functions 462 def src(self, *path_segments): 463 return os.path.join(self.src_tree.name, *path_segments) 464 465 466 def dest(self, *path_segments): 467 return os.path.join(self.dest_tree.name, *path_segments) 468 469 470 def paths(self, *path_segments): 471 return self.src(*path_segments), self.dest(*path_segments) 472 473 474 def assertFileEqual(self, *path_segments): 475 src, dest = self.paths(*path_segments) 476 self.assertEqual(True, os.path.isfile(src)) 477 self.assertEqual(True, os.path.isfile(dest)) 478 self.assertEqual(os.path.getsize(src), os.path.getsize(dest)) 479 self.assertEqual(open(src).read(), open(dest).read()) 480 481 482 def assertFileContents(self, contents, *path_segments): 483 dest = self.dest(*path_segments) 484 self.assertEqual(True, os.path.isfile(dest)) 485 self.assertEqual(os.path.getsize(dest), len(contents)) 486 self.assertEqual(contents, open(dest).read()) 487 488 489 def setUp(self): 490 self.src_tree = autotemp.tempdir(unique_id='utilsrc') 491 self.dest_tree = autotemp.tempdir(unique_id='utilsdest') 492 493 # empty subdirs 494 os.mkdir(self.src("empty")) 495 os.mkdir(self.dest("empty")) 496 497 498 def tearDown(self): 499 self.src_tree.clean() 500 self.dest_tree.clean() 501 502 503 def test_both_dont_exist(self): 504 base_utils.merge_trees(*self.paths("empty")) 505 506 507 def test_file_only_at_src(self): 508 print >> open(self.src("src_only"), "w"), "line 1" 509 base_utils.merge_trees(*self.paths("src_only")) 510 self.assertFileEqual("src_only") 511 512 513 def test_file_only_at_dest(self): 514 print >> open(self.dest("dest_only"), "w"), "line 1" 515 base_utils.merge_trees(*self.paths("dest_only")) 516 self.assertEqual(False, os.path.exists(self.src("dest_only"))) 517 self.assertFileContents("line 1\n", "dest_only") 518 519 520 def test_file_at_both(self): 521 print >> open(self.dest("in_both"), "w"), "line 1" 522 print >> open(self.src("in_both"), "w"), "line 2" 523 base_utils.merge_trees(*self.paths("in_both")) 524 self.assertFileContents("line 1\nline 2\n", "in_both") 525 526 527 def test_directory_with_files_in_both(self): 528 print >> open(self.dest("in_both"), "w"), "line 1" 529 print >> open(self.src("in_both"), "w"), "line 3" 530 base_utils.merge_trees(*self.paths()) 531 self.assertFileContents("line 1\nline 3\n", "in_both") 532 533 534 def test_directory_with_mix_of_files(self): 535 print >> open(self.dest("in_dest"), "w"), "dest line" 536 print >> open(self.src("in_src"), "w"), "src line" 537 base_utils.merge_trees(*self.paths()) 538 self.assertFileContents("dest line\n", "in_dest") 539 self.assertFileContents("src line\n", "in_src") 540 541 542 def test_directory_with_subdirectories(self): 543 os.mkdir(self.src("src_subdir")) 544 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" 545 os.mkdir(self.src("both_subdir")) 546 os.mkdir(self.dest("both_subdir")) 547 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" 548 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" 549 base_utils.merge_trees(*self.paths()) 550 self.assertFileContents("subdir line\n", "src_subdir", "subfile") 551 self.assertFileContents("dest line\nsrc line\n", "both_subdir", 552 "subfile") 553 554 555class test_get_relative_path(unittest.TestCase): 556 def test_not_absolute(self): 557 self.assertRaises(AssertionError, base_utils.get_relative_path, "a", "b") 558 559 def test_same_dir(self): 560 self.assertEqual(base_utils.get_relative_path("/a/b/c", "/a/b"), "c") 561 562 def test_forward_dir(self): 563 self.assertEqual(base_utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d") 564 565 def test_previous_dir(self): 566 self.assertEqual(base_utils.get_relative_path("/a/b", "/a/b/c/d"), "../..") 567 568 def test_parallel_dir(self): 569 self.assertEqual(base_utils.get_relative_path("/a/c/d", "/a/b/c/d"), 570 "../../../c/d") 571 572 573class test_sh_escape(unittest.TestCase): 574 def _test_in_shell(self, text): 575 escaped_text = base_utils.sh_escape(text) 576 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, 577 stdin=open(os.devnull, 'r'), 578 stdout=subprocess.PIPE, 579 stderr=open(os.devnull, 'w')) 580 stdout, _ = proc.communicate() 581 self.assertEqual(proc.returncode, 0) 582 self.assertEqual(stdout[:-1], text) 583 584 585 def test_normal_string(self): 586 self._test_in_shell('abcd') 587 588 589 def test_spaced_string(self): 590 self._test_in_shell('abcd efgh') 591 592 593 def test_dollar(self): 594 self._test_in_shell('$') 595 596 597 def test_single_quote(self): 598 self._test_in_shell('\'') 599 600 601 def test_single_quoted_string(self): 602 self._test_in_shell('\'efgh\'') 603 604 605 def test_string_with_single_quote(self): 606 self._test_in_shell("a'b") 607 608 609 def test_string_with_escaped_single_quote(self): 610 self._test_in_shell(r"a\'b") 611 612 613 def test_double_quote(self): 614 self._test_in_shell('"') 615 616 617 def test_double_quoted_string(self): 618 self._test_in_shell('"abcd"') 619 620 621 def test_backtick(self): 622 self._test_in_shell('`') 623 624 625 def test_backticked_string(self): 626 self._test_in_shell('`jklm`') 627 628 629 def test_backslash(self): 630 self._test_in_shell('\\') 631 632 633 def test_backslashed_special_characters(self): 634 self._test_in_shell('\\$') 635 self._test_in_shell('\\"') 636 self._test_in_shell('\\\'') 637 self._test_in_shell('\\`') 638 639 640 def test_backslash_codes(self): 641 self._test_in_shell('\\n') 642 self._test_in_shell('\\r') 643 self._test_in_shell('\\t') 644 self._test_in_shell('\\v') 645 self._test_in_shell('\\b') 646 self._test_in_shell('\\a') 647 self._test_in_shell('\\000') 648 649 def test_real_newline(self): 650 self._test_in_shell('\n') 651 self._test_in_shell('\\\n') 652 653 654class test_sh_quote_word(test_sh_escape): 655 """Run tests on sh_quote_word. 656 657 Inherit from test_sh_escape to get the same tests to run on both. 658 """ 659 660 def _test_in_shell(self, text): 661 quoted_word = base_utils.sh_quote_word(text) 662 echoed_value = subprocess.check_output('echo %s' % quoted_word, 663 shell=True) 664 self.assertEqual(echoed_value, text + '\n') 665 666 667class test_nested_sh_quote_word(test_sh_quote_word): 668 """Run nested tests on sh_quote_word. 669 670 Inherit from test_sh_quote_word to get the same tests to run on both. 671 """ 672 673 def _test_in_shell(self, text): 674 command = 'echo ' + base_utils.sh_quote_word(text) 675 nested_command = 'echo ' + base_utils.sh_quote_word(command) 676 produced_command = subprocess.check_output(nested_command, shell=True) 677 echoed_value = subprocess.check_output(produced_command, shell=True) 678 self.assertEqual(echoed_value, text + '\n') 679 680 681class test_run(unittest.TestCase): 682 """ 683 Test the base_utils.run() function. 684 685 Note: This test runs simple external commands to test the base_utils.run() 686 API without assuming implementation details. 687 """ 688 def setUp(self): 689 self.god = mock.mock_god(ut=self) 690 self.god.stub_function(base_utils.logging, 'warning') 691 self.god.stub_function(base_utils.logging, 'debug') 692 693 694 def tearDown(self): 695 self.god.unstub_all() 696 697 698 def __check_result(self, result, command, exit_status=0, stdout='', 699 stderr=''): 700 self.assertEquals(result.command, command) 701 self.assertEquals(result.exit_status, exit_status) 702 self.assertEquals(result.stdout, stdout) 703 self.assertEquals(result.stderr, stderr) 704 705 706 def test_default_simple(self): 707 cmd = 'echo "hello world"' 708 # expect some king of logging.debug() call but don't care about args 709 base_utils.logging.debug.expect_any_call() 710 self.__check_result(base_utils.run(cmd), cmd, stdout='hello world\n') 711 712 713 def test_default_failure(self): 714 cmd = 'exit 11' 715 try: 716 base_utils.run(cmd, verbose=False) 717 except base_utils.error.CmdError, err: 718 self.__check_result(err.result_obj, cmd, exit_status=11) 719 720 721 def test_ignore_status(self): 722 cmd = 'echo error >&2 && exit 11' 723 self.__check_result(base_utils.run(cmd, ignore_status=True, verbose=False), 724 cmd, exit_status=11, stderr='error\n') 725 726 727 def test_timeout(self): 728 # we expect a logging.warning() message, don't care about the contents 729 base_utils.logging.warning.expect_any_call() 730 try: 731 base_utils.run('echo -n output && sleep 10', timeout=1, verbose=False) 732 except base_utils.error.CmdError, err: 733 self.assertEquals(err.result_obj.stdout, 'output') 734 735 736 def test_stdout_stderr_tee(self): 737 cmd = 'echo output && echo error >&2' 738 stdout_tee = StringIO.StringIO() 739 stderr_tee = StringIO.StringIO() 740 741 self.__check_result(base_utils.run( 742 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, 743 verbose=False), cmd, stdout='output\n', stderr='error\n') 744 self.assertEqual(stdout_tee.getvalue(), 'output\n') 745 self.assertEqual(stderr_tee.getvalue(), 'error\n') 746 747 748 def test_stdin_string(self): 749 cmd = 'cat' 750 self.__check_result(base_utils.run(cmd, verbose=False, stdin='hi!\n'), 751 cmd, stdout='hi!\n') 752 753 754 def test_safe_args(self): 755 # NOTE: The string in expected_quoted_cmd depends on the internal 756 # implementation of shell quoting which is used by base_utils.run(), 757 # in this case, sh_quote_word(). 758 expected_quoted_cmd = "echo 'hello \"world' again" 759 self.__check_result(base_utils.run( 760 'echo', verbose=False, args=('hello "world', 'again')), 761 expected_quoted_cmd, stdout='hello "world again\n') 762 763 764 def test_safe_args_given_string(self): 765 self.assertRaises(TypeError, base_utils.run, 'echo', args='hello') 766 767 768 def test_wait_interrupt(self): 769 """Test that we actually select twice if the first one returns EINTR.""" 770 base_utils.logging.debug.expect_any_call() 771 772 bg_job = base_utils.BgJob('echo "hello world"') 773 bg_job.result.exit_status = 0 774 self.god.stub_function(base_utils.select, 'select') 775 776 base_utils.select.select.expect_any_call().and_raises( 777 select.error(errno.EINTR, 'Select interrupted')) 778 base_utils.logging.warning.expect_any_call() 779 780 base_utils.select.select.expect_any_call().and_return( 781 ([bg_job.sp.stdout, bg_job.sp.stderr], [], None)) 782 base_utils.logging.warning.expect_any_call() 783 784 self.assertFalse( 785 base_utils._wait_for_commands([bg_job], time.time(), None)) 786 787 788class test_compare_versions(unittest.TestCase): 789 def test_zerofill(self): 790 self.assertEqual(base_utils.compare_versions('1.7', '1.10'), -1) 791 self.assertEqual(base_utils.compare_versions('1.222', '1.3'), 1) 792 self.assertEqual(base_utils.compare_versions('1.03', '1.3'), 0) 793 794 795 def test_unequal_len(self): 796 self.assertEqual(base_utils.compare_versions('1.3', '1.3.4'), -1) 797 self.assertEqual(base_utils.compare_versions('1.3.1', '1.3'), 1) 798 799 800 def test_dash_delimited(self): 801 self.assertEqual(base_utils.compare_versions('1-2-3', '1-5-1'), -1) 802 self.assertEqual(base_utils.compare_versions('1-2-1', '1-1-1'), 1) 803 self.assertEqual(base_utils.compare_versions('1-2-4', '1-2-4'), 0) 804 805 806 def test_alphabets(self): 807 self.assertEqual(base_utils.compare_versions('m.l.b', 'n.b.a'), -1) 808 self.assertEqual(base_utils.compare_versions('n.b.a', 'm.l.b'), 1) 809 self.assertEqual(base_utils.compare_versions('abc.e', 'abc.e'), 0) 810 811 812 def test_mix_symbols(self): 813 self.assertEqual(base_utils.compare_versions('k-320.1', 'k-320.3'), -1) 814 self.assertEqual(base_utils.compare_versions('k-231.5', 'k-231.1'), 1) 815 self.assertEqual(base_utils.compare_versions('k-231.1', 'k-231.1'), 0) 816 817 self.assertEqual(base_utils.compare_versions('k.320-1', 'k.320-3'), -1) 818 self.assertEqual(base_utils.compare_versions('k.231-5', 'k.231-1'), 1) 819 self.assertEqual(base_utils.compare_versions('k.231-1', 'k.231-1'), 0) 820 821 822class test_args_to_dict(unittest.TestCase): 823 def test_no_args(self): 824 result = base_utils.args_to_dict([]) 825 self.assertEqual({}, result) 826 827 828 def test_matches(self): 829 result = base_utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', 830 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) 831 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', 832 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) 833 834 835 def test_unmatches(self): 836 # Temporarily shut warning messages from args_to_dict() when an argument 837 # doesn't match its pattern. 838 logger = logging.getLogger() 839 saved_level = logger.level 840 logger.setLevel(logging.ERROR) 841 842 try: 843 result = base_utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b', 844 ':VAL', '=VVV', 'WORD']) 845 self.assertEqual({}, result) 846 finally: 847 # Restore level. 848 logger.setLevel(saved_level) 849 850 851class test_get_random_port(unittest.TestCase): 852 def do_bind(self, port, socket_type, socket_proto): 853 s = socket.socket(socket.AF_INET, socket_type, socket_proto) 854 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 855 s.bind(('', port)) 856 return s 857 858 859 def test_get_port(self): 860 for _ in xrange(100): 861 p = base_utils.get_unused_port() 862 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) 863 self.assert_(s.getsockname()) 864 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 865 self.assert_(s.getsockname()) 866 867 868if __name__ == "__main__": 869 unittest.main() 870